Micro-Batch Stream Processing (Structured Streaming V1)

Micro-Batch Stream Processing is a stream processing model in Spark Structured Streaming that is used for streaming queries with Trigger.Once and Trigger.ProcessingTime triggers.

Micro-batch stream processing uses MicroBatchExecution stream execution engine.

Micro-batch stream processing supports MicroBatchReadSupport data sources.

Micro-batch stream processing is often referred to as Structured Streaming V1.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.ProcessingTime(1.minute)) // <-- Uses MicroBatchExecution for execution
  .queryName("rate2console")
  .start

assert(sq.isActive)

scala> sq.explain
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@678e6267
+- *(1) Project [timestamp#54, value#55L]
   +- *(1) ScanV2 rate[timestamp#54, value#55L]

// sq.stop

Execution Phases (Processing Cycle)

Once MicroBatchExecution stream processing engine is requested to run an activated streaming query, the query execution goes through the following execution phases every trigger:

Execution phases with execution times are available using StreamingQueryProgress under durationMs.

scala> :type sq
org.apache.spark.sql.streaming.StreamingQuery
sq.lastProgress.durationMs.get("walCommit")
Tip
Enable INFO logging level for StreamExecution logger to be notified about durations.
17/08/11 09:04:17 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:17.373Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {          // <-- Durations (in millis)
    "addBatch" : 38,
    "getBatch" : 1,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 62,
    "walCommit" : 19
  },

Monitoring (using StreamingQueryListener and Logs)

MicroBatchExecution posts events to announce when a streaming query is started and stopped as well as after every micro-batch. StreamingQueryListener interface can be used to intercept the events and act accordingly.

After triggerExecution phase MicroBatchExecution is requested to finish up a streaming batch (trigger) and generate a StreamingQueryProgress (with execution statistics).

MicroBatchExecution prints out the following DEBUG message to the logs:

Execution stats: [executionStats]

MicroBatchExecution posts a QueryProgressEvent with the StreamingQueryProgress and prints out the following INFO message to the logs:

Streaming query made progress: [newProgress]

results matching ""

    No results matching ""