Spark 作业调度

概述

Spark 有好几种计算资源调度的方式。首先,回忆一下 集群模式概述 中每个 Spark 应用(包含一个 SparkContext 实例)中运行了一些其独占的执行器(executor)进程。集群管理器提供了 Spark 应用之间的资源调度(scheduling across applications)。其次,在各个Spark 应用内部,各个线程可能并发地通过 action 算子提交多个 Spark 作业(job)。如果你的应用服务于网络请求,那这种情况是很常见的。在 Spark 应用内部(对应同一个 SparkContext)各个作业之间,Spark 默认 FIFO 调度,同时也可以支持公平调度(fair scheduler)。

跨应用调度

如果在集群上运行,每个 Spark 应用都会 SparkContext 获得一批独占的执行器 JVM,来运行其任务并存储数据。如果有多个用户共享集群,那么会有很多资源分配相关的选项,如何设置还取决于具体的集群管理器。

对 Spark 所支持的各个集群管理器而言,最简单的资源分配,就是对资源静态划分。这种方式就意味着,每个 Spark 应用都是设定一个最大可用资源总量,并且该应用在整个生命周期内都会占住这些资源。这种方式在 Spark 独立部署YARN 调度,以及 Mesos 粗粒度模式(coarse-grained Mesos mode)下都可用。

  • Standalone mode: 默认情况下,Spark 应用在独立部署的集群中都会以 FIFO(first-in-first-out)模式顺序提交运行,并且每个 Spark 应用都会占用集群中所有可用节点。不过你可以通过设置 spark.cores.max 或者 spark.deploy.defaultCores 来限制单个应用所占用的节点个数。最后,除了可以控制对 CPU 的使用数量之外,还可以通过 spark.executor.memory 来控制各个应用的内存占用量。
  • Mesos: 在 Mesos 中要使用静态划分的话,需要将 spark.mesos.coarse 设为 true,同样,你也需要设置 spark.cores.max 来控制各个应用的 CPU 总数,以及 spark.executor.memory 来控制各个应用的内存占用。
  • YARN: 在 YARN 中需要使用 –num-executors 选项来控制 Spark 应用在集群中分配的执行器的个数,对于单个执行器(executor)所占用的资源,可以使用 –executor-memory 和 –executor-cores 来控制,详见YARN Spark Properties

Mesos 上还有一种动态共享 CPU 的方式。在这种模式下,每个 Spark 应用的内存占用仍然是固定且独占的(仍由 spark.executor.memory 决定),但是如果该 Spark 应用没有在某个机器上执行任务的话,那么其他应用可以占用该机器上的 CPU。这种模式对集群中有大量不是很活跃应用的场景非常有效,例如: 集群中有很多不同用户的 Spark shell session。但这种模式不适用于低延迟的场景,因为当 Spark 应用需要使用 CPU 的时候,可能需要等待一段时间才能取得 CPU 的使用权。要使用这种模式,只需要在 mesos:// URL 上设置 spark.mesos.coarse 属性为 false 即可。

值得注意的是,目前还没有任何一种资源分配模式能支持跨 Spark 应用的内存共享。如果你想通过这种方式共享数据,我们建议你可以单独使用一个服务(例如 alluxio),这样就能实现多应用访问同一个 RDD 的数据。

动态资源分配

Spark 提供了一种基于负载来动态调节 Spark 应用资源占用的机制。这意味着,你的应用会在资源空闲的时候将其释放给集群,需要时再重新申请。这一特性在多个应用共享 Spark 集群资源的情况下特别有用。

这个特性默认是禁用的,但是在所有的粗粒度集群管理器上都是可用的,如: 独立部署模式YARN 模式以及 Mesos 粗粒度模式(Mesos coarse-grained mode)。

配置和部署

要使用这一特性有两个前提条件。首先,你的应用必须设置 spark.dynamicAllocation.enabled 为 true。其次,你必须在每个节点上启动 external shuffle service,并在你的应用中将 spark.shuffle.service.enabled 设为 true。external shuffle service 的目的是在移除 executor 的时候,能够保留 executor 输出的 shuffle 文件(本文 后续有更详细的描述)。启用 external shuffle service 的方式在各个集群管理器上各不相同 :

在 Spark 独立部署的集群中,你只需要在 worker 启动前设置 spark.shuffle.server.enabled 为 true 即可。

在 Mesos 粗粒度模式下,你需要在各个节点上运行 ${SPARK_HOME}/sbin/start-mesos-shuffle-service.sh 并设置 spark.shuffle.service.enabled 为 true 即可。例如,你可以用 Marathon 来启用这一功能。

