JobProgressListener Spark Listener

JobProgressListener is a SparkListener for web UI.

JobProgressListener intercepts the following Spark events.

Table 1. JobProgressListener Events
Handler Purpose

onJobStart

Creates a JobUIData. It updates jobGroupToJobIds, pendingStages, jobIdToData, activeJobs, stageIdToActiveJobIds, stageIdToInfo and stageIdToData.

onJobEnd

Removes an entry in activeJobs. It also removes entries in pendingStages and stageIdToActiveJobIds. It updates completedJobs, numCompletedJobs, failedJobs, numFailedJobs and skippedStages.

onStageCompleted

Updates the StageUIData and JobUIData.

onTaskStart

Updates the task’s StageUIData and JobUIData, and registers a new TaskUIData.

onTaskEnd

Updates the task’s StageUIData (and TaskUIData), ExecutorSummary, and JobUIData.

onExecutorMetricsUpdate

onEnvironmentUpdate

Sets schedulingMode property using the current spark.scheduler.mode (from Spark Properties environment details).

Used in Jobs tab (for the Scheduling Mode), and to display pools in JobsTab and StagesTab.

FIXME: Add the links/screenshots for pools.

onBlockManagerAdded

Records an executor and its block manager in the internal executorIdToBlockManagerId registry.

onBlockManagerRemoved

Removes the executor from the internal executorIdToBlockManagerId registry.

onApplicationStart

Records a Spark application’s start time (in the internal startTime).

Used in Jobs tab (for a total uptime and the event timeline) and Job page (for the event timeline).

onApplicationEnd

Records a Spark application’s end time (in the internal endTime).

Used in Jobs tab (for a total uptime).

onTaskGettingResult

Does nothing.

FIXME: Why is this event intercepted at all?!

updateAggregateMetrics Method

Caution
FIXME

Registries and Counters

JobProgressListener uses registries to collect information about job executions.

Table 2. JobProgressListener Registries and Counters
Name Description

numCompletedStages

numFailedStages

stageIdToData

Holds StageUIData per stage, i.e. the stage and stage attempt ids.

stageIdToInfo

stageIdToActiveJobIds

poolToActiveStages

activeJobs

completedJobs

failedJobs

jobIdToData

jobGroupToJobIds

pendingStages

activeStages

completedStages

skippedStages

failedStages

executorIdToBlockManagerId

The lookup table for BlockManagerId per executor id.

Used to track block managers so the Stage page can display Address in Aggregated Metrics by Executor.

FIXME: How does Executors page collect the very same information?

onJobStart Callback

onJobStart(jobStart: SparkListenerJobStart): Unit

onJobStart reads the optional Spark Job group id as spark.jobGroup.id (from properties in the input jobStart).

onJobStart then creates a JobUIData using the input jobStart with status attribute set to JobExecutionStatus.RUNNING and records it in jobIdToData and activeJobs registries.

onJobStart looks the job ids for the group id (in jobGroupToJobIds registry) and adds the job id.

The internal pendingStages is updated with StageInfo for the stage id (for every StageInfo in SparkListenerJobStart.stageInfos collection).

onJobStart records the stages of the job in stageIdToActiveJobIds.

onJobStart records StageInfos in stageIdToInfo and stageIdToData.

onJobEnd Method

onJobEnd(jobEnd: SparkListenerJobEnd): Unit

onJobEnd removes an entry in activeJobs. It also removes entries in pendingStages and stageIdToActiveJobIds. It updates completedJobs, numCompletedJobs, failedJobs, numFailedJobs and skippedStages.

onJobEnd removes the job from activeJobs registry. It removes stages from pendingStages registry.

When completed successfully, the job is added to completedJobs registry with status attribute set to JobExecutionStatus.SUCCEEDED. numCompletedJobs gets incremented.

When failed, the job is added to failedJobs registry with status attribute set to JobExecutionStatus.FAILED. numFailedJobs gets incremented.

