StreamsMetadataState manages the Kafka cluster metadata for a KafkaStreams instance.

kafka streams StreamsMetadataState onChange.png
Figure 1. StreamsMetadataState and Cluster Metadata Changes (from StreamsPartitionAssignor)

StreamsMetadataState comes with getAllMetadata method that returns StreamsMetadatas for all streams client instances in a multi-instance Kafka Streams application.

StreamsMetadataState is created exclusively for the TaskManager of the stream processor threads of a KafkaStreams instance.

StreamsMetadata represents the metadata of a KafkaStreams stream processing node (process) in a Kafka Streams application:

  • A user-defined endpoint (as a pair of a host and a port)

  • Names of state stores

  • Kafka TopicPartitions (that this instance is assigned to)

Table 1. StreamsMetadataState’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description


StreamsMetadata are added exclusively when StreamsMetadataState is requested to rebuildMetadata (after all entries are first removed)

allMetadata is accessed when StreamsMetadataState is requested for a textual representation, getAllMetadataForStore, getMetadataWithKey, getStreamsMetadataForKey


Cluster metadata, i.e. a subset of the nodes, topics, and partitions in the Kafka cluster (as a Kafka Cluster) that is given when StreamsMetadataState is requested to handle cluster metadata changes

Used when:


Names of the global StateStores

Used when…​FIXME


Creating StreamsMetadataState Instance

StreamsMetadataState takes the following when created:

StreamsMetadataState initializes the internal registries and counters.

getAllMetadataForStore Method

Collection<StreamsMetadata> getAllMetadataForStore(final String storeName)


getAllMetadataForStore is used exclusively when KafkaStreams is requested to allMetadataForStore.

Handling Cluster Metadata Changes — onChange Method

void onChange(
  final Map<HostInfo, Set<TopicPartition>> currentState,
  final Cluster clusterMetadata)

onChange simply records the given Cluster as the Cluster and rebuildMetadata (with the given current state of Kafka TopicPartitions per HostInfo).

onChange is used exclusively when TaskManager is requested to notify the StreamsMetadataState about cluster metadata changes.

Rebuilding Internal StreamsMetadata — rebuildMetadata Internal Method

void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> currentState)

rebuildMetadata firstly clears the allMetadata internal registry.

rebuildMetadata does nothing else and returns if the given currentState is empty.

rebuildMetadata requests the InternalTopologyBuilder for stateStoreNameToSourceTopics.

For every entry in the input currentState, rebuildMetadata…​FIXME

rebuildMetadata is used exclusively when StreamsMetadataState is requested to handle cluster metadata changes.

Describing Itself (Textual Representation) — toString Method

String toString() (1)
String toString(final String indent)
  1. Uses an empty indent


toString is used when TaskManager is requested to describe itself.

isInitialized Internal Method

boolean isInitialized()


isInitialized is used when…​FIXME

getMetadataWithKey Method

StreamsMetadata getMetadataWithKey(
  final String storeName,
  final K key,
  final Serializer<K> keySerializer)
StreamsMetadata getMetadataWithKey(
  final String storeName,
  final K key,
  final StreamPartitioner<? super K, ?> partitioner)


getMetadataWithKey is used exclusively when KafkaStreams is requested for metadataForKey.

hasPartitionsForAnyTopics Internal Method

boolean hasPartitionsForAnyTopics(
  final List<String> topicNames,
  final Set<TopicPartition> partitionForHost)

hasPartitionsForAnyTopics is enabled (returns true) when the input topicNames contain any of the topics in the input partitionForHost. Otherwise, hasPartitionsForAnyTopics is disabled (false).

hasPartitionsForAnyTopics is used exclusively when StreamsMetadataState is requested to rebuildMetadata.

getAllMetadata Method

Collection<StreamsMetadata> getAllMetadata()

getAllMetadata simply returns the allMetadata internal registry.

getAllMetadata is used exclusively when KafkaStreams is requested for StreamsMetadatas for all KafkaStreams instances (in a multi-instance Kafka Streams application).

results matching ""

    No results matching ""