博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark修炼之道(高级篇)——Spark源码阅读:第十节 Standalone运行模式解析
阅读量:6655 次
发布时间:2019-06-25

本文共 14382 字,大约阅读时间需要 47 分钟。

Spark Standalone采用的是Master/Slave架构,主要涉及到的类包括:

类:org.apache.spark.deploy.master.Master说明:负责整个集群的资源调度及Application的管理。消息类型:接收Worker发送的消息1. RegisterWorker2. ExecutorStateChanged3. WorkerSchedulerStateResponse4. Heartbeat向Worker发送的消息1. RegisteredWorker2. RegisterWorkerFailed3. ReconnectWorker4. KillExecutor5.LaunchExecutor6.LaunchDriver7.KillDriver8.ApplicationFinished向AppClient发送的消息1. RegisteredApplication2. ExecutorAdded3. ExecutorUpdated4. ApplicationRemoved接收AppClient发送的消息1. RegisterApplication2. UnregisterApplication3. MasterChangeAcknowledged4. RequestExecutors5. KillExecutors向Driver Client发送的消息1.SubmitDriverResponse2.KillDriverResponse3.DriverStatusResponse接收Driver Client发送的消息1.RequestSubmitDriver 2.RequestKillDriver         3.RequestDriverStatus类org.apache.spark.deploy.worker.Worker说明:向Master注册自己并启动CoarseGrainedExecutorBackend,在运行时启动Executor运行Task任务消息类型:向Master发送的消息1. RegisterWorker2. ExecutorStateChanged3. WorkerSchedulerStateResponse4. Heartbeat接收Master发送的消息1. RegisteredWorker2. RegisterWorkerFailed3. ReconnectWorker4. KillExecutor5.LaunchExecutor6.LaunchDriver7.KillDriver8.ApplicationFinished类org.apache.spark.deploy.client.AppClient.ClientEndpoint说明:向Master注册并监控Application,请求或杀死Executors等消息类型:向Master发送的消息1. RegisterApplication2. UnregisterApplication3. MasterChangeAcknowledged4. RequestExecutors5. KillExecutors接收Master发送的消息1. RegisteredApplication2. ExecutorAdded3. ExecutorUpdated4. ApplicationRemoved类:org.apache.spark.scheduler.cluster.DriverEndpoint说明:运行时注册Executor并启动Task的运行并处理Executor发送来的状态更新等消息类型:向Executor发送的消息1.LaunchTask2.KillTask3.RegisteredExecutor4.RegisterExecutorFailed接收Executor发送的消息1.RegisterExecutor2.StatusUpdate类:org.apache.spark.deploy.ClientEndpoint 说明:管理Driver包括提交Driver、Kill掉Driver及获取Driver状态信息向Master发送的消息1.RequestSubmitDriver 2.RequestKillDriver         3.RequestDriverStatus接收Master 发送的消息1.SubmitDriverResponse2.KillDriverResponse3.DriverStatusResponse

上面所有的类都继承自org.apache.spark.rpc.ThreadSafeRpcEndpoint,其底层实现目前都是通过AKKA来实现的,具体如下图所示:

这里写图片描述

各类之间的交互关系如下图所示:

这里写图片描述

1. AppClient与Master间的交互

SparkContext在创建时,会调用,createTaskScheduler方法创建相应的TaskScheduler及SchedulerBackend

// Create and start the scheduler    val (sched, ts) = SparkContext.createTaskScheduler(this, master)    _schedulerBackend = sched    _taskScheduler = ts    _dagScheduler = new DAGScheduler(this)    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler‘s    // constructor    _taskScheduler.start()Standalone运行模式创建的TaskScheduler及SchedulerBackend具体源码如下:/**   * Create a task scheduler based on a given master URL.   * Return a 2-tuple of the scheduler backend and the task scheduler.   */  private def createTaskScheduler(      sc: SparkContext,      master: String): (SchedulerBackend, TaskScheduler) = {   //省略其它非关键代码   case SPARK_REGEX(sparkUrl) =>        val scheduler = new TaskSchedulerImpl(sc)        val masterUrls = sparkUrl.split(",").map("spark://" + _)        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)        scheduler.initialize(backend)        (backend, scheduler)    //省略其它非关键代码}

创建完成TaskScheduler及SchedulerBackend后,调用TaskScheduler的start方法,启动SchedulerBackend(Standalone模式对应SparkDeploySchedulerBackend)

//TaskSchedulerImpl中的start方法 override def start() {    //调用SchedulerBackend的start方法     backend.start()    //省略其它非关键代码  }

对应SparkDeploySchedulerBackend中的start方法源码如下:

