KafkaScheduler is a concrete task scheduler that allows for scheduling tasks using Java’s ScheduledThreadPoolExecutor.

KafkaScheduler uses the requested number of threads that is usually 1 except for KafkaServer that uses background.threads configuration property (default: 10).

KafkaScheduler is created when:

KafkaScheduler is used to create a LogManager and a TransactionCoordinator.

Table 1. KafkaScheduler’s Internal Properties (e.g. Registries and Counters)
Name Description


Java’s ScheduledThreadPoolExecutor that schedules tasks.

Initialized when KafkaScheduler starts up and shut down when KafkaScheduler shuts down.


Java’s AtomicInteger with initial value 0.


Enable INFO, DEBUG or TRACE logging level for kafka.utils.KafkaScheduler logger to see what happens in KafkaScheduler.

Add the following line to config/log4j.properties:


Refer to Logging.

Starting Up — startup Method

startup(): Unit
startup is part of the Scheduler Contract to initialize the scheduler so it is ready to schedule tasks.

When executed, startup prints out the following DEBUG message to the logs:

Initializing task scheduler.

startup requests the ScheduledThreadPoolExecutor to disable executing existing periodic and delayed tasks after the executor has been shutdown.

startup requests the ScheduledThreadPoolExecutor to use a custom thread factory that creates a new KafkaThread with the threadNamePrefix followed by the schedulerThreadId whenever requested for a new thread (e.g. kafka-scheduler-0).

startup throws an IllegalStateException when the KafkaScheduler has already been started:

This scheduler has already been started!

shutdown Method

shutdown(): Unit
shutdown is part of the Scheduler Contract to…​FIXME.


ensureRunning Internal Method

ensureRunning(): Unit


ensureRunning is used when…​FIXME

Scheduling Tasks — schedule Method

def schedule(
  name: String,
  fun: () => Unit,
  delay: Long = 0,
  period: Long = -1,
  unit: TimeUnit = TimeUnit.MILLISECONDS)
schedule is part of the Scheduler Contract to schedule a task.

When schedule is executed, you should see the following DEBUG message in the logs:

DEBUG Scheduling task [name] with initial delay [delay] ms and period [period] ms. (kafka.utils.KafkaScheduler)
schedule uses Java’s java.util.concurrent.TimeUnit to convert delay and period to milliseconds.

schedule first makes sure that KafkaScheduler is running (which simply means that the internal executor has been initialized).

schedule creates an execution thread for the input fun.

For positive period, schedule schedules the thread every period after the initial delay. Otherwise, schedule schedules the thread once.

schedule uses the internal executor to schedule fun using ScheduledThreadPoolExecutor.scheduleAtFixedRate and ScheduledThreadPoolExecutor.schedule for periodic and one-off executions, respectively.

Whenever the thread is executed, and before fun gets triggerred, you should see the following TRACE message in the logs:

Beginning execution of scheduled task '[name]'.

After the execution thread is finished, you should see the following TRACE message in the logs:

Completed execution of scheduled task '[name]'.

In case of any exceptions, the execution thread catches them and you should see the following ERROR message in the logs:

Uncaught exception in scheduled task '[name]'

Creating KafkaScheduler Instance

KafkaScheduler takes the following when created:

  • Number of threads

  • Thread name prefix (default: kafka-scheduler-)

  • daemon flag (default: true)

KafkaScheduler initializes the internal registries and counters.

scheduleOnce Method

scheduleOnce(name: String, fun: () => Unit): Unit


scheduleOnce is used when…​FIXME

resizeThreadPool Method

resizeThreadPool(newSize: Int): Unit


resizeThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure.

results matching ""

    No results matching ""