Streaming Operators — High-Level Declarative Streaming Dataset API

Dataset API comes with a set of operators that are of particular use in Spark Structured Streaming that together constitute so-called High-Level Declarative Streaming Dataset API.

Table 1. Streaming Operators
Operator Description

crossJoin

crossJoin(
  right: Dataset[_]): DataFrame

dropDuplicates

dropDuplicates(): Dataset[T]
dropDuplicates(colNames: Seq[String]): Dataset[T]
dropDuplicates(col1: String, cols: String*): Dataset[T]

Drops duplicate records (given a subset of columns)

explain

explain(): Unit
explain(extended: Boolean): Unit

Explains query plans

groupBy

groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String, cols: String*): RelationalGroupedDataset

Aggregates rows by zero, one or more columns

groupByKey

groupByKey(func: T => K): KeyValueGroupedDataset[K, T]

Aggregates rows by a typed grouping function (and gives a KeyValueGroupedDataset)

join

join(
  right: Dataset[_]): DataFrame
join(
  right: Dataset[_],
  joinExprs: Column): DataFrame
join(
  right: Dataset[_],
  joinExprs: Column,
  joinType: String): DataFrame
join(
  right: Dataset[_],
  usingColumns: Seq[String]): DataFrame
join(
  right: Dataset[_],
  usingColumns: Seq[String],
  joinType: String): DataFrame
join(
  right: Dataset[_],
  usingColumn: String): DataFrame

joinWith

joinWith[U](
  other: Dataset[U],
  condition: Column): Dataset[(T, U)]
joinWith[U](
  other: Dataset[U],
  condition: Column,
  joinType: String): Dataset[(T, U)]

withWatermark

withWatermark(
  eventTime: String,
  delayThreshold: String): Dataset[T]

Defines a streaming watermark (on the given eventTime column with a delay threshold)

writeStream

writeStream: DataStreamWriter[T]

Creates a DataStreamWriter for persisting the result of a streaming query to an external data system

val rates = spark
  .readStream
  .format("rate")
  .option("rowsPerSecond", 1)
  .load

// stream processing
// replace [operator] with the operator of your choice
rates.[operator]

// output stream
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val sq = rates
  .writeStream
  .format("console")
  .option("truncate", false)
  .trigger(Trigger.ProcessingTime(10.seconds))
  .outputMode(OutputMode.Complete)
  .queryName("rate-console")
  .start

// eventually...
sq.stop

results matching ""

    No results matching ""