log4j.logger.kafka.zk.KafkaZkClient=ALL
KafkaZkClient — Higher-Level Zookeeper Client
KafkaZkClient
is a higher-level Zookeeper client.
-
KafkaServer
is requested to start up (and initZkClient) -
AclAuthorizer
is requested to configure (and createAclPaths) -
ZkSecurityMigrator
, TopicCommand (with the deprecated--zookeeper
option), ReassignPartitionsCommand,PreferredReplicaLeaderElectionCommand
(with the deprecated--zookeeper
option), ConfigCommand tools are executed
Tip
|
Enable Add the following line to Refer to Logging. |
Performance Metrics
KafkaZkClient
is a KafkaMetricsGroup with the following performance metrics.
Name | Description |
---|---|
|
Histogram that is updated when |
The performance metrics are registered in kafka.server:type=ZooKeeperClientMetrics group.
getTopicPartitionCount
Method
getTopicPartitionCount(topic: String): Option[Int]
getTopicPartitionCount
…FIXME
Note
|
getTopicPartitionCount is used when…FIXME
|
Registering StateChangeHandler — registerStateChangeHandler
Method
registerStateChangeHandler(stateChangeHandler: StateChangeHandler): Unit
registerStateChangeHandler
…FIXME
Note
|
registerStateChangeHandler is used when…FIXME
|
Creating KafkaZkClient Instance
KafkaZkClient
takes the following when created:
KafkaZkClient
initializes the internal registries and counters.
Creating KafkaZkClient Instance — apply
Factory Method
apply(
connectString: String,
isSecure: Boolean,
sessionTimeoutMs: Int,
connectionTimeoutMs: Int,
maxInFlightRequests: Int,
time: Time,
metricGroup: String = "kafka.server",
metricType: String = "SessionExpireListener"): KafkaZkClient
apply
creates a ZooKeeperClient that is then used to create a KafkaZkClient.
Note
|
|
createTopLevelPaths
Method
createTopLevelPaths(): Unit
createTopLevelPaths
…FIXME
Note
|
createTopLevelPaths is used when…FIXME
|
updateBrokerInfoInZk
Method
updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit
updateBrokerInfoInZk
…FIXME
Note
|
updateBrokerInfoInZk is used when…FIXME
|
registerZNodeChildChangeHandler
Method
registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit
registerZNodeChildChangeHandler
…FIXME
Note
|
registerZNodeChildChangeHandler is used when…FIXME
|
registerZNodeChangeHandlerAndCheckExistence
Method
registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler): Boolean
registerZNodeChangeHandlerAndCheckExistence
…FIXME
Note
|
registerZNodeChangeHandlerAndCheckExistence is used when…FIXME
|
deleteLogDirEventNotifications
Method
deleteLogDirEventNotifications(): Unit
deleteLogDirEventNotifications
…FIXME
Note
|
deleteLogDirEventNotifications is used when…FIXME
|
deleteIsrChangeNotifications
Method
deleteIsrChangeNotifications(): Unit
deleteIsrChangeNotifications
…FIXME
Note
|
deleteIsrChangeNotifications is used when…FIXME
|
unregisterZNodeChildChangeHandler
Method
unregisterZNodeChildChangeHandler(path: String): Unit
unregisterZNodeChildChangeHandler
…FIXME
Note
|
unregisterZNodeChildChangeHandler is used when…FIXME
|
unregisterZNodeChangeHandler
Method
unregisterZNodeChangeHandler(path: String): Unit
unregisterZNodeChangeHandler
…FIXME
Note
|
unregisterZNodeChangeHandler is used when…FIXME
|
setControllerEpochRaw
Method
setControllerEpochRaw(epoch: Int, epochZkVersion: Int): SetDataResponse
setControllerEpochRaw
…FIXME
Note
|
setControllerEpochRaw is used when…FIXME
|
createControllerEpochRaw
Method
createControllerEpochRaw(epoch: Int): CreateResponse
createControllerEpochRaw
…FIXME
Note
|
createControllerEpochRaw is used when…FIXME
|
Fetching Metadata of Brokers in Cluster — getAllBrokersInCluster
Method
getAllBrokersInCluster: Seq[Broker]
getAllBrokersInCluster
fetches broker IDs followed by fetching the metadata of every broker (which is the data associated with a /brokers/ids/[brokerId]
znode).
Note
|
|
getAllTopicsInCluster
Method
getAllTopicsInCluster: Seq[String]
getAllTopicsInCluster
…FIXME
Note
|
getAllTopicsInCluster is used when…FIXME
|
getReplicaAssignmentForTopics
Method
getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicPartition, Seq[Int]]
getReplicaAssignmentForTopics
…FIXME
Note
|
getReplicaAssignmentForTopics is used when…FIXME
|
getPartitionReassignment
Method
getPartitionReassignment: collection.Map[TopicPartition, Seq[Int]]
getPartitionReassignment
…FIXME
Note
|
getPartitionReassignment is used when…FIXME
|
getTopicDeletions
Method
getTopicDeletions: Seq[String]
getTopicDeletions
…FIXME
Note
|
getTopicDeletions is used when…FIXME
|
Retrieving Partition State — getTopicPartitionStates
Method
getTopicPartitionStates(
partitions: Seq[TopicPartition]): Map[TopicPartition, LeaderIsrAndControllerEpoch]
getTopicPartitionStates
getTopicPartitionStatesRaw for the given TopicPartitions
.
For every response, getTopicPartitionStates
decodes the JSON-encoded partition state data (for the partitions that were found in ZooKeeper).
Note
|
getTopicPartitionStates is used when KafkaController is requested to updateLeaderAndIsrCache, areReplicasInIsr, updateLeaderEpoch and process a PartitionReassignmentIsrChange controller event.
|
registerZNodeChangeHandler
Method
registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit
registerZNodeChangeHandler
…FIXME
Note
|
registerZNodeChangeHandler is used when…FIXME
|
getControllerEpoch
Method
getControllerEpoch: Option[(Int, Stat)]
getControllerEpoch
…FIXME
Note
|
getControllerEpoch is used when…FIXME
|
deletePartitionReassignment
Method
deletePartitionReassignment(): Unit
deletePartitionReassignment
…FIXME
Note
|
deletePartitionReassignment is used when…FIXME
|
setOrCreatePartitionReassignment
Method
setOrCreatePartitionReassignment(reassignment: collection.Map[TopicPartition, Seq[Int]]): Unit
setOrCreatePartitionReassignment
…FIXME
Note
|
setOrCreatePartitionReassignment is used when…FIXME
|
Getting Active Controller ID — getControllerId
Method
getControllerId: Option[Int]
getControllerId
sends a request to Zookeeper for the data of the /controller
znode and returns the following:
-
The
brokerid
field of the JSON data when the response isOK
-
None
for aNONODE
response -
Throws a
KeeperException
with the response code and the/controller
path
Note
|
|
Creating Ephemeral Znode (And Throwing Exception When Unsuccessful)-- checkedEphemeralCreate
Method
checkedEphemeralCreate(path: String, data: Array[Byte]): Unit
checkedEphemeralCreate
…FIXME
Note
|
checkedEphemeralCreate is used when…FIXME
|
registerControllerAndIncrementControllerEpoch
Method
registerControllerAndIncrementControllerEpoch(controllerId: Int): (Int, Int)
registerControllerAndIncrementControllerEpoch
…FIXME
Note
|
registerControllerAndIncrementControllerEpoch is used exclusively when KafkaController is requested to elect.
|
retryRequestsUntilConnected
Internal Method
retryRequestsUntilConnected[Req <: AsyncRequest](
requests: Seq[Req]): Seq[Req#Response]
retryRequestsUntilConnected
…FIXME
createSequentialPersistentPath
Method
createSequentialPersistentPath(path: String, data: Array[Byte]): String
createSequentialPersistentPath
…FIXME
Note
|
createSequentialPersistentPath is used when KafkaZkClient is requested to propagateLogDirEvent and propagateIsrChanges.
|
propagateLogDirEvent
Method
propagateLogDirEvent(brokerId: Int): Unit
propagateLogDirEvent
…FIXME
Note
|
propagateLogDirEvent is used exclusively when ReplicaManager is requested to handleLogDirFailure.
|
propagateIsrChanges
Method
propagateIsrChanges(isrChangeSet: collection.Set[TopicPartition]): Unit
propagateIsrChanges
…FIXME
Note
|
propagateIsrChanges is used exclusively when ReplicaManager is requested to maybePropagateIsrChanges.
|
getTopicPartitionStatesRaw
Method
getTopicPartitionStatesRaw(
partitions: Seq[TopicPartition]): Seq[GetDataResponse]
getTopicPartitionStatesRaw
gets the topic partition states (from the path /brokers/topics/[topic]/partitions/[partition]/state
in Zookeeper) for the given partitions.
Internally, getTopicPartitionStatesRaw
creates a ZooKeeper GetDataRequest
for the path /brokers/topics/[topic]/partitions/[partition]/state
for every partition (in the given partitions
).
In the end, getTopicPartitionStatesRaw
retryRequestsUntilConnected the GetDataRequests
.
Note
|
|
getTopicPartitionState
Method
getTopicPartitionState(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch]
getTopicPartitionState
…FIXME
Note
|
getTopicPartitionState is used when…FIXME
|
Fetching Broker IDs — getSortedBrokerList
Method
getSortedBrokerList(): Seq[Int]
getSortedBrokerList
gets the child znodes at /brokers/ids
path and sorts it by broker ID (according to the natural ordering).
Note
|
|
Fetching Child ZNodes — getChildren
Method
getChildren(path : String): Seq[String]
getChildren
…FIXME
Note
|
getChildren is used when…FIXME
|
getClusterId
Method
getClusterId: Option[String]
getClusterId
…FIXME
Note
|
getClusterId is used when…FIXME
|
createOrGetClusterId
Method
createOrGetClusterId(proposedClusterId: String): String
createOrGetClusterId
…FIXME
Note
|
createOrGetClusterId is used when…FIXME
|
All Brokers In Kafka Cluster — getAllBrokerAndEpochsInCluster
Method
getAllBrokerAndEpochsInCluster: Map[Broker, Long]
getAllBrokerAndEpochsInCluster
…FIXME
Note
|
getAllBrokerAndEpochsInCluster is used when…FIXME
|
getBroker
Method
getBroker(
brokerId: Int): Option[Broker]
getBroker
…FIXME
Note
|
getBroker is used when…FIXME
|
LogDirEvent Notifications (from Zookeeper) — getAllLogDirEventNotifications
Method
getAllLogDirEventNotifications: Seq[String]
getAllLogDirEventNotifications
…FIXME
Note
|
getAllLogDirEventNotifications is used when…FIXME
|
Converting LogDirEvent Notifications to Broker IDs — getBrokerIdsFromLogDirEvents
Method
getBrokerIdsFromLogDirEvents(
sequenceNumbers: Seq[String]): Seq[Int]
getBrokerIdsFromLogDirEvents
…FIXME
Note
|
getBrokerIdsFromLogDirEvents is used when…FIXME
|
Creating State Znodes for Selected Partitions — createTopicPartitionStatesRaw
Method
createTopicPartitionStatesRaw(
leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch],
expectedControllerEpochZkVersion: Int
): Seq[CreateResponse]
createTopicPartitionStatesRaw
createTopicPartitions with the topics (of the given partitions).
createTopicPartitionStatesRaw
createTopicPartition with the partitions.
For every partition (in the given leaderIsrAndControllerEpochs
), createTopicPartitionStatesRaw
creates a new CreateRequest
to create /brokers/topics/[topic]/partitions/[partition]/state
persistent znode with the associated LeaderIsrAndControllerEpoch
encoded to JSON format.
// zkCli :2181
$ ./bin/zookeeper-shell.sh :2181 ls /brokers/topics
[t1]
$ ./bin/zookeeper-shell.sh :2181 get /brokers/topics/t1/partitions/0/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
In the end, createTopicPartitionStatesRaw
retryRequestsUntilConnected all the CreateRequests
.
Note
|
createTopicPartitionStatesRaw is used exclusively when ZkPartitionStateMachine is requested to initializeLeaderAndIsrForPartitions.
|
createTopicPartitions
Internal Method
createTopicPartitions(
topics: Seq[String],
expectedControllerEpochZkVersion: Int):Seq[CreateResponse]
createTopicPartitions
…FIXME
Note
|
createTopicPartitions is used when…FIXME
|
createTopicPartition
Internal Method
createTopicPartition(
partitions: Seq[TopicPartition],
expectedControllerEpochZkVersion: Int): Seq[CreateResponse]
createTopicPartition
…FIXME
Note
|
createTopicPartition is used when…FIXME
|
updateLeaderAndIsr
Method
updateLeaderAndIsr(
leaderAndIsrs: Map[TopicPartition, LeaderAndIsr],
controllerEpoch: Int,
expectedControllerEpochZkVersion: Int): UpdateLeaderAndIsrResult
updateLeaderAndIsr
…FIXME
Note
|
updateLeaderAndIsr is used when…FIXME
|
Partition Numbers of Given Topics — getPartitionsForTopics
Method
getPartitionsForTopics(
topics: Set[String]): Map[String, Seq[Int]]
getPartitionsForTopics
…FIXME
Note
|
getPartitionsForTopics is used when…FIXME
|
Reading Entity Config From Zookeeper (Under /config Node) — getEntityConfigs
Method
getEntityConfigs(
rootEntityType: String,
sanitizedEntityName: String): Properties
getEntityConfigs
reads the /config/[rootEntityType]/[sanitizedEntityName]
.
getEntityConfigs
…FIXME
Note
|
getEntityConfigs is used exclusively when AdminZkClient is requested to fetchEntityConfig.
|
createAclPaths
Method
createAclPaths(): Unit
createAclPaths
…FIXME
Note
|
createAclPaths is used when AclAuthorizer is requested to configure.
|
registerBroker
Method
registerBroker(
brokerInfo: BrokerInfo): Long
registerBroker
…FIXME
Note
|
registerBroker is used when…FIXME
|
updateBrokerInfo
Method
updateBrokerInfo(
brokerInfo: BrokerInfo): Unit
updateBrokerInfo
…FIXME
Note
|
updateBrokerInfo is used when…FIXME
|
close
Method
close(): Unit
Note
|
close is part of the Java’s AutoCloseable contract to close a resource (and relinquishing any underlying resources).
|
close
…FIXME