KafkaZkClient — Higher-Level Zookeeper Client

KafkaZkClient is a higher-level Zookeeper client.

KafkaZkClient is created (using the apply factory method) when:

Tip

Enable ALL logging level for kafka.zk.KafkaZkClient logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.zk.KafkaZkClient=ALL

Refer to Logging.

Performance Metrics

KafkaZkClient is a KafkaMetricsGroup with the following performance metrics.

Table 1. KafkaZkClient’s Performance Metrics
Name Description

ZooKeeperRequestLatencyMs

Histogram that is updated when KafkaZkClient is requested to retryRequestsUntilConnected

The performance metrics are registered in kafka.server:type=ZooKeeperClientMetrics group.

KafkaZkClient metrics JConsole.png
Figure 1. KafkaZkClient Metrics in JConsole JMX Tool

getTopicPartitionCount Method

getTopicPartitionCount(topic: String): Option[Int]

getTopicPartitionCount…​FIXME

Note
getTopicPartitionCount is used when…​FIXME

Registering StateChangeHandler — registerStateChangeHandler Method

registerStateChangeHandler(stateChangeHandler: StateChangeHandler): Unit

registerStateChangeHandler…​FIXME

Note
registerStateChangeHandler is used when…​FIXME

Creating KafkaZkClient Instance

KafkaZkClient takes the following when created:

KafkaZkClient initializes the internal registries and counters.

Creating KafkaZkClient Instance — apply Factory Method

apply(
  connectString: String,
  isSecure: Boolean,
  sessionTimeoutMs: Int,
  connectionTimeoutMs: Int,
  maxInFlightRequests: Int,
  time: Time,
  metricGroup: String = "kafka.server",
  metricType: String = "SessionExpireListener"): KafkaZkClient

apply creates a ZooKeeperClient that is then used to create a KafkaZkClient.

Note

apply is used when…​FIXME

createTopLevelPaths Method

createTopLevelPaths(): Unit

createTopLevelPaths…​FIXME

Note
createTopLevelPaths is used when…​FIXME

updateBrokerInfoInZk Method

updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit

updateBrokerInfoInZk…​FIXME

Note
updateBrokerInfoInZk is used when…​FIXME

registerZNodeChildChangeHandler Method

registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit

registerZNodeChildChangeHandler…​FIXME

Note
registerZNodeChildChangeHandler is used when…​FIXME

registerZNodeChangeHandlerAndCheckExistence Method

registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler): Boolean

registerZNodeChangeHandlerAndCheckExistence…​FIXME

Note
registerZNodeChangeHandlerAndCheckExistence is used when…​FIXME

deleteLogDirEventNotifications Method

deleteLogDirEventNotifications(): Unit

deleteLogDirEventNotifications…​FIXME

Note
deleteLogDirEventNotifications is used when…​FIXME

deleteIsrChangeNotifications Method

deleteIsrChangeNotifications(): Unit

deleteIsrChangeNotifications…​FIXME

Note
deleteIsrChangeNotifications is used when…​FIXME

unregisterZNodeChildChangeHandler Method

unregisterZNodeChildChangeHandler(path: String): Unit

unregisterZNodeChildChangeHandler…​FIXME

Note
unregisterZNodeChildChangeHandler is used when…​FIXME

unregisterZNodeChangeHandler Method

unregisterZNodeChangeHandler(path: String): Unit

unregisterZNodeChangeHandler…​FIXME

Note
unregisterZNodeChangeHandler is used when…​FIXME

setControllerEpochRaw Method

setControllerEpochRaw(epoch: Int, epochZkVersion: Int): SetDataResponse

setControllerEpochRaw…​FIXME

Note
setControllerEpochRaw is used when…​FIXME

createControllerEpochRaw Method

createControllerEpochRaw(epoch: Int): CreateResponse

createControllerEpochRaw…​FIXME

Note
createControllerEpochRaw is used when…​FIXME

Fetching Metadata of Brokers in Cluster — getAllBrokersInCluster Method

getAllBrokersInCluster: Seq[Broker]

getAllBrokersInCluster fetches broker IDs followed by fetching the metadata of every broker (which is the data associated with a /brokers/ids/[brokerId] znode).

Note

getAllBrokersInCluster is used when:

getAllTopicsInCluster Method

getAllTopicsInCluster: Seq[String]

getAllTopicsInCluster…​FIXME

Note
getAllTopicsInCluster is used when…​FIXME

getReplicaAssignmentForTopics Method

getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicPartition, Seq[Int]]

getReplicaAssignmentForTopics…​FIXME

Note
getReplicaAssignmentForTopics is used when…​FIXME

getPartitionReassignment Method

getPartitionReassignment: collection.Map[TopicPartition, Seq[Int]]

getPartitionReassignment…​FIXME

