Demo: groupBy Streaming Aggregation with Append Output Mode

The following example code shows a groupBy streaming aggregation with Append output mode.

Append output mode requires that a streaming aggregation defines a watermark (using withWatermark operator) on at least one of the grouping expressions (directly or using window function).

Note
withWatermark operator has to be used before the aggregation operator (for the watermark to be used).

In Append output mode the current watermark level is used to:

  1. Output saved state rows that became expired (as Expired state in the below events table)

  2. Drop late events, i.e. don’t save them to a state store or include in aggregation (as Late events in the below events table)

Note
Sorting is only supported on streaming aggregated Datasets with Complete output mode.
Table 1. Streaming Batches, Events, Watermark and State Rows
Batch / Events Current Watermark Level [ms] Expired State, Late Events and Saved State Rows
event_time id batch

1

1

1

15

2

1

0

Saved State Rows

event_time id batch

1

1

1

15

2

1

event_time id batch

1

1

2

15

2

2

35

3

2

5000

(Maximum event time 15 minus the delayThreshold as defined using withWatermark operator, i.e. 10)

Expired State

event_time id batch

1

1

1


Late Events

event_time id batch

1

1

2


Saved State Rows

event_time id batch

15

2

1

15

2

2

35

3

2

event_time id batch

15

1

3

15

2

3

20

3

3

26

4

3

25000

(Maximum event time from the previous batch is 35 and 10 seconds of delayThreshold)

Expired State

event_time id batch

15

2

1

15

2

2


Late Events

event_time id batch

15

1

3

15

2

3

20

3

3


Saved State Rows

event_time id batch

35

3

2

26

4

3

event_time id batch

36

1

4

25000

(Maximum event time from the previous batch is 26)

Saved State Rows

event_time id batch

35

3

2

26

4

3

36

1

4

event_time id batch

50

1

5

26000

(Maximum event time from the previous batch is 36)

Expired State

event_time id batch

26

4

3


Saved State Rows

event_time id batch

35

3

2

36

1

4

50

1

5

Note
Event time watermark may advance based on the maximum event time from the previous events (from the previous batch exactly as the level advances every trigger so the earlier levels are already counted in).
Note
Event time watermark can only change when the maximum event time is bigger than the current watermark minus the delayThreshold (as defined using withWatermark operator).
Tip

Use the following to publish events to Kafka.

// 1st streaming batch
$ cat /tmp/1
1,1,1
15,2,1

$ kafkacat -P -b localhost:9092 -t topic1 -l /tmp/1

// Alternatively (and slower due to JVM bootup)
$ cat /tmp/1 | ./bin/kafka-console-producer.sh --topic topic1 --broker-list localhost:9092
/**
 * Reading datasets with records from a Kafka topic
 */
/**
TIP (only when working with SNAPSHOT version)
Remove the SNAPSHOT package from the local cache
rm -rf \
  ~/.ivy2/cache/org.apache.spark \
  ~/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.0-SNAPSHOT.jar
*/

/**
TIP: Start spark-shell with spark-sql-kafka-0-10 package
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0-SNAPSHOT
*/

/**
TIP: Copy the following code to append.txt and use :load command in spark-shell to load it
:load append.txt
*/

// START: Only for easier debugging
// The state is then only for one partition
// which should make monitoring it easier
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1)
scala> spark.sessionState.conf.numShufflePartitions
res1: Int = 1
// END: Only for easier debugging

