YarnAllocator — YARN Resource Container Allocator

YarnAllocator requests resources from a YARN cluster (in a form of containers from YARN ResourceManager) and manages the container allocations by allocating them to Spark executors and releasing them when no longer needed by a Spark application.

YarnAllocator manages resources using AMRMClient (that YarnRMClient passes in when creating a YarnAllocator).

spark yarn YarnAllocator amClient ResourceManager.png
Figure 1. Creating YarnAllocator

YarnAllocator is part of the internal state of ApplicationMaster (via the internal allocator reference).

spark yarn YarnAllocator.png
Figure 2. ApplicationMaster uses YarnAllocator (via allocator attribute)
spark yarn YarnAllocator runAllocatedContainers.png
Figure 3. YarnAllocator Runs ExecutorRunnables in Allocated YARN Containers
Table 1. YarnAllocator’s Internal Registries and Counters
Name Description

resource

The YARN Resource that sets capacity requirement (i.e. memory and virtual cores) of a single executor.

NOTE: Resource models a set of computer resources in the cluster. Currently both memory and virtual CPU cores (vcores).

Created when YarnAllocator is created and is the sum of executorMemory and memoryOverhead for the amount of memory and executorCores for the number of virtual cores.

executorIdCounter

Used to set executor id when launching Spark executors in allocated YARN resource containers.

Set to the last allocated executor id (received through a RPC system when YarnAllocator is created).

targetNumExecutors

Current desired total number of executors (as YARN resource containers).

Set to the initial number of executors when YarnAllocator is created.

targetNumExecutors is eventually reached after YarnAllocator updates YARN container allocation requests.

May later be changed when YarnAllocator is requested for total number of executors given locality preferences.

Used when requesting missing resource containers and launching Spark executors in the allocated resource containers.

numExecutorsRunning

Current number of…​FIXME

Used to update YARN container allocation requests and get the current number of executors running.

Incremented when launching Spark executors in allocated YARN resource containers and decremented when releasing a resource container for a Spark executor.

currentNodeBlacklist

List of…​FIXME

releasedContainers

Unneeded containers that are of no use anymore by their globally unique identifier ContainerId (for a Container in the cluster).

NOTE: Hadoop YARN’s Container represents an allocated resource in the cluster. The YARN ResourceManager is the sole authority to allocate any Container to applications. The allocated Container is always on a single node and has a unique ContainerId. It has a specific amount of Resource allocated.

allocatedHostToContainersMap

Lookup table

allocatedContainerToHostMap

Lookup Table

pendingLossReasonRequests

releasedExecutorLossReasons

executorIdToContainer

numUnexpectedContainerRelease

containerIdToExecutorId

hostToLocalTaskCounts

Lookup table

failedExecutorsTimeStamps

executorMemory

memoryOverhead

executorCores

launchContainers

labelExpression

nodeLabelConstructor

containerPlacementStrategy

launcherPool

ContainerLauncher Thread Pool

numLocalityAwareTasks

Number of locality-aware tasks to be used as container placement hint when YarnAllocator is requested for executors given locality preferences.

Set to 0 when YarnAllocator is created.

Used as an input to containerPlacementStrategy.localityOfRequestedContainers when YarnAllocator updates YARN container allocation requests.

Tip

Enable INFO or DEBUG logging level for org.apache.spark.deploy.yarn.YarnAllocator logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.deploy.yarn.YarnAllocator=DEBUG

Refer to Logging.

Creating YarnAllocator Instance

YarnAllocator takes the following when created:

  1. driverUrl

  2. driverRef — RpcEndpointRef to the driver’s FIXME

  3. YarnConfiguration

  4. sparkConf — SparkConf

  5. amClient AMRMClient for ContainerRequest

  6. ApplicationAttemptId

  7. SecurityManager

  8. localResources — Map[String, LocalResource]

All the input parameters for YarnAllocator (but appAttemptId and amClient) are passed directly from the input parameters of YarnRMClient.

YarnAllocator sets the org.apache.hadoop.yarn.util.RackResolver logger to WARN (unless set to some log level already).

YarnAllocator initializes the internal registries and counters.

It sets the following internal counters:

It creates an empty queue of failed executors.

It sets the internal executorFailuresValidityInterval to spark.yarn.executor.failuresValidityInterval.

