Spark Job 对象 详解

news/2024/9/28 13:36:07 标签: spark, 大数据, 分布式

        在 Apache Spark 中,Job 对象是执行逻辑的核心组件之一,它代表了对一系列数据操作(如 transformations 和 actions)的提交。理解 Job 的本质和它在 Spark 中的运行机制,有助于深入理解 Spark 的任务调度、执行模型和容错机制。

Spark Job 对象的定义与作用

        Spark 中的 Job 主要用于表示一个具体的计算作业,它是由用户提交的 Action(例如 count()collect()saveAsTextFile() 等)触发的。这些动作会生成一个 Job 对象,最终调度并执行一系列与之相关的任务。

主要作用
  • 调度的基本单元Job 是 Spark 中由调度器提交给集群调度系统的最小执行单元。每次用户调用 Action 时都会触发一个新的 Job
  • 执行依赖解析:在 Job 中,Spark 会解析由 RDD transformations 构建的执行 DAG(Directed Acyclic Graph,有向无环图),将整个 DAG 划分为多个阶段(Stages),并将每个阶段的计算划分为多个任务(Tasks)。
  • 生命周期管理Job 还负责跟踪其执行状态,包括成功、失败、重试等。调度器负责管理 Job 的整个生命周期。
  • 结果汇总与返回Job 的最终结果会返回给提交的客户端,并供用户程序使用。

底层架构与执行流程

Spark 中 Job 的执行流程可以分为以下几个步骤:

  1. 用户触发 Action

    当用户调用 RDD 的 Action 操作(如 collect())时,Spark 会触发一个 Job 的创建。每个 Job 与一个 Action 一一对应。
  2. DAG 划分

    Spark 的调度器会将 RDD 的 transformations 构建的 DAG 划分为多个阶段(Stages)。这些阶段之间通过宽依赖(Shuffle Dependencies)进行划分,每个 Stage 是一组可以并行执行的操作。
  3. 生成任务(Task)

    每个 Stage 会被进一步分解为多个 Task。这些 Task 通常与数据分区(Partition)相对应。每个 Task 会在集群的不同节点上执行,并行处理数据。
  4. 调度执行

    每个 Stage 中的 Task 通过 TaskSet 被提交到 TaskScheduler,由调度器在集群中的不同节点上执行。调度器会根据可用资源、节点健康状况等因素进行调度。
  5. 结果返回与 Job 完成

    在所有 Stage 完成后,Job 被标记为完成,最后的结果会被返回给用户,供进一步处理。

代码层面解释

        在 Spark 源码中,Job 的相关实现可以在 DAGScheduler 和 Job 类中找到。DAGScheduler 是调度层的核心组件,它负责将用户的高层操作分解为具体的作业(Job)和任务(Task)。

1. Job 对象的类结构

在 Spark 代码中,Job 由 DAGScheduler 负责创建。每个 Job 都有一个唯一的 jobId。其定义主要存在于 DAGScheduler.scala 文件中。

// DAGScheduler.scala (部分代码)
class Job(
  val jobId: Int,
  val finalStage: Stage,
  val callSite: CallSite,
  val listener: JobListener,
  val properties: Properties) {
    
  def finished(result: JobResult): Unit = {
    listener.jobSucceeded(result)
  }
}

在上述代码中,Job 对象中有几个关键字段:

  • jobId:作业的唯一标识符。
  • finalStage:该 Job 的最后一个 Stage,作业的完成意味着该阶段的完成。
  • callSite:作业执行时的代码位置信息。
  • listener:用于监听 Job 执行状态的监听器,通常用于执行完成时通知上层。
  • properties:包含一些与作业相关的配置信息。
2. DAGScheduler 的作用

DAGScheduler 是 Spark 调度器的核心组件,负责管理 Job 的生命周期,包括划分阶段、提交任务、重试失败任务等。

DAGScheduler 的部分代码如下:

// DAGScheduler.scala (简化示例)
private[scheduler] class DAGScheduler(
  taskScheduler: TaskScheduler,
  listenerBus: LiveListenerBus,
  mapOutputTracker: MapOutputTracker,
  blockManagerMaster: BlockManagerMaster,
  env: SparkEnv,
  clock: Clock = new SystemClock()) extends Logging {

  private val jobIdToActiveJob = new HashMap[Int, ActiveJob]

  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {

    // 创建一个新的 Job
    val jobId = nextJobId.getAndIncrement()
    val finalStage = createResultStage(rdd, func, partitions, jobId, callSite)
    val job = new Job(jobId, finalStage, callSite, resultHandler, properties)

    // 提交 Job
    jobIdToActiveJob(jobId) = new ActiveJob(job, finalStage)
    submitStage(finalStage)
    
    return job.waiter
  }
}

