承接上一篇文章,我们继续来分析Executor的启动过程,本文主要分为两部分:
向worker发送启动Executor的消息
启动完成后向driver发送ExecutorAdded的消息,这里的driver就是ClientEndpoint
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) exec.application.driver.send( ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) }
启动Executor
首先我们分析Worker在接收到LaunchExecutor消息之后所执行的操作:
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => // 首先判断Master是否为Active状态 if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) // 创建executor的工作目录 // Create the executor's working directory val executorDir = new File(workDir, appId + "/" + execId) if (!executorDir.mkdirs()) { throw new IOException("Failed to create directory " + executorDir) } // Create local dirs for the executor. These are passed to the executor via the // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the // application finishes. 根据application创建executor的本地目录,可以通过SPARK_EXECUTOR_DIRS进行配置 val appLocalDirs = appDirectories.get(appId).getOrElse { Utils.getOrCreateLocalRootDirs(conf).map { dir => val appDir = Utils.createDirectory(dir, namePrefix = "executor") Utils.chmod700(appDir) appDir.getAbsolutePath() }.toSeq } appDirectories(appId) = appLocalDirs // 实例化ExecutorRunner val manager = new ExecutorRunner( appId, execId, appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), cores_, memory_, self, workerId, host, webUi.boundPort, publicAddress, sparkHome, executorDir, workerUri, conf, appLocalDirs, ExecutorState.RUNNING) // 保存在executors中 executors(appId + "/" + execId) = manager // 执行ExecutorRunner的start方法 manager.start() // 修改计算资源的使用情况 coresUsed += cores_ memoryUsed += memory_ // 向Master发送ExecutorStateChanged的消息 sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) } catch { case e: Exception => { logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(e.toString), None)) } } }
首先实例化ExecutorRunner,ExecutorRunner就是Standalone模式下用来管理一个executor进程的执行的。然后调用ExecutorRunner的start()方法:
private[worker] def start() { workerThread = new Thread("ExecutorRunner for " + fullId) { override def run() { fetchAndRunExecutor() } } workerThread.start() // Shutdown hook that kills actors on shutdown. shutdownHook = ShutdownHookManager.addShutdownHook { () => // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`. if (state == ExecutorState.RUNNING) { state = ExecutorState.FAILED } killProcess(Some("Worker shutting down")) } }
可以看见内部创建了一条线程用来执行fetchAndRunExecutor方法,当调用线程的start方法时,线程中的run方法运行,即fetchAndRunExecutor()方法开始执行:
private def fetchAndRunExecutor() { try { // Launch the process // 首先构建command val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf), memory, sparkHome.getAbsolutePath, substituteVariables) val command = builder.command() val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"") logInfo(s"Launch command: $formattedCommand") // 设置Executor的本地目录并设置一些配置参数 builder.directory(executorDir) builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator)) // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") // Add webUI log urls val baseUrl = s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") // 开启一个新的进程运行command process = builder.start() val header = "Spark Executor Command: %s\n%s\n\n".format( formattedCommand, "=" * 40) // Redirect its stdout and stderr to files val stdout = new File(executorDir, "stdout") stdoutAppender = FileAppender(process.getInputStream, stdout, conf) val stderr = new File(executorDir, "stderr") Files.write(header, stderr, UTF_8) stderrAppender = FileAppender(process.getErrorStream, stderr, conf) // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown) // or with nonzero exit code val exitCode = process.waitFor() state = ExecutorState.EXITED val message = "Command exited with code " + exitCode worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))) } catch { case interrupted: InterruptedException => { logInfo("Runner thread for executor " + fullId + " interrupted") state = ExecutorState.KILLED killProcess(None) } case e: Exception => { logError("Error running executor", e) state = ExecutorState.FAILED killProcess(Some(e.toString)) } } }
这里最重要的就是process = builder.start(),即开启一个新的线程来运行我们构建的command,也就是说开辟一个新的进程(JVM)来运行"org.apache.spark.executor.CoarseGrainedExecutorBackend"这个类的main方法,还记得这是在哪里设置的吗,没错,就是SparkDeploySchedulerBackend的start()方法中,所以我们现在进入CoarseGrainedExecutorBackend这个类的main方法:
def main(args: Array[String]) { var driverUrl: String = null var executorId: String = null var hostname: String = null var cores: Int = 0 var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() var argv = args.toList // 这里就是通过我们构建command的时候传入的参数对变量进行初始化操作 while (!argv.isEmpty) { argv match { case ("--driver-url") :: value :: tail => driverUrl = value argv = tail case ("--executor-id") :: value :: tail => executorId = value argv = tail case ("--hostname") :: value :: tail => hostname = value argv = tail case ("--cores") :: value :: tail => cores = value.toInt argv = tail case ("--app-id") :: value :: tail => appId = value argv = tail case ("--worker-url") :: value :: tail => // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) argv = tail case ("--user-class-path") :: value :: tail => userClassPath += new URL(value) argv = tail case Nil => case tail => // scalastyle:off println System.err.println(s"Unrecognized options: ${tail.mkString(" ")}") // scalastyle:on println printUsageAndExit() } } if (driverUrl == null || executorId == null || hostname == null || cores <= 0 || appId == null) { printUsageAndExit() } // 如果传入的参数没有问题就执行run方法 run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) }
这里要先说明一下,CoarseGrainedExecutorBackend实际上实现的是ExecutorBackend,而ExecutorBackend根据集群的运行模式不同有三种不同的实现,分别是CoarseGrainedExecutorBackend、LocalBackend、MesosExecutorBackend,而这里的CoarseGrainedExecutorBackend就是Standalone模式下的具体实现,而Standalone模式下是通过ExecutorRunner来启动一个进程运行CoarseGrainedExecutorBackend的main方法的。
接下来就是调用run方法:
private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath: Seq[URL]) { SignalLogger.register(log) SparkHadoopUtil.get.runAsSparkUser { () => // Debug code Utils.checkHost(hostname) // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf val port = executorConf.getInt("spark.executor.port", 0) val fetcher = RpcEnv.create( "driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf), clientMode = true) val driver = fetcher.setupEndpointRefByURI(driverUrl) val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++ Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() // Create SparkEnv using properties we fetched from the driver. val driverConf = new SparkConf() for ((key, value) <- props) { // this is required for SSL in standalone mode if (SparkConf.isExecutorStartupConf(key)) { driverConf.setIfMissing(key, value) } else { driverConf.set(key, value) } } if (driverConf.contains("spark.yarn.credentials.file")) { logInfo("Will periodically update credentials from: " + driverConf.get("spark.yarn.credentials.file")) SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf) } val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, isLocal = false) // SparkEnv will set spark.executor.port if the rpc env is listening for incoming // connections (e.g., if it's using akka). Otherwise, the executor is running in // client mode only, and does not accept incoming connections. val sparkHostPort = env.conf.getOption("spark.executor.port").map { port => hostname + ":" + port }.orNull env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env)) workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } env.rpcEnv.awaitTermination() SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer() } }
上面的源码主要分为部分:
从Driver上获得Spark的一些属性信息
使用得到的信息创建ExecutorEnv即Executor的运行时环境
然后实例化CoarseGrainedExecutorBackend并向RpcEnv进行注册
注册时会调用CoarseGrainedExecutorBackend的onStart方法
WorkerWatcher部分此处我们不做分析,我们看CoarseGrainedExecutorBackend的onStart方法:
override def onStart() { logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) // 向Driver发送RegisterExecutor消息 ref.ask[RegisterExecutorResponse]( RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => Utils.tryLogNonFatalError { Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse } case Failure(e) => { logError(s"Cannot register with driver: $driverUrl", e) System.exit(1) } }(ThreadUtils.sameThread) }
这里我们需要关心的是这个driver到底是谁,即driverUrl到底是什么?
那么我们追踪一下:driverUrl是实例化CoarseGrainedExecutorBackend的时候传入的,而执行实例化时候的这个driverUrl又是通过run方法传入的,而run方法中的driverUrl又是main方法执行的时候传入的,main方法的driverUrl是根据传入的参数获得的,即创建新进程时传入的参数,即执行的command,而command是通过appDesc的command构建的,而appDesc是在SparkDeploySchedulerBackend中的start方法中构建的,如下所示:
// The endpoint for executors to talk to usval driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName, RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt), CoarseGrainedSchedulerBackend.ENDPOINT_NAME)val args = Seq( "--driver-url", driverUrl, "--executor-id", "{{EXECUTOR_ID}}", "--hostname", "{{HOSTNAME}}", "--cores", "{{CORES}}", "--app-id", "{{APP_ID}}", "--worker-url", "{{WORKER_URL}}")
这里的CoarseGrainedSchedulerBackend.ENDPOINT_NAME是"CoarseGrainedScheduler":
private[spark] object CoarseGrainedSchedulerBackend { val ENDPOINT_NAME = "CoarseGrainedScheduler"}
而DriverEndpoint注册的时候就是使用的ENDPOINT_NAME
driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
所以这里的driverUrl指的就是DriverEndpoint,DriverEndpoint在接收到RegisterExecutor消息后执行的操作为:
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) => if (executorDataMap.contains(executorId)) { context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. val executorAddress = if (executorRef.address != null) { executorRef.address } else { context.senderAddress } logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId") addressToExecutorId(executorAddress) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host, cores, cores, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } // Note: some tests expect the reply to come after we put the executor in the map context.reply(RegisteredExecutor(executorAddress.host)) listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) makeOffers() }
如果一切正常DriverEndpoint会向CoarseGrainedExecutorBackend回复消息RegisteredExecutor,CoarseGrainedExecutorBackend接收到消息后实例化了Executor,具体的实例化过程中比较重要的两个部分就是初始化运行tasks的线程池和向driver发送心跳信息,部分源码如下:
...// 开启线程池,用来运行提交的tasks// Start worker thread poolprivate val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")private val executorSource = new ExecutorSource(threadPool, executorId) ...// 可以看到是开辟了一个线程来发送心跳// Executor for the heartbeat task.private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")// 使用driver中的HeartbeatReceiver来接收心跳,实际上HeartbeatReceiver是SparkContext实例化的时候创建的// must be initialized before running startDriverHeartbeat()private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)/** * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES` * times, it should kill itself. The default value is 60. It means we will retry to send * heartbeats about 10 minutes because the heartbeat interval is 10s. */// 上面的注释说的很清楚了,最大的失败次数是60次,每隔10s重试一次,也就是说可以重试10分钟private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)/** * Count the failure times of heartbeat. It should only be acessed in the heartbeat thread. Each * successful heartbeat will reset it to 0. */private var heartbeatFailures = 0// 开始发送心跳startDriverHeartbeater()
具体startDriverHeartbeater()方法的实现这里就不追踪下去了,同时本文上述源码中出现的向Master、Worker、Driver回复消息的部分也不进行说明,大家可以自行阅读,其实原理都是一样的,就跟我们平时工作一样,如果公司来了一个新同事,当他准备完成认为可以工作了,就要向领导汇报,领导接收到汇报之后就会为其分配具体的工作任务。
附上一副图,方便大家理解(注意该图只是画了主要流程,为了便于观看,Rpc通信的部分只是简单的画成了“A发送消息给B”的形式,特此说明)
向driver发消息
下面是向driver发送消息的部分,注意这里的driver指的是ClientEndpoint
exec.application.driver.send( ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) }
ClientEndpoint接收到消息后执行的操作:
case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) => val fullId = appId + "/" + id logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores)) listener.executorAdded(fullId, workerId, hostPort, cores, memory)
这里主要就是日志相关的工作了,不再阐述。
至此Application的注册和Executor的启动注册大致的流程我们就走完了,接下来就是task的提交和运行的部分了。
本文参照的是Spark 1.6.3版本的源码,同时给出Spark 2.1.0版本的连接:
本文为原创,欢迎转载,转载请注明出处、作者,谢谢!
作者:sun4lower
链接:https://www.jianshu.com/p/16d84f5d5595
共同學習,寫下你的評論
評論加載中...
作者其他優質文章