It sets the internal executorMemory to spark.executor.memory.

It sets the internal memoryOverhead to spark.yarn.executor.memoryOverhead. If unavailable, it is set to the maximum of 10% of executorMemory and 384.

It sets the internal executorCores to spark.executor.cores.

It creates the internal resource to Hadoop YARN’s Resource with both executorMemory + memoryOverhead memory and executorCores CPU cores.

It creates the internal launcherPool called ContainerLauncher with maximum spark.yarn.containerLauncherMaxThreads threads.

It sets the internal launchContainers to spark.yarn.launchContainers.

It sets the internal labelExpression to spark.yarn.executor.nodeLabelExpression.

It sets the internal nodeLabelConstructor to…​FIXME

Caution
FIXME nodeLabelConstructor?

It sets the internal containerPlacementStrategy to…​FIXME

Caution
FIXME LocalityPreferredContainerPlacementStrategy?

getNumExecutorsRunning Method

Caution
FIXME

updateInternalState Method

Caution
FIXME

killExecutor Method

Caution
FIXME

Specifying Current Total Number of Executors with Locality Preferences — requestTotalExecutorsWithPreferredLocalities Method

requestTotalExecutorsWithPreferredLocalities(
  requestedTotal: Int,
  localityAwareTasks: Int,
  hostToLocalTaskCount: Map[String, Int],
  nodeBlacklist: Set[String]): Boolean

requestTotalExecutorsWithPreferredLocalities returns whether the current desired total number of executors is different than the input requestedTotal.

Note
requestTotalExecutorsWithPreferredLocalities should instead have been called shouldRequestTotalExecutorsWithPreferredLocalities since it answers the question whether to request new total executors or not.

requestTotalExecutorsWithPreferredLocalities sets the internal numLocalityAwareTasks and hostToLocalTaskCounts attributes to the input localityAwareTasks and hostToLocalTaskCount arguments, respectively.

If the input requestedTotal is different than the internal targetNumExecutors you should see the following INFO message in the logs:

INFO YarnAllocator: Driver requested a total number of [requestedTotal] executor(s).

requestTotalExecutorsWithPreferredLocalities saves the input requestedTotal to be the current desired total number of executors.

requestTotalExecutorsWithPreferredLocalities updates blacklist information to YARN ResouceManager for this application in order to avoid allocating new Containers on the problematic nodes.

Caution
FIXME Describe the blacklisting
Note
requestTotalExecutorsWithPreferredLocalities is executed in response to RequestExecutors message to ApplicationMaster.

Adding or Removing Container Requests to Launch Executors — updateResourceRequests Method

updateResourceRequests(): Unit

updateResourceRequests requests new or cancels outstanding executor containers from the YARN ResourceManager.

Note
In YARN, you have to request containers for resources first (using AMRMClient.addContainerRequest) before calling AMRMClient.allocate.

It gets the list of outstanding YARN’s ContainerRequests (using the constructor’s AMRMClient[ContainerRequest]) and aligns their number to current workload.

updateResourceRequests consists of two main branches:

  1. missing executors, i.e. when the number of executors allocated already or pending does not match the needs and so there are missing executors.

  2. executors to cancel, i.e. when the number of pending executor allocations is positive, but the number of all the executors is more than Spark needs.

Note
updateResourceRequests is used when YarnAllocator requests new resource containers.

Case 1. Missing Executors

You should see the following INFO message in the logs:

INFO YarnAllocator: Will request [count] executor containers, each with [vCores] cores and [memory] MB memory including [memoryOverhead] MB overhead

It then splits pending container allocation requests per locality preference of pending tasks (in the internal hostToLocalTaskCounts registry).

Caution
FIXME Review splitPendingAllocationsByLocality

It removes stale container allocation requests (using YARN’s AMRMClient.removeContainerRequest).

Caution
FIXME Stale?

You should see the following INFO message in the logs:

INFO YarnAllocator: Canceled [cancelledContainers] container requests (locality no longer needed)

It computes locality of requested containers (based on the internal numLocalityAwareTasks, hostToLocalTaskCounts and allocatedHostToContainersMap lookup table).

Caution
FIXME Review containerPlacementStrategy.localityOfRequestedContainers + the code that follows.

For any new container needed updateResourceRequests adds a container request (using YARN’s AMRMClient.addContainerRequest).

