Trigger Continuous(long intervalMs)
Trigger Continuous(long interval, TimeUnit timeUnit)
Trigger Continuous(Duration interval)
Trigger Continuous(String interval)
Trigger — How Frequently to Check Sources For New Data
Trigger
defines how often a streaming query should be executed (triggered) and emit a new data (which StreamExecution
uses to resolve a TriggerExecutor).
Trigger | Creating Instance |
---|---|
|
|
|
|
|
|
Note
|
You specify the trigger for a streaming query using DataStreamWriter 's trigger method.
|
import org.apache.spark.sql.streaming.Trigger
val query = spark.
readStream.
format("rate").
load.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.Once). // <-- execute once and stop
queryName("rate-once").
start
assert(query.isActive == false)
scala> println(query.lastProgress)
{
"id" : "2ae4b0a4-434f-4ca7-a523-4e859c07175b",
"runId" : "24039ce5-906c-4f90-b6e7-bbb3ec38a1f5",
"name" : "rate-once",
"timestamp" : "2017-07-04T18:39:35.998Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 1365,
"getBatch" : 29,
"getOffset" : 0,
"queryPlanning" : 285,
"triggerExecution" : 1742,
"walCommit" : 40
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
"startOffset" : null,
"endOffset" : 0,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@7dbf277"
}
}
Note
|
Although Trigger allows for custom implementations, StreamExecution refuses such attempts and reports an IllegalStateException .
|
import org.apache.spark.sql.streaming.Trigger
case object MyTrigger extends Trigger
scala> val sq = spark
.readStream
.format("rate")
.load
.writeStream
.format("console")
.trigger(MyTrigger) // <-- use custom trigger
.queryName("rate-custom-trigger")
.start
java.lang.IllegalStateException: Unknown type of trigger: MyTrigger
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:60)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:275)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
... 57 elided
Note
|
Trigger was introduced in the commit for [SPARK-14176][SQL] Add DataFrameWriter.trigger to set the stream batch period.
|
Examples of ProcessingTime
ProcessingTime
is a Trigger
that assumes that milliseconds is the minimum time unit.
You can create an instance of ProcessingTime
using the following constructors:
-
ProcessingTime(Long)
that accepts non-negative values that represent milliseconds.ProcessingTime(10)
-
ProcessingTime(interval: String)
orProcessingTime.create(interval: String)
that acceptCalendarInterval
instances with or without leadinginterval
string.ProcessingTime("10 milliseconds") ProcessingTime("interval 10 milliseconds")
-
ProcessingTime(Duration)
that acceptsscala.concurrent.duration.Duration
instances.ProcessingTime(10.seconds)
-
ProcessingTime.create(interval: Long, unit: TimeUnit)
forLong
andjava.util.concurrent.TimeUnit
instances.ProcessingTime.create(10, TimeUnit.SECONDS)