在 YARN 模式下,需要按以下 步骤 在各个 NodeManager 上启动 :

  1. 首先按照YARN profile 构建 Spark。如果你已经有打好包的 Spark,可以忽略这一步。
  2. 找到 spark-\-yarn-shuffle.jar。如果你是自定义编译,其位置应该在 ${SPARK_HOME}/network/yarn/target/scala-\,否则应该可以在 lib 目录下找到这个 jar 包。
  3. 将该 jar 包添加到 NodeManager 的 classpath 路径中。
  4. 配置各个节点上的 yarn-site.xml,将 spark_shuffle 添加到 yarn.nodemanager.aux-services 中,然后将 yarn.nodemanager.aux-services.spark_shuffle.class 设为 org.apache.spark.network.yarn.YarnShuffleService,并将 spark.shuffle.service.enabled 设为 true。
  5. 最后重启各节点上的 NodeManager。

所有相关的配置都是可选的,并且都在 spark.dynamicAllocation. 和 spark.shuffle.service. 命名空间下。更详细请参考: 配置页面

资源分配策略

总体上来说,Spark 应该在执行器空闲时将其关闭,而在后续要用时再次申请。因为没有一个固定的方法,可以预测一个执行器在后续是否马上回被分配去执行任务,或者一个新分配的执行器实际上是空闲的,所以我们需要一些试探性的方法,来决定是否申请或移除一个执行器。

请求策略

一个启用了动态分配的 Spark 应用会在有等待任务需要调度的时候,申请额外的执行器。这种情况下,必定意味着已有的执行器已经不足以同时执行所有未完成的任务。

Spark 会分轮次来申请执行器。实际的资源申请,会在任务挂起 spark.dynamicAllocation.schedulerBacklogTimeout 秒后首次触发,其后如果等待队列中仍有挂起的任务,则每过 spark.dynamicAlloction.sustainedSchedulerBacklogTimeout 秒后触发一次资源申请。另外,每一轮所申请的执行器个数以指数形式增长。例如,一个 Spark 应用可能在首轮申请1个执行器,后续的轮次申请个数可能是 2个、4个、8个… … 。

采用指数级增长策略的原因有两个: 第一,对于任何一个 Spark 应用如果只是需要多申请少数几个执行器的话,那么必须非常谨慎地启动资源申请,这和 TCP 慢启动有些类似;第二,如果一旦 Spark 应用确实需要申请很多个执行器的话,那么可以确保其所需的计算资源及时地增长。

移除策略

移除执行器的策略就简单多了。Spark 应用会在某个执行器空闲超过 spark.dynamicAllocation.executorIdleTimeout 秒后将其删除。在绝大多数情况下,执行器的移除条件和申请条件都是互斥的,也就是说,执行器在有待执行任务挂起时,不应该空闲。

优雅地关闭 Executor(执行器)

非动态分配模式下,执行器可能的退出原因有执行失败或者相关 Spark 应用已经退出。不管是那种原因,执行器的所有状态都已经不再需要,可以丢弃掉。但在动态分配的情形下,执行器有可能在 Spark 应用运行期间被移除。这时候,如果 Spark 应用尝试去访问该执行器存储的状态,就必须重算这一部分数据。因此,Spark 需要一种机制,能够优雅地关闭执行器,同时还保留其状态数据。

这种需求对于 Shuffle(混洗) 操作尤其重要。在 Shuffle 过程中,Spark 执行器首先将 map 输出写到本地磁盘,同时执行器本身又是一个文件服务器,这样其他执行器就能够通过该执行器获得对应的 map 结果数据。一旦有某些任务执行时间过长,动态分配有可能在混洗结束前移除任务异常的执行器,而这些被移除的执行器对应的数据将会被重新计算,但这些重算其实是不必要的。

要解决这一问题,就需要用到 external shuffle service ,该服务在 Spark 1.2 引入。该服务在每个节点上都会启动一个不依赖于任何 Spark 应用或执行器的独立进程。一旦该服务启用,Spark 执行器不再从各个执行器上获取 shuffle 文件,转而从这个 service 获取。这意味着,任何执行器输出的混洗状态数据都可能存留时间比对应的执行器进程还长。
除了混洗文件之外,执行器也会在磁盘或者内存中缓存数。一旦执行器被移除,其缓存数据将无法访问。这个问题目前还没有解决。或许在未来的版本中,可能会采用外部混洗服务类似的方法,将缓存数据保存在堆外存储中以解决这一问题。

应用内调度

