import org.apache.spark.sql.Dataset
val q = spark.readStream
.format("rate")
.load
.writeStream
.foreachBatch { (output: Dataset[_], batchId: Long) => // <-- creates a ForeachBatchSink
println(s"Batch ID: $batchId")
output.show
}
.start
// q.stop
scala> println(q.lastProgress.sink.description)
ForeachBatchSink
ForeachBatchSink
ForeachBatchSink
is a streaming sink that is used for the DataStreamWriter.foreachBatch streaming operator.
ForeachBatchSink
is created exclusively when DataStreamWriter
is requested to start execution of the streaming query (with the foreachBatch source).
ForeachBatchSink
uses ForeachBatchSink name.
Note
|
ForeachBatchSink was added in Spark 2.4.0 as part of SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame.
|
Adding Batch — addBatch
Method
addBatch(batchId: Long, data: DataFrame): Unit
Note
|
addBatch is a part of Sink Contract to "add" a batch of data to the sink.
|
addBatch
…FIXME