// input data from a data source
// it's rate data source
// but that does not really matter
// We need a streaming Dataset
val input = spark
.readStream
.format("rate")
.load
// Streaming aggregation with groupBy
val counts = input
.groupBy($"value" % 2)
.count
counts.explain(extended = true)
/**
== Parsed Logical Plan ==
'Aggregate [('value % 2)], [('value % 2) AS (value % 2)#23, count(1) AS count#22L]
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@7879348, rate, [timestamp#15, value#16L]
== Analyzed Logical Plan ==
(value % 2): bigint, count: bigint
Aggregate [(value#16L % cast(2 as bigint))], [(value#16L % cast(2 as bigint)) AS (value % 2)#23L, count(1) AS count#22L]
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@7879348, rate, [timestamp#15, value#16L]
== Optimized Logical Plan ==
Aggregate [(value#16L % 2)], [(value#16L % 2) AS (value % 2)#23L, count(1) AS count#22L]
+- Project [value#16L]
+- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@7879348, rate, [timestamp#15, value#16L]
== Physical Plan ==
*(4) HashAggregate(keys=[(value#16L % 2)#27L], functions=[count(1)], output=[(value % 2)#23L, count#22L])
+- StateStoreSave [(value#16L % 2)#27L], state info [ checkpoint = <unknown>, runId = 8c0ae2be-5eaa-4038-bc29-a176abfaf885, opId = 0, ver = 0, numPartitions = 200], Append, 0, 2
+- *(3) HashAggregate(keys=[(value#16L % 2)#27L], functions=[merge_count(1)], output=[(value#16L % 2)#27L, count#29L])
+- StateStoreRestore [(value#16L % 2)#27L], state info [ checkpoint = <unknown>, runId = 8c0ae2be-5eaa-4038-bc29-a176abfaf885, opId = 0, ver = 0, numPartitions = 200], 2
+- *(2) HashAggregate(keys=[(value#16L % 2)#27L], functions=[merge_count(1)], output=[(value#16L % 2)#27L, count#29L])
+- Exchange hashpartitioning((value#16L % 2)#27L, 200)
+- *(1) HashAggregate(keys=[(value#16L % 2) AS (value#16L % 2)#27L], functions=[partial_count(1)], output=[(value#16L % 2)#27L, count#29L])
+- *(1) Project [value#16L]
+- StreamingRelation rate, [timestamp#15, value#16L]
*/
Streaming Aggregation
In Spark Structured Streaming, a streaming aggregation is a streaming query that was described (build) using the following high-level streaming operators:
-
Dataset.groupBy,
Dataset.rollup
,Dataset.cube
(that simply create aRelationalGroupedDataset
) -
Dataset.groupByKey (that simply creates a
KeyValueGroupedDataset
) -
SQL’s
GROUP BY
clause (includingWITH CUBE
andWITH ROLLUP
)
Streaming aggregation belongs to the category of Stateful Stream Processing.
IncrementalExecution — QueryExecution of Streaming Queries
Under the covers, the high-level operators create a logical query plan with one or more Aggregate
logical operators.
Tip
|
Read up on Aggregate logical operator in The Internals of Spark SQL book. |
In Spark Structured Streaming IncrementalExecution is responsible for planning streaming queries for execution.
At query planning, IncrementalExecution
uses the StatefulAggregationStrategy execution planning strategy for planning streaming aggregations (Aggregate
unary logical operators) as pairs of StateStoreRestoreExec and StateStoreSaveExec physical operators.