MemorySink

MemorySink is a streaming Sink that stores records in memory. It is particularly useful for testing.

MemorySink is used for memory format and requires a query name (by queryName method or queryName option).

val spark: SparkSession = ???
val logs = spark.readStream.textFile("logs/*.out")

scala> val outStream = logs.writeStream
  .format("memory")
  .queryName("logs")
  .start()
outStream: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@690337df

scala> sql("select * from logs").show(truncate = false)
Note
MemorySink was introduced in the pull request for [SPARK-14288][SQL] Memory Sink for streaming.

Use toDebugString to see the batches.

Its aim is to allow users to test streaming applications in the Spark shell or other local tests.

You can set checkpointLocation using option method or it will be set to spark.sql.streaming.checkpointLocation property.

If spark.sql.streaming.checkpointLocation is set, the code uses $location/$queryName directory.

Finally, when no spark.sql.streaming.checkpointLocation is set, a temporary directory memory.stream under java.io.tmpdir is used with offsets subdirectory inside.

Note
The directory is cleaned up at shutdown using ShutdownHookManager.registerShutdownDeleteDir.

It creates MemorySink instance based on the schema of the DataFrame it operates on.

It creates a new DataFrame using MemoryPlan with MemorySink instance created earlier and registers it as a temporary table (using DataFrame.registerTempTable method).

Note
At this point you can query the table as if it were a regular non-streaming table using sql method.

A new StreamingQuery is started (using StreamingQueryManager.startQuery) and returned.

Table 1. MemorySink’s Internal Registries and Counters
Name Description

batches

FIXME

Used when…​FIXME

Tip

Enable DEBUG logging level for org.apache.spark.sql.execution.streaming.MemorySink logger to see what happens in MemorySink.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.MemorySink=DEBUG

Refer to Logging.

addBatch Method

addBatch(batchId: Long, data: DataFrame): Unit

addBatch checks if batchId has already been committed (i.e. added to batches internal registry).

If batchId was already committed, you should see the following DEBUG message in the logs:

DEBUG Skipping already committed batch: [batchId]

Otherwise, if the batchId is not already committed, you should see the following DEBUG message in the logs:

DEBUG Committing batch [batchId] to [this]

For Append and Update output modes, addBatch collects records from data and registers batchId (i.e. adds to batches internal registry).

Note
addBatch uses collect operator to collect records. It is when the records are "downloaded" to the driver’s memory.

For Complete output mode, addBatch collects records (as for the other output modes), but before registering batchId clears batches internal registry.

When the output mode is invalid, addBatch reports a IllegalArgumentException with the following error message.

Output mode [outputMode] is not supported by MemorySink
Note
addBatch is a part of Sink Contract to "add" a batch of data to the sink.

results matching ""

    No results matching ""