RecurringTimer

class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)

RecurringTimer (aka timer) is a private[streaming] class that uses a single daemon thread prefixed RecurringTimer - [name] that, once started, executes callback in a loop every period time (until it is stopped).

The wait time is achieved by Clock.waitTillTime (that makes testing easier).

Tip

Enable INFO or DEBUG logging level for org.apache.spark.streaming.util.RecurringTimer logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.streaming.util.RecurringTimer=DEBUG

Refer to Logging.

When RecurringTimer triggers an action for a period, you should see the following DEBUG message in the logs:

DEBUG RecurringTimer: Callback for [name] called at time [prevTime]

Start and Restart Times

getStartTime(): Long
getRestartTime(originalStartTime: Long): Long

getStartTime and getRestartTime are helper methods that calculate time.

getStartTime calculates a time that is a multiple of the timer’s period and is right after the current system time.

Note
getStartTime is used when JobGenerator is started.

getRestartTime is similar to getStartTime but includes originalStartTime input parameter, i.e. it calculates a time as getStartTime but shifts the result to accommodate the time gap since originalStartTime.

Note
getRestartTime is used when JobGenerator is restarted.

Starting Timer

start(startTime: Long): Long
start(): Long (1)
  1. Uses the internal getStartTime method to calculate startTime and calls start(startTime: Long).

You can start a RecurringTimer using start methods.

Note
start() method uses the internal getStartTime method to calculate startTime and calls start(startTime: Long).

When start is called, it sets the internal nextTime to the given input parameter startTime and starts the internal daemon thread. This is the moment when the clock starts ticking…​

You should see the following INFO message in the logs:

INFO RecurringTimer: Started timer for [name] at time [nextTime]

Stopping Timer

stop(interruptTimer: Boolean): Long

A timer is stopped using stop method.

Note
It is called when JobGenerator stops.

When called, you should see the following INFO message in the logs:

INFO RecurringTimer: Stopped timer for [name] after time [prevTime]

stop method uses the internal stopped flag to mark the stopped state and returns the last period for which it was successfully executed (tracked as prevTime internally).

Note
Before it fully terminates, it triggers callback one more/last time, i.e. callback is executed for a period after RecurringTimer has been (marked) stopped.

Fun Fact

You can execute org.apache.spark.streaming.util.RecurringTimer as a command-line standalone application.

$ ./bin/spark-class org.apache.spark.streaming.util.RecurringTimer
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
INFO RecurringTimer: Started timer for Test at time 1453787444000
INFO RecurringTimer: 1453787444000: 1453787444000
DEBUG RecurringTimer: Callback for Test called at time 1453787444000
INFO RecurringTimer: 1453787445005: 1005
DEBUG RecurringTimer: Callback for Test called at time 1453787445000
INFO RecurringTimer: 1453787446004: 999
DEBUG RecurringTimer: Callback for Test called at time 1453787446000
INFO RecurringTimer: 1453787447005: 1001
DEBUG RecurringTimer: Callback for Test called at time 1453787447000
INFO RecurringTimer: 1453787448000: 995
DEBUG RecurringTimer: Callback for Test called at time 1453787448000
^C
INFO ShutdownHookManager: Shutdown hook called
INFO ShutdownHookManager: Deleting directory /private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/spark-71dbd43d-2db3-4527-adb8-f1174d799b0d/repl-a6b9bf12-fec2-4004-9236-3b0ab772cc94
INFO ShutdownHookManager: Deleting directory /private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/spark-71dbd43d-2db3-4527-adb8-f1174d799b0d

results matching ""

    No results matching ""