Client is a handle to a YARN cluster to submit ApplicationMaster (that represents a Spark application submitted to a YARN cluster).

spark yarn Client YarnClient.png
Figure 1. Client and Hadoop’s YarnClient Interactions

Depending on the deploy mode it uses ApplicationMaster or ApplicationMaster’s wrapper ExecutorLauncher by their class names in a ContainerLaunchContext (that represents all of the information needed by the YARN NodeManager to launch a container).

Client was initially used as a standalone application to submit Spark applications to a YARN cluster, but is currently considered obsolete.
Table 1. Client’s Internal Properties
Name Initial Value Description


spark.yarn.executor.memoryOverhead and falls back to 10% of the spark.executor.memory or 384 whatever is larger.


NOTE: 10% and 384 are constants and cannot be changed.


Enable INFO or DEBUG logging level for org.apache.spark.deploy.yarn.Client logger to see what happens inside.

Add the following line to conf/

Refer to Logging.

isUserClassPathFirst Method


getUserClasspath Method




Setting Up Environment to Launch ApplicationMaster Container — setupLaunchEnv Method


launcherBackend Property


loginFromKeytab Method


Creating Client Instance

Creating an instance of Client does the following:

  • Creates an internal instance of YarnClient (using YarnClient.createYarnClient) that becomes yarnClient.

  • Creates an internal instance of YarnConfiguration (using YarnConfiguration and the input hadoopConf) that becomes yarnConf.

  • Sets the internal isClusterMode that says whether spark.submit.deployMode is cluster deploy mode.

Submitting Spark Application to YARN — submitApplication Method

submitApplication(): ApplicationId

submitApplication submits a Spark application (represented by ApplicationMaster) to a YARN cluster (i.e. to the YARN ResourceManager) and returns the application’s ApplicationId.

submitApplication is also used in the currently-deprecated

Internally, it executes LauncherBackend.connect first and then executes Client.setupCredentials to set up credentials for future calls.

It then inits the internal yarnClient (with the internal yarnConf) and starts it. All this happens using Hadoop API.

FIXME How to configure YarnClient? What is YARN’s YarnClient.getYarnClusterMetrics?

You should see the following INFO in the logs:

INFO Client: Requesting a new application from cluster with [count] NodeManagers

It then YarnClient.createApplication() to create a new application in YARN and obtains the application id.

The LauncherBackend instance changes state to SUBMITTED with the application id.

FIXME Why is this important?

submitApplication verifies whether the cluster has resources for the ApplicationManager (using verifyClusterResources).

You should see the following INFO message in the logs:

INFO Client: Submitting application [appId] to ResourceManager

submitApplication submits the new YARN ApplicationSubmissionContext for ApplicationMaster to YARN (using Hadoop’s YarnClient.submitApplication).

It returns the YARN ApplicationId for the Spark application (represented by ApplicationMaster).

submitApplication is used when Client runs or YarnClientSchedulerBackend is started.

Creating YARN ApplicationSubmissionContext — createApplicationSubmissionContext Method

  newApp: YarnClientApplication,
  containerContext: ContainerLaunchContext): ApplicationSubmissionContext

createApplicationSubmissionContext creates YARN’s ApplicationSubmissionContext.

YARN’s ApplicationSubmissionContext represents all of the information needed by the YARN ResourceManager to launch the ApplicationMaster for a Spark application.

createApplicationSubmissionContext uses YARN’s YarnClientApplication (as the input newApp) to create a ApplicationSubmissionContext.

createApplicationSubmissionContext sets the following information in the ApplicationSubmissionContext:

The name of the Spark application configuration setting or Spark if not set

Queue (to which the Spark application is submitted)

spark.yarn.queue configuration setting

ContainerLaunchContext (that describes the Container with which the ApplicationMaster for the Spark application is launched)

the input containerContext

Type of the Spark application


Tags for the Spark application

spark.yarn.tags configuration setting

Number of max attempts of the Spark application to be submitted.

