KafkaController

KafkaController is a Kafka service responsible for:

KafkaController uses listeners as a notification system to monitor znodes in Zookeeper and react accordingly.

In a Kafka cluster, one of the brokers serves as the controller, which is responsible for managing the states of partitions and replicas and for performing administrative tasks like reassigning partitions.

KafkaController is created and immediately started when KafkaServer starts up.

KafkaController.png
Figure 1. KafkaController

KafkaController is part of every Kafka broker, but only one KafkaController is active at all times.

KafkaController emulates a state machine using controller events.

Table 1. KafkaController’s Controller Events
Event ControllerState process Handler

BrokerChange

BrokerChange

ControllerChange

  • newControllerId: Int

ControllerChange

  1. Assigns the current controller ID as the input newControllerId

  2. (only when the broker is no longer an active controller) Resigns as the active controller

Note
Similar to Reelect event with the only difference that it does not trigger election

ControlledShutdown

  • ID

  • controlledShutdownCallback: Try[Set[TopicAndPartition]] ⇒ Unit

ControlledShutdown

Reelect

ControllerChange

Startup

ControllerChange

logIdent is [Controller id=[brokerId]].

Table 2. KafkaController’s Internal Properties (e.g. Registries and Counters)
Name Description

activeControllerId

The ID of the active KafkaController

  • Initialized to -1

controllerContext

eventManager

ControllerEventManager for controllerContext.stats.rateAndTimeMetrics and updateMetrics listener

kafkaScheduler

KafkaScheduler with 1 daemon thread with kafka-scheduler prefix

partitionStateMachine

PartitionStateMachine

replicaStateMachine

ReplicaStateMachine

stateChangeLogger

StateChangeLogger with the broker ID and inControllerContext flag enabled

tokenCleanScheduler

KafkaScheduler with 1 daemon thread with delegation-token-cleaner prefix

topicDeletionManager

TopicDeletionManager

Table 3. KafkaController’s Listeners
Listener Description

brokerChangeListener

BrokerChangeListener for this KafkaController and eventManager

isrChangeNotificationListener

IsrChangeNotificationListener for this KafkaController and eventManager

Registered in registerIsrChangeNotificationListener when KafkaController does onControllerFailover.

De-registered in deregisterIsrChangeNotificationListener when KafkaController resigns as the active controller.

logDirEventNotificationListener

LogDirEventNotificationListener

partitionModificationsListeners

PartitionModificationsListener by name

partitionReassignmentListener

PartitionReassignmentListener for this KafkaController and ControllerEventManager

preferredReplicaElectionListener

PreferredReplicaElectionListener for this KafkaController and ControllerEventManager

topicDeletionListener

TopicDeletionListener (for this KafkaController and ControllerEventManager)

Registered in registerTopicDeletionListener when KafkaController does onControllerFailover.

De-registered in deregisterTopicDeletionListener when KafkaController resigns as the active controller.

Tip

Enable WARN, INFO or DEBUG logging levels for kafka.controller.KafkaController logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.controller.KafkaController=DEBUG

Refer to Logging.

initiateReassignReplicasForTopicPartition Method

initiateReassignReplicasForTopicPartition

initiateReassignReplicasForTopicPartition…​FIXME

Note
initiateReassignReplicasForTopicPartition is used when…​FIXME

deregisterPartitionReassignmentIsrChangeListeners Method

deregisterPartitionReassignmentIsrChangeListeners

deregisterPartitionReassignmentIsrChangeListeners…​FIXME

Note
deregisterPartitionReassignmentIsrChangeListeners is used when…​FIXME

resetControllerContext Method

resetControllerContext

resetControllerContext…​FIXME

Note
resetControllerContext is used when…​FIXME

deregisterBrokerChangeListener Method

deregisterBrokerChangeListener

deregisterBrokerChangeListener…​FIXME

Note
deregisterBrokerChangeListener is used when…​FIXME

deregisterTopicChangeListener Method

deregisterTopicChangeListener

deregisterTopicChangeListener…​FIXME

Note
deregisterTopicChangeListener is used when…​FIXME

Resigning As Active Controller — onControllerResignation Method

onControllerResignation(): Unit

onControllerResignation starts by printing out the following DEBUG message to the logs:

Resigning

onControllerResignation unsubscribes from intercepting Zookeeper events for the following znodes in order:

onControllerResignation requests TopicDeletionManager to reset.

onControllerResignation requests KafkaScheduler to shutdown.

onControllerResignation resets the following internal counters:

onControllerResignation requests PartitionStateMachine to shutdown.

onControllerResignation deregisterTopicChangeListener.

onControllerResignation deregisterTopicDeletionListener.

onControllerResignation requests ReplicaStateMachine to shutdown.

