CoarseGrainedSchedulerBackend

CoarseGrainedSchedulerBackend is a SchedulerBackend.

CoarseGrainedSchedulerBackend is an ExecutorAllocationClient.

CoarseGrainedSchedulerBackend is responsible for requesting resources from a cluster manager for executors that it in turn uses to launch tasks (on coarse-grained executors).

CoarseGrainedSchedulerBackend holds executors for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task.

Caution
FIXME Picture with dependencies

CoarseGrainedSchedulerBackend registers CoarseGrainedScheduler RPC Endpoint that executors use for RPC communication.

Note
Active executors are executors that are not pending to be removed or lost.
Table 1. Built-In CoarseGrainedSchedulerBackends per Cluster Environment
Cluster Environment CoarseGrainedSchedulerBackend

Spark Standalone

StandaloneSchedulerBackend

Spark on YARN

YarnSchedulerBackend

Spark on Mesos

MesosCoarseGrainedSchedulerBackend

Note
CoarseGrainedSchedulerBackend is only created indirectly through built-in implementations per cluster environment.
Table 2. CoarseGrainedSchedulerBackend’s Internal Properties
Name Initial Value Description

currentExecutorIdCounter

The last (highest) identifier of all allocated executors.

Used exclusively in YarnSchedulerEndpoint to respond to RetrieveLastAllocatedExecutorId message.

createTime

Current time

The time CoarseGrainedSchedulerBackend was created.

defaultAskTimeout

spark.rpc.askTimeout or spark.network.timeout or 120s

Default timeout for blocking RPC messages (aka ask messages).

driverEndpoint

(uninitialized)

RPC endpoint reference to CoarseGrainedScheduler RPC endpoint (with DriverEndpoint as the message handler).

Initialized when CoarseGrainedSchedulerBackend starts.

Used when CoarseGrainedSchedulerBackend executes the following (asynchronously, i.e. on a separate thread):

executorDataMap

empty

Registry of ExecutorData by executor id.

NOTE: ExecutorData holds an executor’s endpoint reference, address, host, the number of free and total CPU cores, the URL of execution logs.

Element added when DriverEndpoint receives RegisterExecutor message and removed when DriverEndpoint receives RemoveExecutor message or a remote host (with one or many executors) disconnects.

executorsPendingToRemove

empty

Executors marked as removed but the confirmation from a cluster manager has not arrived yet.

hostToLocalTaskCount

empty

Registry of hostnames and possible number of task running on them.

localityAwareTasks

0

Number of pending tasks…​FIXME

maxRegisteredWaitingTimeMs

spark.scheduler.maxRegisteredResourcesWaitingTime

maxRpcMessageSize

spark.rpc.message.maxSize but not greater than 2047

Maximum RPC message size in MB.

When above 2047 MB you should see the following IllegalArgumentException:

spark.rpc.message.maxSize should not be greater than 2047 MB

_minRegisteredRatio

spark.scheduler.minRegisteredResourcesRatio

numPendingExecutors

0

totalCoreCount

0

Total number of CPU cores, i.e. the sum of all the cores on all executors.

totalRegisteredExecutors

0

Total number of registered executors

Tip

Enable INFO or DEBUG logging level for org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend=DEBUG

Refer to Logging.

Killing All Executors on Node — killExecutorsOnHost Method

Caution
FIXME

Making Fake Resource Offers on Executors — makeOffers Internal Methods

makeOffers(): Unit
makeOffers(executorId: String): Unit

makeOffers takes the active executors (out of the executorDataMap internal registry) and creates WorkerOffer resource offers for each (one per executor with the executor’s id, host and free cores).

Caution
Only free cores are considered in making offers. Memory is not! Why?!

It then requests TaskSchedulerImpl to process the resource offers to create a collection of TaskDescription collections that it in turn uses to launch tasks.

Creating CoarseGrainedSchedulerBackend Instance

CoarseGrainedSchedulerBackend takes the following when created:

CoarseGrainedSchedulerBackend initializes the internal registries and counters.

Getting Executor Ids — getExecutorIds Method

When called, getExecutorIds simply returns executor ids from the internal executorDataMap registry.

Note
It is called when SparkContext calculates executor ids.

CoarseGrainedSchedulerBackend Contract

