DataStreamWriter — Writing Datasets To Streaming Data Sinks

DataStreamWriter is the interface to describe when and what rows of a streaming query are sent out to the streaming sink.

Table 1. DataStreamWriter’s Methods
Method Description


foreach(writer: ForeachWriter[T]): DataStreamWriter[T]

Sets ForeachWriter in the full control of streaming writes


foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]

(New in 2.4.0) Sets the source to foreachBatch and the foreachBatchWriter to the given function.

As per SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame, the purpose of the method is to expose the micro-batch output as a dataframe for the following:

  • Pass the output rows of each batch to a library that is designed for the batch jobs only

  • Reuse batch data sources for output whose streaming version does not exist

  • Multi-writes where the output rows are written to multiple outputs by writing twice for every batch


format(source: String): DataStreamWriter[T]

Specifies the format of the data sink (aka output format)

The format is used internally as the name (alias) of the streaming sink to use to write the data to


option(key: String, value: Boolean): DataStreamWriter[T]
option(key: String, value: Double): DataStreamWriter[T]
option(key: String, value: Long): DataStreamWriter[T]
option(key: String, value: String): DataStreamWriter[T]


options(options: Map[String, String]): DataStreamWriter[T]

Specifies the configuration options of a data sink

You could use option method if you prefer specifying the options one by one or there is only one in use.


outputMode(outputMode: OutputMode): DataStreamWriter[T]
outputMode(outputMode: String): DataStreamWriter[T]

Specifies the output mode


partitionBy(colNames: String*): DataStreamWriter[T]


queryName(queryName: String): DataStreamWriter[T]

Assigns the name of a query


start(): StreamingQuery
start(path: String): StreamingQuery (1)
  1. Explicit path (that could also be specified as an option)

Creates and immediately starts a StreamingQuery


trigger(trigger: Trigger): DataStreamWriter[T]

Sets the Trigger for how often a streaming query should be executed and the result saved.


A streaming query is a Dataset with a streaming logical plan.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
import org.apache.spark.sql.DataFrame
val rates: DataFrame = spark.

scala> rates.isStreaming
res1: Boolean = true

scala> rates.queryExecution.logical.isStreaming
res2: Boolean = true

DataStreamWriter is available using writeStream method of a streaming Dataset.

import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.Row

val streamingQuery: Dataset[Long] = ...

scala> streamingQuery.isStreaming
res0: Boolean = true

val writer: DataStreamWriter[Row] = streamingQuery.writeStream

Like the batch DataFrameWriter, DataStreamWriter has a direct support for many file formats and an extension point to plug in new formats.

// see above for writer definition

// Save dataset in JSON format

In the end, you start the actual continuous writing of the result of executing a Dataset to a sink using start operator.

Beside the above operators, there are the following to work with a Dataset as a whole.

hive is not supported for streaming writing (and leads to a AnalysisException).
DataFrameWriter is responsible for writing in a batch fashion.
Table 2. DataStreamWriter’s Internal Properties (e.g. Registries, Counters and Flags)
Name Initial Value Description




foreachBatchWriter: (Dataset[T], Long) => Unit

The function that is used as the batch writer in the ForeachBatchSink for foreachBatch






OutputMode of the streaming sink

Set using outputMode method.


Specifying Write Option — option Method

option(key: String, value: String): DataStreamWriter[T]
option(key: String, value: Boolean): DataStreamWriter[T]
option(key: String, value: Long): DataStreamWriter[T]
option(key: String, value: Double): DataStreamWriter[T]

Internally, option adds the key and value to extraOptions internal option registry.

Specifying Output Mode — outputMode Method

outputMode(outputMode: String): DataStreamWriter[T]
outputMode(outputMode: OutputMode): DataStreamWriter[T]

outputMode specifies the output mode of a streaming query, i.e. what data is sent out to a streaming sink when there is new data available in streaming data sources.

When not defined explicitly, outputMode defaults to Append output mode.

outputMode can be specified by name or one of the OutputMode values.

Setting Query Name — queryName method

queryName(queryName: String): DataStreamWriter[T]

queryName sets the name of a streaming query.

Internally, it is just an additional option with the key queryName.

Setting How Often to Execute Streaming Query — trigger method

trigger(trigger: Trigger): DataStreamWriter[T]

trigger method sets the time interval of the trigger (that executes a batch runner) for a streaming query.

Trigger specifies how often results should be produced by a StreamingQuery. See Trigger.

The default trigger is ProcessingTime(0L) that runs a streaming query as often as possible.

Consult Trigger to learn about Trigger and ProcessingTime types.

Creating and Starting Execution of Streaming Query — start Method

start(): StreamingQuery
start(path: String): StreamingQuery  (1)
  1. Sets path option to path and passes the call on to start()

start starts a streaming query.

start gives a StreamingQuery to control the execution of the continuous query.

Whether or not you have to specify path option depends on the streaming sink in use.

Internally, start branches off per source.

  • memory

  • foreach

  • other formats


Table 3. start’s Options
Option Description


Name of active streaming query


Directory for checkpointing (and to store query metadata like offsets before and after being processed, the query id, etc.)

start reports a AnalysisException when source is hive.

val q =  spark.
  format("hive") <-- hive format used as a streaming sink
scala> q.start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:234)
  ... 48 elided
Define options using option or options methods.

Making ForeachWriter in Charge of Streaming Writes — foreach method

foreach(writer: ForeachWriter[T]): DataStreamWriter[T]

foreach sets the input ForeachWriter to be in control of streaming writes.

Internally, foreach sets the streaming output format as foreach and foreachWriter as the input writer.

foreach uses SparkSession to access SparkContext to clean the ForeachWriter.

foreach reports an IllegalArgumentException when writer is null.

foreach writer cannot be null

results matching ""

    No results matching ""