HeartbeatReceiver RPC Endpoint

HeartbeatReceiver is a ThreadSafeRpcEndpoint and a SparkListener.

HeartbeatReceiver acts as message endpoint for HeartbeatReceiver RPC endpoint and is registered when SparkContext is created.

HeartbeatReceiver tracks executors and informs TaskScheduler and SparkContext about lost executors.

spark HeartbeatReceiver Heartbeat.png
Figure 1. HeartbeatReceiver RPC Endpoint and Heartbeats from Executors
Table 1. HeartbeatReceiver RPC Endpoint’s Messages (in alphabetical order)
Message Description

ExecutorRemoved

FIXME

ExecutorRegistered

Posted when HeartbeatReceiver is notified that a new executor has been registered (with a Spark application).

ExpireDeadHosts

FIXME

Heartbeat

Posted when Executor informs that it is alive and reports task metrics.

TaskSchedulerIsSet

Posted when SparkContext informs that TaskScheduler is available.

Table 2. HeartbeatReceiver’s Internal Registries and Counters
Name Description

executorLastSeen

Executor ids and the timestamps of when the last heartbeat was received.

scheduler

TaskScheduler

Tip

Enable DEBUG or TRACE logging levels for org.apache.spark.HeartbeatReceiver to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.HeartbeatReceiver=TRACE

Refer to Logging.

Creating HeartbeatReceiver Instance

HeartbeatReceiver takes the following when created:

HeartbeatReceiver initializes the internal registries and counters.

Starting HeartbeatReceiver RPC Endpoint — onStart Method

Note
onStart is part of the RpcEndpoint contract

Stopping HeartbeatReceiver RPC Endpoint — onStop Method

Note
onStop is part of the RpcEndpoint Contract

When called, HeartbeatReceiver cancels the checking task (that sends a blocking ExpireDeadHosts every spark.network.timeoutInterval on eventLoopThread - Heartbeat Receiver Event Loop Thread - see Starting (onStart method)) and shuts down eventLoopThread and killExecutorThread executors.

killExecutorThread — Kill Executor Thread

killExecutorThread is a daemon ScheduledThreadPoolExecutor with a single thread.

The name of the thread pool is kill-executor-thread.

Note
It is used to request SparkContext to kill the executor.

eventLoopThread — Heartbeat Receiver Event Loop Thread

eventLoopThread is a daemon ScheduledThreadPoolExecutor with a single thread.

The name of the thread pool is heartbeat-receiver-event-loop-thread.

ExecutorRegistered

ExecutorRegistered(executorId: String)

When received, HeartbeatReceiver registers the executorId executor and the current time (in executorLastSeen internal registry).

Note
HeartbeatReceiver uses the internal Clock to know the current time.

ExecutorRemoved

ExecutorRemoved(executorId: String)

When ExecutorRemoved arrives, executorId is simply removed from executorLastSeen internal registry.

Note
HeartbeatReceiver itself sends a ExecutorRegistered message (from removeExecutor internal method). It is as a follow-up to SparkListener.onExecutorRemoved when a driver removes an executor.
Note
It is an internal message.

ExpireDeadHosts

ExpireDeadHosts

When ExpireDeadHosts arrives the following TRACE is printed out to the logs:

TRACE HeartbeatReceiver: Checking for hosts with no recent heartbeats in HeartbeatReceiver.

Each executor (in executorLastSeen registry) is checked whether the time it was last seen is not longer than spark.network.timeout.

For any such executor, the following WARN message is printed out to the logs:

WARN HeartbeatReceiver: Removing executor [executorId] with no recent heartbeats: [time] ms exceeds timeout [timeout] ms

TaskScheduler.executorLost is called (with SlaveLost("Executor heartbeat timed out after [timeout] ms").

SparkContext.killAndReplaceExecutor is asynchronously called for the executor (i.e. on killExecutorThread).

The executor is removed from executorLastSeen.

Note
It is an internal message.

Heartbeat

Heartbeat(executorId: String,
  accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
  blockManagerId: BlockManagerId)

When received, HeartbeatReceiver finds the executorId executor (in executorLastSeen registry).

When the executor is found, HeartbeatReceiver updates the time the heartbeat was received (in executorLastSeen).

Note
HeartbeatReceiver uses the internal Clock to know the current time.

HeartbeatReceiver then submits an asynchronous task to notify TaskScheduler that the heartbeat was received from the executor. HeartbeatReceiver posts a HeartbeatResponse back to the executor (with the response from TaskScheduler whether the executor has been registered already or not so it may eventually need to re-register).

If however the executor was not found (in executorLastSeen registry), i.e. the executor was not registered before, you should see the following DEBUG message in the logs and the response is to notify the executor to re-register.

DEBUG Received heartbeat from unknown executor [executorId]

In a very rare case, when TaskScheduler is not yet assigned to HeartbeatReceiver, you should see the following WARN message in the logs and the response is to notify the executor to re-register.

WARN Dropping [heartbeat] because TaskScheduler is not ready yet
Note
TaskScheduler can be unassigned when no TaskSchedulerIsSet has not been received yet.

TaskSchedulerIsSet

TaskSchedulerIsSet

When received, HeartbeatReceiver sets the internal reference to TaskScheduler.

Note
HeartbeatReceiver uses SparkContext that is given when HeartbeatReceiver is created.

onExecutorAdded Method

onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit

onExecutorAdded simply sends a ExecutorRegistered message to itself (that in turn registers an executor).

Note
onExecutorAdded is a part of SparkListener contract to announce that a new executor was registered with a Spark application.

Sending ExecutorRegistered Message to Itself — addExecutor Internal Method

addExecutor(executorId: String): Option[Future[Boolean]]

addExecutor sends ExecutorRegistered message (to register executorId executor).

Note
addExecutor is used when HeartbeatReceiver is notified that a new executor was added.

Settings

Table 3. Spark Properties
Spark Property Default Value Description

spark.storage.blockManagerTimeoutIntervalMs

60s

spark.storage.blockManagerSlaveTimeoutMs

120s

spark.network.timeout

spark.storage.blockManagerSlaveTimeoutMs

See spark.network.timeout in RPC Environment (RpcEnv)

spark.network.timeoutInterval

spark.storage.blockManagerTimeoutIntervalMs

results matching ""

    No results matching ""