import org.apache.spark.sql.Dataset
val q = spark.readStream
.format("rate")
.load
.writeStream
.foreachBatch { (output: Dataset[_], batchId: Long) => // <-- creates a ForeachBatchSink
println(s"Batch ID: $batchId")
output.show
}
.start
// q.stop
scala> println(q.lastProgress.sink.description)
ForeachBatchSink
ForeachBatchSink
ForeachBatchSink is a streaming sink that is used for the DataStreamWriter.foreachBatch streaming operator.
ForeachBatchSink is created exclusively when DataStreamWriter is requested to start execution of the streaming query (with the foreachBatch source).
ForeachBatchSink uses ForeachBatchSink name.
|
Note
|
ForeachBatchSink was added in Spark 2.4.0 as part of SPARK-24565 Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame.
|
Adding Batch — addBatch Method
addBatch(batchId: Long, data: DataFrame): Unit
|
Note
|
addBatch is a part of Sink Contract to "add" a batch of data to the sink.
|
addBatch…FIXME