MemoryStream — Streaming Reader for Micro-Batch Stream Processing

MemoryStream is a concrete streaming source of memory data source that supports reading in Micro-Batch Stream Processing.

Tip

Enable ALL logging level for org.apache.spark.sql.execution.streaming.MemoryStream logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.MemoryStream=ALL

Refer to Logging.

Creating MemoryStream Instance

MemoryStream takes the following to be created:

  • ID

  • SQLContext

MemoryStream initializes the internal properties.

Creating MemoryStream Instance — apply Object Factory

apply[A : Encoder](
  implicit sqlContext: SQLContext): MemoryStream[A]

apply uses an memoryStreamId internal counter to create a new MemoryStream with a unique ID and the implicit SQLContext.

Adding Data to Source — addData Method

addData(
  data: TraversableOnce[A]): Offset

addData adds the given data to the batches internal registry.

Internally, addData prints out the following DEBUG message to the logs:

Adding: [data]

In the end, addData increments the current offset and adds the data to the batches internal registry.

Generating Next Streaming Batch — getBatch Method

Note
getBatch is a part of Streaming Source contract.

When executed, getBatch uses the internal batches collection to return requested offsets.

You should see the following DEBUG message in the logs:

DEBUG MemoryStream: MemoryBatch [[startOrdinal], [endOrdinal]]: [newBlocks]

Logical Plan — logicalPlan Internal Property

logicalPlan: LogicalPlan
Note
logicalPlan is part of the MemoryStreamBase Contract for the logical query plan of the memory stream.

logicalPlan is simply a StreamingExecutionRelation (for this memory source and the attributes).

MemoryStream uses StreamingExecutionRelation logical plan to build Datasets or DataFrames when requested.

scala> val ints = MemoryStream[Int]
ints: org.apache.spark.sql.execution.streaming.MemoryStream[Int] = MemoryStream[value#13]

scala> ints.toDS.queryExecution.logical.isStreaming
res14: Boolean = true

scala> ints.toDS.queryExecution.logical
res15: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = MemoryStream[value#13]

Schema (schema method)

MemoryStream works with the data of the schema as described by the Encoder (of the Dataset).

Textual Representation — toString Method

toString: String
Note
toString is part of the java.lang.Object contract for the string representation of the object.

toString uses the output schema to return the following textual representation:

MemoryStream[[output]]

Plan Input Partitions — planInputPartitions Method

planInputPartitions(): java.util.List[InputPartition[InternalRow]]
Note
planInputPartitions is part of the DataSourceReader contract in Spark SQL for the number of InputPartitions to use as RDD partitions (when DataSourceV2ScanExec physical operator is requested for the partitions of the input RDD).

planInputPartitions…​FIXME

planInputPartitions prints out a DEBUG message to the logs with the generateDebugString (with the batches after the last committed offset).

planInputPartitions…​FIXME

generateDebugString Internal Method

generateDebugString(
  rows: Seq[UnsafeRow],
  startOrdinal: Int,
  endOrdinal: Int): String

generateDebugString resolves and binds the encoder for the data.

In the end, generateDebugString returns the following string:

MemoryBatch [[startOrdinal], [endOrdinal]]: [rows]
Note
generateDebugString is used exclusively when MemoryStream is requested to planInputPartitions.

Internal Properties

Name Description

batches

Batch data (ListBuffer[Array[UnsafeRow]])

currentOffset

Current offset (as LongOffset)

lastOffsetCommitted

Last committed offset (as LongOffset)

output

Output schema (Seq[Attribute]) of the logical query plan

Used exclusively for toString

results matching ""

    No results matching ""