Trigger — How Frequently to Check Sources For New Data

Trigger defines how often a streaming query should be executed (triggered) and emit a new data (which StreamExecution uses to resolve a TriggerExecutor).

Table 1. Trigger’s Factory Methods
Trigger Creating Instance

ContinuousTrigger

Trigger Continuous(long intervalMs)
Trigger Continuous(long interval, TimeUnit timeUnit)
Trigger Continuous(Duration interval)
Trigger Continuous(String interval)

OneTimeTrigger

Trigger Once()

ProcessingTime

Trigger ProcessingTime(Duration interval)
Trigger ProcessingTime(long intervalMs)
Trigger ProcessingTime(long interval, TimeUnit timeUnit)
Trigger ProcessingTime(String interval)
Note
You specify the trigger for a streaming query using DataStreamWriter's trigger method.
import org.apache.spark.sql.streaming.Trigger
val query = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.Once). // <-- execute once and stop
  queryName("rate-once").
  start

assert(query.isActive == false)

scala> println(query.lastProgress)
{
  "id" : "2ae4b0a4-434f-4ca7-a523-4e859c07175b",
  "runId" : "24039ce5-906c-4f90-b6e7-bbb3ec38a1f5",
  "name" : "rate-once",
  "timestamp" : "2017-07-04T18:39:35.998Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 1365,
    "getBatch" : 29,
    "getOffset" : 0,
    "queryPlanning" : 285,
    "triggerExecution" : 1742,
    "walCommit" : 40
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
    "startOffset" : null,
    "endOffset" : 0,
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "[email protected]"
  }
}
Note
Although Trigger allows for custom implementations, StreamExecution refuses such attempts and reports an IllegalStateException.
import org.apache.spark.sql.streaming.Trigger
case object MyTrigger extends Trigger
scala> val sq = spark
  .readStream
  .format("rate")
  .load
  .writeStream
  .format("console")
  .trigger(MyTrigger) // <-- use custom trigger
  .queryName("rate-custom-trigger")
  .start
java.lang.IllegalStateException: Unknown type of trigger: MyTrigger
  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:60)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:275)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
  ... 57 elided

Examples of ProcessingTime

ProcessingTime is a Trigger that assumes that milliseconds is the minimum time unit.

You can create an instance of ProcessingTime using the following constructors:

  • ProcessingTime(Long) that accepts non-negative values that represent milliseconds.

    ProcessingTime(10)
  • ProcessingTime(interval: String) or ProcessingTime.create(interval: String) that accept CalendarInterval instances with or without leading interval string.

    ProcessingTime("10 milliseconds")
    ProcessingTime("interval 10 milliseconds")
  • ProcessingTime(Duration) that accepts scala.concurrent.duration.Duration instances.

    ProcessingTime(10.seconds)
  • ProcessingTime.create(interval: Long, unit: TimeUnit) for Long and java.util.concurrent.TimeUnit instances.

    ProcessingTime.create(10, TimeUnit.SECONDS)

results matching ""

    No results matching ""