LocalEndpoint — RPC Endpoint for LocalSchedulerBackend

LocalEndpoint is the ThreadSafeRpcEndpoint for LocalSchedulerBackend and is registered under the LocalSchedulerBackendEndpoint name.

LocalEndpoint is created exclusively when LocalSchedulerBackend is requested to start.

Put simply, LocalEndpoint is the communication channel between TaskSchedulerImpl and LocalSchedulerBackend. LocalEndpoint is a (thread-safe) RpcEndpoint that hosts an executor (with driver ID and localhost hostname) for Spark local mode.

Table 1. LocalEndpoint’s RPC Messages
Message Description

KillTask

Requests the executor to kill a given task

ReviveOffers

Calls reviveOffers

StatusUpdate

StopExecutor

Requests the executor to stop

When a LocalEndpoint starts up (as part of Spark local’s initialization) it prints out the following INFO messages to the logs:

INFO Executor: Starting executor ID driver on host localhost
INFO Executor: Using REPL class URI: http://192.168.1.4:56131

LocalEndpoint creates a single Executor with the following properties:

The executor is then used when LocalEndpoint is requested to handle KillTask and StopExecutor RPC messages, and reviveOffers.

Table 2. LocalEndpoint’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

freeCores

The number of CPU cores that are free to use (to schedule tasks)

Default: Initial number of CPU cores (aka totalCores)

Increments when LocalEndpoint is requested to handle StatusUpdate RPC message with a finished state

Decrements when LocalEndpoint is requested to reviveOffers and there were tasks to execute

Note
A single task to execute costs spark.task.cpus configuration (default: 1).

Used when LocalEndpoint is requested to reviveOffers

Tip

Enable INFO logging level for org.apache.spark.scheduler.local.LocalEndpoint logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.local.LocalEndpoint=INFO

Refer to Logging.

Creating LocalEndpoint Instance

LocalEndpoint takes the following to be created:

LocalEndpoint initializes the internal registries and counters.

Processing Receive-Only RPC Messages — receive Method

receive: PartialFunction[Any, Unit]
Note
receive is part of the RpcEndpoint Contract to process receive-only RPC messages.

receive handles (processes) ReviveOffers, StatusUpdate, and KillTask RPC messages.

ReviveOffers RPC Message

ReviveOffers()

When received, LocalEndpoint reviveOffers.

Note
ReviveOffers RPC message is sent out exclusively when LocalSchedulerBackend is requested to reviveOffers.

StatusUpdate RPC Message

StatusUpdate(
  taskId: Long,
  state: TaskState,
  serializedData: ByteBuffer)

When received, LocalEndpoint requests the TaskSchedulerImpl to handle a task status update (given the taskId, the task state and the data).

If the given TaskState is a finished state (one of FINISHED, FAILED, KILLED, LOST states), LocalEndpoint adds spark.task.cpus configuration (default: 1) to the freeCores registry followed by reviveOffers.

Note
StatusUpdate RPC message is sent out exclusively when LocalSchedulerBackend is requested to statusUpdate.

KillTask RPC Message

KillTask(
  taskId: Long,
  interruptThread: Boolean,
  reason: String)

When received, LocalEndpoint requests the single Executor to kill a task (given the taskId, the interruptThread flag and the reason).

Note
KillTask RPC message is sent out exclusively when LocalSchedulerBackend is requested to kill a task.

Reviving Offers — reviveOffers Method

reviveOffers(): Unit

reviveOffers…​FIXME

Note
reviveOffers is used when LocalEndpoint is requested to handle RPC messages (namely ReviveOffers and StatusUpdate).

Processing Receive-Reply RPC Messages — receiveAndReply Method

receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]
Note
receiveAndReply is part of the RpcEndpoint Contract to process receive-reply RPC messages.

receiveAndReply handles (processes) StopExecutor RPC message exclusively.

StopExecutor RPC Message

StopExecutor()

When received, LocalEndpoint requests the single Executor to stop and requests the given RpcCallContext to reply with true (as the response).

Note
StopExecutor RPC message is sent out exclusively when LocalSchedulerBackend is requested to stop.

results matching ""

    No results matching ""