YarnSchedulerBackend — Foundation for Coarse-Grained Scheduler Backends for YARN

YarnSchedulerBackend is a CoarseGrainedSchedulerBackend that acts as the foundation for the concrete deploy mode-specific Spark scheduler backends for YARN, i.e. YarnClientSchedulerBackend and YarnClusterSchedulerBackend for client deploy mode and cluster deploy mode, respectively.

YarnSchedulerBackend registers itself as YarnScheduler RPC endpoint in the RPC Environment.

yarn YarnSchedulerBackend.png
Figure 1. YarnSchedulerBackend in YARN Container

YarnSchedulerBackend is ready to accept task launch requests right after the sufficient executors are registered (that varies on dynamic allocation being enabled or not).

Note
With no extra configuration, YarnSchedulerBackend is ready for task launch requests when 80% of all the requested executors are available.
Note
YarnSchedulerBackend is an private[spark] abstract class and is never created directly (but only indirectly through the concrete implementations YarnClientSchedulerBackend and YarnClusterSchedulerBackend).
Table 1. YarnSchedulerBackend’s Internal Properties
Name Initial Value Description

minRegisteredRatio

Ratio for minimum number of registered executors to claim YarnSchedulerBackend is ready for task launch requests.

Minimum expected number of executors that is used to ensure that sufficient resources are available (and start accepting task launch requests).

yarnSchedulerEndpoint

YarnSchedulerEndpoint object

yarnSchedulerEndpointRef

RPC endpoint reference to YarnScheduler RPC endpoint

Created when YarnSchedulerBackend is created.

totalExpectedExecutors

0

Total expected number of executors that is used to ensure that sufficient resources are available (and start accepting task launch requests).

Updated to the final value when Spark on YARN starts (in client mode or cluster mode).

askTimeout

FIXME

FIXME

appId

FIXME

FIXME

attemptId

(undefined)

YARN’s ApplicationAttemptId of a Spark application.

Only defined in cluster deploy mode.

Set when YarnClusterSchedulerBackend starts (and bindToYarn is called) using YARN’s ApplicationMaster.getAttemptId.

Used for applicationAttemptId which is part of SchedulerBackend Contract.

shouldResetOnAmRegister

Controls whether to reset YarnSchedulerBackend when another RegisterClusterManager RPC message arrives and allows resetting internal state after the initial ApplicationManager failed and a new one was registered (that can only happen in client deploy mode).

Disabled (i.e. false) when YarnSchedulerBackend is created

Resetting YarnSchedulerBackend — reset Method

reset resets the parent CoarseGrainedSchedulerBackend scheduler backend and ExecutorAllocationManager (accessible by SparkContext.executorAllocationManager).

doRequestTotalExecutors Method

def doRequestTotalExecutors(requestedTotal: Int): Boolean
Note
doRequestTotalExecutors is part of the CoarseGrainedSchedulerBackend Contract.
spark YarnSchedulerBackend doRequestTotalExecutors.png
Figure 2. Requesting Total Executors in YarnSchedulerBackend (doRequestTotalExecutors method)

doRequestTotalExecutors simply sends a blocking RequestExecutors message to YarnScheduler RPC Endpoint with the input requestedTotal and the internal localityAwareTasks and hostToLocalTaskCount attributes.

Caution
FIXME The internal attributes are already set. When and how?

Starting the Backend — start Method

start creates a SchedulerExtensionServiceBinding object (using SparkContext, appId, and attemptId) and starts it (using SchedulerExtensionServices.start(binding)).

Note
A SchedulerExtensionServices object is created when YarnSchedulerBackend is initialized and available as services.

Ultimately, it calls the parent’s CoarseGrainedSchedulerBackend.start.

Note

start throws IllegalArgumentException when the internal appId has not been set yet.

java.lang.IllegalArgumentException: requirement failed: application ID unset

Stopping the Backend — stop Method

stop calls the parent’s CoarseGrainedSchedulerBackend.requestTotalExecutors (using (0, 0, Map.empty) parameters).

Caution
FIXME Explain what 0, 0, Map.empty means after the method’s described for the parent.

It calls the parent’s CoarseGrainedSchedulerBackend.stop.

Ultimately, it stops the internal SchedulerExtensionServiceBinding object (using services.stop()).

Caution
FIXME Link the description of services.stop() here.

Recording Application and Attempt Ids — bindToYarn Method

bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit

bindToYarn sets the internal appId and attemptId to the value of the input parameters, appId and attemptId, respectively.

Note
start requires appId.

Requesting YARN for Spark Application’s Current Attempt Id — applicationAttemptId Method

applicationAttemptId(): Option[String]
Note
applicationAttemptId is part of SchedulerBackend Contract.

applicationAttemptId requests the internal YARN’s ApplicationAttemptId for the Spark application’s current attempt id.

Creating YarnSchedulerBackend Instance

Note
This section is only to take notes about the required components to instantiate the base services.

YarnSchedulerBackend takes the following when created:

YarnSchedulerBackend initializes the internal properties.

Checking if Enough Executors Are Available — sufficientResourcesRegistered Method

sufficientResourcesRegistered(): Boolean
Note
sufficientResourcesRegistered is part of the CoarseGrainedSchedulerBackend contract that makes sure that sufficient resources are available.

sufficientResourcesRegistered is positive, i.e. true, when totalRegisteredExecutors is exactly or above minRegisteredRatio of totalExpectedExecutors.

results matching ""

    No results matching ""