IncrementalExecution — QueryExecution of Streaming Datasets

IncrementalExecution is a QueryExecution of a streaming Dataset that StreamExecution creates when incrementally executing the logical query plan (every trigger).

IncrementalExecution StreamExecution.png
Figure 1. StreamExecution creates IncrementalExecution (every trigger / streaming batch)
Tip
Details on QueryExecution contract can be found in the Mastering Apache Spark 2 gitbook.

IncrementalExecution registers state physical preparation rule with the parent QueryExecution's preparations that prepares the streaming physical plan (using batch-specific execution properties).

IncrementalExecution is created when:

IncrementalExecution uses the state checkpoint directory (that is given when IncrementalExecution is created) that is one of the following:

Demo: State Checkpoint Directory
// START: Only for easier debugging
// The state is then only for one partition
// which should make monitoring easier
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)

assert(spark.sessionState.conf.numShufflePartitions == 1)
// END: Only for easier debugging

val counts = spark
  .readStream
  .format("rate")
  .load
  .groupBy(window($"timestamp", "5 seconds") as "group")
  .agg(count("value") as "value_count") // <-- creates an Aggregate logical operator
  .orderBy("group")  // <-- makes for easier checking

assert(counts.isStreaming, "This should be a streaming query")

// Search for "checkpoint = <unknown>" in the following output
// Looks for StateStoreSave and StateStoreRestore
scala> counts.explain
== Physical Plan ==
*(5) Sort [group#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#5 ASC NULLS FIRST, 1)
   +- *(4) HashAggregate(keys=[window#11], functions=[count(value#1L)])
      +- StateStoreSave [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], Append, 0, 2
         +- *(3) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
            +- StateStoreRestore [window#11], state info [ checkpoint = <unknown>, runId = 558bf725-accb-487d-97eb-f790fa4a6138, opId = 0, ver = 0, numPartitions = 1], 2
               +- *(2) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
                  +- Exchange hashpartitioning(window#11, 1)
                     +- *(1) HashAggregate(keys=[window#11], functions=[partial_count(value#1L)])
                        +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#11, value#1L]
                           +- *(1) Filter isnotnull(timestamp#0)
                              +- StreamingRelation rate, [timestamp#0, value#1L]

// Start the query to access lastExecution that has the checkpoint resolved
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val t = Trigger.ProcessingTime(1.hour) // should be enough time for exploration
val sq = counts
  .writeStream
  .format("console")
  .option("truncate", false)
  .option("checkpointLocation", "/tmp/spark-streams-state-checkpoint-root")
  .trigger(t)
  .outputMode(OutputMode.Complete)
  .start

// wait till the first batch which should happen right after start

import org.apache.spark.sql.execution.streaming._
val lastExecution = sq.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
scala> println(lastExecution.checkpointLocation)
file:/tmp/spark-streams-state-checkpoint-root/state
Table 1. IncrementalExecution’s Internal Registries and Counters (in alphabetical order)
Name Description

planner

SparkPlanner with the following extra planning strategies (in the order of execution):

Note

planner is used to plan (aka convert) an optimized logical plan into a physical plan (that is later available as sparkPlan).

sparkPlan physical plan is then prepared for execution using preparations physical optimization rules. The result is later available as executedPlan physical plan.

state

State preparation rule (i.e. Rule[SparkPlan]) that transforms a streaming physical plan (i.e. SparkPlan with StateStoreSaveExec, StreamingDeduplicateExec and FlatMapGroupsWithStateExec physical operators) and fills missing properties that are batch-specific, e.g.

Used when IncrementalExecution prepares a physical plan (i.e. SparkPlan) for execution (which is when StreamExecution runs a streaming batch and plans a streaming query).

statefulOperatorId

Java’s AtomicInteger

  • 0 when IncrementalExecution is created

  • Incremented…​FIXME

nextStatefulOperationStateInfo Internal Method

nextStatefulOperationStateInfo(): StatefulOperatorStateInfo

nextStatefulOperationStateInfo creates a new StatefulOperatorStateInfo with checkpointLocation, runId, the next statefulOperatorId and currentBatchId.

Note
All the properties of StatefulOperatorStateInfo are specified when IncrementalExecution is created.
Note
nextStatefulOperationStateInfo is used exclusively when IncrementalExecution is requested to transform a streaming physical plan using state preparation rule.

Creating IncrementalExecution Instance

IncrementalExecution takes the following when created:

IncrementalExecution initializes the internal registries and counters.

shouldRunAnotherBatch Method

shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean

shouldRunAnotherBatch…​FIXME

Note
shouldRunAnotherBatch is used exclusively when MicroBatchExecution is requested to <spark-sql-streaming-MicroBatchExecution.adoc#constructNextBatch, construct the next streaming batch>>.

results matching ""

    No results matching ""