// Use streaming aggregation with groupBy operator to have StateStoreSaveExec operator
// Since the demo uses Append output mode
// it has to define a streaming event time watermark using withWatermark operator
// UnsupportedOperationChecker makes sure that the requirement holds
val idsPerBatch = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  withColumn("tokens", split('value, ",")).
  withColumn("seconds", 'tokens(0) cast "long").
  withColumn("event_time", to_timestamp(from_unixtime('seconds))). // <-- Event time has to be a timestamp
  withColumn("id", 'tokens(1)).
  withColumn("batch", 'tokens(2) cast "int").
  withWatermark(eventTime = "event_time", delayThreshold = "10 seconds"). // <-- define watermark (before groupBy!)
  groupBy($"event_time"). // <-- use event_time for grouping
  agg(collect_list("batch") as "batches", collect_list("id") as "ids").
  withColumn("event_time", to_timestamp($"event_time")) // <-- convert to human-readable date

// idsPerBatch is a streaming Dataset with just one Kafka source
// so it knows nothing about output mode or the current streaming watermark yet
// - Output mode is defined on writing side
// - streaming watermark is read from rows at runtime
// That's why StatefulOperatorStateInfo is generic (and uses the default Append for output mode)
// and no batch-specific values are printed out
// They will be available right after the first streaming batch
// Use explain on a streaming query to know the trigger-specific values
scala> idsPerBatch.explain
== Physical Plan ==
*Project [event_time#36-T10000ms AS event_time#97, batches#90, ids#92]
+- ObjectHashAggregate(keys=[event_time#36-T10000ms], functions=[collect_list(batch#61, 0, 0), collect_list(id#48, 0, 0)])
   +- Exchange hashpartitioning(event_time#36-T10000ms, 1)
      +- StateStoreSave [event_time#36-T10000ms], StatefulOperatorStateInfo(<unknown>,7c5641eb-8ff9-447b-b9ba-b347c057d08f,0,0), Append, 0
         +- ObjectHashAggregate(keys=[event_time#36-T10000ms], functions=[merge_collect_list(batch#61, 0, 0), merge_collect_list(id#48, 0, 0)])
            +- Exchange hashpartitioning(event_time#36-T10000ms, 1)
               +- StateStoreRestore [event_time#36-T10000ms], StatefulOperatorStateInfo(<unknown>,7c5641eb-8ff9-447b-b9ba-b347c057d08f,0,0)
                  +- ObjectHashAggregate(keys=[event_time#36-T10000ms], functions=[merge_collect_list(batch#61, 0, 0), merge_collect_list(id#48, 0, 0)])
                     +- Exchange hashpartitioning(event_time#36-T10000ms, 1)
                        +- ObjectHashAggregate(keys=[event_time#36-T10000ms], functions=[partial_collect_list(batch#61, 0, 0), partial_collect_list(id#48, 0, 0)])
                           +- EventTimeWatermark event_time#36: timestamp, interval 10 seconds
                              +- *Project [cast(from_unixtime(cast(split(cast(value#1 as string), ,)[0] as bigint), yyyy-MM-dd HH:mm:ss, Some(Europe/Berlin)) as timestamp) AS event_time#36, split(cast(value#1 as string), ,)[1] AS id#48, cast(split(cast(value#1 as string), ,)[2] as int) AS batch#61]
                                 +- StreamingRelation kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

// Start the query and hence StateStoreSaveExec
// Note Append output mode
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = idsPerBatch.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(5.seconds)).
  outputMode(OutputMode.Append). // <-- Append output mode
  start

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+-------+---+
|event_time|batches|ids|
+----------+-------+---+
+----------+-------+---+

// there's only 1 stateful operator and hence 0 for the index in stateOperators
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
  "numRowsTotal" : 0,
  "numRowsUpdated" : 0,
  "memoryUsedBytes" : 77
}

// Current watermark
// We've just started so it's the default start time
scala> println(sq.lastProgress.eventTime.get("watermark"))
1970-01-01T00:00:00.000Z

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------+---+
|event_time|batches|ids|
+----------+-------+---+
+----------+-------+---+

// it's Append output mode so numRowsTotal is...FIXME
// no keys were available earlier (it's just started!) and so numRowsUpdated is 0
scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
  "numRowsTotal" : 2,
  "numRowsUpdated" : 2,
  "memoryUsedBytes" : 669
}

// Current watermark
// One streaming batch has passed so it's still the default start time
// that will get changed the next streaming batch
// watermark is always one batch behind
scala> println(sq.lastProgress.eventTime.get("watermark"))
1970-01-01T00:00:00.000Z

// Could be 0 if the time to update the lastProgress is short
// FIXME Explain it in detail
scala> println(sq.lastProgress.numInputRows)
2

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+-------+---+
|event_time         |batches|ids|
+-------------------+-------+---+
|1970-01-01 01:00:01|[1]    |[1]|
+-------------------+-------+---+

scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
  "numRowsTotal" : 2,
  "numRowsUpdated" : 2,
  "memoryUsedBytes" : 701
}

// Current watermark
// Updated and so the output with the final aggregation (aka expired state)
scala> println(sq.lastProgress.eventTime.get("watermark"))
1970-01-01T00:00:05.000Z

scala> println(sq.lastProgress.numInputRows)
3

-------------------------------------------
Batch: 3
-------------------------------------------
+-------------------+-------+------+
|event_time         |batches|ids   |
+-------------------+-------+------+
|1970-01-01 01:00:15|[2, 1] |[2, 2]|
+-------------------+-------+------+

scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
  "numRowsTotal" : 2,
  "numRowsUpdated" : 1,
  "memoryUsedBytes" : 685
}

// Current watermark
// Updated and so the output with the final aggregation (aka expired state)
scala> println(sq.lastProgress.eventTime.get("watermark"))
1970-01-01T00:00:25.000Z

scala> println(sq.lastProgress.numInputRows)
4

-------------------------------------------
Batch: 4
-------------------------------------------
+----------+-------+---+
|event_time|batches|ids|
+----------+-------+---+
+----------+-------+---+

scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
  "numRowsTotal" : 3,
  "numRowsUpdated" : 1,
  "memoryUsedBytes" : 965
}

scala> println(sq.lastProgress.eventTime.get("watermark"))
1970-01-01T00:00:25.000Z

scala> println(sq.lastProgress.numInputRows)
1

// publish new records
// See the events table above

-------------------------------------------
Batch: 5
-------------------------------------------
+-------------------+-------+---+
|event_time         |batches|ids|
+-------------------+-------+---+
|1970-01-01 01:00:26|[3]    |[4]|
+-------------------+-------+---+

scala> println(sq.lastProgress.stateOperators(0).prettyJson)
{
  "numRowsTotal" : 3,
  "numRowsUpdated" : 1,
  "memoryUsedBytes" : 997
}

// Current watermark
// Updated and so the output with the final aggregation (aka expired state)
scala> println(sq.lastProgress.eventTime.get("watermark"))
1970-01-01T00:00:26.000Z

scala> println(sq.lastProgress.numInputRows)
1

// In the end...
sq.stop

results matching ""

    No results matching ""