You should see the following INFO message in the logs:

INFO YarnAllocator: Submitted container request (host: [host], capability: [resource])

Case 2. Cancelling Pending Executor Allocations

When there are executors to cancel (case 2.), you should see the following INFO message in the logs:

INFO Canceling requests for [numToCancel] executor container(s) to have a new desired total [targetNumExecutors] executors.

It checks whether there are pending allocation requests and removes the excess (using YARN’s AMRMClient.removeContainerRequest). If there are no pending allocation requests, you should see the WARN message in the logs:

WARN Expected to find pending requests, but found none.

Handling Allocated Containers for Executors — handleAllocatedContainers Internal Method

handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit

handleAllocatedContainers handles allocated YARN containers, i.e. runs Spark executors on matched containers or releases unneeded containers.

Note
A YARN Container represents an allocated resource in the cluster. The allocated Container is always on a single node and has a unique ContainerId. It has a specific amount of Resource allocated.

If handleAllocatedContainers did not manage to allocate some containers, you should see the following DEBUG message in the logs:

DEBUG Releasing [size] unneeded containers that were allocated to us

handleAllocatedContainers releases the unneeded containers (if there are any).

handleAllocatedContainers runs the allocated and matched containers.

You should see the following INFO message in the logs:

INFO Received [allocatedContainersSize] containers from YARN, launching executors on [containersToUseSize] of them.
Note
handleAllocatedContainers is used exclusively when YarnAllocator allocates YARN resource containers for Spark executors.

Running ExecutorRunnables (with CoarseGrainedExecutorBackends) in Allocated YARN Resource Containers — runAllocatedContainers Internal Method

runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit

runAllocatedContainers traverses the YARN Container collection (as the input containersToUse) and schedules execution of ExecutorRunnables per YARN container on ContainerLauncher thread pool.

spark yarn YarnAllocator runAllocatedContainers.png
Figure 4. YarnAllocator Runs ExecutorRunnables in Allocated YARN Containers
Note
A Container in YARN represents allocated resources (memory and cores) in the cluster.

Internally, runAllocatedContainers increments executorIdCounter internal counter.

Note
runAllocatedContainers asserts that the amount of memory of a container not less than the requested memory for executors. And only memory!

You should see the following INFO message in the logs:

INFO YarnAllocator: Launching container [containerId] for on host [executorHostname]

runAllocatedContainers checks if the number of executors running is less than the number of required executors.

If there are executors still missing (and runAllocatedContainers is not in testing mode), runAllocatedContainers schedules execution of a ExecutorRunnable on ContainerLauncher thread pool and updates internal state. When executing a ExecutorRunnable runAllocatedContainers first creates a ExecutorRunnable and starts it.

When runAllocatedContainers catches a non-fatal exception and you should see the following ERROR message in the logs and immediately releases the container (using the internal AMRMClient).

ERROR Failed to launch executor [executorId] on container [containerId]

If YarnAllocator has reached target number of executors, you should see the following INFO message in the logs:

INFO Skip launching executorRunnable as running Executors count: [numExecutorsRunning] reached target Executors count: [targetNumExecutors].
Note
runAllocatedContainers is used exclusively when YarnAllocator handles allocated YARN containers.

Releasing YARN Container — internalReleaseContainer Internal Procedure

All unnecessary YARN containers (that were allocated but are either of no use or no longer needed) are released using the internal internalReleaseContainer procedure.

internalReleaseContainer(container: Container): Unit

internalReleaseContainer records container in the internal releasedContainers registry and releases it to the YARN ResourceManager (calling AMRMClient[ContainerRequest].releaseAssignedContainer using the internal amClient).

Deciding on Use of YARN Container — matchContainerToRequest Internal Method

When handleAllocatedContainers handles allocated containers for executors, it uses matchContainerToRequest to match the containers to ContainerRequests (and hence to workload and location preferences).

matchContainerToRequest(
  allocatedContainer: Container,
  location: String,
  containersToUse: ArrayBuffer[Container],
  remaining: ArrayBuffer[Container]): Unit

matchContainerToRequest puts allocatedContainer in containersToUse or remaining collections per available outstanding ContainerRequests that match the priority of the input allocatedContainer, the input location, and the memory and vcore capabilities for Spark executors.