在指定的 Spark 应用内部(对应同一个 SparkContext 实例),多个线程可能并发地提交 Spark 作业(job)。在本节中,作业(job)是指,由 Spark action 算子(如: collect)触发的一系列计算任务的集合。Spark 调度器是完全线程安全的,并且能够支持 Spark 应用同时处理多个请求(比如: 来自不同用户的查询)。

默认,Spark 应用内部使用 FIFO 调度策略。每个作业被划分为多个阶段(stage)(例如: map 阶段和 reduce 阶段),第一个作业在其启动后会优先获取所有的可用资源,然后是第二个作业再申请,再第三个……。如果前面的作业没有把集群资源占满,则后续的作业可以立即启动运行,否则,后提交的作业会有明显的延迟等待。

不过从 Spark 0.8 开始,Spark 也能支持各个作业间的公平(Fair)调度。公平调度时,Spark 以轮询的方式给每个作业分配资源,因此所有的作业获得的资源大体上是平均分配。这意味着,即使有大作业在运行,小的作业再提交也能立即获得计算资源而不是等待前面的作业结束,大大减少了延迟时间。这种模式特别适合于多用户配置。

要启用公平调度器,只需设置一下 SparkContext 中 spark.scheduler.mode 属性为 FAIR 即可 :

1
2
3
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平调度资源池

公平调度器还可以支持将作业分组放入资源池(pool),然后给每个资源池配置不同的选项(如: 权重)。这样你就可以给一些比较重要的作业创建一个“高优先级”资源池,或者你也可以把每个用户的作业分到一组,这样一来就是各个用户平均分享集群资源,而不是各个作业平分集群资源。Spark 公平调度的实现方式基本都是模仿 Hadoop Fair Scheduler 来实现的。

默认情况下,新提交的作业都会进入到默认资源池中,不过作业对应于哪个资源池,可以在提交作业的线程中用 SparkContext.setLocalProperty 设定 spark.scheduler.pool 属性。示例代码如下 :

1
2
// 假设 sc 是您的 SparkContext 变量
sc.setLocalProperty("spark.scheduler.pool", "pool1")

一旦设好了局部属性,所有该线程所提交的作业(即: 在该线程中调用 action 算子,如: RDD.save, count, collect 等)都会使用这个资源池。这个设置是以线程为单位保存的,你很容易实现用同一线程来提交同一用户的所有作业到同一个资源池中。同样,如果需要清除资源池设置,只需在对应线程中调用如下代码 :

1
sc.setLocalProperty("spark.scheduler.pool", null)

资源池默认行为

默认地,各个资源池之间平分整个集群的资源(包括 default 资源池),但在资源池内部,默认情况下,作业是 FIFO 顺序执行的。举例来说,如果你为每个用户创建了一个资源池,那么久意味着各个用户之间共享整个集群的资源,但每个用户自己提交的作业是按顺序执行的,而不会出现后提交的作业抢占前面作业的资源。

配置资源池属性

资源池的属性需要通过配置文件来指定。每个资源池都支持以下3个属性 :

  • schedulingMode: 可以是 FIFO 或 FAIR,控制资源池内部的作业是如何调度的。
  • weight: 控制资源池相对其他资源池,可以分配到资源的比例。默认所有资源池的 weight 都是 1。如果你将某个资源池的 weight 设为 2,那么该资源池中的资源将是其他池子的2倍。如果将 weight 设得很高,如 1000,可以实现资源池之间的调度优先级 – 也就是说,weight=1000 的资源池总能立即启动其对应的作业。
  • minShare: 除了整体 weight 之外,每个资源池还能指定一个最小资源分配值(CPU 个数),管理员可能会需要这个设置。公平调度器总是会尝试优先满足所有活跃(active)资源池的最小资源分配值,然后再根据各个池子的 weight 来分配剩下的资源。因此,minShare 属性能够确保每个资源池都能至少获得一定量的集群资源。minShare 的默认值是 0。

资源池属性是一个 XML 文件,可以基于 conf/fairscheduler.xml.template 修改,然后在 SparkConf 的 spark.scheduler.allocation.file 属性指定文件路径:

1
conf.set("spark.scheduler.allocation.file", "/path/to/file")

资源池 XML 配置文件格式如下,其中每个池子对应一个 元素,每个资源池可以有其独立的配置 :

1
2
3
4
5
6
7
8
9
10
11
12
13
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>

完整的例子可以参考 conf/fairscheduler.xml.template。注意,没有在配置文件中配置的资源池都会使用默认配置(schedulingMode : FIFO,weight : 1,minShare : 0)。

热评文章