MemoryManager — Memory Management System

MemoryManager is the base of memory managers that manage shared memory for task execution and block storage.

package org.apache.spark.memory

abstract class MemoryManager(...) {
  // only required methods that have no implementation
  // the others follow
  def acquireExecutionMemory(
    numBytes: Long,
    taskAttemptId: Long,
    memoryMode: MemoryMode): Long
  def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
  def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
  def maxOffHeapStorageMemory: Long
  def maxOnHeapStorageMemory: Long
}
Note
MemoryManager is a private[spark] contract.
Table 1. (Subset of) MemoryManager Contract
Method Description

acquireExecutionMemory

Used exclusively when TaskMemoryManager is requested to acquireExecutionMemory

acquireStorageMemory

Used when:

acquireUnrollMemory

Used exclusively when MemoryStore is requested to reserveUnrollMemoryForThisTask

maxOffHeapStorageMemory

Used when:

maxOnHeapStorageMemory

Used when:

MemoryManager is created along with SparkEnv.

Note
MemoryManager is a Scala abstract class and cannot be created directly, but only as one of the implementations.

Execution memory is used for computation in shuffles, joins, sorts and aggregations.

Storage memory is used for caching and propagating internal data across the nodes in a cluster.

Table 2. MemoryManagers
MemoryManager Description

StaticMemoryManager

(legacy)

UnifiedMemoryManager

The default memory manager

Table 3. MemoryManager’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

onHeapStorageMemoryPool

FIXME

Used when…​FIXME

offHeapStorageMemoryPool

FIXME

Used when…​FIXME

pageSizeBytes

FIXME

Used when…​FIXME

tungstenMemoryAllocator

Used when…​FIXME

Note
tungstenMemoryAllocator is a Scala final val and cannot be changed by custom MemoryManagers.

Creating MemoryManager Instance

MemoryManager takes the following when created:

  • SparkConf

  • Number of CPU cores

  • onHeapStorageMemory

  • onHeapExecutionMemory

MemoryManager initializes the internal registries and counters.

Note
MemoryManager is a Scala abstract class and cannot be created directly, but only as one of the implementations.

releaseExecutionMemory Method

releaseExecutionMemory(
  numBytes: Long,
  taskAttemptId: Long,
  memoryMode: MemoryMode): Unit

releaseExecutionMemory…​FIXME

Note
releaseExecutionMemory is used when TaskMemoryManager is requested to releaseExecutionMemory and cleanUpAllAllocatedMemory

releaseAllExecutionMemoryForTask Method

releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long

releaseAllExecutionMemoryForTask…​FIXME

Note
releaseAllExecutionMemoryForTask is used exclusively when TaskRunner is requested to run (and cleans up after itself).

tungstenMemoryMode Flag

tungstenMemoryMode: MemoryMode

tungstenMemoryMode returns OFF_HEAP only when the following are all met:

  • spark.memory.offHeap.enabled configuration property is enabled (it is not by default)

  • spark.memory.offHeap.size configuration property is greater than 0 (it is 0 by default)

  • JVM supports unaligned memory access (aka unaligned Unsafe, i.e. sun.misc.Unsafe package is available and the underlying system has unaligned-access capability)

Otherwise, tungstenMemoryMode returns ON_HEAP.

Note
Given that spark.memory.offHeap.enabled configuration property is disabled (false) by default and spark.memory.offHeap.size configuration property is 0 by default, Spark seems to encourage using Tungsten memory allocated on the JVM heap (ON_HEAP).
Note
tungstenMemoryMode is a Scala final val and cannot be changed by custom MemoryManagers.
Note

tungstenMemoryMode is used when:

freePage Method

void freePage(MemoryBlock page)

freePage…​FIXME

Note
freePage is used when…​FIXME

results matching ""

    No results matching ""