import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.Row
val streamingQuery: Dataset[Long] = ...
assert(streamingQuery.isStreaming)
val writer: DataStreamWriter[Row] = streamingQuery.writeStream
DataStreamWriter — Writing Datasets To Streaming Sink
DataStreamWriter
is the interface to describe when and what rows of a streaming query are sent out to the streaming sink.
DataStreamWriter
is available using Dataset.writeStream method (on a streaming query).
Method | Description | ||
---|---|---|---|
|
Sets ForeachWriter in the full control of streaming writes |
||
|
(New in 2.4.0) Sets the source to As per SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame, the purpose of the method is to expose the micro-batch output as a dataframe for the following:
|
||
|
Specifies the format of the data sink (aka output format) The format is used internally as the name (alias) of the streaming sink to use to write the data to |
||
|
|
||
|
Specifies the configuration options of a data sink
|
||
|
Specifies the output mode |
||
|
|
||
|
Assigns the name of a query |
||
|
Creates and immediately starts a StreamingQuery |
||
|
Sets the Trigger for how often a streaming query should be executed and the result saved. |
Note
|
A streaming query is a Dataset with a streaming logical plan.
|
Like the batch DataFrameWriter
, DataStreamWriter
has a direct support for many file formats and an extension point to plug in new formats.
// see above for writer definition
// Save dataset in JSON format
writer.format("json")
In the end, you start the actual continuous writing of the result of executing a Dataset
to a sink using start operator.
writer.save
Beside the above operators, there are the following to work with a Dataset
as a whole.
Note
|
hive is not supported for streaming writing (and leads to a AnalysisException ).
|
Note
|
DataFrameWriter is responsible for writing in a batch fashion.
|
Specifying Write Option — option
Method
option(key: String, value: String): DataStreamWriter[T]
option(key: String, value: Boolean): DataStreamWriter[T]
option(key: String, value: Long): DataStreamWriter[T]
option(key: String, value: Double): DataStreamWriter[T]
Internally, option
adds the key
and value
to extraOptions internal option registry.
Specifying Output Mode — outputMode
Method
outputMode(outputMode: String): DataStreamWriter[T]
outputMode(outputMode: OutputMode): DataStreamWriter[T]
outputMode
specifies the output mode of a streaming query, i.e. what data is sent out to a streaming sink when there is new data available in streaming data sources.
Note
|
When not defined explicitly, outputMode defaults to Append output mode.
|
outputMode
can be specified by name or one of the OutputMode values.
Setting Query Name — queryName
method
queryName(queryName: String): DataStreamWriter[T]
queryName
sets the name of a streaming query.
Internally, it is just an additional option with the key queryName
.
Setting How Often to Execute Streaming Query — trigger
method
trigger(trigger: Trigger): DataStreamWriter[T]
trigger
method sets the time interval of the trigger (that executes a batch runner) for a streaming query.
Note
|
Trigger specifies how often results should be produced by a StreamingQuery. See Trigger.
|
The default trigger is ProcessingTime(0L) that runs a streaming query as often as possible.
Tip
|
Consult Trigger to learn about Trigger and ProcessingTime types.
|
Creating and Starting Execution of Streaming Query — start
Method
start(): StreamingQuery
start(path: String): StreamingQuery (1)
-
Sets
path
option topath
and passes the call on tostart()
start
starts a streaming query.
start
gives a StreamingQuery to control the execution of the continuous query.
Note
|
Whether or not you have to specify path option depends on the streaming sink in use.
|
Internally, start
branches off per source
.
-
memory
-
foreach
-
other formats
…FIXME
Option | Description |
---|---|
|
Name of active streaming query |
Directory for checkpointing (and to store query metadata like offsets before and after being processed, the query id, etc.) |
start
reports a AnalysisException
when source
is hive
.
val q = spark.
readStream.
text("server-logs/*").
writeStream.
format("hive") <-- hive format used as a streaming sink
scala> q.start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:234)
... 48 elided
Making ForeachWriter in Charge of Streaming Writes — foreach
method
foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
foreach
sets the input ForeachWriter to be in control of streaming writes.
Internally, foreach
sets the streaming output format as foreach
and foreachWriter
as the input writer
.
Note
|
foreach uses SparkSession to access SparkContext to clean the ForeachWriter .
|
Note
|
|
Internal Properties
Name | Initial Value | Description |
---|---|---|
|
||
|
|
The function that is used as the batch writer in the ForeachBatchSink for foreachBatch |
|
||
|
||
|
||
|
OutputMode of the streaming sink Set using outputMode method. |
|
|