UnsafeShuffleWriter — ShuffleWriter for SerializedShuffleHandle

UnsafeShuffleWriter is a concrete ShuffleWriter for SerializedShuffleHandles:

UnsafeShuffleWriter is created exclusively when SortShuffleManager is requested for a ShuffleWriter for a SerializedShuffleHandle.

UnsafeShuffleWriter can use a specialized NIO-based fast merge procedure that avoids extra serialization/deserialization when spark.file.transferTo configuration property (default: true) is enabled.

UnsafeShuffleWriter uses the initial buffer size for sorting (default: 4096) when creating a ShuffleExternalSorter (when requested to open).

Tip
Use spark.shuffle.sort.initialBufferSize configuration property to change the default buffer size.

UnsafeShuffleWriter uses a fixed buffer size for the output stream of serialized data written into a byte array (default: 1024 * 1024).

UnsafeShuffleWriter uses the spark.shuffle.file.buffer configuration property (default: 32k) for…​FIXME

UnsafeShuffleWriter uses the spark.shuffle.unsafe.file.output.buffer configuration property (default: 32k) for…​FIXME

Tip

Enable ALL logging levels for org.apache.spark.shuffle.sort.UnsafeShuffleWriter logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.shuffle.sort.UnsafeShuffleWriter=ALL

Refer to Logging.

mergeSpillsWithTransferTo Internal Method

long[] mergeSpillsWithTransferTo(
  SpillInfo[] spills,
  File outputFile)

mergeSpillsWithTransferTo…​FIXME

Note
mergeSpillsWithTransferTo is used exclusively when UnsafeShuffleWriter is requested to mergeSpills (with the transferToEnabled flag enabled and no encryption).

Forcing ShuffleExternalSorter to Spill To Disk (Freeing Up Execution Memory) — forceSorterToSpill Internal Method

void forceSorterToSpill()

forceSorterToSpill simply requests the ShuffleExternalSorter to spill to disk (and free up execution memory).

Note
forceSorterToSpill is used exclusively for testing.

mergeSpills Internal Method

long[] mergeSpills(
  SpillInfo[] spills,
  File outputFile)

mergeSpills…​FIXME

Note
mergeSpills is used exclusively when UnsafeShuffleWriter is requested to close internal resources and write out merged spill files.

updatePeakMemoryUsed Internal Method

void updatePeakMemoryUsed()

updatePeakMemoryUsed…​FIXME

Note
updatePeakMemoryUsed is used when UnsafeShuffleWriter is requested for the peak memory used and to close internal resources and write out merged spill files

Writing Records to Shuffle System — write Method

void write(Iterator<Product2<K, V>> records)
Note
write is part of the ShuffleWriter Contract to write records to a shuffle system.

Internally, write traverses the input sequence of records (for a RDD partition) and insertRecordIntoSorter one by one. When all the records have been processed, write closes internal resources and writes spill files merged.

Caution
FIXME

Stopping ShuffleWriter — stop Method

Option<MapStatus> stop(boolean success)
Note
stop is part of the ShuffleWriter Contract to stop the ShuffleWriter.

stop…​FIXME

Creating UnsafeShuffleWriter Instance

UnsafeShuffleWriter takes the following to be created:

UnsafeShuffleWriter requests the SerializedShuffleHandle for the ShuffleDependency that is then requested for the Partitioner and, in the end, for the number of partitions. UnsafeShuffleWriter makes sure that the number of shuffle output partitions is below (1 << 24) partition identifiers that can be encoded and throws an IllegalArgumentException if not met:

UnsafeShuffleWriter can only be used for shuffles with at most 16777215 reduce partitions
Note
The number of shuffle output partitions is first enforced when SortShuffleManager checks if SerializedShuffleHandle can be used for ShuffleHandle (that eventually leads to UnsafeShuffleWriter).

Opening UnsafeShuffleWriter (Creating ShuffleExternalSorter and SerializationStream) — open Internal Method

void open()

open makes sure that the internal reference to ShuffleExternalSorter (as sorter) is not defined and creates one itself.

open creates a new byte array output stream (as serBuffer) with the buffer capacity of 1M.

