log4j.logger.kafka.utils.KafkaScheduler=TRACE
KafkaScheduler
KafkaScheduler is a concrete task scheduler that allows for scheduling tasks using Java’s ScheduledThreadPoolExecutor.
KafkaScheduler uses the requested number of threads that is usually 1 except for KafkaServer that uses background.threads configuration property (default: 10).
KafkaScheduler is created when:
-
KafkaControlleris created (and initializes kafkaScheduler and tokenCleanScheduler) -
GroupMetadataManageris created -
KafkaServeris requested to start up -
ZooKeeperClientis created
KafkaScheduler is used to create a LogManager and a TransactionCoordinator.
| Name | Description |
|---|---|
|
Java’s ScheduledThreadPoolExecutor that schedules tasks. Initialized when |
|
Java’s AtomicInteger with initial value 0. |
|
Tip
|
Enable Add the following line to Refer to Logging. |
Starting Up — startup Method
startup(): Unit
|
Note
|
startup is part of the Scheduler Contract to initialize the scheduler so it is ready to schedule tasks.
|
When executed, startup prints out the following DEBUG message to the logs:
Initializing task scheduler.
startup creates a ScheduledThreadPoolExecutor with the requested number of threads.
startup requests the ScheduledThreadPoolExecutor to disable executing existing periodic and delayed tasks after the executor has been shutdown.
startup requests the ScheduledThreadPoolExecutor to use a custom thread factory that creates a new KafkaThread with the threadNamePrefix followed by the schedulerThreadId whenever requested for a new thread (e.g. kafka-scheduler-0).
startup throws an IllegalStateException when the KafkaScheduler has already been started:
This scheduler has already been started!
shutdown Method
shutdown(): Unit
|
Note
|
shutdown is part of the Scheduler Contract to…FIXME.
|
shutdown…FIXME
ensureRunning Internal Method
ensureRunning(): Unit
ensureRunning…FIXME
|
Note
|
ensureRunning is used when…FIXME
|
Scheduling Tasks — schedule Method
def schedule(
name: String,
fun: () => Unit,
delay: Long = 0,
period: Long = -1,
unit: TimeUnit = TimeUnit.MILLISECONDS)
|
Note
|
schedule is part of the Scheduler Contract to schedule a task.
|
When schedule is executed, you should see the following DEBUG message in the logs:
DEBUG Scheduling task [name] with initial delay [delay] ms and period [period] ms. (kafka.utils.KafkaScheduler)
|
Note
|
schedule uses Java’s java.util.concurrent.TimeUnit to convert delay and period to milliseconds.
|
schedule first makes sure that KafkaScheduler is running (which simply means that the internal executor has been initialized).
schedule creates an execution thread for the input fun.
For positive period, schedule schedules the thread every period after the initial delay. Otherwise, schedule schedules the thread once.
|
Note
|
schedule uses the internal executor to schedule fun using ScheduledThreadPoolExecutor.scheduleAtFixedRate and ScheduledThreadPoolExecutor.schedule for periodic and one-off executions, respectively.
|
Whenever the thread is executed, and before fun gets triggerred, you should see the following TRACE message in the logs:
Beginning execution of scheduled task '[name]'.
After the execution thread is finished, you should see the following TRACE message in the logs:
Completed execution of scheduled task '[name]'.
In case of any exceptions, the execution thread catches them and you should see the following ERROR message in the logs:
Uncaught exception in scheduled task '[name]'
Creating KafkaScheduler Instance
KafkaScheduler takes the following when created:
KafkaScheduler initializes the internal registries and counters.
scheduleOnce Method
scheduleOnce(name: String, fun: () => Unit): Unit
scheduleOnce…FIXME
|
Note
|
scheduleOnce is used when…FIXME
|
resizeThreadPool Method
resizeThreadPool(newSize: Int): Unit
resizeThreadPool…FIXME
|
Note
|
resizeThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure.
|