spark.yarn.maxAppAttempts configuration setting

The attemptFailuresValidityInterval in milliseconds for the Spark application configuration setting

Resource Capabilities for ApplicationMaster for the Spark application

See Resource Capabilities for ApplicationMaster — Memory and Virtual CPU Cores section below

Rolled Log Aggregation for the Spark application

See Rolled Log Aggregation Configuration for Spark Application section below

You will see the DEBUG message in the logs when the setting is not set:

DEBUG spark.yarn.maxAppAttempts is not set. Cluster's default value will be used.

Resource Capabilities for ApplicationMaster — Memory and Virtual CPU Cores

YARN’s Resource models a set of computer resources in the cluster. Currently, YARN supports resources with memory and virtual CPU cores capabilities only.

The requested YARN’s Resource for the ApplicationMaster for a Spark application is the sum of amMemory and amMemoryOverhead for the memory and amCores for the virtual CPU cores.

Besides, if is set, a new YARN ResourceRequest is created (for the ApplicationMaster container) that includes:

Resource Name

* (star) that represents no locality.




The resource capabilities as defined above.

Number of containers


Node label expression configuration setting

ResourceRequest of AM container configuration setting

It sets the resource request to this new YARN ResourceRequest detailed in the table above.

Rolled Log Aggregation for Spark Application

YARN’s LogAggregationContext represents all of the information needed by the YARN NodeManager to handle the logs for an application.

If spark.yarn.rolledLog.includePattern is defined, it creates a YARN LogAggregationContext with the following patterns:

Include Pattern

spark.yarn.rolledLog.includePattern configuration setting

Exclude Pattern

spark.yarn.rolledLog.excludePattern configuration setting

Verifying Maximum Memory Capability of YARN Cluster — verifyClusterResources Internal Method

verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit

verifyClusterResources is a private helper method that submitApplication uses to ensure that the Spark application (as a set of ApplicationMaster and executors) is not going to request more than the maximum memory capability of the YARN cluster. If so, it throws an IllegalArgumentException.

verifyClusterResources queries the input GetNewApplicationResponse (as newAppResponse) for the maximum memory.

INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster ([maximumMemory] MB per container)

If the maximum memory capability is above the required executor or ApplicationMaster memory, you should see the following INFO message in the logs:

INFO Client: Will allocate AM container, with [amMem] MB memory including [amMemoryOverhead] MB overhead

If however the executor memory (as a sum of spark.executor.memory and spark.yarn.executor.memoryOverhead settings) is more than the maximum memory capability, verifyClusterResources throws an IllegalArgumentException with the following message:

Required executor memory ([executorMemory]+[executorMemoryOverhead] MB) is above the max threshold ([maximumMemory] MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.

If the required memory for ApplicationMaster is more than the maximum memory capability, verifyClusterResources throws an IllegalArgumentException with the following message:

Required AM memory ([amMemory]+[amMemoryOverhead] MB) is above the max threshold ([maximumMemory] MB) of this cluster! Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.

Creating YARN ContainerLaunchContext to Launch ApplicationMaster — createContainerLaunchContext Internal Method

createContainerLaunchContext(newAppResponse: GetNewApplicationResponse): ContainerLaunchContext
The input GetNewApplicationResponse is Hadoop YARN’s GetNewApplicationResponse.

When a Spark application is submitted to YARN, it calls the private helper method createContainerLaunchContext that creates a YARN ContainerLaunchContext request for YARN NodeManager to launch ApplicationMaster (in a container).

When called, you should see the following INFO message in the logs:

INFO Setting up container launch context for our AM

It gets at the application id (from the input newAppResponse).

It calculates the path of the application’s staging directory.

FIXME What’s appStagingBaseDir?

It does a custom step for a Python application.

It sets up an environment to launch ApplicationMaster container and prepareLocalResources. A ContainerLaunchContext record is created with the environment and the local resources.

The JVM options are calculated as follows:

--class is set if in cluster mode based on --class command-line argument.


If --jar command-line argument was specified, it is set as --jar.

In cluster deploy mode, org.apache.spark.deploy.yarn.ApplicationMaster is created while in client deploy mode it is org.apache.spark.deploy.yarn.ExecutorLauncher.

If --arg command-line argument was specified, it is set as --arg.

The entire ApplicationMaster argument line (as amArgs) is of the form:

[amClassName] --class [userClass] --jar [userJar] --arg [userArgs] --properties-file [propFile]

The entire command line is of the form:

FIXME prefixEnv? How is path calculated? ApplicationConstants.LOG_DIR_EXPANSION_VAR?
[JAVA_HOME]/bin/java -server [javaOpts] [amArgs] 1> [LOG_DIR]/stdout 2> [LOG_DIR]/stderr

The command line to launch a ApplicationMaster is set to the ContainerLaunchContext record (using setCommands).

You should see the following DEBUG messages in the logs:

DEBUG Client: ===============================================================================
DEBUG Client: YARN AM launch context:
DEBUG Client:     user class: N/A
DEBUG Client:     env:
DEBUG Client:         [launchEnv]
DEBUG Client:     resources:
DEBUG Client:         [localResources]
DEBUG Client:     command:
DEBUG Client:         [commands]
DEBUG Client: ===============================================================================

A SecurityManager is created and set as the application’s ACLs.

FIXME setApplicationACLs? Set up security tokens?
createContainerLaunchContext is used when Client submits a Spark application to a YARN cluster.

prepareLocalResources Method

  destDir: Path,
  pySparkArchives: Seq[String]): HashMap[String, LocalResource]

prepareLocalResources is…​FIXME

FIXME Describe credentialManager

When called, prepareLocalResources prints out the following INFO message to the logs:

INFO Client: Preparing resources for our AM container
FIXME What’s a delegation token?

After all the security delegation tokens are obtained and only when there are any, you should see the following DEBUG message in the logs:

DEBUG Client: [token1]
DEBUG Client: [token2]
DEBUG Client: [tokenN]
FIXME Where is credentials assigned?

If a keytab is used to log in and the nearest time of the next renewal is in the future, prepareLocalResources sets the internal spark.yarn.credentials.renewalTime and spark.yarn.credentials.updateTime times for renewal and update security tokens.

It gets the replication factor (using spark.yarn.submit.file.replication setting) or falls back to the default value for the input destDir.

The replication factor is only used for copyFileToRemote later. Perhaps it should not be mentioned here (?)

It creates the input destDir (on a HDFS-compatible file system) with 0700 permission (rwx------), i.e. inaccessible to all but its owner and the superuser so the owner only can read, write and execute. It uses Hadoop’s Path.getFileSystem to access Hadoop’s FileSystem that owns destDir (using the constructor’s hadoopConf — Hadoop’s Configuration).

See org.apache.hadoop.fs.FileSystem to know a list of HDFS-compatible file systems, e.g. Amazon S3 or Windows Azure.
FIXME if (loginFromKeytab)
FIXME Describe case Some(jars)

If neither spark.yarn.archive nor spark.yarn.jars is set, you should see the following WARN message in the logs:

WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.

It then finds the directory with jar files under SPARK_HOME (using YarnCommandBuilderUtils.findJarsDir).

FIXME YarnCommandBuilderUtils.findJarsDir

And all the jars are zipped to a temporary archive, e.g. that is distribute as ARCHIVE to spark_libs (only when they differ).

If a user jar (--jar) was specified on command line, the jar is distribute as FILE to app.jar.

It then distributes additional resources specified in SparkConf for the application, i.e. jars (under spark.yarn.dist.jars), files (under spark.yarn.dist.files), and archives (under spark.yarn.dist.archives).

The additional files to distribute can be defined using spark-submit using command-line options --jars, --files, and --archives.
FIXME Describe distribute

It sets spark.yarn.secondary.jars for the jars that have localized path (non-local paths) or their path (for local paths).