class CoarseGrainedSchedulerBackend {
  def minRegisteredRatio: Double
  def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint
  def reset(): Unit
  def sufficientResourcesRegistered(): Boolean
  def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean]
  def doKillExecutors(executorIds: Seq[String]): Future[Boolean]
}
Note
CoarseGrainedSchedulerBackend is a private[spark] contract.
Table 3. FIXME Contract
Method Description

minRegisteredRatio

Ratio between 0 and 1 (inclusive).

Controlled by spark.scheduler.minRegisteredResourcesRatio.

reset

FIXME

doRequestTotalExecutors

FIXME

doKillExecutors

FIXME

sufficientResourcesRegistered

Always positive, i.e. true, that means that sufficient resources are available.

Used when CoarseGrainedSchedulerBackend checks if sufficient compute resources are available.

numExistingExecutors Method

Caution
FIXME

killExecutors Methods

Caution
FIXME

getDriverLogUrls Method

Caution
FIXME

applicationAttemptId Method

Caution
FIXME

Requesting Additional Executors — requestExecutors Method

requestExecutors(numAdditionalExecutors: Int): Boolean

requestExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).

Note
requestExecutors method is part of ExecutorAllocationClient Contract that SparkContext uses for requesting additional executors (as a part of a developer API for dynamic allocation of executors).

When called, you should see the following INFO message followed by DEBUG message in the logs:

INFO Requesting [numAdditionalExecutors] additional executor(s) from the cluster manager
DEBUG Number of pending executors is now [numPendingExecutors]

numPendingExecutors is increased by the input numAdditionalExecutors.

requestExecutors requests executors from a cluster manager (that reflects the current computation needs). The "new executor total" is a sum of the internal numExistingExecutors and numPendingExecutors decreased by the number of executors pending to be removed.

If numAdditionalExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of additional executor(s) [numAdditionalExecutors] from the cluster manager. Please specify a positive number!
Note
It is a final method that no other scheduler backends could customize further.
Note
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

Requesting Exact Number of Executors — requestTotalExecutors Method

requestTotalExecutors(
  numExecutors: Int,
  localityAwareTasks: Int,
  hostToLocalTaskCount: Map[String, Int]): Boolean

requestTotalExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).

It sets the internal localityAwareTasks and hostToLocalTaskCount registries. It then calculates the exact number of executors which is the input numExecutors and the executors pending removal decreased by the number of already-assigned executors.

If numExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of executor(s) [numExecutors] from the cluster manager. Please specify a positive number!
Note
It is a final method that no other scheduler backends could customize further.
Note
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

Finding Default Level of Parallelism — defaultParallelism Method

defaultParallelism(): Int
Note
defaultParallelism is part of the SchedulerBackend Contract.

defaultParallelism is spark.default.parallelism Spark property if set.

Otherwise, defaultParallelism is the maximum of totalCoreCount or 2.

Killing Task — killTask Method

killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit
Note
killTask is part of the SchedulerBackend contract.

killTask simply sends a KillTask message to driverEndpoint.

Caution
FIXME Image

Stopping All Executors — stopExecutors Method

stopExecutors sends a blocking StopExecutors message to driverEndpoint (if already initialized).

Note
It is called exclusively while CoarseGrainedSchedulerBackend is being stopped.

You should see the following INFO message in the logs:

INFO CoarseGrainedSchedulerBackend: Shutting down all executors

Reset State — reset Method

reset resets the internal state:

  1. Sets numPendingExecutors to 0

  2. Clears executorsPendingToRemove

  3. Sends a blocking RemoveExecutor message to driverEndpoint for every executor (in the internal executorDataMap) to inform it about SlaveLost with the message:

    Stale executor after cluster manager re-registered.

reset is a method that is defined in CoarseGrainedSchedulerBackend, but used and overriden exclusively by YarnSchedulerBackend.

Remove Executor — removeExecutor Method

removeExecutor(executorId: String, reason: ExecutorLossReason)

removeExecutor sends a blocking RemoveExecutor message to driverEndpoint.

CoarseGrainedScheduler RPC Endpoint — driverEndpoint

When CoarseGrainedSchedulerBackend starts, it registers CoarseGrainedScheduler RPC endpoint to be the driver’s communication endpoint.

driverEndpoint is a DriverEndpoint.

Note
CoarseGrainedSchedulerBackend is created while SparkContext is being created that in turn lives inside a Spark driver. That explains the name driverEndpoint (at least partially).

It is called standalone scheduler’s driver endpoint internally.

It tracks:

