ZkUtils is…​FIXME

ZkUtils is created when…​FIXME


Quoting Nodes and ephemeral nodes from the Zookeeper documentation:

Unlike standard file systems, each node in a ZooKeeper namespace can have data associated with it as well as children. It is like having a file-system that allows a file to also be a directory. (ZooKeeper was designed to store coordination data: status information, configuration, location information, etc., so the data stored at each node is usually small, in the byte to kilobyte range.) We use the term znode to make it clear that we are talking about ZooKeeper data nodes.

Znodes maintain a stat structure that includes version numbers for data changes, ACL changes, and timestamps, to allow cache validations and coordinated updates. Each time a znode’s data changes, the version number increases. For instance, whenever a client retrieves data it also receives the version of the data.

The data stored at each znode in a namespace is read and written atomically. Reads get all the data bytes associated with a znode and a write replaces all the data. Each node has an Access Control List (ACL) that restricts who can do what.

Table 1. ZkUtils’s ZNodes in Zookeeper
ZNode Description



Table 2. ZkUtils’s Internal Properties (e.g. Registries and Counters) (in alphabetical order)
Name Description



getCluster Method

getCluster(): Cluster

getCluster gets the children znodes of /brokers/ids znode and reads their data (as a JSON blob).

getCluster then adds creates a Broker from the znode id and the JSON blob (with a host, a port and endpoints).

getCluster is used exclusively when ZKRebalancerListener does syncedRebalance (that happens for the currently-deprecated ZookeeperConsumerConnector).

deletePathRecursive Method


deletePath Method


Creating ZkUtils Instance — apply Factory Method

  zkUrl: String,
  sessionTimeout: Int,
  connectionTimeout: Int,
  isZkSecurityEnabled: Boolean): ZkUtils



apply is used when:

  1. KafkaServer connects to Zookeeper

  2. FIXME

Registering Listener for State Changes — subscribeStateChanges Method

subscribeStateChanges(listener: IZkStateListener): Unit

subscribeStateChanges requests ZkClient to subscribeStateChanges with the listener.


subscribeStateChanges is used when:

  1. KafkaController is requested to register a SessionExpirationListener

  2. FIXME

Registering Listener for Child Changes — subscribeChildChanges Method

subscribeChildChanges(path: String, listener: IZkChildListener): Option[Seq[String]]


subscribeChildChanges is used…​FIXME

De-Registering Listener for Child Changes — unsubscribeChildChanges Method

unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit

unsubscribeChildChanges requests ZkClient to unsubscribeChildChanges for the input path and childListener.

unsubscribeChildChanges is used when…​FIXME

De-Registering Listener for Data Changes — unsubscribeDataChanges Method

unsubscribeDataChanges(path: String, dataListener: IZkDataListener): Unit

unsubscribeDataChanges requests ZkClient to unsubscribeDataChanges for the input path and dataListener.

unsubscribeDataChanges is used when…​FIXME

registerBrokerInZk Method

  id: Int,
  host: String,
  port: Int,
  advertisedEndpoints: Seq[EndPoint],
  jmxPort: Int,
  rack: Option[String],
  apiVersion: ApiVersion): Unit


registerBrokerInZk is used exclusively when KafkaHealthcheck is requested to register.

getTopicPartitionCount Method

getTopicPartitionCount(topic: String): Option[Int]



getTopicPartitionCount is used when:

  1. GroupMetadataManager is requested for getGroupMetadataTopicPartitionCount of __consumer_offsets topic

  2. TransactionStateManager is requested for getTransactionTopicPartitionCount of __transaction_state topic

Creating JSON with Broker ID — controllerZkData Method

controllerZkData(brokerId: Int, timestamp: Long): String

controllerZkData creates a JSON with the following fields:

  • "version":1

  • "brokerid":[brokerId]

  • "timestamp":[timestamp]

import kafka.utils._
scala> ZkUtils.controllerZkData(1, System.currentTimeMillis())
res0: String = {"version":1,"brokerid":1,"timestamp":"1506161225262"}
controllerZkData is used exclusively when KafkaController is requested for elect.

Creating ZkUtils Instance

ZkUtils takes the following when created:

  • ZkClient

  • ZkConnection

  • isSecure flag

ZkUtils initializes the internal registries and counters.

Reading Data Associated with ZNode — readDataMaybeNull Method

readDataMaybeNull(path: String): (Option[String], Stat)

readDataMaybeNull requests ZkClient to readData from path znode.

readDataMaybeNull returns None (for Option[String]) when path znode is not available.

results matching ""

    No results matching ""