onControllerResignation deregisterBrokerChangeListener.

onControllerResignation resetControllerContext.

In the end, onControllerResignation prints out the following DEBUG message to the logs:

Resigned
Note

onControllerResignation is used when:

  1. ControllerEventThread processes ControllerChange and Reelect controller events

  2. triggerControllerMove

  3. KafkaController shuts down

Unsubscribing from Child Changes to /isr_change_notification ZNode — deregisterIsrChangeNotificationListener Internal Method

deregisterIsrChangeNotificationListener(): Unit

deregisterIsrChangeNotificationListener prints out the following DEBUG message to the logs:

De-registering IsrChangeNotificationListener

deregisterIsrChangeNotificationListener requests ZkUtils to unsubscribe from intercepting changes to /isr_change_notification znode with IsrChangeNotificationListener.

Note
deregisterIsrChangeNotificationListener is used exclusively when KafkaController resigns as the active controller.

Unsubscribing from Child Changes to /log_dir_event_notification ZNode — deregisterLogDirEventNotificationListener Internal Method

deregisterLogDirEventNotificationListener(): Unit

deregisterLogDirEventNotificationListener prints out the following DEBUG message to the logs:

De-registering logDirEventNotificationListener

deregisterLogDirEventNotificationListener requests ZkUtils to unsubscribe from intercepting changes to /log_dir_event_notification znode with LogDirEventNotificationListener.

Note
deregisterLogDirEventNotificationListener is used exclusively when KafkaController resigns as the active controller.

Unsubscribing from Data Changes to /admin/preferred_replica_election ZNode — deregisterPreferredReplicaElectionListener Method

deregisterPreferredReplicaElectionListener(): Unit

deregisterPreferredReplicaElectionListener requests ZkUtils to unsubscribe from intercepting data changes to /admin/preferred_replica_election znode with PreferredReplicaElectionListener.

Note
deregisterPreferredReplicaElectionListener is used exclusively when KafkaController resigns as the active controller.

Unsubscribing from Data Changes to /admin/reassign_partitions ZNode — deregisterPartitionReassignmentListener Method

deregisterPartitionReassignmentListener(): Unit

deregisterPartitionReassignmentListener requests ZkUtils to unsubscribe from intercepting data changes to /admin/reassign_partitions znode with PartitionReassignmentListener.

Note
deregisterPartitionReassignmentListener is used exclusively when KafkaController resigns as the active controller.

triggerControllerMove Internal Method

triggerControllerMove(): Unit

triggerControllerMove…​FIXME

Note

triggerControllerMove is used when:

  1. KafkaController handleIllegalState

  2. KafkaController caught an exception while electing or becoming a controller

handleIllegalState Internal Method

handleIllegalState(e: IllegalStateException): Nothing

handleIllegalState…​FIXME

Note
handleIllegalState is used when KafkaController catches a IllegalStateException in updateLeaderEpochAndSendRequest, sendUpdateMetadataRequest and ControlledShutdown event.

sendUpdateMetadataRequest Method

sendUpdateMetadataRequest(): Unit

sendUpdateMetadataRequest…​FIXME

Note

sendUpdateMetadataRequest is used when:

updateLeaderEpochAndSendRequest Internal Method

updateLeaderEpochAndSendRequest(): Unit

updateLeaderEpochAndSendRequest…​FIXME

Note
updateLeaderEpochAndSendRequest is used when…​FIXME

shutdown Method

shutdown(): Unit

shutdown…​FIXME

Note
shutdown is used when…​FIXME

updateMetrics Internal Method

Caution
FIXME

onBrokerStartup Method

onBrokerStartup(newBrokers: Seq[Int]): Unit

onBrokerStartup…​FIXME

Note
onBrokerStartup is used exclusively when KafkaController processes BrokerChange controller event.

elect Method

elect(): Unit

elect…​FIXME

Note
elect is used when KafkaController enters Startup and Reelect states.

onControllerFailover Method

Caution
FIXME
Note
onControllerFailover is used exclusively when KafkaController is requested to elect.

isActive Method

isActive: Boolean

isActive says whether the activeControllerId equals the broker ID (from KafkaConfig).

Caution
FIXME When could they be different?

registerIsrChangeNotificationListener Internal Method

registerIsrChangeNotificationListener(): Option[Seq[String]]

registerIsrChangeNotificationListener…​FIXME

Note
registerIsrChangeNotificationListener is used when…​FIXME

deregisterIsrChangeNotificationListener Internal Method

deregisterIsrChangeNotificationListener(): Unit

deregisterIsrChangeNotificationListener…​FIXME

Note
deregisterIsrChangeNotificationListener is used when…​FIXME

Creating KafkaController Instance

