KafkaZkClient — Higher-Level Kafka-Specific ZooKeeper Client

KafkaZkClient is a higher-level Kafka-specific ZooKeeper client.

KafkaZkClient is created (using the apply factory method) when:

  • KafkaServer is requested to initZkClient (right after kafka.Kafka command-line application is executed)

  • AclCommand, ConfigCommand, PreferredReplicaLeaderElectionCommand, ReassignPartitionsCommand, TopicCommand commands are executed

KafkaZkClient is a KafkaMetricsGroup and registers the metrics in kafka.server:type=ZooKeeperClientMetrics metric namespace.

Table 1. KafkaZkClient’s Metrics
Name Description

ZooKeeperRequestLatencyMs

Histogram that is updated when KafkaZkClient is requested to retryRequestsUntilConnected

KafkaZkClient metrics JConsole.png
Figure 1. KafkaZkClient Metrics in JConsole JMX Tool
Tip

Enable DEBUG logging level for kafka.zk.KafkaZkClient logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.zk.KafkaZkClient=DEBUG

Refer to Logging.

getTopicPartitionCount Method

getTopicPartitionCount(topic: String): Option[Int]

getTopicPartitionCount…​FIXME

Note
getTopicPartitionCount is used when…​FIXME

Registering StateChangeHandler — registerStateChangeHandler Method

registerStateChangeHandler(stateChangeHandler: StateChangeHandler): Unit

registerStateChangeHandler…​FIXME

Note
registerStateChangeHandler is used when…​FIXME

Creating KafkaZkClient Instance

KafkaZkClient takes the following when created:

KafkaZkClient initializes the internal registries and counters.

Creating KafkaZkClient Instance — apply Factory Method

apply(
  connectString: String,
  isSecure: Boolean,
  sessionTimeoutMs: Int,
  connectionTimeoutMs: Int,
  maxInFlightRequests: Int,
  time: Time,
  metricGroup: String = "kafka.server",
  metricType: String = "SessionExpireListener"): KafkaZkClient

apply creates a ZooKeeperClient that is then used to create a KafkaZkClient.

Note

apply is used when:

  • KafkaServer is requested to initZkClient (right after kafka.Kafka command-line application is executed)

  • The following commands are executed (main):

    • AclCommand (when SimpleAclAuthorizer is used and requested to configure)

    • ConfigCommand

    • PreferredReplicaLeaderElectionCommand

    • ReassignPartitionsCommand

    • TopicCommand

createTopLevelPaths Method

createTopLevelPaths(): Unit

createTopLevelPaths…​FIXME

Note
createTopLevelPaths is used when…​FIXME

updateBrokerInfoInZk Method

updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit

updateBrokerInfoInZk…​FIXME

Note
updateBrokerInfoInZk is used when…​FIXME

registerZNodeChildChangeHandler Method

registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit

registerZNodeChildChangeHandler…​FIXME

Note
registerZNodeChildChangeHandler is used when…​FIXME

registerZNodeChangeHandlerAndCheckExistence Method

registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler): Boolean

registerZNodeChangeHandlerAndCheckExistence…​FIXME

Note
registerZNodeChangeHandlerAndCheckExistence is used when…​FIXME

deleteLogDirEventNotifications Method

deleteLogDirEventNotifications(): Unit

deleteLogDirEventNotifications…​FIXME

Note
deleteLogDirEventNotifications is used when…​FIXME

deleteIsrChangeNotifications Method

deleteIsrChangeNotifications(): Unit

deleteIsrChangeNotifications…​FIXME

Note
deleteIsrChangeNotifications is used when…​FIXME

unregisterZNodeChildChangeHandler Method

unregisterZNodeChildChangeHandler(path: String): Unit

unregisterZNodeChildChangeHandler…​FIXME

Note
unregisterZNodeChildChangeHandler is used when…​FIXME

unregisterZNodeChangeHandler Method

unregisterZNodeChangeHandler(path: String): Unit

unregisterZNodeChangeHandler…​FIXME

Note
unregisterZNodeChangeHandler is used when…​FIXME

setControllerEpochRaw Method

setControllerEpochRaw(epoch: Int, epochZkVersion: Int): SetDataResponse

setControllerEpochRaw…​FIXME

Note
setControllerEpochRaw is used when…​FIXME

createControllerEpochRaw Method

createControllerEpochRaw(epoch: Int): CreateResponse

createControllerEpochRaw…​FIXME

Note
createControllerEpochRaw is used when…​FIXME

Fetching Metadata of Brokers in Cluster — getAllBrokersInCluster Method

getAllBrokersInCluster: Seq[Broker]

getAllBrokersInCluster fetches broker IDs followed by fetching the metadata of every broker (which is the data associated with a /brokers/ids/[brokerId] znode).

Note

getAllBrokersInCluster is used when:

getAllTopicsInCluster Method

getAllTopicsInCluster: Seq[String]

getAllTopicsInCluster…​FIXME

