Streaming Operators — High-Level Declarative Streaming Dataset API

Dataset API comes with a set of operators that are of particular use in Spark Structured Streaming that together constitute so-called High-Level Declarative Streaming Dataset API.

Table 1. Streaming Operators
Operator Description

dropDuplicates

dropDuplicates(): Dataset[T]
dropDuplicates(colNames: Seq[String]): Dataset[T]
dropDuplicates(col1: String, cols: String*): Dataset[T]

Drops duplicate records (given a subset of columns)

explain

explain(): Unit
explain(extended: Boolean): Unit

Explains query plans

groupBy

groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String, cols: String*): RelationalGroupedDataset

Aggregates rows by zero, one or more columns

groupByKey

groupByKey(func: T => K): KeyValueGroupedDataset[K, T]

Aggregates rows by a typed grouping function (and gives a KeyValueGroupedDataset)

withWatermark

withWatermark(
  eventTime: String,
  delayThreshold: String): Dataset[T]

Defines a streaming (event-time) watermark for late events (on the given eventTime column with a delay threshold)

writeStream

writeStream: DataStreamWriter[T]

Creates a DataStreamWriter for persisting the result of a streaming query to an external data system

val rates = spark
  .readStream
  .format("rate")
  .option("rowsPerSecond", 1)
  .load

// stream processing
// replace [operator] with the operator of your choice
rates.[operator]

// output stream
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val sq = rates
  .writeStream
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.ProcessingTime(10.seconds))
  .outputMode(OutputMode.Complete)
  .queryName("rate-console")
  .start

// eventually...
sq.stop

results matching ""

    No results matching ""