KafkaController takes the following when created:

KafkaController initializes the internal registries and counters.

Starting ControllerEventManager (and Putting Startup Event in Event Queue) — startup Method

startup(): Unit

startup puts Startup event at the end of the event queue of ControllerEventManager and requests it to start.

Note
startup is used exclusively when KafkaServer is started up.

Registering SessionExpirationListener To Control Session Recreation — registerSessionExpirationListener Internal Method

registerSessionExpirationListener(): Unit

registerSessionExpirationListener requests ZkUtils to subscribe to state changes with a SessionExpirationListener (with the KafkaController and ControllerEventManager).

Note
SessionExpirationListener puts Reelect event on the event queue of ControllerEventManager every time the Zookeeper session has expired and a new session has been created.
Note
registerSessionExpirationListener is used exclusively when Startup event is processed (after ControllerEventThread is started).

Registering ControllerChangeListener for /controller ZNode Changes — registerControllerChangeListener Internal Method

registerControllerChangeListener(): Unit

registerControllerChangeListener requests ZkUtils to subscribe to data changes for /controller znode with a ControllerChangeListener (with the KafkaController and ControllerEventManager).

Note

ControllerChangeListener emits:

  1. ControllerChange event with the current controller ID (on the event queue of ControllerEventManager) every time the data of a znode changes

  2. Reelect event when the data associated with a znode has been deleted

Note
registerControllerChangeListener is used exclusively when Startup event is processed (after ControllerEventThread is started).

registerBrokerChangeListener Internal Method

registerBrokerChangeListener(): Option[Seq[String]]

registerBrokerChangeListener requests ZkUtils to subscribeChildChanges for /brokers/ids path with BrokerChangeListener.

Note
registerBrokerChangeListener is used exclusively when KafkaController does onControllerFailover.

Getting Active Controller ID (from JSON under /controller znode) — getControllerID Method

getControllerID(): Int

getControllerID returns the ID of the active Kafka controller that is associated with /controller znode in JSON format or -1 otherwise.

Internally, getControllerID requests ZkUtils for data associated with /controller znode.

If available, getControllerID parses the data (being the current controller info in JSON format) to extract brokerid field.

$ ./bin/zookeeper-shell.sh 0.0.0.0:2181
Connecting to 0.0.0.0:2181
Welcome to ZooKeeper!
...
get /controller
{"version":1,"brokerid":100,"timestamp":"1506197069724"}
cZxid = 0xf9
ctime = Sat Sep 23 22:04:29 CEST 2017
mZxid = 0xf9
mtime = Sat Sep 23 22:04:29 CEST 2017
pZxid = 0xf9
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15eaa3a4fdd000d
dataLength = 56
numChildren = 0

Otherwise, when no /controller znode is available, getControllerID returns -1.

Note

getControllerID is used when:

  1. Processing Reelect controller event

  2. elect

Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode — registerTopicDeletionListener Internal Method

registerTopicDeletionListener(): Option[Seq[String]]

registerTopicDeletionListener requests ZkUtils to subscribeChildChanges to /admin/delete_topics znode with TopicDeletionListener.

Note
registerTopicDeletionListener is used exclusively when KafkaController does onControllerFailover.

De-Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode — deregisterTopicDeletionListener Internal Method

deregisterTopicDeletionListener(): Unit

deregisterTopicDeletionListener requests ZkUtils to unsubscribeChildChanges to /admin/delete_topics znode with TopicDeletionListener.

Note
deregisterTopicDeletionListener is used exclusively when KafkaController resigns as the active controller.

processUpdateNotifications Internal Method

processUpdateNotifications(partitions: Seq[TopicPartition]): Unit

processUpdateNotifications…​FIXME

Note
processUpdateNotifications is used when…​FIXME

onReplicasBecomeOffline Internal Method

onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit

onReplicasBecomeOffline…​FIXME

Note
onReplicasBecomeOffline is used when…​FIXME

onPartitionReassignment Internal Method

onPartitionReassignment(
  topicPartition: TopicPartition,
  reassignedPartitionContext: ReassignedPartitionsContext): Unit

onPartitionReassignment…​FIXME

Note
onPartitionReassignment is used when…​FIXME

onBrokerUpdate Internal Method

onBrokerUpdate(updatedBrokerId: Int): Unit

onBrokerUpdate…​FIXME

Note
onBrokerUpdate is used when…​FIXME

scheduleAutoLeaderRebalanceTask Internal Method

scheduleAutoLeaderRebalanceTask(delay: Long, unit: TimeUnit): Unit

scheduleAutoLeaderRebalanceTask…​FIXME

Note
scheduleAutoLeaderRebalanceTask is used when…​FIXME

results matching ""

    No results matching ""