MemoryStore

MemoryStore is the memory store for blocks of data.

MemoryStore is created exclusively when BlockManager is created.

spark MemoryStore.png
Figure 1. Creating MemoryStore

The "idiom" to access the current MemoryStore is to request SparkEnv for the BlockManager that manages the MemoryStore.

SparkEnv.get.blockManager.memoryStore

MemoryStore uses Java’s java.util.LinkedHashMap with access-order ordering mode. In access-order, the order of iteration is the order in which the entries were last accessed, from least-recently accessed to most-recently. That gives LRU cache behaviour when evicting blocks.

MemoryStore uses spark.storage.unrollMemoryThreshold configuration property (default: 1024 * 1024 bytes) when requested to putIteratorAsValues and putIteratorAsBytes.

Table 1. MemoryStore’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

entries

Java’s java.util.LinkedHashMap of MemoryEntries per BlockId (with the initial capacity of 32, the load factor of 0.75 and access-order ordering mode, i.e. iteration is in the order in which its entries were last accessed, from least-recently accessed to most-recently).

Caution
FIXME Where are these dependencies used?
Tip

Enable INFO or DEBUG logging level for org.apache.spark.storage.memory.MemoryStore logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.memory.MemoryStore=DEBUG

Refer to Logging.

releaseUnrollMemoryForThisTask Method

releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit

releaseUnrollMemoryForThisTask…​FIXME

Note

releaseUnrollMemoryForThisTask is used when:

  • Task is requested to run (and cleans up after itself)

  • MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes

  • PartiallyUnrolledIterator is requested to releaseUnrollMemory

  • PartiallySerializedBlock is requested to discard and finishWritingToStream

getValues Method

getValues(blockId: BlockId): Option[Iterator[_]]

getValues does…​FIXME

getBytes Method

getBytes(blockId: BlockId): Option[ChunkedByteBuffer]

getBytes does…​FIXME

putIteratorAsBytes Method

putIteratorAsBytes[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long]

putIteratorAsBytes tries to put the blockId block in memory store as bytes.

Caution
FIXME

Removing Block

Caution
FIXME

Acquiring Storage Memory for Blocks — putBytes Method

putBytes[T](
  blockId: BlockId,
  size: Long,
  memoryMode: MemoryMode,
  _bytes: () => ChunkedByteBuffer): Boolean

putBytes requests storage memory for blockId from MemoryManager and registers the block in entries internal registry.

Internally, putBytes first makes sure that blockId block has not been registered already in entries internal registry.

Note

memoryMode can be ON_HEAP or OFF_HEAP and is a property of a StorageLevel.

import org.apache.spark.storage.StorageLevel._
scala> MEMORY_AND_DISK.useOffHeap
res0: Boolean = false

scala> OFF_HEAP.useOffHeap
res1: Boolean = true

If successful, putBytes "materializes" _bytes byte buffer and makes sure that the size is exactly size. It then registers a SerializedMemoryEntry (for the bytes and memoryMode) for blockId in the internal entries registry.

You should see the following INFO message in the logs:

INFO Block [blockId] stored as bytes in memory (estimated size [size], free [bytes])

putBytes returns true only after blockId was successfully registered in the internal entries registry.

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.storage.unrollMemoryThreshold

1024 * 1024

Initial per-task memory size needed to store a block in memory.

spark.storage.unrollMemoryThreshold should be at most the total amount of memory available for storage. If not, you should see the following WARN message in the logs:

Max memory [maxMemory] is less than the initial memory threshold [unrollMemoryThreshold] needed to store a block in memory. Please configure Spark with more memory.

Used when MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes.

Evicting Blocks From Memory — evictBlocksToFreeSpace Method

evictBlocksToFreeSpace(
  blockId: Option[BlockId],
  space: Long,
  memoryMode: MemoryMode): Long

evictBlocksToFreeSpace…​FIXME

Note
evictBlocksToFreeSpace is used when StorageMemoryPool is requested to acquireMemory and freeSpaceToShrinkPool.

Checking Whether Block Exists In MemoryStore — contains Method

contains(blockId: BlockId): Boolean

contains is positive (true) when the entries internal registry contains blockId key.

Note
contains is used when…​FIXME

putIteratorAsValues Method

putIteratorAsValues[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]

putIteratorAsValues makes sure that the BlockId does not exist or throws an IllegalArgumentException:

requirement failed: Block [blockId] is already present in the MemoryStore

putIteratorAsValues reserveUnrollMemoryForThisTask (with the initial memory threshold and ON_HEAP memory mode).

Caution
FIXME

putIteratorAsValues tries to put the blockId block in memory store as values.

Note
putIteratorAsValues is used when BlockManager stores bytes of a block or iterator of values of a block or when attempting to cache spilled values read from disk.

Creating MemoryStore Instance

MemoryStore takes the following when created:

MemoryStore initializes the internal registries and counters.

reserveUnrollMemoryForThisTask Method

reserveUnrollMemoryForThisTask(
  blockId: BlockId,
  memory: Long,
  memoryMode: MemoryMode): Boolean

reserveUnrollMemoryForThisTask acquires a lock on MemoryManager and requests it to acquireUnrollMemory.

Note
reserveUnrollMemoryForThisTask is used when MemoryStore is requested to putIteratorAsValues and putIteratorAsBytes.

Requesting Total Amount Of Memory Available For Storage (In Bytes) — maxMemory Internal Method

maxMemory: Long

maxMemory requests the MemoryManager for the current maxOnHeapStorageMemory and maxOffHeapStorageMemory, and simply returns their sum.

Tip

Enable INFO logging to find the maxMemory in the logs when MemoryStore is created:

MemoryStore started with capacity [maxMemory] MB
Note
maxMemory is used for logging purposes only.

results matching ""

    No results matching ""