ControllerEventManager manages ControllerEvents (in a controller event queue).

ControllerEventManager is created exclusively for the KafkaController (when KafkaServer is requested to start up).

When started, ControllerEventManager simply requests the ControllerEventThread to start processing ControllerEvents.

ControllerEventManager is started when KafkaController is started up.

Figure 1. ControllerEventManager is Created and Started With KafkaController
Table 1. ControllerEventManager’s Internal Properties (e.g. Registries and Counters)
Name Description


Controller event queue

queue is a Java java.util.concurrent.LinkedBlockingQueue (i.e. an optionally-bounded blocking queue based on linked nodes that orders elements in first-in-first-out fashion) of ControllerEvents.


The current ControllerState


ControllerEventThread with controller-event-thread thread name

Creating ControllerEventManager Instance

ControllerEventManager takes the following when created:

  • rateAndTimeMetrics collection of ControllerState and KafkaTimer

  • eventProcessedListener Procedure of ControllerEvent

ControllerEventManager initializes the internal registries and counters.

Emitting (Enqueuing) Controller Event — put Method

put(event: ControllerEvent): Unit

put simply inserts the ControllerEvent at the end of the controller event queue.


put is used when:

  • ControllerEventManager is requested to clearAndPut

  • KafkaController is requested to do its operation and emits various events

Starting ControllerEventManager (and ControllerEventThread) — start Method

start(): Unit
ControllerEventThread is a ShutdownableThread that triggers doWork() method when started.
start is used exclusively when KafkaController is requested to start up (when KafkaServer is requested to start).

clearAndPut Method

clearAndPut(event: ControllerEvent): Unit

clearAndPut simply clears the controller event queue and enqueues the controller event.


clearAndPut is used when:

  • ControllerEventManager is requested to close

  • KafkaController is requested to startup (and registers a StateChangeHandler that emits a Expire event when beforeInitializingSession)

Closing Up — close Method

close(): Unit

close simply clears the event queue and emits a ShutdownEventThread event.

In the end, close waits until the shutdown is complete. You should see the following INFO message in the logs:

Shutdown completed
close is used exclusively when KafkaController is requested to shutdown.

results matching ""

    No results matching ""