Streaming Sink — Adding Batches of Data to Storage

Sink is the contract for streaming writes, i.e. adding batches to an output every trigger.

Note
Sink is part of the so-called Structured Streaming V1 that is currently being rewritten to StreamWriteSupport in V2.

Sink is a single-method interface with addBatch method.

package org.apache.spark.sql.execution.streaming

trait Sink {
  def addBatch(batchId: Long, data: DataFrame): Unit
}

addBatch is used to "add" a batch of data to the sink (for batchId batch).

addBatch is used when StreamExecution runs a batch.

Table 1. Sinks
Format / Operator Sink

console

Any FileFormat

  • csv

  • hive

  • json

  • libsvm

  • orc

  • parquet

  • text

FileStreamSink

foreach operator

ForeachSink

kafka

KafkaSink

memory

MemorySink

Tip
You can create your own streaming format implementing StreamSinkProvider.

When creating a custom Sink it is recommended to accept the options (e.g. Map[String, String]) that the DataStreamWriter was configured with. You can then use the options to fine-tune the write path.

class HighPerfSink(options: Map[String, String]) extends Sink {
  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    val bucketName = options.get("bucket").orNull
    ...
  }
}

results matching ""

    No results matching ""