这个代码展示了 DAGScheduler 是如何接收用户的 Action 调用,创建 Job 并提交执行的:

  • submitJob 方法会基于传入的 RDD 和操作函数创建一个新的 Job
  • 调用 createResultStage 方法将 RDD DAG 分解为 Stage,并创建该 Job 的最终 Stage
  • submitStage 方法负责将阶段提交到底层的 TaskScheduler,执行该阶段中的任务。
3. Job 与 ActiveJob 的关系

Job 是一个抽象的高层次的概念,而 ActiveJob 是其运行时状态的一个封装。ActiveJob 代表一个正在运行的 Job,包含了更多的运行时状态信息。

// ActiveJob.scala
private[spark] class ActiveJob(
    val jobId: Int,
    val finalStage: Stage,
    val func: (TaskContext, Iterator[_]) => _,
    val partitions: Array[Int],
    val callSite: CallSite,
    val listener: JobListener,
    val properties: Properties) {
    
  val numTasks = partitions.length
  var numFinished = 0

  def stageFinished(stage: Stage): Unit = {
    if (numFinished == numTasks) {
      listener.jobSucceeded(this)
    }
  }
}

总结

  • Job 的核心作用Job 是 Spark 中用于管理由 Action 操作触发的计算任务。它通过 DAGScheduler 划分执行阶段(Stages),并调度相应的任务执行,最终将计算结果返回给用户。
  • 代码实现Job 在 Spark 源码中作为调度系统的一个重要组成部分,由 DAGScheduler 创建并管理。DAGScheduler 负责将用户的作业拆解为可执行的阶段和任务,并交由 TaskScheduler 执行。
  • 调度逻辑Job 包含了执行依赖、分区信息和调度状态等。通过与 Stage 和 Task 的结合,Job 的执行能够在大规模分布式环境中高效并行化。

        了解这些底层机制有助于理解 Spark 在执行任务时的调度流程和容错处理机制,也为优化 Spark 作业的性能提供了更深入的视角。


http://www.niftyadmin.cn/n/5681192.html

相关文章

ESP32 Bluedroid 篇(1)—— ibeacon 广播

前言 前面我们已经了解了 ESP32 的 BLE 整体架构,现在我们开始实际学习一下Bluedroid 从机篇的广播和扫描。本文将会以 ble_ibeacon demo 为例子进行讲解,需要注意的一点是。ibeacon 分为两个部分,一个是作为广播者,一个是作为观…

深度学习自编码器 - 提供发现潜在原因的线索篇

序言 在探索复杂数据背后的秘密时,深度学习如同一把锐利的钥匙,特别是其核心的表示学习机制,为我们打开了一扇通往未知世界的大门。表示学习不仅仅是数据的简单编码或转换,它更是深度挖掘数据内在结构、关系与规律的过程。在这一…

web前端-CSS引入方式

一、内部样式表 内部样式表(内嵌样式表)是写到html页面内部,是将所有的 CSS 代码抽取出来,单独放到一个<styie>标签中。 注意: ① <style>标签理论上可以放在 HTML文档的任何地方&#xff0c;但一般会放在文档的<head>标签中 ② 通过此种方式&#xff0c;可…

【JavaEE初阶】深入理解wait和notify以及线程饿死的解决

前言&#xff1a; &#x1f308;上期博客&#xff1a;【JavaEE初阶】深入解析死锁的产生和避免以及内存不可见问题-CSDN博客 &#x1f525;感兴趣的小伙伴看一看小编主页&#xff1a;【JavaEE初阶】深入解析死锁的产生和避免以及内存不可见问题-CSDN博客 ⭐️小编会在后端开…

Reactor 反应堆模式

Reactor 反应堆模式 1、概念 Reactor&#xff08;反应堆&#xff09;模式是一种事件驱动的设计模式&#xff0c;通常用于处理高并发的I/O操作&#xff0c;尤其是在服务器或网络编程中。它基于事件多路复用机制&#xff0c;使得单个线程能够同时管理大量并发连接&#xff0c;而…

el-table给列加单位,表头加样式,加斑马纹

<el-table ref"table" class"dataTable" :data"detailList" :header-cell-style"tableHeaderColor" :row-class-name"tableRowClassName" highlight-current-row><el-table-column label"序号" al…

git 基本原理

文章内容来源于视频 举个案例&#xff0c;家族里面有一本记载祖传秘籍的菊花宝典&#xff0c;这本菊花宝典的正本存储在家族祠堂里面&#xff0c;每一个家庭从正本复制一本存在自己家中&#xff0c;称为副本。这个过程称为clone 一个家庭需要再菊花宝典中添加技能&#xff0c…

Django 配置邮箱服务,实现发送信息到指定邮箱

一、这里以qq邮箱为例&#xff0c;打开qq邮箱的SMTP服务 二、django项目目录设置setting.py 文件 setting.py 添加如下内容&#xff1a; # 发送邮件相关配置 EMAIL_BACKEND django.core.mail.backends.smtp.EmailBackend EMAIL_USE_TLS True EMAIL_HOST smtp.qq.com EMAIL…