ControllerChannelManager manages connections to all brokers for KafkaController.

ControllerChannelManager is created when KafkaController is created (to add or remove brokers as they "announce" themselves in Zookeeper) for ControllerBrokerRequestBatches of the KafkaController itself, the ZkReplicaStateMachine and ZkPartitionStateMachine (so they can send broker and partition changes out to all brokers in a Kafka cluster).

When started, ControllerChannelManager establishes connection to every broker and starts a corresponding RequestSendThread to keep sending queued controller requests.

ControllerChannelManager uses [Channel manager on controller [brokerId]] as the logging prefix (aka logIdent).


Enable ALL logging levels for kafka.controller.ControllerChannelManager logger to see what happens inside.

Add the following line to config/


Refer to Logging.

Connection Metadata of All Brokers — brokerStateInfo Internal Registry

brokerStateInfo: HashMap[Int, ControllerBrokerStateInfo]

brokerStateInfo is connection metadata by broker ID.

Request threads for brokers are all started when ControllerChannelManager is requested to start up.

A new broker is added when ControllerChannelManager is requested to add a broker (when addBroker).

A broker is removed when ControllerChannelManager is requested to remove a broker (when removeBroker or shutting down).

Use the TotalQueueSize gauge metric for the queue depth (i.e. how many controller requests are waiting to be sent out to all brokers).

Performance Metrics

ControllerChannelManager is a KafkaMetricsGroup with the following performance metrics.

Table 1. ControllerChannelManager’s Performance Metrics
Metric Name Description


Controller requests (AbstractControlRequests) queue size (per broker)


For every broker


Total number of controller requests (AbstractControlRequests) to be sent out to brokers

The performance metrics are registered in kafka.controller:type=ControllerChannelManager group.

ControllerChannelManager jconsole.png
Figure 1. ControllerChannelManager in jconsole

Creating ControllerChannelManager Instance

ControllerChannelManager takes the following to be created:

Registering New Broker — addNewBroker Internal Method

  broker: Broker): Unit

addNewBroker prints out the following DEBUG message to the logs:

Controller [brokerId] trying to connect to broker [id]

addNewBroker finds the name of the listener to use for communication with the broker based on configuration property (if defined) or

addNewBroker finds the security protocol to use for communication with the broker based on and configuration properties (if defined) or

addNewBroker requests the Broker for the node for the listener name.

addNewBroker creates a new LogContext to use the prefix:

[Controller id=[brokerId], targetBrokerId=[brokerNode]]

addNewBroker creates a NetworkClient. Firstly, addNewBroker creates a ChannelBuilder (for the security protocol, the listener name, SERVER JAAS context type and SASL-related properties) and, if it is a Reconfigurable, adds it to the KafkaConfig as reconfigurable. addNewBroker then creates a Selector with controller-channel metric group (and broker-id of the broker node).

addNewBroker builds a thread name per the optional threadNamePrefix:


addNewBroker creates a new RequestRateAndQueueTimeMs timer metric with the id of the broker to connect to.

addNewBroker creates a daemon RequestSendThread for the broker ID (of the controller broker), the ControllerContext, the NetworkClient, the RequestRateAndQueueTimeMs metric, the StateChangeLogger, and the thread name.

addNewBroker creates a QueueSize gauge metric (with the id of the broker to connect) that is the number of the AbstractControlRequest messages in the queue.

In the end, addNewBroker registers (adds) the id of the broker to connect with a new ControllerBrokerStateInfo to the brokerStateInfo internal registry.

addNewBroker is used when ControllerChannelManager is requested to start up (and connect to brokers) and add a broker.

Registering Newly-Added Broker — addBroker Method

addBroker(broker: Broker): Unit


addBroker is used when KafkaController is requested to process a BrokerChange controller event.

Deregistering Broker — removeBroker Method

removeBroker(brokerId: Int): Unit

removeBroker finds the broker metadata in the brokerStateInfo internal registry that is then used to removeExistingBroker.

removeBroker is used exclusively when KafkaController is requested to process a BrokerChange controller event.

Starting Up — startup Method

startup(): Unit


startup is used when KafkaController is requested to initializeControllerContext.

Shutting Down — shutdown Method

shutdown(): Unit


shutdown is used when…​FIXME

Sending AbstractControlRequest Out to Broker — sendRequest Method

  brokerId: Int,
  request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
  callback: AbstractResponse => Unit = null)


sendRequest is used exclusively when ControllerBrokerRequestBatch is requested to send a controller request to a broker.

removeExistingBroker Internal Method

  brokerState: ControllerBrokerStateInfo): Unit


removeExistingBroker is used when…​FIXME

Starting RequestSendThread — startRequestSendThread Internal Method

  brokerId: Int): Unit

startRequestSendThread finds the RequestSendThread in the broker metadata in the brokerStateInfo internal registry and, if the thread has not started yet, startRequestSendThread starts it.

startRequestSendThread is used when ControllerChannelManager is requested to start up and addBroker.


ControllerBrokerStateInfo is a broker metadata that holds the following:

results matching ""

    No results matching ""