import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.Row
val streamingQuery: Dataset[Long] = ...
assert(streamingQuery.isStreaming)
val writer: DataStreamWriter[Row] = streamingQuery.writeStream
DataStreamWriter — Writing Datasets To Streaming Sink
DataStreamWriter is the interface to describe when and what rows of a streaming query are sent out to the streaming sink.
DataStreamWriter is available using Dataset.writeStream method (on a streaming query).
| Method | Description | ||
|---|---|---|---|
|
Sets ForeachWriter in the full control of streaming writes |
||
|
(New in 2.4.0) Sets the source to As per SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame, the purpose of the method is to expose the micro-batch output as a dataframe for the following:
|
||
|
Specifies the format of the data sink (aka output format) The format is used internally as the name (alias) of the streaming sink to use to write the data to |
||
|
|
||
|
Specifies the configuration options of a data sink
|
||
|
Specifies the output mode |
||
|
|
||
|
Assigns the name of a query |
||
|
Creates and immediately starts a StreamingQuery |
||
|
Sets the Trigger for how often a streaming query should be executed and the result saved. |
|
Note
|
A streaming query is a Dataset with a streaming logical plan.
|
Like the batch DataFrameWriter, DataStreamWriter has a direct support for many file formats and an extension point to plug in new formats.
// see above for writer definition
// Save dataset in JSON format
writer.format("json")
In the end, you start the actual continuous writing of the result of executing a Dataset to a sink using start operator.
writer.save
Beside the above operators, there are the following to work with a Dataset as a whole.
|
Note
|
hive is not supported for streaming writing (and leads to a AnalysisException).
|
|
Note
|
DataFrameWriter is responsible for writing in a batch fashion.
|
Specifying Write Option — option Method
option(key: String, value: String): DataStreamWriter[T]
option(key: String, value: Boolean): DataStreamWriter[T]
option(key: String, value: Long): DataStreamWriter[T]
option(key: String, value: Double): DataStreamWriter[T]
Internally, option adds the key and value to extraOptions internal option registry.
Specifying Output Mode — outputMode Method
outputMode(outputMode: String): DataStreamWriter[T]
outputMode(outputMode: OutputMode): DataStreamWriter[T]
outputMode specifies the output mode of a streaming query, i.e. what data is sent out to a streaming sink when there is new data available in streaming data sources.
|
Note
|
When not defined explicitly, outputMode defaults to Append output mode.
|
outputMode can be specified by name or one of the OutputMode values.
Setting Query Name — queryName method
queryName(queryName: String): DataStreamWriter[T]
queryName sets the name of a streaming query.
Internally, it is just an additional option with the key queryName.
Setting How Often to Execute Streaming Query — trigger method
trigger(trigger: Trigger): DataStreamWriter[T]
trigger method sets the time interval of the trigger (that executes a batch runner) for a streaming query.
|
Note
|
Trigger specifies how often results should be produced by a StreamingQuery. See Trigger.
|
The default trigger is ProcessingTime(0L) that runs a streaming query as often as possible.
|
Tip
|
Consult Trigger to learn about Trigger and ProcessingTime types.
|
Creating and Starting Execution of Streaming Query — start Method
start(): StreamingQuery
start(path: String): StreamingQuery (1)
-
Sets
pathoption topathand passes the call on tostart()
start starts a streaming query.
start gives a StreamingQuery to control the execution of the continuous query.
|
Note
|
Whether or not you have to specify path option depends on the streaming sink in use.
|
Internally, start branches off per source.
-
memory -
foreach -
other formats
…FIXME
| Option | Description |
|---|---|
|
Name of active streaming query |
Directory for checkpointing (and to store query metadata like offsets before and after being processed, the query id, etc.) |
start reports a AnalysisException when source is hive.
val q = spark.
readStream.
text("server-logs/*").
writeStream.
format("hive") <-- hive format used as a streaming sink
scala> q.start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:234)
... 48 elided
Making ForeachWriter in Charge of Streaming Writes — foreach method
foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
foreach sets the input ForeachWriter to be in control of streaming writes.
Internally, foreach sets the streaming output format as foreach and foreachWriter as the input writer.
|
Note
|
foreach uses SparkSession to access SparkContext to clean the ForeachWriter.
|
|
Note
|
|
Internal Properties
| Name | Initial Value | Description |
|---|---|---|
|
||
|
|
The function that is used as the batch writer in the ForeachBatchSink for foreachBatch |
|
||
|
||
|
||
|
OutputMode of the streaming sink Set using outputMode method. |
|
|