Spark Stage- An Introduction to Physical Execution plan

1. Objective

A stage is nothing but a step in a physical execution plan. Moreover, It is a physical unit of the execution plan. This document aims the whole concept of Apache Spark Stage. Also, we will learn the types of Stages in Spark. However, Spark stages are of two types. Such as ShuffleMapstage in Spark and ResultStage in spark. There is also a method to create Spark Stage, We will also learn that in detail.

Spark Stage - Task and submitting jobs

Task & Submitting a job in Spark Stage

2. Stages in Spark

A stage is nothing but a step in a physical execution plan. Moreover, It is a physical unit of the execution plan. In other words, Stage is a set of parallel tasks i.e. one task per partition. Basically,  each job which gets divided into smaller sets of tasks is a stage. Although,  it totally depends on each other. However, we can say it is as same as the map and reduce stages in MapReduce.

Basically, with many other dependent parent stages, we can associate spark stage. However, it can only work on the partitions of a single RDD. Also, with the boundary of a stage in spark marked by shuffle dependencies.

Ultimately,  submission of Spark stage triggers execution of a series of dependent parent stages. Although, There is a first Job Id present at every stage that is the id of the job which submits stage in Spark.

Spark Stage - Submitting a job triggers execution of the stage

Submitting a job triggers execution of the stage and its parent Spark stages

3. Types of Spark Stage

Basically, stages in Apache spark are two categories

a. ShuffleMapStage in Spark

b. ResultStage in Spark

Let’s discuss each type of Spark Stages in detail:

a. ShuffleMapStage in Spark

ShuffleMapStage is considered as an intermediate Spark stage in the physical execution of DAG. Basically, it produces data for another stage(s). Also, for a shuffle writes map output files. Moreover, in a job in Adaptive Query Planning / Adaptive Scheduling we can consider it as the final stage in Apache Spark. Further, it is possible to submit it independently as a Spark job for Adaptive Query Planning.

In addition,  at the time of execution, a Spark ShuffleMapStage saves map output files. Further, we can fetch those files can by reduce tasks. Moreover, when all map outputs are available the ShuffleMapStage is considered ready. Although, output locations can be missing sometimes. Two things we can infer from this scenario. Those are partitions might not be calculated or are lost. However, we can track how many shuffle map outputs are available. To track these stages uses outputLocs &_numAvailableOutputs internal registries.

Moreover, we consider ShuffleMapStage in Spark as an input for the other following Spark stages in the DAG of stages. Basically, that is shuffle dependency’s map side. However,  it is possible that there is n number of multiple pipeline operations, in ShuffleMapStage. like map and filter, before shuffle operation. Furthermore, we can share single ShuffleMapStage among different jobs.

Spark Stage - DAGScheduler & Spark Job

DAGScheduler & Spark stages for a job

b. ResultStage in Spark

By running a function on a spark RDD Stage that executes a Spark action in a user program is a ResultStage. Moreover, it is considered as a final stage in spark. Although, ResultStage implies as a final stage in a job that applies a function on one or many partitions of the target RDD in Spark. Also, helps for computation of the result of an action.

Graph of Spark Stage

Graph of Spark Stage with DAGScheduler

4. Getting StageInfo For Most Recent Attempt

There is one more method, latestInfo method. Basically, it helps to know the most recent StageInfo.`

latestInfo: StageInfo

5. Stage Contract

Also, it is a private[scheduler] abstract contract.

abstract class Stage {

def findMissingPartitions(): Seq[Int]


6. Method To Create New Apache Spark Stage

There is a basic method by which we can create the new stage in Spark. The method is:


numPartitionsToCompute: Int,

taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit

Basically, it creates a new TaskMetrics. Moreover, with the help of RDD’s SparkContext, we register the internal accumulators. Furthermore, we can also use the same Spark rdd that was defined when we were creating Stage.

In addition, to set latestInfo to be a StageInfo, from Stage we can use following. Such as nextAttemptId, numPartitionsToCompute, & taskLocalityPreferences. Also, increments nextAttemptId counter.

The very important thing to note is that we use this method only when DAGScheduler submits missing tasks for a Spark stage.

7. Conclusion

In this blog, we have studied the whole concept of Apache Spark Stages. Hope, this document helped to calm the curiosity about Stage in Spark.

For reference.


Leave a comment

Your email address will not be published. Required fields are marked *