168大数据

标题: Spark技术内幕之任务调度:从SparkContext开始 [打印本页]

作者: 乔帮主    时间: 2015-5-20 15:26
标题: Spark技术内幕之任务调度:从SparkContext开始

SparkContext是开发Spark应用的入口,它负责和整个集群的交互,包括创建RDD,accumulators and broadcast variables。理解Spark的架构,需要从这个入口开始。下图是官网的架构图。


DriverProgram就是用户提交的程序,这里边定义了SparkContext的实例。SparkContext定义在core/src/main/scala/org/apache/spark/SparkContext.scala。

Spark默认的构造函数接受org.apache.spark.SparkConf, 通过这个参数我们可以自定义本次提交的参数,这个参数会覆盖系统的默认配置。

先上一张与SparkContext相关的类图:



下面是SparkContext非常重要的数据成员的定义:


[AppleScript] 纯文本查看 复制代码
 // Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
} catch {
case e: Exception => throw
new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
}

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
taskScheduler.start()


通过createTaskScheduler,我们可以获得不同资源管理类型或者部署类型的调度器。看一下现在支持的部署方法:

[AppleScript] 纯文本查看 复制代码
 /** Creates a task scheduler based on a given master URL. Extracted for testing. */
private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
// Regular expression used for local[N] and local
  • master formats
    val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
    // Regular expression for local[N, maxRetries], used in tests with failing tasks
    val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
    // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
    val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
    // Regular expression for connecting to Spark deploy clusters
    val SPARK_REGEX = """spark://(.*)""".r
    // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
    val MESOS_REGEX = """(mesos|zk)://.*""".r
    // Regular expression for connection to Simr cluster
    val SIMR_REGEX = """simr://(.*)""".r

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
    case "local" =>
    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
    val backend = new LocalBackend(scheduler, 1)
    scheduler.initialize(backend)
    scheduler

    case LOCAL_N_REGEX(threads) =>
    def localCpuCount = Runtime.getRuntime.availableProcessors()
    // local
  • estimates the number of cores on the machine; local[N] uses exactly N threads.
    val threadCount = if (threads == "*") localCpuCount else threads.toInt
    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
    val backend = new LocalBackend(scheduler, threadCount)
    scheduler.initialize(backend)
    scheduler

    case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
    def localCpuCount = Runtime.getRuntime.availableProcessors()
    // local[*, M] means the number of cores on the computer with M failures
    // local[N, M] means exactly N threads with M failures
    val threadCount = if (threads == "*") localCpuCount else threads.toInt
    val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
    val backend = new LocalBackend(scheduler, threadCount)
    scheduler.initialize(backend)
    scheduler

    case SPARK_REGEX(sparkUrl) =>
    val scheduler = new TaskSchedulerImpl(sc)
    val masterUrls = sparkUrl.split(",").map("spark://" + _)
    val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
    scheduler.initialize(backend)
    scheduler

    case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
    // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
    val memoryPerSlaveInt = memoryPerSlave.toInt
    if (sc.executorMemory > memoryPerSlaveInt) {
    throw new SparkException(
    "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
    memoryPerSlaveInt, sc.executorMemory))
    }

    val scheduler = new TaskSchedulerImpl(sc)
    val localCluster = new LocalSparkCluster(
    numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
    val masterUrls = localCluster.start()
    val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
    scheduler.initialize(backend)
    backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
    localCluster.stop()
    }
    scheduler

    case "yarn-standalone" | "yarn-cluster" =>
    if (master == "yarn-standalone") {
    logWarning(
    "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
    }
    val scheduler = try {
    val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
    val cons = clazz.getConstructor(classOf[SparkContext])
    cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
    } catch {
    // TODO: Enumerate the exact reasons why it can fail
    // But irrespective of it, it means we cannot proceed !
    case e: Exception => {
    throw new SparkException("YARN mode not available ?", e)
    }
    }
    val backend = try {
    val clazz =
    Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
    val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
    cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
    } catch {
    case e: Exception => {
    throw new SparkException("YARN mode not available ?", e)
    }
    }
    scheduler.initialize(backend)
    scheduler

    case "yarn-client" =>
    val scheduler = try {
    val clazz =
    Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
    val cons = clazz.getConstructor(classOf[SparkContext])
    cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

    } catch {
    case e: Exception => {
    throw new SparkException("YARN mode not available ?", e)
    }
    }

    val backend = try {
    val clazz =
    Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
    val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
    cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
    } catch {
    case e: Exception => {
    throw new SparkException("YARN mode not available ?", e)
    }
    }

    scheduler.initialize(backend)
    scheduler

    case mesosUrl @ MESOS_REGEX(_) =>
    MesosNativeLibrary.load()
    val scheduler = new TaskSchedulerImpl(sc)
    val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
    val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
    val backend = if (coarseGrained) {
    new CoarseMesosSchedulerBackend(scheduler, sc, url)
    } else {
    new MesosSchedulerBackend(scheduler, sc, url)
    }
    scheduler.initialize(backend)
    scheduler

    case SIMR_REGEX(simrUrl) =>
    val scheduler = new TaskSchedulerImpl(sc)
    val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
    scheduler.initialize(backend)
    scheduler

    case _ =>
    throw new SparkException("Could not parse Master URL: '" + master + "'")
    }
    }
    }

  • 主要的逻辑从line 20开始。主要通过传入的Master URL来生成Scheduler 和 Scheduler backend。对于常见的Standalone的部署方式,我们看一下是生成的Scheduler 和 Scheduler backend:


    [AppleScript] 纯文本查看 复制代码
     case SPARK_REGEX(sparkUrl) =>
    val scheduler = new TaskSchedulerImpl(sc)
    val masterUrls = sparkUrl.split(",").map("spark://" + _)
    val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
    scheduler.initialize(backend)
    scheduler


    org.apache.spark.scheduler.TaskSchedulerImpl通过一个SchedulerBackend管理了所有的cluster的调度;它主要实现了通用的逻辑。对于系统刚启动时,需要理解两个接口,一个是initialize,一个是start。这个也是在SparkContext初始化时调用的:


    [AppleScript] 纯文本查看 复制代码
     def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
    schedulingMode match {
    case SchedulingMode.FIFO =>
    new FIFOSchedulableBuilder(rootPool)
    case SchedulingMode.FAIR =>
    new FairSchedulableBuilder(rootPool, conf)
    }
    }
    schedulableBuilder.buildPools()
    }


    由此可见,初始化主要是SchedulerBackend的初始化,它主要时通过集群的配置来获得调度模式,现在支持的调度模式是FIFO和公平调度,默认的是FIFO。

    [AppleScript] 纯文本查看 复制代码
    // default scheduler is FIFO
    private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
    val schedulingMode: SchedulingMode = try {
    SchedulingMode.withName(schedulingModeConf.toUpperCase)
    } catch {
    case e: java.util.NoSuchElementException =>
    throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")
    }


    start的实现如下:


    [AppleScript] 纯文本查看 复制代码
     override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
    logInfo("Starting speculative execution thread")
    import sc.env.actorSystem.dispatcher
    sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
    SPECULATION_INTERVAL milliseconds) {
    Utils.tryOrExit { checkSpeculatableTasks() }
    }
    }
    }

    主要是backend的启动。对于非本地模式,并且设置了spark.speculation为true,那么对于指定时间未返回的task将会启动另外的task来执行。其实对于一般的应用,这个的确可能会减少任务的执行时间,但是也浪费了集群的计算资源。因此对于离线应用来说,这个设置是不推荐的。


    org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend是Standalone模式的SchedulerBackend。它的定义如下:

    [AppleScript] 纯文本查看 复制代码
    private[spark] class SparkDeploySchedulerBackend(
    scheduler: TaskSchedulerImpl,
    sc: SparkContext,
    masters: Array[String])
    extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
    with AppClientListener
    with Logging {


    看一下它的start:


    [AppleScript] 纯文本查看 复制代码
     override def start() {
    super.start()

    // The endpoint for executors to talk to us
    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
    SparkEnv.driverActorSystemName,
    conf.get("spark.driver.host"),
    conf.get("spark.driver.port"),
    CoarseGrainedSchedulerBackend.ACTOR_NAME)
    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
    .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
    cp.split(java.io.File.pathSeparator)
    }
    val libraryPathEntries =
    sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>
    cp.split(java.io.File.pathSeparator)
    }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
    args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
    sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))

    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    client.start()

    waitForRegistration()
    }


    接下来,我们将对TaskScheduler,SchedulerBackend和DAG Scheduler进行详解,来逐步揭开他们的神秘面纱。










    欢迎光临 168大数据 (http://www.bi168.cn/) Powered by Discuz! X3.2