ContinuousExecution — StreamExecution in Continuous Stream Processing

ContinuousExecution is the StreamExecution in Continuous Stream Processing.

ContinuousExecution is created when StreamingQueryManager is requested to create a streaming query with a StreamWriteSupport sink and a ContinuousTrigger (when DataStreamWriter is requested to start an execution of the streaming query).

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.Continuous(1.minute)) // <-- Gives ContinuousExecution
  .queryName("rate2console")
  .start

import org.apache.spark.sql.streaming.StreamingQuery
assert(sq.isInstanceOf[StreamingQuery])

// The following gives access to the internals
// And to ContinuousExecution
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val engine = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery
import org.apache.spark.sql.execution.streaming.StreamExecution
assert(engine.isInstanceOf[StreamExecution])

import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
val continuousEngine = engine.asInstanceOf[ContinuousExecution]
assert(continuousEngine.trigger == Trigger.Continuous(1.minute))

ContinuousExecution can only run streaming queries with StreamingRelationV2 with ContinuousReadSupport data source.

When created (for a streaming query), ContinuousExecution is given the analyzed logical plan. The analyzed logical plan is immediately transformed to include a ContinuousExecutionRelation for every StreamingRelationV2 with ContinuousReadSupport data source (and is the logical plan internally).

Note
ContinuousExecution uses the same instance of ContinuousExecutionRelation for the same instances of StreamingRelationV2 with ContinuousReadSupport data source.

ContinuousExecution allows for exactly one ContinuousReader in the streaming query (and asserts it when addOffset and commit).

When requested to run the streaming query, ContinuousExecution collects ContinuousReadSupport data sources (inside ContinuousExecutionRelation) from the analyzed logical plan and requests each and every ContinuousReadSupport to create a ContinuousReader (that are stored in continuousSources internal registry).

Table 1. ContinuousExecution’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

continuousSources

continuousSources: Seq[ContinuousReader]

Used when ContinuousExecution is requested to commit, getStartOffsets, and runContinuous

Use sources to access the current value

currentEpochCoordinatorId

FIXME

Used when…​FIXME

triggerExecutor

  • ProcessingTimeExecutor for ContinuousTrigger

Used when…​FIXME

Note
StreamExecution throws an IllegalStateException when the Trigger is not a ContinuousTrigger.
Tip

Enable INFO or DEBUG logging levels for org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution=DEBUG

Refer to Logging.

getStartOffsets Internal Method

getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq

getStartOffsets…​FIXME

Note
getStartOffsets is used when…​FIXME

Committing Epoch — commit Method

commit(epoch: Long): Unit

commit…​FIXME

Note
commit is used exclusively when EpochCoordinator is requested to commitEpoch.

awaitEpoch Internal Method

awaitEpoch(epoch: Long): Unit

awaitEpoch…​FIXME

Note
awaitEpoch is used when…​FIXME

addOffset Method

addOffset(
  epoch: Long,
  reader: ContinuousReader,
  partitionOffsets: Seq[PartitionOffset]): Unit

addOffset…​FIXME

Note
addOffset is used when…​FIXME

sources Method

sources: Seq[BaseStreamingSource]
Note
sources is part of ProgressReporter Contract to…​FIXME.

sources…​FIXME

Analyzed Logical Plan of Streaming Query — logicalPlan Property

logicalPlan: LogicalPlan
Note
logicalPlan is part of StreamExecution Contract that is the analyzed logical plan of the streaming query.

logicalPlan resolves StreamingRelationV2 leaf logical operators (with a ContinuousReadSupport source) to ContinuousExecutionRelation leaf logical operators.

Internally, logicalPlan transforms the analyzed logical plan as follows:

  1. For every StreamingRelationV2 leaf logical operator with a ContinuousReadSupport source, logicalPlan looks it up for the corresponding ContinuousExecutionRelation (if available in the internal lookup registry) or creates a ContinuousExecutionRelation (with the ContinuousReadSupport source, the options and the output attributes of the StreamingRelationV2 operator)

  2. For any other StreamingRelationV2, logicalPlan throws an UnsupportedOperationException:

    Data source [name] does not support continuous processing.

Running Activated Streaming Query — runActivatedStream Method

runActivatedStream(sparkSessionForStream: SparkSession): Unit
Note
runActivatedStream is part of StreamExecution Contract to run a streaming query.

runActivatedStream…​FIXME

Running Streaming Query in Continuous Mode — runContinuous Internal Method

runContinuous(sparkSessionForQuery: SparkSession): Unit

runContinuous…​FIXME

Note
runContinuous is used exclusively when ContinuousExecution is requested to execute an activated streaming query.

Creating ContinuousExecution Instance

ContinuousExecution takes the following when created:

  • SparkSession

  • The name of the structured query

  • Path to the checkpoint directory (aka metadata directory)

  • Analyzed logical query plan (LogicalPlan)

  • StreamWriteSupport

  • Trigger

  • Clock

  • Output mode

  • Options (Map[String, String])

  • deleteCheckpointOnStop flag to control whether to delete the checkpoint directory on stop

ContinuousExecution initializes the internal registries and counters.

Stopping Streaming Query — stop Method

stop(): Unit
Note
stop is part of the StreamingQuery Contract to stop the streaming query.

stop transitions the streaming query to TERMINATED state.

If the queryExecutionThread is alive (i.e. it has been started and has not yet died), stop interrupts it and waits for this thread to die.

In the end, stop prints out the following INFO message to the logs:

Query [prettyIdString] was stopped
Note
prettyIdString is in the format of queryName [id = [id], runId = [runId]].

results matching ""

    No results matching ""