log4j.logger.org.apache.spark.sql.execution.streaming.MemorySink=ALL
MemorySink
MemorySink
is a streaming sink that stores batches (records) in memory.
MemorySink
is intended only for testing or demos.
MemorySink
is used for memory
format and requires a query name (by queryName
method or queryName
option).
Note
|
MemorySink was introduced in the pull request for [SPARK-14288][SQL] Memory Sink for streaming.
|
Use toDebugString
to see the batches.
Its aim is to allow users to test streaming applications in the Spark shell or other local tests.
You can set checkpointLocation
using option
method or it will be set to spark.sql.streaming.checkpointLocation property.
If spark.sql.streaming.checkpointLocation
is set, the code uses $location/$queryName
directory.
Finally, when no spark.sql.streaming.checkpointLocation
is set, a temporary directory memory.stream
under java.io.tmpdir
is used with offsets
subdirectory inside.
Note
|
The directory is cleaned up at shutdown using ShutdownHookManager.registerShutdownDeleteDir .
|
It creates MemorySink
instance based on the schema of the DataFrame it operates on.
It creates a new DataFrame using MemoryPlan
with MemorySink
instance created earlier and registers it as a temporary table (using DataFrame.registerTempTable method).
Note
|
At this point you can query the table as if it were a regular non-streaming table using sql method. |
A new StreamingQuery is started (using StreamingQueryManager.startQuery) and returned.
Tip
|
Enable Add the following line to Refer to Logging. |
Creating MemorySink Instance
MemorySink
takes the following to be created:
MemorySink
initializes the batches internal property.
In-Memory Buffer of Streaming Batches — batches
Internal Property
batches: ArrayBuffer[AddedData]
batches
holds data from streaming batches that have been added (written) to this sink.
For Complete output mode, batches
holds rows from the last batch only.
batches
can be cleared (emptied) using clear.
Adding Batch of Data to Sink — addBatch
Method
addBatch(
batchId: Long,
data: DataFrame): Unit
Note
|
addBatch is part of the Sink Contract to "add" a batch of data to the sink.
|
A batch ID is considered committed when the given batch ID is greater than the latest batch ID (if available).
Batch Not Committed
With the batchId
not committed, addBatch
prints out the following DEBUG message to the logs:
Committing batch [batchId] to [this]
addBatch
collects records from the given data
.
Note
|
addBatch uses Dataset.collect operator to collect records.
|
For Append and Update output modes, addBatch
adds the data (as a AddedData
) to the batches internal registry.
For Complete output mode, addBatch
clears the batches internal registry first before adding the data (as a AddedData
).
For any other output mode, addBatch
reports an IllegalArgumentException
:
Output mode [outputMode] is not supported by MemorySink
Clearing Up Internal Batch Buffer — clear
Method
clear(): Unit
clear
simply removes (clears) all data from the batches internal registry.
Note
|
clear is used exclusively in tests.
|