DataStreamWriter — Writing Datasets To Streaming Data Sinks

DataStreamWriter is the interface to describe when and what rows of a streaming query are sent out to the streaming sink.

Table 1. DataStreamWriter’s Methods
Method Description

foreach

foreach(writer: ForeachWriter[T]): DataStreamWriter[T]

Sets ForeachWriter in the full control of streaming writes

foreachBatch

foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]

(New in 2.4.0) Sets the source to foreachBatch and the foreachBatchWriter to the given function.

As per SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame, the purpose of the method is to expose the micro-batch output as a dataframe for the following:

  • Pass the output rows of each batch to a library that is designed for the batch jobs only

  • Reuse batch data sources for output whose streaming version does not exist

  • Multi-writes where the output rows are written to multiple outputs by writing twice for every batch

format

format(source: String): DataStreamWriter[T]

Specifies the format of the data sink (aka output format)

The format is used internally as the name (alias) of the streaming sink to use to write the data to

option

option(key: String, value: Boolean): DataStreamWriter[T]
option(key: String, value: Double): DataStreamWriter[T]
option(key: String, value: Long): DataStreamWriter[T]
option(key: String, value: String): DataStreamWriter[T]

options

options(options: Map[String, String]): DataStreamWriter[T]

Specifies the configuration options of a data sink

Note
You could use option method if you prefer specifying the options one by one or there is only one in use.

outputMode

outputMode(outputMode: OutputMode): DataStreamWriter[T]
outputMode(outputMode: String): DataStreamWriter[T]

Specifies the output mode

partitionBy

partitionBy(colNames: String*): DataStreamWriter[T]

queryName

queryName(queryName: String): DataStreamWriter[T]

Assigns the name of a query

start

start(): StreamingQuery
start(path: String): StreamingQuery (1)
  1. Explicit path (that could also be specified as an option)

Creates and immediately starts a StreamingQuery

trigger

trigger(trigger: Trigger): DataStreamWriter[T]

Sets the Trigger for how often a streaming query should be executed and the result saved.

Note

A streaming query is a Dataset with a streaming logical plan.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
import org.apache.spark.sql.DataFrame
val rates: DataFrame = spark.
  readStream.
  format("rate").
  load

scala> rates.isStreaming
res1: Boolean = true

scala> rates.queryExecution.logical.isStreaming
res2: Boolean = true

DataStreamWriter is available using writeStream method of a streaming Dataset.

import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.Row

val streamingQuery: Dataset[Long] = ...

scala> streamingQuery.isStreaming
res0: Boolean = true

val writer: DataStreamWriter[Row] = streamingQuery.writeStream

Like the batch DataFrameWriter, DataStreamWriter has a direct support for many file formats and an extension point to plug in new formats.

// see above for writer definition

// Save dataset in JSON format
writer.format("json")

In the end, you start the actual continuous writing of the result of executing a Dataset to a sink using start operator.

writer.save

Beside the above operators, there are the following to work with a Dataset as a whole.

Note
hive is not supported for streaming writing (and leads to a AnalysisException).
Note
DataFrameWriter is responsible for writing in a batch fashion.
Table 2. DataStreamWriter’s Internal Properties (e.g. Registries, Counters and Flags)
Name Initial Value Description

extraOptions

foreachBatchWriter

null

foreachBatchWriter: (Dataset[T], Long) => Unit

The function that is used as the batch writer in the ForeachBatchSink for foreachBatch

foreachWriter

partitioningColumns

source

outputMode

OutputMode.Append

OutputMode of the streaming sink

Set using outputMode method.

trigger

Specifying Write Option — option Method

option(key: String, value: String): DataStreamWriter[T]
option(key: String, value: Boolean): DataStreamWriter[T]
option(key: String, value: Long): DataStreamWriter[T]
option(key: String, value: Double): DataStreamWriter[T]

Internally, option adds the key and value to extraOptions internal option registry.

Specifying Output Mode — outputMode Method

outputMode(outputMode: String): DataStreamWriter[T]
outputMode(outputMode: OutputMode): DataStreamWriter[T]

outputMode specifies the output mode of a streaming query, i.e. what data is sent out to a streaming sink when there is new data available in streaming data sources.

Note
When not defined explicitly, outputMode defaults to Append output mode.

outputMode can be specified by name or one of the OutputMode values.

Setting Query Name — queryName method

queryName(queryName: String): DataStreamWriter[T]

queryName sets the name of a streaming query.

Internally, it is just an additional option with the key queryName.

Setting How Often to Execute Streaming Query — trigger method

trigger(trigger: Trigger): DataStreamWriter[T]

trigger method sets the time interval of the trigger (that executes a batch runner) for a streaming query.

Note
Trigger specifies how often results should be produced by a StreamingQuery. See Trigger.

The default trigger is ProcessingTime(0L) that runs a streaming query as often as possible.

Tip
Consult Trigger to learn about Trigger and ProcessingTime types.

Creating and Starting Execution of Streaming Query — start Method

start(): StreamingQuery
start(path: String): StreamingQuery  (1)
  1. Sets path option to path and passes the call on to start()

start starts a streaming query.

start gives a StreamingQuery to control the execution of the continuous query.

Note
Whether or not you have to specify path option depends on the streaming sink in use.

Internally, start branches off per source.

  • memory

  • foreach

  • other formats

…​FIXME

Table 3. start’s Options
Option Description

queryName

Name of active streaming query

checkpointLocation

Directory for checkpointing (and to store query metadata like offsets before and after being processed, the query id, etc.)

start reports a AnalysisException when source is hive.

val q =  spark.
  readStream.
  text("server-logs/*").
  writeStream.
  format("hive") <-- hive format used as a streaming sink
scala> q.start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:234)
  ... 48 elided
Note
Define options using option or options methods.

Making ForeachWriter in Charge of Streaming Writes — foreach method

foreach(writer: ForeachWriter[T]): DataStreamWriter[T]

foreach sets the input ForeachWriter to be in control of streaming writes.

Internally, foreach sets the streaming output format as foreach and foreachWriter as the input writer.

Note
foreach uses SparkSession to access SparkContext to clean the ForeachWriter.
Note

foreach reports an IllegalArgumentException when writer is null.

foreach writer cannot be null

results matching ""

    No results matching ""