log4j.logger.kafka.controller.ControllerChannelManager=ALL
ControllerChannelManager
ControllerChannelManager
manages connections to all brokers for KafkaController.
ControllerChannelManager
is created when KafkaController is created (to add or remove brokers as they "announce" themselves in Zookeeper) for ControllerBrokerRequestBatches of the KafkaController itself, the ZkReplicaStateMachine and ZkPartitionStateMachine (so they can send broker and partition changes out to all brokers in a Kafka cluster).
When started, ControllerChannelManager
establishes connection to every broker and starts a corresponding RequestSendThread to keep sending queued controller requests.
ControllerChannelManager
uses [Channel manager on controller [brokerId]] as the logging prefix (aka logIdent
).
Tip
|
Enable Add the following line to Refer to Logging. |
Connection Metadata of All Brokers — brokerStateInfo
Internal Registry
brokerStateInfo: HashMap[Int, ControllerBrokerStateInfo]
brokerStateInfo
is connection metadata by broker ID.
A new broker is added when ControllerChannelManager
is requested to add a broker (when addBroker).
A broker is removed when ControllerChannelManager
is requested to remove a broker (when removeBroker or shutting down).
Use the TotalQueueSize gauge metric for the queue depth (i.e. how many controller requests are waiting to be sent out to all brokers).
Performance Metrics
ControllerChannelManager
is a KafkaMetricsGroup with the following performance metrics.
Metric Name | Description |
---|---|
|
Controller requests (AbstractControlRequests) queue size (per broker) |
|
|
|
Total number of controller requests (AbstractControlRequests) to be sent out to brokers |
The performance metrics are registered in kafka.controller:type=ControllerChannelManager group.
Creating ControllerChannelManager Instance
ControllerChannelManager
takes the following to be created:
Registering New Broker — addNewBroker
Internal Method
addNewBroker(
broker: Broker): Unit
addNewBroker
prints out the following DEBUG message to the logs:
Controller [brokerId] trying to connect to broker [id]
addNewBroker
finds the name of the listener to use for communication with the broker based on control.plane.listener.name configuration property (if defined) or inter.broker.listener.name.
addNewBroker
finds the security protocol to use for communication with the broker based on control.plane.listener.name and listener.security.protocol.map configuration properties (if defined) or security.inter.broker.protocol.
addNewBroker
requests the Broker
for the node for the listener name.
addNewBroker
creates a new LogContext
to use the prefix:
[Controller id=[brokerId], targetBrokerId=[brokerNode]]
addNewBroker
creates a NetworkClient. Firstly, addNewBroker
creates a ChannelBuilder (for the security protocol, the listener name, SERVER
JAAS context type and SASL-related properties) and, if it is a Reconfigurable, adds it to the KafkaConfig as reconfigurable. addNewBroker
then creates a Selector with controller-channel
metric group (and broker-id
of the broker node).
addNewBroker
builds a thread name per the optional threadNamePrefix:
[threadNamePrefix]:Controller-[brokerId]-to-broker-[id]-send-thread
addNewBroker
creates a new RequestRateAndQueueTimeMs timer metric with the id of the broker to connect to.
addNewBroker
creates a daemon RequestSendThread for the broker ID (of the controller broker), the ControllerContext, the NetworkClient
, the RequestRateAndQueueTimeMs
metric, the StateChangeLogger, and the thread name.
addNewBroker
creates a QueueSize gauge metric (with the id of the broker to connect) that is the number of the AbstractControlRequest messages in the queue.
In the end, addNewBroker
registers (adds) the id of the broker to connect with a new ControllerBrokerStateInfo to the brokerStateInfo internal registry.
Note
|
addNewBroker is used when ControllerChannelManager is requested to start up (and connect to brokers) and add a broker.
|
Registering Newly-Added Broker — addBroker
Method
addBroker(broker: Broker): Unit
addBroker
…FIXME
Note
|
addBroker is used when KafkaController is requested to process a BrokerChange controller event.
|
Deregistering Broker — removeBroker
Method
removeBroker(brokerId: Int): Unit
removeBroker
finds the broker metadata in the brokerStateInfo internal registry that is then used to removeExistingBroker.
Note
|
removeBroker is used exclusively when KafkaController is requested to process a BrokerChange controller event.
|
Starting Up — startup
Method
startup(): Unit
startup
…FIXME
Note
|
startup is used when KafkaController is requested to initializeControllerContext.
|
Sending AbstractControlRequest Out to Broker — sendRequest
Method
sendRequest(
brokerId: Int,
request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
callback: AbstractResponse => Unit = null)
sendRequest
…FIXME
Note
|
sendRequest is used exclusively when ControllerBrokerRequestBatch is requested to send a controller request to a broker.
|
removeExistingBroker
Internal Method
removeExistingBroker(
brokerState: ControllerBrokerStateInfo): Unit
removeExistingBroker
…FIXME
Note
|
removeExistingBroker is used when…FIXME
|
Starting RequestSendThread — startRequestSendThread
Internal Method
startRequestSendThread(
brokerId: Int): Unit
startRequestSendThread
finds the RequestSendThread
in the broker metadata in the brokerStateInfo internal registry and, if the thread has not started yet, startRequestSendThread
starts it.