It updates Spark configuration (with internal configuration settings using the internal distCacheMgr reference).

FIXME Where are they used? It appears they are required for ApplicationMaster when it prepares local resources, but what is the sequence of calls to lead to ApplicationMaster?

It uploads to the input destDir and sets spark.yarn.cache.confArchive

It creates configuration archive and copyFileToRemote(destDir, localConfArchive, replication, force = true, destName = Some(LOCALIZED_CONF_ARCHIVE)).

FIXME copyFileToRemote(destDir, localConfArchive, replication, force = true, destName = Some(LOCALIZED_CONF_ARCHIVE))?

It adds a cache-related resource (using the internal distCacheMgr).

FIXME What resources? Where? Why is this needed?

Ultimately, it clears the cache-related internal configuration settings — spark.yarn.cache.filenames, spark.yarn.cache.sizes, spark.yarn.cache.timestamps, spark.yarn.cache.visibilities, spark.yarn.cache.types, spark.yarn.cache.confArchive — from the SparkConf configuration since they are internal and should not "pollute" the web UI’s environment page.

The localResources are returned.

FIXME How is localResources calculated?

Creating Archive With Configuration Files and Spark Configuration — createConfArchive Internal Method

createConfArchive(): File

createConfArchive is a private helper method that prepareLocalResources uses to create an archive with the local config files — and (before distributing it and the other files for ApplicationMaster and executors to use on a YARN cluster).

The archive will also contain all the files under HADOOP_CONF_DIR and YARN_CONF_DIR environment variables (if defined).

Additionally, the archive contains a with the current Spark configuration.

The archive is a temporary file with the spark_conf prefix and .zip extension with the files above.

Copying File to Remote File System — copyFileToRemote Method

  destDir: Path,
  srcPath: Path,
  replication: Short,
  force: Boolean = false,
  destName: Option[String] = None): Path

copyFileToRemote is a private[yarn] method to copy srcPath to the remote file system destDir (if needed) and return the destination path resolved following symlinks and mount points.

It is exclusively used in prepareLocalResources.

Unless force is enabled (it is disabled by default), copyFileToRemote will only copy srcPath when the source (of srcPath) and target (of destDir) file systems are the same.

You should see the following INFO message in the logs:

INFO Client: Uploading resource [srcPath] -> [destPath]

copyFileToRemote copies srcPath to destDir and sets 644 permissions, i.e. world-wide readable and owner writable.

If force is disabled or the files are the same, copyFileToRemote will only print out the following INFO message to the logs:

INFO Client: Source and destination file systems are the same. Not copying [srcPath]

Ultimately, copyFileToRemote returns the destination path resolved following symlinks and mount points.

Populating CLASSPATH for ApplicationMaster and Executors — populateClasspath Method

  args: ClientArguments,
  conf: Configuration,
  sparkConf: SparkConf,
  env: HashMap[String, String],
  extraClassPath: Option[String] = None): Unit

populateClasspath is a private[yarn] helper method that populates the CLASSPATH (for ApplicationMaster and executors).

