MemoryStream is a streaming Source that produces values to memory.

MemoryStream uses the internal batches collection of datasets.


This source is not for production use due to design contraints, e.g. infinite in-memory collection of lines read and no fault recovery.

MemoryStream is designed primarily for unit tests, tutorials and debugging.

val spark: SparkSession = ???

implicit val ctx = spark.sqlContext

import org.apache.spark.sql.execution.streaming.MemoryStream
// It uses two implicits: Encoder[Int] and SQLContext
val intsIn = MemoryStream[Int]

val ints = intsIn.toDF
  .withColumn("t", current_timestamp())
  .withWatermark("t", "5 minutes")
  .groupBy(window($"t", "5 minutes") as "window")
  .agg(count("*") as "total")

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val totalsOver5mins = ints.

scala> val zeroOffset = intsIn.addData(0, 1, 2)
zeroOffset: org.apache.spark.sql.execution.streaming.Offset = #0


|    0|
|    1|
|    2|

17/02/28 20:06:01 DEBUG StreamExecution: Starting Trigger Calculation
17/02/28 20:06:01 DEBUG StreamExecution: getOffset took 0 ms
17/02/28 20:06:01 DEBUG StreamExecution: triggerExecution took 0 ms
17/02/28 20:06:01 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map(watermark -> 1970-01-01T00:00:00.000Z))
17/02/28 20:06:01 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec5addda-0e46-4c3c-b2c2-604a854ee19a",
  "runId" : "d850cabc-94d0-4931-8a2d-e054086e39c3",
  "name" : "totalsOver5mins",
  "timestamp" : "2017-02-28T19:06:01.175Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 0
  "eventTime" : {
    "watermark" : "1970-01-01T00:00:00.000Z"
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "MemoryStream[value#1]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "MemorySink"

Enable DEBUG logging level for org.apache.spark.sql.execution.streaming.MemoryStream logger to see what happens inside.

Add the following line to conf/

Refer to Logging.

Creating MemoryStream Instance

apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A]

MemoryStream object defines apply method that you can use to create instances of MemoryStream streaming sources.

Adding Data to Source (addData methods)

addData(data: A*): Offset
addData(data: TraversableOnce[A]): Offset

addData methods add the input data to batches internal collection.

When executed, addData adds a DataFrame (created using toDS implicit method) and increments the internal currentOffset offset.

You should see the following DEBUG message in the logs:

DEBUG MemoryStream: Adding ds: [ds]

Generating Next Streaming Batch — getBatch Method

getBatch is a part of Streaming Source contract.

When executed, getBatch uses the internal batches collection to return requested offsets.

You should see the following DEBUG message in the logs:

DEBUG MemoryStream: MemoryBatch [[startOrdinal], [endOrdinal]]: [newBlocks]

StreamingExecutionRelation Logical Plan

MemoryStream uses StreamingExecutionRelation logical plan to build Datasets or DataFrames when requested.

scala> val ints = MemoryStream[Int]
ints: org.apache.spark.sql.execution.streaming.MemoryStream[Int] = MemoryStream[value#13]

scala> ints.toDS.queryExecution.logical.isStreaming
res14: Boolean = true

scala> ints.toDS.queryExecution.logical
res15: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = MemoryStream[value#13]

Schema (schema method)

MemoryStream works with the data of the schema as described by the Encoder (of the Dataset).

results matching ""

    No results matching ""