StreamExecution manages execution of a streaming query for a SQLContext and a Sink. It requires a LogicalPlan to know the Source objects from which records are periodically pulled down.

StreamExecution is a StreamingQuery with additional attributes:

It starts an internal thread (microBatchThread) to periodically (every 10 milliseconds) poll for new records in the sources and create a batch.

The time between batches - 10 milliseconds - is fixed (i.e. not configurable).

StreamExecution can be in three states:

  • INITIALIZED when the instance was created.

  • ACTIVE when batches are pulled from the sources.

  • TERMINATED when batches were successfully processed or the query stopped.


Enable DEBUG logging level for org.apache.spark.sql.execution.streaming.StreamExecution to see what happens inside.

Add the following line to conf/

Refer to Logging.

runBatches Internal Method

runBatches(): Unit
scala> val out = in.write
out: org.apache.spark.sql.StreamingQuery = Continuous Query - memStream [state = ACTIVE]
16/04/16 00:48:47 INFO StreamExecution: Starting new continuous query.

scala> 16/04/16 00:48:47 INFO StreamExecution: Committed offsets for batch 1.
16/04/16 00:48:47 DEBUG StreamExecution: Stream running from {} to {FileSource[hello]: #0}
16/04/16 00:48:47 DEBUG StreamExecution: Retrieving data from FileSource[hello]: None -> #0
16/04/16 00:48:47 DEBUG StreamExecution: Optimized batch in 163.940239ms
16/04/16 00:48:47 INFO StreamExecution: Completed up to {FileSource[hello]: #0} in 703.573981ms

toDebugString Method

You can call toDebugString on StreamExecution to learn about the internals.

scala> out.asInstanceOf[StreamExecution].toDebugString
res3: String =
=== Continuous Query ===
Name: memStream
Current Offsets: {FileSource[hello]: #0}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:


results matching ""

    No results matching ""