class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
RecurringTimer
RecurringTimer
(aka timer) is a private[streaming]
class that uses a single daemon thread prefixed RecurringTimer - [name]
that, once started, executes callback
in a loop every period
time (until it is stopped).
The wait time is achieved by Clock.waitTillTime
(that makes testing easier).
Tip
|
Enable Add the following line to
Refer to Logging. |
When RecurringTimer
triggers an action for a period
, you should see the following DEBUG message in the logs:
DEBUG RecurringTimer: Callback for [name] called at time [prevTime]
Start and Restart Times
getStartTime(): Long
getRestartTime(originalStartTime: Long): Long
getStartTime
and getRestartTime
are helper methods that calculate time.
getStartTime
calculates a time that is a multiple of the timer’s period
and is right after the current system time.
Note
|
getStartTime is used when JobGenerator is started.
|
getRestartTime
is similar to getStartTime
but includes originalStartTime
input parameter, i.e. it calculates a time as getStartTime
but shifts the result to accommodate the time gap since originalStartTime
.
Note
|
getRestartTime is used when JobGenerator is restarted.
|
Starting Timer
start(startTime: Long): Long
start(): Long (1)
-
Uses the internal getStartTime method to calculate
startTime
and callsstart(startTime: Long)
.
You can start a RecurringTimer
using start
methods.
Note
|
start() method uses the internal getStartTime method to calculate startTime and calls start(startTime: Long) .
|
When start
is called, it sets the internal nextTime
to the given input parameter startTime
and starts the internal daemon thread. This is the moment when the clock starts ticking…
You should see the following INFO message in the logs:
INFO RecurringTimer: Started timer for [name] at time [nextTime]
Stopping Timer
stop(interruptTimer: Boolean): Long
A timer is stopped using stop
method.
Note
|
It is called when JobGenerator stops. |
When called, you should see the following INFO message in the logs:
INFO RecurringTimer: Stopped timer for [name] after time [prevTime]
stop
method uses the internal stopped
flag to mark the stopped state and returns the last period
for which it was successfully executed (tracked as prevTime
internally).
Note
|
Before it fully terminates, it triggers callback one more/last time, i.e. callback is executed for a period after RecurringTimer has been (marked) stopped.
|
Fun Fact
You can execute org.apache.spark.streaming.util.RecurringTimer
as a command-line standalone application.
$ ./bin/spark-class org.apache.spark.streaming.util.RecurringTimer
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
INFO RecurringTimer: Started timer for Test at time 1453787444000
INFO RecurringTimer: 1453787444000: 1453787444000
DEBUG RecurringTimer: Callback for Test called at time 1453787444000
INFO RecurringTimer: 1453787445005: 1005
DEBUG RecurringTimer: Callback for Test called at time 1453787445000
INFO RecurringTimer: 1453787446004: 999
DEBUG RecurringTimer: Callback for Test called at time 1453787446000
INFO RecurringTimer: 1453787447005: 1001
DEBUG RecurringTimer: Callback for Test called at time 1453787447000
INFO RecurringTimer: 1453787448000: 995
DEBUG RecurringTimer: Callback for Test called at time 1453787448000
^C
INFO ShutdownHookManager: Shutdown hook called
INFO ShutdownHookManager: Deleting directory /private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/spark-71dbd43d-2db3-4527-adb8-f1174d799b0d/repl-a6b9bf12-fec2-4004-9236-3b0ab772cc94
INFO ShutdownHookManager: Deleting directory /private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/spark-71dbd43d-2db3-4527-adb8-f1174d799b0d