摘要Spark 内核泛指 Spark 的核心运行机制包括核心组件的运行机制、任务调度机制、内存管理机制、Shuffle 原理等。本文将深入剖析 Spark 内核的方方面面帮助你理解 Spark 的底层原理从而更好地完成代码设计并准确锁定项目运行中的问题。一、Spark 核心组件回顾1.1 DriverSpark 驱动器节点用于执行 Spark 任务中的 main 方法负责实际代码的执行工作。Driver 在 Spark 作业执行时主要负责将用户程序转化为作业Job在 Executor 之间调度任务Task跟踪 Executor 的执行情况通过 UI 展示查询运行情况1.2 ExecutorSpark Executor 对象负责在 Spark 作业中运行具体任务任务彼此之间相互独立。Spark 应用启动时ExecutorBackend 节点被同时启动并且始终伴随着整个 Spark 应用的生命周期而存在。Executor 有两个核心功能负责运行组成 Spark 应用的任务并将结果返回给驱动器Driver通过自身的块管理器Block Manager为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的因此任务可以在运行时充分利用缓存数据加速运算二、Spark 通用运行流程概述Spark 通用运行流程的核心步骤任务提交后都会先启动 Driver 程序Driver 向集群管理器注册应用程序集群管理器根据此任务的配置文件分配 Executor 并启动Driver 开始执行 main 函数Spark 查询为懒执行当执行到 Action 算子时开始反向推算根据宽依赖进行 Stage 的划分随后每一个 Stage 对应一个 TaskSetTaskSet 中有多个 Task查找可用资源 Executor 进行调度根据本地化原则Task 会被分发到指定的 Executor 去执行在任务执行的过程中Executor 也会不断与 Driver 进行通信报告任务运行情况三、Spark 部署模式Spark 支持多种集群管理器Cluster Manager模式说明StandaloneSpark 原生的简单集群管理器自带完整的服务可单独部署到一个集群中Hadoop YARN统一的资源管理机制根据 Driver 在集群中的位置不同分为 yarn client 和 yarn clusterApache Mesos强大的分布式资源管理框架允许多种不同的框架部署在其上K8S容器式部署环境3.1 YARN Cluster 模式运行流程执行脚本提交任务实际是启动一个 SparkSubmit 的 JVM 进程SparkSubmit 类中的 main 方法反射调用 YarnClusterApplication 的 main 方法YarnClusterApplication 创建 Yarn 客户端然后向 Yarn 服务器发送执行指令bin/java ApplicationMasterYarn 框架收到指令后会在指定的 NM 中启动 ApplicationMasterApplicationMaster 启动 Driver 线程执行用户的作业AM 向 RM 注册申请资源获取资源后 AM 向 NM 发送指令bin/java YarnCoarseGrainedExecutorBackendCoarseGrainedExecutorBackend 进程会接收消息跟 Driver 通信注册已经启动的 Executor然后启动计算对象 Executor 等待接收任务Driver 线程继续执行完成作业的调度和任务的执行Driver 分配任务并监控任务的执行注意SparkSubmit、ApplicationMaster 和 CoarseGrainedExecutorBackend 是独立的进程Driver 是独立的线程Executor 和 YarnClusterApplication 是对象。3.2 YARN Client 模式运行流程执行脚本提交任务实际是启动一个 SparkSubmit 的 JVM 进程SparkSubmit 类中的 main 方法反射调用用户代码的 main 方法启动 Driver 线程执行用户的作业并创建 ScheduleBackendYarnClientSchedulerBackend 向 RM 发送指令bin/java ExecutorLauncherYarn 框架收到指令后会在指定的 NM 中启动 ExecutorLauncher实际上还是调用 ApplicationMaster 的 main 方法AM 向 RM 注册申请资源获取资源后 AM 向 NM 发送指令bin/java CoarseGrainedExecutorBackendCoarseGrainedExecutorBackend 进程会接收消息跟 Driver 通信注册已经启动的 Executor然后启动计算对象 Executor 等待接收任务Driver 分配任务并监控任务的执行3.3 Standalone Client 模式在 Standalone Client 模式下Driver 在任务提交的本地机器上运行。Driver 启动后向 Master 注册应用程序Master 根据 submit 脚本的资源需求找到内部资源至少可以启动一个 Executor 的所有 Worker然后在这些 Worker 之间分配 ExecutorWorker 上的 Executor 启动后会向 Driver 反向注册所有的 Executor 注册完成后Driver 开始执行 main 函数之后执行到 Action 算子时开始划分 Stage每个 Stage 生成对应的 TaskSet之后将 Task 分发到各个 Executor 上执行。四、Spark 通讯架构4.1 Spark 通信架构概述Spark 中通信框架的发展版本通信框架说明Spark 早期Akka内部通信部件Spark 1.3Netty引入 Netty 通信框架解决 Shuffle 大数据传输问题Spark 1.6Akka Netty可配置使用Netty 完全实现了 Akka 在 Spark 中的功能Spark 2.xNetty抛弃 Akka使用 Netty 作为内部通讯组件Spark 基于 Netty 新的 RPC 框架借鉴了 Akka 中的设计它是基于Actor 模型。Spark 通讯框架中各个组件Client/Master/Worker可以认为是一个个独立的实体各个实体之间通过消息来进行通信。4.2 Spark 通信终端EndpointClient/Master/Worker有1 个 InBox和N 个 OutBoxN1N 取决于当前 Endpoint 与多少其他的 Endpoint 进行通信一个与其通讯的其他 Endpoint 对应一个 OutBox。Endpoint 接收到的消息被写入 InBox发送出去的消息写入 OutBox 并被发送到其他 Endpoint 的 InBox 中。核心组件组件说明RpcEndpointRPC 通信终端。Spark 针对每个节点Client/Master/Worker都称之为一个 RPC 终端实现 RpcEndpoint 接口RpcEnvRPC 上下文环境每个 RPC 终端运行时依赖的上下文环境Dispatcher消息调度分发器将消息分发至对应的指令收件箱或发件箱Inbox指令消息收件箱。一个本地 RpcEndpoint 对应一个收件箱RpcEndpointRef对远程 RpcEndpoint 的一个引用OutBox指令消息发件箱。一个目标 RpcEndpoint 对应一个发件箱TransportClientNetty 通信客户端一个 OutBox 对应一个 TransportClientTransportServerNetty 通信服务端一个 RpcEndpoint 对应一个 TransportServer生命周期Constructor - onStart - receive* - onStop4.3 核心通信终端// Driver 端classDriverEndpointextendsIsolatedRpcEndpoint// Executor 端classCoarseGrainedExecutorBackendextendsIsolatedRpcEndpoint五、Spark 任务调度机制5.1 任务调度概述在生产环境下Spark 集群的部署方式一般为YARN-Cluster 模式。Driver 线程主要初始化 SparkContext 对象准备运行所需的上下文然后一方面保持与 ApplicationMaster 的 RPC 连接通过 ApplicationMaster 申请资源另一方面根据用户业务逻辑开始调度任务将任务下发到已有的空闲 Executor 上。当 ResourceManager 向 ApplicationMaster 返回 Container 资源时ApplicationMaster 就尝试在对应的 Container 上启动 Executor 进程Executor 进程起来后会向 Driver 反向注册注册成功后保持与 Driver 的心跳同时等待 Driver 分发任务当分发的任务执行完毕后将任务状态上报给 Driver。5.2 Job / Stage / Task 概念概念说明Job以 Action 方法为界遇到一个 Action 方法则触发一个 JobStageJob 的子集以 RDD 宽依赖即 Shuffle为界遇到 Shuffle 做一次划分TaskStage 的子集以并行度分区数来衡量分区数是多少则有多少个 Task5.3 Stage 级调度DAGSchedulerSpark 的任务调度总体来说分两路进行Stage 级的调度由 DAGScheduler 完成Task 级的调度由 TaskScheduler 完成DAGScheduler 的主要工作根据 RDD 的血缘关系构建 DAG根据宽依赖Shuffle划分 Stage将 Stage 打包成 TaskSet 提交给 TaskScheduler监控 Stage 的运行状态5.4 Task 级调度TaskSchedulerTaskScheduler 将 TaskSet 封装为TaskSetManager加入到调度队列中。TaskSetManager 负责监控管理同一个 Stage 中的 Tasks。调度流程TaskSetManager 加入 rootPool 调度池中调用 SchedulerBackend 的 reviveOffers 方法driverEndpoint 收到 ReviveOffer 消息后调用 makeOffers根据本地化原则选择 Task 分发到 Executor5.5 本地化调度DAGScheduler 切割 Job、划分 Stage通过调用 submitStage 来提交一个 Stage 对应的 tasks。submitStage 会调用 submitMissingTasks确定每个需要计算的 task 的 preferredLocations。Task 的 Locality 级别优先级由高到低级别说明PROCESS_LOCAL进程本地化task 和数据在同一个 Executor 中性能最好NODE_LOCAL节点本地化task 和数据在同一个节点中但不在同一个 Executor 中RACK_LOCAL机架本地化task 和数据在同一个机架的两个节点上NO_PREF对于 task 来说从哪里获取都一样ANYtask 和数据可以在集群的任何地方性能最差调度策略Spark 调度总是会尽量让每个 task 以最高的本地性级别来启动。当一个 task 以 X 本地性级别启动但是该本地性级别对应的所有节点都没有空闲资源而启动失败此时并不会马上降低本地性级别启动而是在某个时间长度内再次以 X 本地性级别来启动该 task若超过限时时间则降级启动。可以通过调大每个类别的最大容忍延迟时间在等待阶段对应的 Executor 可能就会有相应的资源去执行此 task。5.6 失败重试与黑名单机制Task 被提交到 Executor 启动执行后Executor 会将执行状态上报给 SchedulerBackendSchedulerBackend 则告诉 TaskSchedulerTaskScheduler 找到该 Task 对应的 TaskSetManager并通知到该 TaskSetManager。对于失败的 Task记录失败次数如果失败次数未超过最大重试次数放回待调度的 Task 池子中否则整个 Application 失败黑名单机制记录 Task 上一次失败所在的 Executor Id 和 Host下次再调度这个 Task 时避免它被调度到上一次失败的节点上。六、Spark Shuffle 原理6.1 Shuffle 概述Shuffle 是 Spark 中数据重新分配的过程通常发生在宽依赖如 groupByKey、reduceByKey操作中。Shuffle 阶段Shuffle Write上游 Task 将数据写入临时文件Shuffle Read下游 Task 从临时文件中读取数据6.2 SortShuffle6.2.1 普通 SortShuffle在该模式下数据会先写入一个数据结构reduceByKey写入 Map一边通过 Map 局部聚合一边写入内存Join算子写入 ArrayList 直接写入内存中然后需要判断是否达到阈值如果达到就会将内存数据结构的数据写入到磁盘清空内存数据结构。在溢写磁盘前先根据 key 进行排序排序过后的数据会分批写入到磁盘文件中。默认批次为 10000 条。最后在每个 Task 中将所有的临时文件合并merge 过程单独写一份索引文件标识下游各个 Task 的数据在文件中的索引。6.2.2 Bypass SortShufflebypass 运行机制的触发条件shuffle reduce task 数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值默认为 200不是聚合类的 shuffle 算子比如 reduceByKey此时 task 会为每个 reduce 端的 task 都创建一个临时磁盘文件并将数据按 key 进行 hash然后根据 key 的 hash 值将 key 写入对应的磁盘文件之中。该机制与普通 SortShuffleManager 运行机制的不同在于不会进行排序。启用该机制的最大好处在于shuffle write 过程中不需要进行数据的排序操作节省掉了这部分的性能开销。七、Spark 内存管理7.1 内存管理概述Spark 1.6 之前采用静态内存管理机制Spark 1.6 之后引入统一内存管理机制。7.2 静态内存管理在 Spark 最初采用的静态内存管理机制下存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的。计算公式可用的存储内存 systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction 可用的执行内存 systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction其中systemMaxMemory取决于当前 JVM 堆内内存的大小safetyFraction参数在逻辑上预留出保险区域降低因实际内存超出当前预设范围而导致 OOM 的风险。7.3 统一内存管理Spark 1.6 之后引入的统一内存管理机制与静态内存管理的区别在于存储内存和执行内存可以共享同一块空间可以动态占用对方的空闲区域。动态占用机制规则设定基本的存储内存和执行内存区域spark.storage.storageFraction参数确定双方各自拥有的空间的范围双方的空间都不足时则存储到硬盘若己方空间不足而对方空余时可借用对方的空间执行内存的空间被对方占用后可让对方将占用的部分转存到硬盘然后归还借用的空间存储内存的空间被对方占用后无法让对方归还因为需要考虑 Shuffle 过程中的很多因素统一内存管理的动态占用机制与静态内存管理一样当双方空间都被占满后若有新增内容双方都需要将其存储到硬盘若己方不足对方空余则可占用对方Storage 占用对方的内存可被淘汰Execution 占用对方的内存不可被淘汰只能等待释放7.4 堆外内存Spark 1.6 开始引入了 Off-heap 内存。堆外内存意味着把内存对象分配在 Java 虚拟机的堆以外的内存这些内存直接受操作系统管理而不是 JVM。在默认情况下堆外内存并不启用可通过配置启用spark.memory.offHeap.enabledtrue spark.memory.offHeap.size512m除了没有 other 空间堆外内存与堆内内存的划分方式相同所有运行中的并发任务共享存储内存和执行内存。7.5 RDD 持久化与存储级别在对 RDD 持久化时Spark 规定了 7 种不同的存储级别持久化级别含义MEMORY_ONLY以非序列化的 Java 对象方式持久化在 JVM 内存中MEMORY_AND_DISK同上但某些 partition 无法存储在内存中时会持久化到磁盘中MEMORY_ONLY_SER同 MEMORY_ONLY但使用 Java 序列化方式减少内存开销MEMORY_AND_DISK_SER同 MEMORY_AND_DISK但使用序列化方式持久化DISK_ONLY使用非序列化 Java 对象方式持久化完全存储到磁盘上MEMORY_ONLY_2将持久化数据复用一份保存到其他节点MEMORY_AND_DISK_2同上同时保存到磁盘和内存并复用一份存储级别从三个维度定义了 RDD 的 Partition同时也就是 Block的存储方式存储位置磁盘 / 堆内内存 / 堆外内存存储形式是否为非序列化的形式副本数量大于 1 时需要远程冗余备份到其他节点7.6 Tungsten 内存优化Shuffle Write 阶段中用到的 Tungsten 是 Databricks 公司提出的对 Spark 优化内存和 CPU 使用的计划钨丝计划解决了一些 JVM 在性能上的限制和弊端。Tungsten 采用的页式内存管理机制建立在 MemoryManager 之上Tungsten 对执行内存的使用进行了进一步的抽象这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。每个内存页用一个MemoryBlock来定义并用Object obj和long offset这两个变量统一标识一个内存页在系统内存中的地址。Tungsten 页式管理下的所有内存用64 位的逻辑地址表示由页号13 位和页内偏移量51 位组成。有了统一的寻址方式Spark 可以用 64 位逻辑地址的指针定位到堆内或堆外的内存整个 Shuffle Write 排序的过程只需要对指针进行排序并且无需反序列化整个过程非常高效。八、总结特性说明核心组件Driver调度中心、Executor任务执行部署模式Standalone / YARN-Client / YARN-Cluster / Mesos / K8S通信框架Spark 2.x 使用 Netty 替代 Akka任务调度DAGSchedulerStage 级 TaskSchedulerTask 级本地化级别PROCESS_LOCAL NODE_LOCAL RACK_LOCAL NO_PREF ANYShuffle 机制SortShuffle普通 / bypass内存管理统一内存管理存储内存 执行内存动态共享优化技术Tungsten 页式内存管理、堆外内存深入理解 Spark 内核原理能够帮助我们更好地完成 Spark 代码设计写出更高效的程序准确锁定项目运行过程中出现的问题的症结所在合理调优 Spark 作业提升集群资源利用率理解 Spark 的容错机制保障生产环境稳定运行