Note
The input location can be host, rack, or * (star), i.e. any host.

If there are any outstanding ContainerRequests that meet the requirements, it simply takes the first one and puts it in the input containersToUse collection. It also removes the ContainerRequest so it is not submitted again (it uses the internal AMRMClient[ContainerRequest]).

Otherwise, it puts the input allocatedContainer in the input remaining collection.

processCompletedContainers Method

processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit

processCompletedContainers accepts a collection of YARN’s ContainerStatus'es.

Note

ContainerStatus represents the current status of a YARN Container and provides details such as:

  • Id

  • State

  • Exit status of a completed container.

  • Diagnostic message for a failed container.

For each completed container in the collection, processCompletedContainers removes it from the internal releasedContainers registry.

It looks the host of the container up (in the internal allocatedContainerToHostMap lookup table). The host may or may not exist in the lookup table.

Caution
FIXME The host may or may not exist in the lookup table?

The ExecutorExited exit reason is computed.

When the host of the completed container has been found, the internal numExecutorsRunning counter is decremented.

You should see the following INFO message in the logs:

INFO Completed container [containerId] [host] (state: [containerState], exit status: [containerExitStatus])

For ContainerExitStatus.SUCCESS and ContainerExitStatus.PREEMPTED exit statuses of the container (which are not considered application failures), you should see one of the two possible INFO messages in the logs:

INFO Executor for container [id] exited because of a YARN event (e.g., pre-emption) and not because of an error in the running job.
INFO Container [id] [host] was preempted.

Other exit statuses of the container are considered application failures and reported as a WARN message in the logs:

WARN Container killed by YARN for exceeding memory limits. [diagnostics] Consider boosting spark.yarn.executor.memoryOverhead.

or

WARN Container marked as failed: [id] [host]. Exit status: [containerExitStatus]. Diagnostics: [containerDiagnostics]

The host is looked up in the internal allocatedHostToContainersMap lookup table. If found, the container is removed from the containers registered for the host or the host itself is removed from the lookup table when this container was the last on the host.

The container is removed from the internal allocatedContainerToHostMap lookup table.

The container is removed from the internal containerIdToExecutorId translation table. If an executor is found, it is removed from the internal executorIdToContainer translation table.

If the executor was recorded in the internal pendingLossReasonRequests lookup table, the exit reason (as calculated earlier as ExecutorExited) is sent back for every pending RPC message recorded.

If no executor was found, the executor and the exit reason are recorded in the internal releasedExecutorLossReasons lookup table.

In case the container was not in the internal releasedContainers registry, the internal numUnexpectedContainerRelease counter is increased and a RemoveExecutor RPC message is sent to the driver (as specified when YarnAllocator was created) to notify about the failure of the executor.

Requesting and Allocating YARN Resource Containers to Spark Executors (and Cancelling Outstanding Containers) — allocateResources Method

allocateResources(): Unit

allocateResources claims new resource containers from YARN ResourceManager and cancels any outstanding resource container requests.

Note
In YARN, you first have to submit requests for YARN resource containers to YARN ResourceManager (using AMRMClient.addContainerRequest) before claiming them by calling AMRMClient.allocate.

allocateResources then claims the containers (using the internal reference to YARN’s AMRMClient) with progress indicator of 0.1f.

You can see the exact moment in the YARN console for the Spark application with the progress bar at 10%.

spark yarn console progress 10.png
Figure 5. YARN Console after Allocating YARN Containers (Progress at 10%)

If the number of allocated containers is greater than 0, you should see the following DEBUG message in the logs (in stderr on YARN):

DEBUG YarnAllocator: Allocated containers: [allocatedContainersSize]. Current executor count: [numExecutorsRunning]. Cluster resources: [availableResources].

If the number of completed containers is greater than 0, you should see the following DEBUG message in the logs (in stderr on YARN):

DEBUG YarnAllocator: Completed [completedContainersSize] containers

allocateResources processes completed containers.

You should see the following DEBUG message in the logs (in stderr on YARN):

DEBUG YarnAllocator: Finished processing [completedContainersSize] completed containers. Current running executor count: [numExecutorsRunning].
Note
allocateResources is used when ApplicationMaster is registered to the YARN ResourceManager and launches progress Reporter thread.

results matching ""

    No results matching ""