log4j.logger.kafka.utils.KafkaScheduler=TRACE
KafkaScheduler
KafkaScheduler
is a concrete task scheduler that allows for scheduling tasks using Java’s ScheduledThreadPoolExecutor.
KafkaScheduler
uses the requested number of threads that is usually 1
except for KafkaServer
that uses background.threads configuration property (default: 10
).
KafkaScheduler
is created when:
-
KafkaController
is created (and initializes kafkaScheduler and tokenCleanScheduler) -
GroupMetadataManager
is created -
KafkaServer
is requested to start up -
ZooKeeperClient
is created
KafkaScheduler
is used to create a LogManager and a TransactionCoordinator.
Name | Description |
---|---|
|
Java’s ScheduledThreadPoolExecutor that schedules tasks. Initialized when |
|
Java’s AtomicInteger with initial value 0. |
Tip
|
Enable Add the following line to Refer to Logging. |
Starting Up — startup
Method
startup(): Unit
Note
|
startup is part of the Scheduler Contract to initialize the scheduler so it is ready to schedule tasks.
|
When executed, startup
prints out the following DEBUG message to the logs:
Initializing task scheduler.
startup
creates a ScheduledThreadPoolExecutor with the requested number of threads.
startup
requests the ScheduledThreadPoolExecutor to disable executing existing periodic and delayed tasks after the executor has been shutdown.
startup
requests the ScheduledThreadPoolExecutor to use a custom thread factory that creates a new KafkaThread
with the threadNamePrefix followed by the schedulerThreadId whenever requested for a new thread (e.g. kafka-scheduler-0
).
startup
throws an IllegalStateException
when the KafkaScheduler
has already been started:
This scheduler has already been started!
shutdown
Method
shutdown(): Unit
Note
|
shutdown is part of the Scheduler Contract to…FIXME.
|
shutdown
…FIXME
ensureRunning
Internal Method
ensureRunning(): Unit
ensureRunning
…FIXME
Note
|
ensureRunning is used when…FIXME
|
Scheduling Tasks — schedule
Method
def schedule(
name: String,
fun: () => Unit,
delay: Long = 0,
period: Long = -1,
unit: TimeUnit = TimeUnit.MILLISECONDS)
Note
|
schedule is part of the Scheduler Contract to schedule a task.
|
When schedule
is executed, you should see the following DEBUG message in the logs:
DEBUG Scheduling task [name] with initial delay [delay] ms and period [period] ms. (kafka.utils.KafkaScheduler)
Note
|
schedule uses Java’s java.util.concurrent.TimeUnit to convert delay and period to milliseconds.
|
schedule
first makes sure that KafkaScheduler
is running (which simply means that the internal executor has been initialized).
schedule
creates an execution thread for the input fun
.
For positive period
, schedule
schedules the thread every period
after the initial delay
. Otherwise, schedule
schedules the thread once.
Note
|
schedule uses the internal executor to schedule fun using ScheduledThreadPoolExecutor.scheduleAtFixedRate and ScheduledThreadPoolExecutor.schedule for periodic and one-off executions, respectively.
|
Whenever the thread is executed, and before fun
gets triggerred, you should see the following TRACE message in the logs:
Beginning execution of scheduled task '[name]'.
After the execution thread is finished, you should see the following TRACE message in the logs:
Completed execution of scheduled task '[name]'.
In case of any exceptions, the execution thread catches them and you should see the following ERROR message in the logs:
Uncaught exception in scheduled task '[name]'
Creating KafkaScheduler Instance
KafkaScheduler
takes the following when created:
KafkaScheduler
initializes the internal registries and counters.
scheduleOnce
Method
scheduleOnce(name: String, fun: () => Unit): Unit
scheduleOnce
…FIXME
Note
|
scheduleOnce is used when…FIXME
|
resizeThreadPool
Method
resizeThreadPool(newSize: Int): Unit
resizeThreadPool
…FIXME
Note
|
resizeThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure.
|