override def start() {    super.start()    //省略其它非关键代码    //Application相关信息(包括应用程序名、executor运行内存等)    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)    //创建AppClient,传入相应启动参数    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)    client.start()    waitForRegistration()  }

AppClient类中的start方法原码如下:

//AppClient Start方法  def start() {    // Just launch an rpcEndpoint; it will call back into the listener.    //ClientEndpoint,该ClientEndpoint为AppClient的内部类    //它是AppClient的rpcEndpoint    endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))  }ClientEndpoint在启动时,会向Master注册Applicationoverride def onStart(): Unit = {      try {        registerWithMaster(1)      } catch {        case e: Exception =>          logWarning("Failed to connect to master", e)          markDisconnected()          stop()      }    }

registerWithMaster方法便是向Master注册Application,其源码如下:

/**     * Register with all masters asynchronously. It will call `registerWithMaster` every     * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.     * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.     *     * nthRetry means this is the nth attempt to register with master.     */    private def registerWithMaster(nthRetry: Int) {      registerMasterFutures = tryRegisterAllMasters()      //注册失败时重试      registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable {        override def run(): Unit = {          Utils.tryOrExit {            if (registered) {              registerMasterFutures.foreach(_.cancel(true))              registerMasterThreadPool.shutdownNow()            } else if (nthRetry >= REGISTRATION_RETRIES) {              markDead("All masters are unresponsive! Giving up.")            } else {              registerMasterFutures.foreach(_.cancel(true))              registerWithMaster(nthRetry + 1)            }          }        }      }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)    }

向所有Masters注册,因为Master可能实现了高可靠(HA),例如ZooKeeper的HA方式,所以存在多个Master,但最终只有Active Master响应,具体源码如下:

/**     *  Register with all masters asynchronously and returns an array `Future`s for cancellation.     */    private def tryRegisterAllMasters(): Array[JFuture[_]] = {      for (masterAddress <- masterRpcAddresses) yield {        registerMasterThreadPool.submit(new Runnable {          override def run(): Unit = try {            if (registered) {              return            }            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")            //获取Master rpcEndpoint            val masterRef =              rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)            //向Master发送RegisterApplication信息            masterRef.send(RegisterApplication(appDescription, self))          } catch {            case ie: InterruptedException => // Cancelled            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)          }        })      }    }

Master会接收来自AppClient的RegisterApplication消息,具体源码如下:

//org.apache.spark.deploy.master.Master.receive方法接受AppClient发送来的RegisterApplication消息override def receive: PartialFunction[Any, Unit] = {    case RegisterApplication(description, driver) => {      // TODO Prevent repeated registrations from some driver      if (state == RecoveryState.STANDBY) {        // ignore, don‘t send response      } else {        logInfo("Registering app " + description.name)        //创建ApplicationInfo        val app = createApplication(description, driver)        //注册Application        registerApplication(app)        logInfo("Registered app " + description.name + " with ID " + app.id)        persistenceEngine.addApplication(app)        //向AppClient发送RegisteredApplication消息        driver.send(RegisteredApplication(app.id, self))        schedule()      }    }

AppClient内部类ClientEndpoint接收Master发来的RegisteredApplication消息

override def receive: PartialFunction[Any, Unit] = {      case RegisteredApplication(appId_, masterRef) =>        // FIXME How to handle the following cases?        // 1. A master receives multiple registrations and sends back multiple        // RegisteredApplications due to an unstable network.        // 2. Receive multiple RegisteredApplication from different masters because the master is        // changing.        appId = appId_        registered = true        master = Some(masterRef)        listener.connected(appId)       //省略其它非关键代码}

通过上述过程便完成Application的注册。其它交互信息如下

//------------------AppClient 向 Master  发送的消息------------------//  //AppClient向Master注册Application  case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)    extends DeployMessage //AppClient向Master注销Application  case class UnregisterApplication(appId: String) //Master从故障中恢复后,发送MasterChange消息给AppClient,AppClient接收到该消息后,更改保存的Master信息,然后发送MasterChangeAcknowledged给Master  case class MasterChangeAcknowledged(appId: String)//为Application的运行申请数量为requestedTotal的Executor  case class RequestExecutors(appId: String, requestedTotal: Int)//杀死Application对应的Executors  case class KillExecutors(appId: String, executorIds: Seq[String])//------------------Master  向 AppClient 发送的消息------------------//  //向AppClient发送Application注册成功的消息  case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage  // TODO(matei): replace hostPort with host  //Worker启动了Executor后,发送该消息通知AppClient  case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
Utils.checkHostPort(hostPort, "Required hostport") } //Executor状态更新后,发送该消息通知AppClient case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], exitStatus: Option[Int]) //Application成功运行或失败时,Master发送该消息给AppClient //AppClient接收该消息后,停止Application的运行 case class ApplicationRemoved(message: String) // Master 发生变化时,会利用MasterChanged消息通知Worker及AppClient case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)

2. Master与Worker间的交互

这里只给出其基本的消息交互,后面有时间再来具体分析。

//------------------Worker向Master发送的消息------------------//  //向Master注册Worker,Master在完成Worker注册后,向Worker发送RegisteredWorker消息,此后便可接收来自Master的调度  case class RegisterWorker(      id: String,      host: String,      port: Int,      worker: RpcEndpointRef,      cores: Int,      memory: Int,      webUiPort: Int,      publicAddress: String)    extends DeployMessage {    Utils.checkHost(host, "Required hostname")    assert (port > 0)  }  //向Master汇报Executor的状态变化  case class ExecutorStateChanged(      appId: String,      execId: Int,      state: ExecutorState,      message: Option[String],      exitStatus: Option[Int])    extends DeployMessage  //向Master汇报Driver状态变化  case class DriverStateChanged(      driverId: String,      state: DriverState,      exception: Option[Exception])    extends DeployMessage  //Worker向Master汇报其运行的Executor及Driver信息  case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],     driverIds: Seq[String])  //Worker向Master发送的心跳信息,主要向Master报活  case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
//------------------Master向Worker发送的消息------------------//  //Worker发送RegisterWorker消息注册Worker,注册成功后Master回复RegisteredWorker消息给Worker  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage  //Worker发送RegisterWorker消息注册Worker,注册失败后Master回复RegisterWorkerFailed消息给Worker  case class RegisterWorkerFailed(message: String) extends DeployMessage   //Worker心跳超时后,Master向Worker发送ReconnectWorker消息,通知Worker节点需要重新注册  case class ReconnectWorker(masterUrl: String) extends DeployMessage  //application运行完毕后,Master向Worker发送KillExecutor消息,Worker接收到消息后,删除对应execId的Executor  case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage   //向Worker节点发送启动Executor消息  case class LaunchExecutor(      masterUrl: String,      appId: String,      execId: Int,      appDesc: ApplicationDescription,      cores: Int,      memory: Int)    extends DeployMessage  //向Worker节点发送启动Driver消息  case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage  //杀死对应Driver  case class KillDriver(driverId: String) extends DeployMessage  case class ApplicationFinished(id: String)

3. Driver Client与 Master间的消息交互

Driver Client主要是管理Driver,包括向Master提交Driver、请求杀死Driver等,其源码位于org.apache.spark.deploy.client.scala源码文件当中,类名为:org.apache.spark.deploy.ClientEndpoint。要注意其与org.apache.spark.deploy.client.AppClient.ClientEndpoint类的本质不同。

//------------------Driver Client间Master信息的交互------------------//  //Driver Client向Master请求提交Driver  case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage  //Master向Driver Client返回注册是否成功的消息  case class SubmitDriverResponse(      master: RpcEndpointRef, success: Boolean, driverId: Option[String], message: String)    extends DeployMessage  //Driver Client向Master请求Kill Driver  case class RequestKillDriver(driverId: String) extends DeployMessage  // Master回复Kill Driver是否成功  case class KillDriverResponse(      master: RpcEndpointRef, driverId: String, success: Boolean, message: String)    extends DeployMessage  //Driver Client向Master请求Driver状态  case class RequestDriverStatus(driverId: String) extends DeployMessage  //Master向Driver Client返回状态请求信息  case class DriverStatusResponse(found: Boolean, state: Option[DriverState],    workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception])

4. Driver与Executor间的消息交互

//------------------Driver向Executor发送的消息------------------//  //启动Task  case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage  //杀死Task  case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)    extends CoarseGrainedClusterMessage //Executor注册成功case object RegisteredExecutor extends CoarseGrainedClusterMessage //Executor注册失败case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage//------------------Executor向Driver发送的消息------------------// //向Driver注册Executor case class RegisterExecutor(      executorId: String,      executorRef: RpcEndpointRef,      hostPort: String,      cores: Int,      logUrls: Map[String, String])    extends CoarseGrainedClusterMessage {    Utils.checkHostPort(hostPort, "Expected host port")  } //向Driver汇报状态变化 case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,    data: SerializableBuffer) extends CoarseGrainedClusterMessage  object StatusUpdate {
/** Alternate factory method that takes a ByteBuffer directly for the data field */ def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer) : StatusUpdate = { StatusUpdate(executorId, taskId, state, new SerializableBuffer(data)) } }

作者:周志湖

网名:摇摆少年梦

你可能感兴趣的文章
oracle锁表解锁
查看>>
土狗的小抄本 -- JVM工具集命令
查看>>
Redhat 使用Yum安装更新rpm包
查看>>
nginx教程全集汇总
查看>>
安装PBX环境
查看>>
Outlook签名设置不完全指北
查看>>
记录mysql insert into on duplicate key update的使用
查看>>
Windows7 文件共享及打印机共享 取消账户密码的办法
查看>>
Linux系统基础-管理之如何在终端上获取Linux命令帮助.
查看>>
play2.0文档-面向java开发者(3)
查看>>
我的友情链接
查看>>
搭建本地yum库
查看>>
make[4]: Entering directory `/home/li/tools/mysql-5.1.72/mysql-test'
查看>>
你,需要时(zi)间(wo)管理
查看>>
[玩系列教程]x版本更改用戶名方法
查看>>
记录一次服务器被***
查看>>
模板机部署系统后的eth0网卡设置
查看>>
Protocol Numbers 协议号文档
查看>>
为什么php的trait不能实现interface
查看>>
【Weblogic干货】瞬间运行正常的weblogic.xml内幕
查看>>