It uses driver-revive-thread daemon single-thread thread pool for …​FIXME

Caution
FIXME A potential issue with driverEndpoint.asInstanceOf[NettyRpcEndpointRef].toURI - doubles spark:// prefix.

Starting CoarseGrainedSchedulerBackend (and Registering CoarseGrainedScheduler RPC Endpoint) — start Method

start(): Unit
Note
start is part of the SchedulerBackend contract.

start takes all spark.-prefixed properties and registers the CoarseGrainedScheduler RPC endpoint (backed by DriverEndpoint ThreadSafeRpcEndpoint).

CoarseGrainedScheduler rpc endpoint.png
Figure 1. CoarseGrainedScheduler Endpoint
Note
start uses TaskSchedulerImpl to access the current SparkContext and in turn SparkConf.
Note
start uses RpcEnv that was given when CoarseGrainedSchedulerBackend was created.

Checking If Sufficient Compute Resources Available Or Waiting Time Passed — isReady Method

isReady(): Boolean
Note
isReady is part of the SchedulerBackend contract.

isReady allows to delay task launching until sufficient resources are available or spark.scheduler.maxRegisteredResourcesWaitingTime passes.

Note
sufficientResourcesRegistered by default responds that sufficient resources are available.

If the resources are available, you should see the following INFO message in the logs and isReady is positive.

INFO SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: [minRegisteredRatio]
Note
minRegisteredRatio is in the range 0 to 1 (uses spark.scheduler.minRegisteredResourcesRatio) to denote the minimum ratio of registered resources to total expected resources before submitting tasks.

If there are no sufficient resources available yet (the above requirement does not hold), isReady checks whether the time since startup passed spark.scheduler.maxRegisteredResourcesWaitingTime to give a way to launch tasks (even when minRegisteredRatio not being reached yet).

You should see the following INFO message in the logs and isReady is positive.

INFO SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: [maxRegisteredWaitingTimeMs](ms)

Otherwise, when no sufficient resources are available and spark.scheduler.maxRegisteredResourcesWaitingTime has not elapsed, isReady is negative.

Reviving Resource Offers (by Posting ReviveOffers to CoarseGrainedSchedulerBackend RPC Endpoint) — reviveOffers Method

reviveOffers(): Unit
Note
reviveOffers is part of the SchedulerBackend contract.

reviveOffers simply sends a ReviveOffers message to CoarseGrainedSchedulerBackend RPC endpoint.

CoarseGrainedExecutorBackend reviveOffers.png
Figure 2. CoarseGrainedExecutorBackend Revives Offers

Stopping CoarseGrainedSchedulerBackend (and Stopping Executors) — stop Method

stop(): Unit
Note
stop is part of the SchedulerBackend contract.

stop stops all executors and CoarseGrainedScheduler RPC endpoint (by sending a blocking StopDriver message).

In case of any Exception, stop reports a SparkException with the message:

Error stopping standalone scheduler's driver endpoint

createDriverEndpointRef Method

createDriverEndpointRef(properties: ArrayBuffer[(String, String)]): RpcEndpointRef

createDriverEndpointRef creates DriverEndpoint and registers it as CoarseGrainedScheduler.

Note
createDriverEndpointRef is used when CoarseGrainedSchedulerBackend starts.

Creating DriverEndpoint — createDriverEndpoint Method

createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint

createDriverEndpoint simply creates a DriverEndpoint.

Note
The purpose of createDriverEndpoint is to allow YARN to use the custom YarnDriverEndpoint.
Note
createDriverEndpoint is used when CoarseGrainedSchedulerBackend createDriverEndpointRef.

Settings

Table 4. Spark Properties
Property Default Value Description

spark.scheduler.revive.interval

1s

Time (in milliseconds) between resource offers revives.

spark.rpc.message.maxSize

128

Maximum message size to allow in RPC communication. In MB when the unit is not given.

Generally only applies to map output size (serialized) information sent between executors and the driver.

Increase this if you are running jobs with many thousands of map and reduce tasks and see messages about the RPC message size.

spark.scheduler.minRegisteredResourcesRatio

0

Double number between 0 and 1 (including) that controls the minimum ratio of (registered resources / total expected resources) before submitting tasks.

See isReady in this document.

spark.scheduler.maxRegisteredResourcesWaitingTime

30s

Time to wait for sufficient resources available.

See isReady in this document.

results matching ""

    No results matching ""