ExternalShuffleService is an external shuffle service that serves shuffle blocks from outside an Executor process. It runs as a standalone application and manages shuffle output files so they are available for executors at all time. As the shuffle output files are managed externally to the executors it offers an uninterrupted access to the shuffle output files regardless of executors being killed or down.

You start ExternalShuffleService using start-shuffle-service.sh shell script and enable its use by the driver and executors using spark.shuffle.service.enabled.

There is a custom external shuffle service for Spark on YARN — YarnShuffleService.

Enable INFO logging level for org.apache.spark.deploy.ExternalShuffleService logger to see what happens inside.

Add the following line to conf/log4j.properties:


Refer to Logging.

start-shuffle-service.sh Shell Script


start-shuffle-service.sh shell script allows you to launch ExternalShuffleService. The script is under sbin directory.

When executed, it runs sbin/spark-config.sh and bin/load-spark-env.sh shell scripts. It then executes sbin/spark-daemon.sh with start command and the parameters: org.apache.spark.deploy.ExternalShuffleService and 1.

$ ./sbin/start-shuffle-service.sh
starting org.apache.spark.deploy.ExternalShuffleService, logging to ...logs/spark-jacek-org.apache.spark.deploy.ExternalShuffleService-1-japila.local.out

$ tail -f ...logs/spark-jacek-org.apache.spark.deploy.ExternalShuffleService-1-japila.local.out
Spark Command: /Library/Java/JavaVirtualMachines/Current/Contents/Home/bin/java -cp /Users/jacek/dev/oss/spark/conf/:/Users/jacek/dev/oss/spark/assembly/target/scala-2.11/jars/* -Xmx1g org.apache.spark.deploy.ExternalShuffleService
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/07 08:02:02 INFO ExternalShuffleService: Started daemon with process name: 42918@japila.local
16/06/07 08:02:03 INFO ExternalShuffleService: Starting shuffle service on port 7337 with useSasl = false

You can also use spark-class to launch ExternalShuffleService.

spark-class org.apache.spark.deploy.ExternalShuffleService

Launching ExternalShuffleService — main Method

When started, it executes Utils.initDaemon(log).

FIXME Utils.initDaemon(log)? See spark-submit.

It loads default Spark properties and creates a SecurityManager.

A ExternalShuffleService is created and started.

A shutdown hook is registered so when ExternalShuffleService is shut down, it prints the following INFO message to the logs and the stop method is executed.

INFO ExternalShuffleService: Shutting down shuffle service.

Enable DEBUG logging level for org.apache.spark.network.shuffle.ExternalShuffleBlockResolver logger to see what happens inside.

Add the following line to conf/log4j.properties:


Refer to Logging.

You should see the following INFO message in the logs:

INFO ExternalShuffleBlockResolver: Registered executor [AppExecId] with [executorInfo]

You should also see the following messages when a SparkContext is closed:

INFO ExternalShuffleBlockResolver: Application [appId] removed, cleanupLocalDirs = [cleanupLocalDirs]
INFO ExternalShuffleBlockResolver: Cleaning up executor [AppExecId]'s [executor.localDirs.length] local dirs
DEBUG ExternalShuffleBlockResolver: Successfully cleaned up directory: [localDir]

Creating ExternalShuffleService Instance

ExternalShuffleService requires a SparkConf and SecurityManager.

When created, it reads spark.shuffle.service.enabled (disabled by default) and spark.shuffle.service.port (defaults to 7337) configuration settings. It also checks whether authentication is enabled.

FIXME Review securityManager.isAuthenticationEnabled()

It then creates a TransportConf (as transportConf).

It creates a ExternalShuffleBlockHandler (as blockHandler) and TransportContext (as transportContext).

FIXME TransportContext?

No internal TransportServer (as server) is created.

Starting ExternalShuffleService — start Method

start(): Unit

start starts a ExternalShuffleService.

When start is executed, you should see the following INFO message in the logs:

INFO ExternalShuffleService: Starting shuffle service on port [port] with useSasl = [useSasl]

If useSasl is enabled, a SaslServerBootstrap is created.

FIXME SaslServerBootstrap?

The internal server reference (a TransportServer) is created (which will attempt to bind to port).

Stopping ExternalShuffleService — stop Method

stop(): Unit

stop closes the internal server reference and clears it (i.e. sets it to null).


ExternalShuffleBlockHandler is a RpcHandler (i.e. a handler for sendRPC() messages sent by TransportClients).

When created, ExternalShuffleBlockHandler requires a OneForOneStreamManager and TransportConf with a registeredExecutorFile to create a ExternalShuffleBlockResolver.


Enable TRACE logging level for org.apache.spark.network.shuffle.ExternalShuffleBlockHandler logger to see what happens inside.

Add the following line to conf/log4j.properties:


Refer to Logging.

handleMessage Method

  BlockTransferMessage msgObj,
  TransportClient client,
  RpcResponseCallback callback)

handleMessage handles two types of BlockTransferMessage messages:

For any other BlockTransferMessage message it throws a UnsupportedOperationException:

Unexpected message: [msgObj]


OpenBlocks(String appId, String execId, String[] blockIds)

When OpenBlocks is received, handleMessage authorizes the client.

FIXME checkAuth?

It then gets block data for each block id in blockIds (using ExternalShuffleBlockResolver).

Finally, it registers a stream and does callback.onSuccess with a serialized byte buffer (for the streamId and the number of blocks in msg).

FIXME callback.onSuccess?

You should see the following TRACE message in the logs:

TRACE Registered streamId [streamId] with [length] buffers for client [clientId] from host [remoteAddress]


RegisterExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo)




getBlockData Method

ManagedBuffer getBlockData(String appId, String execId, String blockId)

getBlockData parses blockId (in the format of shuffle_[shuffleId]_[mapId]_[reduceId]) and returns the FileSegmentManagedBuffer that corresponds to shuffle_[shuffleId]_[mapId]_0.data.

getBlockData splits blockId to 4 parts using _ (underscore). It works exclusively with shuffle block ids with the other three parts being shuffleId, mapId, and reduceId.

It looks up an executor (i.e. a ExecutorShuffleInfo in executors private registry) for appId and execId to search for a ManagedBuffer.

The ManagedBuffer is indexed using a binary file shuffle_[shuffleId]_[mapId]_0.index (that contains offset and length of the buffer) with a data file being shuffle_[shuffleId]_[mapId]_0.data (that is returned as FileSegmentManagedBuffer).

It throws a IllegalArgumentException for block ids with less than four parts:

Unexpected block id format: [blockId]

or for non-shuffle block ids:

Expected shuffle block id, got: [blockId]

It throws a RuntimeException when no ExecutorShuffleInfo could be found.

Executor is not registered (appId=[appId], execId=[execId])"


Table 1. Spark Properties
Spark Property Default Value Description



Enables External Shuffle Service. When true, the driver registers itself with the shuffle service.

Used to enable for dynamic allocation of executors and in CoarseMesosSchedulerBackend to instantiate MesosExternalShuffleClient.

Explicitly disabled for LocalSparkCluster (and any attempts to set it are ignored).



results matching ""

    No results matching ""