open creates a new SerializationStream for the new byte array output stream using SerializerInstance.

Note
SerializerInstance was defined when UnsafeShuffleWriter was created (and is exactly the one used to create the ShuffleDependency).
Note
open is used exclusively when UnsafeShuffleWriter is created.

Inserting Record Into ShuffleExternalSorter — insertRecordIntoSorter Method

void insertRecordIntoSorter(Product2<K, V> record)
Note
Partitioner is defined when UnsafeShuffleWriter is created.

insertRecordIntoSorter then writes the key and the value of the input record to SerializationStream and calculates the size of the serialized buffer.

Note
SerializationStream is created when UnsafeShuffleWriter opens.

In the end, insertRecordIntoSorter inserts the serialized buffer to ShuffleExternalSorter (as Platform.BYTE_ARRAY_OFFSET ).

Note
ShuffleExternalSorter is created when UnsafeShuffleWriter opens.
Note
insertRecordIntoSorter is used exclusively when UnsafeShuffleWriter is requested to write records.

Closing Internal Resources and Writing Out Merged Spill Files — closeAndWriteOutput Method

void closeAndWriteOutput()

closeAndWriteOutput first updates peak memory used.

closeAndWriteOutput removes the internal ByteArrayOutputStream and SerializationStream.

closeAndWriteOutput removes the internal ShuffleExternalSorter.

closeAndWriteOutput requests IndexShuffleBlockResolver for the data file for the shuffleId and mapId.

closeAndWriteOutput creates a temporary file to merge spill files, deletes them afterwards, and requests IndexShuffleBlockResolver to write index file and commit.

closeAndWriteOutput creates a MapStatus with the location of the executor’s BlockManager and partition lengths in the merged file.

If there is an issue with deleting spill files, you should see the following ERROR message in the logs:

ERROR Error while deleting spill file [path]

If there is an issue with deleting the temporary file, you should see the following ERROR message in the logs:

ERROR Error while deleting temp file [path]
Note
closeAndWriteOutput is used exclusively when UnsafeShuffleWriter is requested to write records.

mergeSpillsWithFileStream Internal Method

long[] mergeSpillsWithFileStream(
  SpillInfo[] spills,
  File outputFile,
  @Nullable CompressionCodec compressionCodec)

mergeSpillsWithFileStream…​FIXME

Note
mergeSpillsWithFileStream is used exclusively when UnsafeShuffleWriter is requested to mergeSpills.

Getting Peak Memory Used — getPeakMemoryUsedBytes Method

long getPeakMemoryUsedBytes()

getPeakMemoryUsedBytes simply updatePeakMemoryUsed and returns the internal peakMemoryUsedBytes registry.

Note
getPeakMemoryUsedBytes is used exclusively when UnsafeShuffleWriter is requested to stop.

Internal Properties

Table 1. UnsafeShuffleWriter’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

mapStatus

Created when UnsafeShuffleWriter is requested to close internal resources and write out merged spill files (with the BlockManagerId of the BlockManager and partitionLengths)

Returned when UnsafeShuffleWriter is requested to stop

partitioner

Used when UnsafeShuffleWriter is requested for the following:

peakMemoryUsedBytes

Peak memory used (in bytes) that is updated exclusively in updatePeakMemoryUsed (after requesting the ShuffleExternalSorter for getPeakMemoryUsedBytes)

Use getPeakMemoryUsedBytes to access the current value

serBuffer

java.io.ByteArrayOutputStream of serialized data (written into a byte array of 1MB initial size)

Used when UnsafeShuffleWriter is requested for the following:

serializer

SerializerInstance (that is a new instance of the Serializer of the ShuffleDependency of the SerializedShuffleHandle)

Used exclusively when UnsafeShuffleWriter is requested to open (and creates the SerializationStream)

serOutputStream

SerializationStream (that is created when the SerializerInstance is requested to serializeStream with the ByteArrayOutputStream)

Used exclusively when UnsafeShuffleWriter is requested to insertRecordIntoSorter

shuffleId

sorter

writeMetrics

Used when UnsafeShuffleWriter is requested for the following:

results matching ""

    No results matching ""