Note
getPartitionReassignment is used when…​FIXME

getTopicDeletions Method

getTopicDeletions: Seq[String]

getTopicDeletions…​FIXME

Note
getTopicDeletions is used when…​FIXME

Retrieving Partition State — getTopicPartitionStates Method

getTopicPartitionStates(
  partitions: Seq[TopicPartition]): Map[TopicPartition, LeaderIsrAndControllerEpoch]

getTopicPartitionStates getTopicPartitionStatesRaw for the given TopicPartitions.

For every response, getTopicPartitionStates decodes the JSON-encoded partition state data (for the partitions that were found in ZooKeeper).

Note
getTopicPartitionStates is used when KafkaController is requested to updateLeaderAndIsrCache, areReplicasInIsr, updateLeaderEpoch and process a PartitionReassignmentIsrChange controller event.

registerZNodeChangeHandler Method

registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit

registerZNodeChangeHandler…​FIXME

Note
registerZNodeChangeHandler is used when…​FIXME

getControllerEpoch Method

getControllerEpoch: Option[(Int, Stat)]

getControllerEpoch…​FIXME

Note
getControllerEpoch is used when…​FIXME

deletePartitionReassignment Method

deletePartitionReassignment(): Unit

deletePartitionReassignment…​FIXME

Note
deletePartitionReassignment is used when…​FIXME

setOrCreatePartitionReassignment Method

setOrCreatePartitionReassignment(reassignment: collection.Map[TopicPartition, Seq[Int]]): Unit

setOrCreatePartitionReassignment…​FIXME

Note
setOrCreatePartitionReassignment is used when…​FIXME

Getting Active Controller ID — getControllerId Method

getControllerId: Option[Int]

getControllerId sends a request to Zookeeper for the data of the /controller znode and returns the following:

  • The brokerid field of the JSON data when the response is OK

  • None for a NONODE response

  • Throws a KeeperException with the response code and the /controller path

Note

getControllerId is used when:

Creating Ephemeral Znode (And Throwing Exception When Unsuccessful)-- checkedEphemeralCreate Method

checkedEphemeralCreate(path: String, data: Array[Byte]): Unit

checkedEphemeralCreate…​FIXME

Note
checkedEphemeralCreate is used when…​FIXME

registerControllerAndIncrementControllerEpoch Method

registerControllerAndIncrementControllerEpoch(controllerId: Int): (Int, Int)

registerControllerAndIncrementControllerEpoch…​FIXME

Note
registerControllerAndIncrementControllerEpoch is used exclusively when KafkaController is requested to elect.

retryRequestsUntilConnected Internal Method

retryRequestsUntilConnected[Req <: AsyncRequest](
  requests: Seq[Req]): Seq[Req#Response]

retryRequestsUntilConnected…​FIXME

createSequentialPersistentPath Method

createSequentialPersistentPath(path: String, data: Array[Byte]): String

createSequentialPersistentPath…​FIXME

Note
createSequentialPersistentPath is used when KafkaZkClient is requested to propagateLogDirEvent and propagateIsrChanges.

propagateLogDirEvent Method

propagateLogDirEvent(brokerId: Int): Unit

propagateLogDirEvent…​FIXME

Note
propagateLogDirEvent is used exclusively when ReplicaManager is requested to handleLogDirFailure.

propagateIsrChanges Method

propagateIsrChanges(isrChangeSet: collection.Set[TopicPartition]): Unit

propagateIsrChanges…​FIXME

Note
propagateIsrChanges is used exclusively when ReplicaManager is requested to maybePropagateIsrChanges.

getTopicPartitionStatesRaw Method

getTopicPartitionStatesRaw(
  partitions: Seq[TopicPartition]): Seq[GetDataResponse]

getTopicPartitionStatesRaw gets the topic partition states (from the path /brokers/topics/[topic]/partitions/[partition]/state in Zookeeper) for the given partitions.

Internally, getTopicPartitionStatesRaw creates a ZooKeeper GetDataRequest for the path /brokers/topics/[topic]/partitions/[partition]/state for every partition (in the given partitions).

In the end, getTopicPartitionStatesRaw retryRequestsUntilConnected the GetDataRequests.

Note

getTopicPartitionStatesRaw is used when:

getTopicPartitionState Method

getTopicPartitionState(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch]

getTopicPartitionState…​FIXME

Note
getTopicPartitionState is used when…​FIXME

Fetching Broker IDs — getSortedBrokerList Method

getSortedBrokerList(): Seq[Int]

getSortedBrokerList gets the child znodes at /brokers/ids path and sorts it by broker ID (according to the natural ordering).

Note

getSortedBrokerList is used when:

Fetching Child ZNodes — getChildren Method

getChildren(path : String): Seq[String]

getChildren…​FIXME

Note
getChildren is used when…​FIXME

getClusterId Method

getClusterId: Option[String]

getClusterId…​FIXME

Note
getClusterId is used when…​FIXME

createOrGetClusterId Method

createOrGetClusterId(proposedClusterId: String): String

createOrGetClusterId…​FIXME

Note
createOrGetClusterId is used when…​FIXME

All Brokers In Kafka Cluster — getAllBrokerAndEpochsInCluster Method

getAllBrokerAndEpochsInCluster: Map[Broker, Long]

getAllBrokerAndEpochsInCluster…​FIXME

Note
getAllBrokerAndEpochsInCluster is used when…​FIXME

getBroker Method

getBroker(
  brokerId: Int): Option[Broker]

getBroker…​FIXME

Note
getBroker is used when…​FIXME

LogDirEvent Notifications (from Zookeeper) — getAllLogDirEventNotifications Method

getAllLogDirEventNotifications: Seq[String]

getAllLogDirEventNotifications…​FIXME

Note
getAllLogDirEventNotifications is used when…​FIXME

Converting LogDirEvent Notifications to Broker IDs — getBrokerIdsFromLogDirEvents Method

getBrokerIdsFromLogDirEvents(
  sequenceNumbers: Seq[String]): Seq[Int]

getBrokerIdsFromLogDirEvents…​FIXME

Note
getBrokerIdsFromLogDirEvents is used when…​FIXME

Creating State Znodes for Selected Partitions — createTopicPartitionStatesRaw Method

createTopicPartitionStatesRaw(
  leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch],
  expectedControllerEpochZkVersion: Int
): Seq[CreateResponse]