For every stage in the job, the stage is removed from the active jobs (in stageIdToActiveJobIds) that can remove the entire entry if no active jobs exist.

Every pending stage in stageIdToInfo gets added to skippedStages.

onExecutorMetricsUpdate Method

onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit

onTaskStart Method

onTaskStart(taskStart: SparkListenerTaskStart): Unit

onTaskStart updates StageUIData and JobUIData, and registers a new TaskUIData.

onTaskStart takes TaskInfo from the input taskStart.

onTaskStart looks the StageUIData for the stage and stage attempt ids up (in stageIdToData registry).

onTaskStart increments numActiveTasks and puts a TaskUIData for the task in stageData.taskData.

Ultimately, onTaskStart looks the stage in the internal stageIdToActiveJobIds and for each active job reads its JobUIData (from jobIdToData). It then increments numActiveTasks.

onTaskEnd Method

onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit

onTaskEnd updates the StageUIData (and TaskUIData), ExecutorSummary, and JobUIData.

onTaskEnd takes TaskInfo from the input taskEnd.

Note
onTaskEnd does its processing when the TaskInfo is available and stageAttemptId is not -1.

onTaskEnd looks the StageUIData for the stage and stage attempt ids up (in stageIdToData registry).

onTaskEnd saves accumulables in the StageUIData.

onTaskEnd reads the ExecutorSummary for the executor (the task has finished on).

Depending on the task end’s reason onTaskEnd increments succeededTasks, killedTasks or failedTasks counters.

onTaskEnd adds the task’s duration to taskTime.

onTaskEnd decrements the number of active tasks (in the StageUIData).

Again, depending on the task end’s reason onTaskEnd computes errorMessage and updates StageUIData.

Caution
FIXME Why is the same information in two different registries — stageData and execSummary?!

If taskMetrics is available, updateAggregateMetrics is executed.

The task’s TaskUIData is looked up in stageData.taskData and updateTaskInfo and updateTaskMetrics are executed. errorMessage is updated.

onTaskEnd makes sure that the number of tasks in StageUIData (stageData.taskData) is not above spark.ui.retainedTasks and drops the excess.

Ultimately, onTaskEnd looks the stage in the internal stageIdToActiveJobIds and for each active job reads its JobUIData (from jobIdToData). It then decrements numActiveTasks and increments numCompletedTasks, numKilledTasks or numFailedTasks depending on the task’s end reason.

onStageSubmitted Method

onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit

onStageCompleted Method

onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit

onStageCompleted updates the StageUIData and JobUIData.

onStageCompleted reads stageInfo from the input stageCompleted and records it in stageIdToInfo registry.

onStageCompleted looks the StageUIData for the stage and the stage attempt ids up in stageIdToData registry.

onStageCompleted records accumulables in StageUIData.

onStageCompleted removes the stage from poolToActiveStages and activeStages registries.

If the stage completed successfully (i.e. has no failureReason), onStageCompleted adds the stage to completedStages registry and increments numCompletedStages counter. It trims completedStages.

Otherwise, when the stage failed, onStageCompleted adds the stage to failedStages registry and increments numFailedStages counter. It trims failedStages.

Ultimately, onStageCompleted looks the stage in the internal stageIdToActiveJobIds and for each active job reads its JobUIData (from jobIdToData). It then decrements numActiveStages. When completed successfully, it adds the stage to completedStageIndices. With failure, numFailedStages gets incremented.

JobUIData

Caution
FIXME

blockManagerIds method

blockManagerIds: Seq[BlockManagerId]
Caution
FIXME

StageUIData

Caution
FIXME

Settings

Table 3. Spark Properties
Setting Default Value Description

spark.ui.retainedJobs

1000

The number of jobs to hold information about

spark.ui.retainedStages

1000

The number of stages to hold information about

spark.ui.retainedTasks

100000

The number of tasks to hold information about

results matching ""

    No results matching ""