Note
getAllTopicsInCluster is used when…​FIXME

getReplicaAssignmentForTopics Method

getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicPartition, Seq[Int]]

getReplicaAssignmentForTopics…​FIXME

Note
getReplicaAssignmentForTopics is used when…​FIXME

getPartitionReassignment Method

getPartitionReassignment: collection.Map[TopicPartition, Seq[Int]]

getPartitionReassignment…​FIXME

Note
getPartitionReassignment is used when…​FIXME

getTopicDeletions Method

getTopicDeletions: Seq[String]

getTopicDeletions…​FIXME

Note
getTopicDeletions is used when…​FIXME

Retrieving Partition State — getTopicPartitionStates Method

getTopicPartitionStates(partitions: Seq[TopicPartition]): Map[TopicPartition, LeaderIsrAndControllerEpoch]

getTopicPartitionStates getTopicPartitionStatesRaw for the given TopicPartitions.

For every response, getTopicPartitionStates decodes the JSON-encoded partition state data (for the partitions that were found in ZooKeeper).

Note
getTopicPartitionStates is used when KafkaController is requested to updateLeaderAndIsrCache, areReplicasInIsr, updateLeaderEpoch and process a PartitionReassignmentIsrChange controller event.

registerZNodeChangeHandler Method

registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit

registerZNodeChangeHandler…​FIXME

Note
registerZNodeChangeHandler is used when…​FIXME

getControllerEpoch Method

getControllerEpoch: Option[(Int, Stat)]

getControllerEpoch…​FIXME

Note
getControllerEpoch is used when…​FIXME

deletePartitionReassignment Method

deletePartitionReassignment(): Unit

deletePartitionReassignment…​FIXME

Note
deletePartitionReassignment is used when…​FIXME

setOrCreatePartitionReassignment Method

setOrCreatePartitionReassignment(reassignment: collection.Map[TopicPartition, Seq[Int]]): Unit

setOrCreatePartitionReassignment…​FIXME

Note
setOrCreatePartitionReassignment is used when…​FIXME

Getting Active Controller ID — getControllerId Method

getControllerId: Option[Int]

getControllerId sends a request to Zookeeper for the data of the /controller znode and returns the following:

  • The brokerid field of the JSON data when the response is OK

  • None for a NONODE response

  • Throws a KeeperException with the response code and the /controller path

Note

getControllerId is used when:

Creating Ephemeral Znode (And Throwing Exception When Unsuccessful)-- checkedEphemeralCreate Method

checkedEphemeralCreate(path: String, data: Array[Byte]): Unit

checkedEphemeralCreate…​FIXME

Note
checkedEphemeralCreate is used when…​FIXME

registerControllerAndIncrementControllerEpoch Method

registerControllerAndIncrementControllerEpoch(controllerId: Int): (Int, Int)

registerControllerAndIncrementControllerEpoch…​FIXME

Note
registerControllerAndIncrementControllerEpoch is used exclusively when KafkaController is requested to elect.

retryRequestsUntilConnected Internal Method

retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response]

retryRequestsUntilConnected…​FIXME

createSequentialPersistentPath Method

createSequentialPersistentPath(path: String, data: Array[Byte]): String

createSequentialPersistentPath…​FIXME

Note
createSequentialPersistentPath is used when KafkaZkClient is requested to propagateLogDirEvent and propagateIsrChanges.

propagateLogDirEvent Method

propagateLogDirEvent(brokerId: Int): Unit

propagateLogDirEvent…​FIXME

Note
propagateLogDirEvent is used exclusively when ReplicaManager is requested to handleLogDirFailure.

propagateIsrChanges Method

propagateIsrChanges(isrChangeSet: collection.Set[TopicPartition]): Unit

propagateIsrChanges…​FIXME

Note
propagateIsrChanges is used exclusively when ReplicaManager is requested to maybePropagateIsrChanges.

getTopicPartitionStatesRaw Method

getTopicPartitionStatesRaw(partitions: Seq[TopicPartition]): Seq[GetDataResponse]

getTopicPartitionStatesRaw creates a ZooKeeper GetDataRequest for the path /brokers/topics/[topic]/partitions/[partition]/state for every partition in the given partitions.

In the end, getTopicPartitionStatesRaw retryRequestsUntilConnected the GetDataRequests.

Note

getTopicPartitionStatesRaw is used when:

getTopicPartitionState Method

getTopicPartitionState(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch]

getTopicPartitionState…​FIXME

Note
getTopicPartitionState is used when…​FIXME

Fetching Broker IDs — getSortedBrokerList Method

getSortedBrokerList(): Seq[Int]

getSortedBrokerList gets the child znodes at /brokers/ids path and sorts it by broker ID (according to the natural ordering).

Note

getSortedBrokerList is used when:

Fetching Child ZNodes — getChildren Method

getChildren(path : String): Seq[String]

getChildren…​FIXME

Note
getChildren is used when…​FIXME

results matching ""

    No results matching ""