The input args is null when preparing environment for ExecutorRunnable and the constructor’s args for Client.
  1. The optional extraClassPath (which is first changed to include paths on YARN cluster machines).

    extraClassPath corresponds to spark.driver.extraClassPath for the driver and spark.executor.extraClassPath for executors.
  2. YARN’s own Environment.PWD

  3. __spark_conf__ directory under YARN’s Environment.PWD

  4. If the deprecated spark.yarn.user.classpath.first is set, …​FIXME

  5. __spark_libs__/* under YARN’s Environment.PWD

  6. (unless the optional spark.yarn.archive is defined) All the local jars in spark.yarn.jars (which are first changed to be paths on YARN cluster machines).

  7. All the entries from YARN’s yarn.application.classpath or YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH (if yarn.application.classpath is not set)

  8. All the entries from YARN’s mapreduce.application.classpath or MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH (if mapreduce.application.classpath not set).

  9. SPARK_DIST_CLASSPATH (which is first changed to include paths on YARN cluster machines).


You should see the result of executing populateClasspath when you enable DEBUG logging level for the org.apache.spark.deploy.yarn.Client logger, i.e.

DEBUG Client:     env:
DEBUG Client:         CLASSPATH -> <CPS>/__spark_conf__<CPS>/__spark_libs__/*<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/*<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/lib/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*

Changing Path to be YARN NodeManager-aware — getClusterPath Method

getClusterPath(conf: SparkConf, path: String): String

getClusterPath replaces any occurences of spark.yarn.config.gatewayPath in path to the value of spark.yarn.config.replacementPath.

Adding CLASSPATH Entry to Environment — addClasspathEntry Method

addClasspathEntry(path: String, env: HashMap[String, String]): Unit

addClasspathEntry is a private helper method to add the input path to CLASSPATH key in the input env.

Distributing Files to Remote File System — distribute Internal Method

  path: String,
  resType: LocalResourceType = LocalResourceType.FILE,
  destName: Option[String] = None,
  targetDir: Option[String] = None,
  appMasterOnly: Boolean = false): (Boolean, String)

distribute is an internal helper method that prepareLocalResources uses to find out whether the input path is of local: URI scheme and return a localized path for a non-local path, or simply the input path for a local one.

distribute returns a pair with the first element being a flag for the input path being local or non-local, and the other element for the local or localized path.

For local path that was not distributed already, distribute copies the input path to remote file system (if needed) and adds path to the application’s distributed cache.

Joining Path Components using Path.SEPARATOR — buildPath Method

buildPath(components: String*): String

buildPath is a helper method to join all the path components using the directory separator, i.e. org.apache.hadoop.fs.Path.SEPARATOR.

isClusterMode Internal Flag

isClusterMode is an internal flag that says whether the Spark application runs in cluster or client deploy mode. The flag is enabled for cluster deploy mode, i.e. true.

Since a Spark application requires different settings per deploy mode, isClusterMode flag effectively "splits" Client on two parts per deploy mode — one responsible for client and the other for cluster deploy mode.
FIXME Replace the internal fields used below with their true meanings.
Table 2. Internal Attributes of Client per Deploy Mode (isClusterMode flag)
Internal attribute cluster deploy mode client deploy mode










spark.driver.extraLibraryPath and spark.driver.libraryPath

--class command-line argument for ApplicationMaster


Application master class



When the isClusterMode flag is enabled, the internal reference to YARN’s YarnClient is used to stop application.

When the isClusterMode flag is enabled (and spark.yarn.submit.waitAppCompletion is disabled), so is fireAndForget internal flag.


SPARK_YARN_MODE flag controls…​FIXME

Any environment variable with the SPARK_ prefix is propagated to all (remote) processes.
FIXME Where is SPARK_ prefix rule enforced?
SPARK_YARN_MODE is a system property (i.e. available using System.getProperty) and a environment variable (i.e. available using System.getenv or sys.env). See YarnSparkHadoopUtil.
FIXME Why is this needed? git blame it.

SPARK_YARN_MODE flag is checked when YarnSparkHadoopUtil or SparkHadoopUtil are accessed.

It is cleared later when Client is requested to stop.

Internal Hadoop’s YarnClient — yarnClient Property

val yarnClient = YarnClient.createYarnClient

yarnClient is a private internal reference to Hadoop’s YarnClient that Client uses to create and submit a YARN application (for your Spark application), killApplication.

yarnClient is inited and started when Client submits a Spark application to a YARN cluster.

yarnClient is stopped when Client stops.

Launching Client Standalone Application — main Method

main method is invoked while a Spark application is being deployed to a YARN cluster.

It is executed by spark-submit with --master yarn command-line argument.

When you start the main method when starting the Client standalone application, say using ./bin/spark-class org.apache.spark.deploy.yarn.Client, you will see the following WARN message in the logs unless you set SPARK_SUBMIT system property.

WARN Client: WARNING: This client is deprecated and will be removed in a future version of Spark. Use ./bin/spark-submit with "--master yarn"

main turns SPARK_YARN_MODE flag on.

It then instantiates SparkConf, parses command-line arguments (using ClientArguments) and passes the call on to method.

Stopping Client (with LauncherBackend and YarnClient) — stop Method

stop(): Unit

stop closes the internal LauncherBackend and stops the internal YarnClient.

It also clears SPARK_YARN_MODE flag (to allow switching between cluster types).

Running Client — run Method

If LauncherBackend is not connected to a RM, i.e. LauncherBackend.isConnected returns false, and fireAndForget is enabled, …​FIXME

FIXME When could LauncherBackend lost the connection since it was connected in submitApplication?
FIXME What is fireAndForget?

Otherwise, when LauncherBackend is connected or fireAndForget is disabled, monitorApplication is called. It returns a pair of yarnApplicationState and finalApplicationStatus that is checked against three different state pairs and throw a SparkException:

  • YarnApplicationState.KILLED or FinalApplicationStatus.KILLED lead to SparkException with the message "Application [appId] is killed".

  • YarnApplicationState.FAILED or FinalApplicationStatus.FAILED lead to SparkException with the message "Application [appId] finished with failed status".

  • FinalApplicationStatus.UNDEFINED leads to SparkException with the message "The final status of application [appId] is undefined".

FIXME What are YarnApplicationState and FinalApplicationStatus statuses?

monitorApplication Method

  appId: ApplicationId,
  returnOnRunning: Boolean = false,
  logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus)

monitorApplication continuously reports the status of a Spark application appId every until the application state is one of the following YarnApplicationState:

  • RUNNING (when returnOnRunning is enabled)




It is used in run, YarnClientSchedulerBackend.waitForApplication and

It gets the application’s report from the YARN ResourceManager to obtain YarnApplicationState of the ApplicationMaster.

It uses Hadoop’s YarnClient.getApplicationReport(appId).

Unless logApplicationReport is disabled, it prints the following INFO message to the logs:

INFO Client: Application report for [appId] (state: [state])

If logApplicationReport and DEBUG log level are enabled, it prints report details every time interval to the logs:

16/04/23 13:21:36 INFO Client:
	 client token: N/A
	 diagnostics: N/A
	 ApplicationMaster host: N/A
	 ApplicationMaster RPC port: -1
	 queue: default
	 start time: 1461410495109
	 final status: UNDEFINED
	 tracking URL: http://japila.local:8088/proxy/application_1461410200840_0001/
	 user: jacek

For INFO log level it prints report details only when the application state changes.

When the application state changes, LauncherBackend is notified (using LauncherBackend.setState).

The application state is an instance of Hadoop’s YarnApplicationState.

For states FINISHED, FAILED or KILLED, cleanupStagingDir is called and the method finishes by returning a pair of the current state and the final application status.

If returnOnRunning is enabled (it is disabled by default) and the application state turns RUNNING, the method returns a pair of the current state RUNNING and the final application status.

cleanupStagingDir won’t be called when returnOnRunning is enabled and an application turns RUNNING. I guess it is likely a left-over since the Client is deprecated now.

The current state is recorded for future checks (in the loop).

cleanupStagingDir Method

cleanupStagingDir clears the staging directory of an application.

It is used in submitApplication when there is an exception and monitorApplication when an application finishes and the method quits.

It uses spark.yarn.stagingDir setting or falls back to a user’s home directory for the staging directory. If cleanup is enabled, it deletes the entire staging directory for the application.

You should see the following INFO message in the logs:

INFO Deleting staging directory [stagingDirPath]

reportLauncherState Method

reportLauncherState(state: SparkAppHandle.State): Unit

reportLauncherState merely passes the call on to LauncherBackend.setState.

What does setState do?

results matching ""

    No results matching ""