createTopicPartitionStatesRaw createTopicPartitions with the topics (of the given partitions).

createTopicPartitionStatesRaw createTopicPartition with the partitions.

For every partition (in the given leaderIsrAndControllerEpochs), createTopicPartitionStatesRaw creates a new CreateRequest to create /brokers/topics/[topic]/partitions/[partition]/state persistent znode with the associated LeaderIsrAndControllerEpoch encoded to JSON format.

// zkCli :2181
$ ./bin/zookeeper-shell.sh :2181 ls /brokers/topics
[t1]

$ ./bin/zookeeper-shell.sh :2181 get /brokers/topics/t1/partitions/0/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}

In the end, createTopicPartitionStatesRaw retryRequestsUntilConnected all the CreateRequests.

Note
createTopicPartitionStatesRaw is used exclusively when ZkPartitionStateMachine is requested to initializeLeaderAndIsrForPartitions.

createTopicPartitions Internal Method

createTopicPartitions(
  topics: Seq[String],
  expectedControllerEpochZkVersion: Int):Seq[CreateResponse]

createTopicPartitions…​FIXME

Note
createTopicPartitions is used when…​FIXME

createTopicPartition Internal Method

createTopicPartition(
  partitions: Seq[TopicPartition],
  expectedControllerEpochZkVersion: Int): Seq[CreateResponse]

createTopicPartition…​FIXME

Note
createTopicPartition is used when…​FIXME

updateLeaderAndIsr Method

updateLeaderAndIsr(
  leaderAndIsrs: Map[TopicPartition, LeaderAndIsr],
  controllerEpoch: Int,
  expectedControllerEpochZkVersion: Int): UpdateLeaderAndIsrResult

updateLeaderAndIsr…​FIXME

Note
updateLeaderAndIsr is used when…​FIXME

Partition Numbers of Given Topics — getPartitionsForTopics Method

getPartitionsForTopics(
  topics: Set[String]): Map[String, Seq[Int]]

getPartitionsForTopics…​FIXME

Note
getPartitionsForTopics is used when…​FIXME

Reading Entity Config From Zookeeper (Under /config Node) — getEntityConfigs Method

getEntityConfigs(
  rootEntityType: String,
  sanitizedEntityName: String): Properties

getEntityConfigs reads the /config/[rootEntityType]/[sanitizedEntityName].

getEntityConfigs…​FIXME

Note
getEntityConfigs is used exclusively when AdminZkClient is requested to fetchEntityConfig.

createAclPaths Method

createAclPaths(): Unit

createAclPaths…​FIXME

Note
createAclPaths is used when AclAuthorizer is requested to configure.

registerBroker Method

registerBroker(
  brokerInfo: BrokerInfo): Long

registerBroker…​FIXME

Note
registerBroker is used when…​FIXME

updateBrokerInfo Method

updateBrokerInfo(
  brokerInfo: BrokerInfo): Unit

updateBrokerInfo…​FIXME

Note
updateBrokerInfo is used when…​FIXME

close Method

close(): Unit
Note
close is part of the Java’s AutoCloseable contract to close a resource (and relinquishing any underlying resources).

close…​FIXME

results matching ""

    No results matching ""