InternalTopologyBuilder

InternalTopologyBuilder is used to build a topology of processor nodes (for StreamThread.StandbyTaskCreator and StreamThread.TaskCreator task factories and hence stream processor tasks themselves).

kafka streams InternalTopologyBuilder build.png
Figure 1. InternalTopologyBuilder and ProcessorTopology

InternalTopologyBuilder is created exclusively for a Topology (which is simply the frontend to the internals of topology development).

kafka streams InternalTopologyBuilder Topology.png
Figure 2. InternalTopologyBuilder and Topology

InternalTopologyBuilder takes no arguments when created.

import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val builder = new InternalTopologyBuilder

InternalTopologyBuilder uses node factories to build a topology. InternalTopologyBuilder supports ProcessorNodeFactory, SourceNodeFactory, and SinkNodeFactory factory types only (and throws a TopologyException for unknown factories).

InternalTopologyBuilder can have one or more sources that are added (registered) using addSource method (which simply registers a SourceNodeFactory under a name).

scala> :type builder
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder

val sourceNodeName = "sourceNode"

import org.apache.kafka.streams.processor.WallclockTimestampExtractor
val timestampExtractor = new WallclockTimestampExtractor
import org.apache.kafka.common.serialization.StringDeserializer
val keyDeserializer = new StringDeserializer
val valDeserializer = new StringDeserializer
val topics = Seq("input")
import org.apache.kafka.streams.Topology.AutoOffsetReset
builder.addSource(
  AutoOffsetReset.LATEST,
  sourceNodeName,
  timestampExtractor,
  keyDeserializer,
  valDeserializer,
  topics: _*)

InternalTopologyBuilder can have zero or more processor nodes that are added (registered) using addProcessor method (which simply registers a ProcessorNodeFactory under a name with a ProcessorSupplier and one or more predecessors). Since processors require that the predecessors are already added to the topology, a topology has to have at least one source processor already added.

scala> :type builder
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder

assert(!builder.getSourceTopicNames.isEmpty)

val sourceTopicName = "input"

import collection.JavaConverters._
assert(builder.getSourceTopicNames.asScala.contains(sourceTopicName))

import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier}
val supplier = new ProcessorSupplier[String, String] {
  def get = new Processor[String, String] {
    def init(context: ProcessorContext): Unit = {}
    def process(key: String, value: String): Unit = {}
    def close(): Unit = {}
  }
}
val processorNodeName = "processorNode"
val sourceNodeName = "sourceNode"
val predecessorNames = Seq(sourceNodeName)
builder.addProcessor(processorNodeName, supplier, predecessorNames: _*)

InternalTopologyBuilder can have zero or more sink nodes that are added (registered) using addSink method (which simply registers a SinkNodeFactory under a name).

scala> :type builder
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder

// FIXME: Finish the demo

InternalTopologyBuilder requires an application ID. The application ID is used as a namespace for the internal topics (so they do not clash with the topics of other Kafka Streams applications). The application ID is set using setApplicationId (and is the APPLICATION_ID_CONFIG configuration property that KafkaStreams requires when created).

InternalTopologyBuilder setApplicationId(final String applicationId)

InternalTopologyBuilder uses the application id when:

InternalTopologyBuilder has to be optimized using rewriteTopology (that sets the required application ID).

scala> :type builder
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder

import org.apache.kafka.streams.StreamsConfig
import java.util.Properties
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
val config = new StreamsConfig(props)
builder.rewriteTopology(config)

InternalTopologyBuilder can be described (using a TopologyDescription).

scala> :type builder
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder

// add nodes and stores

val description = builder.describe
scala> println(description)
Topologies:
   Sub-topology: 0
    Source: sourceNode (topics: [input])
      --> processorNode
    Processor: processorNode (stores: [])
      --> none
      <-- sourceNode

Global Source Node is a node from a SourceNodeFactory with exactly one topic and registered in globalTopics. You can use isGlobalSource predicate to check if a name is of a global source node.

Table 1. InternalTopologyBuilder’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

copartitionSourceGroups

earliestResetPatterns

earliestResetTopics

globalStateStores

Global state stores by name

Note
There are two types of StateStores, i.e. global and regular. Use allStateStoreName to access them all.
Note

InternalTopologyBuilder comes with globalStateStores method to access globalStateStores registry as an unmodifiable collection. It is used when:

globalTopics

Names of global topics

internalTopicNames

Names of the internal topics that were auto-created when InternalTopologyBuilder was requested to addInternalTopic

A new topic name is added when InternalTopologyBuilder is requested to addInternalTopic

latestResetPatterns

latestResetTopics

nodeFactories

NodeFactories by node name

nodeGrouper

QuickUnion of the names of node groups

Used when…​FIXME

nodeGroups

Node groups by ID, i.e. groups of node names that can be looked up by a group ID (Map<Integer, Set<String>>)

Initialized when InternalTopologyBuilder is requested for node groups (the very first time since it never changes once initialized)

Note
nodeGroups accessor is used to access nodeGroups registry.

nodeToSinkTopic

nodeToSourcePatterns

nodeToSourceTopics

Topic names by the source processor node name (without the application ID prefix for internal topics)

New entries are added when InternalTopologyBuilder is requested for the following:

sourceTopicNames

Collection of registered topic names

Used when…​FIXME

stateFactories

stateStoreNameToSourceRegex

stateStoreNameToSourceTopics

storeToChangelogTopic

Names of the state stores and the names of the corresponding changelog topics (Map<String, String>)

storeToChangelogTopic manages state stores with the StateStoreFactory with change-logging enabled

A new pair is added when InternalTopologyBuilder is requested to buildProcessorNode and associate the names of a state store and a topic

subscriptionUpdates

topicPattern

Source topics pattern (to subscribe to)

topicToPatterns

Tip

Enable DEBUG logging level for org.apache.kafka.streams.processor.internals.InternalTopologyBuilder logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.InternalTopologyBuilder=DEBUG

Adding Application ID to Topic (As Prefix) — decorateTopic Internal Method

String decorateTopic(final String topic)

decorateTopic…​FIXME

Note

decorateTopic is used when:

buildSinkNode Internal Method

void buildSinkNode(
  Map<String, ProcessorNode> processorMap,
  Map<String, SinkNode> topicSinkMap,
  Set<String> repartitionTopics,
  SinkNodeFactory sinkNodeFactory,
  SinkNode node)

buildSinkNode…​FIXME

Note
buildSinkNode is used exclusively when InternalTopologyBuilder is requested to build a topology of processor nodes.

maybeDecorateInternalSourceTopics Internal Method

List<String> maybeDecorateInternalSourceTopics(final Collection<String> sourceTopics)

maybeDecorateInternalSourceTopics…​FIXME

Note

maybeDecorateInternalSourceTopics is used when:

resetTopicsPattern Internal Method

Pattern resetTopicsPattern(
  final Set<String> resetTopics,
  final Set<Pattern> resetPatterns,
  final Set<String> otherResetTopics,
  final Set<Pattern> otherResetPatterns)

resetTopicsPattern…​FIXME

Note
resetTopicsPattern is used when…​FIXME

copartitionGroups Method

Collection<Set<String>> copartitionGroups()

copartitionGroups…​FIXME

Note
copartitionGroups is used exclusively when StreamsPartitionAssignor is requested to perform group assignment (assign tasks to consumer clients).

copartitionSources Method

void copartitionSources(
  Collection<String> sourceNodes)

copartitionSources…​FIXME

Note
copartitionSources is used exclusively when AbstractStream is requested to ensureJoinableWith.

Adding Processor to Topology — addProcessor Method

void addProcessor(
  String name,
  ProcessorSupplier supplier,
  String... predecessorNames)

addProcessor simply registers a new ProcessorNodeFactory by the given name in the nodeFactories internal registry.

addProcessor also adds the name to the nodeGrouper and unites the processor name with the predecessors.

In the end, addProcessor resets the nodeGroups collection (i.e. null).

Note
A processor has a unique name, a ProcessorSupplier and at least one predecessor (that cannot be itself)

addProcessor requires that the given name, the ProcessorSupplier and the predecessor names are all defined (i.e. not null) or throws a NullPointerException.

addProcessor requires that the given name is unique across all the registered nodeFactories or throws a TopologyException.

addProcessor requires that there is at least one predecessor name given or throws a TopologyException.

Note

addProcessor is used when:

buildProcessorNode Internal Method

void buildProcessorNode(
  Map<String, ProcessorNode> processorMap,
  Map<String, StateStore> stateStoreMap,
  ProcessorNodeFactory factory,
  ProcessorNode node)

buildProcessorNode…​FIXME

Note
buildProcessorNode is used exclusively when InternalTopologyBuilder is requested to build a topology of processor nodes.

buildSourceNode Internal Method

void buildSourceNode(
  Map<String, SourceNode> topicSourceMap,
  Set<String> repartitionTopics,
  SourceNodeFactory sourceNodeFactory,
  SourceNode node)

buildSourceNode…​FIXME

Note
buildSourceNode is used exclusively when InternalTopologyBuilder is requested to build a topology of processor nodes.

Adding Source Node to Topology — addSource Method

void addSource(
  Topology.AutoOffsetReset offsetReset,
  String name,
  TimestampExtractor timestampExtractor,
  Deserializer keyDeserializer,
  Deserializer valDeserializer,
  Pattern topicPattern)
void addSource(
  Topology.AutoOffsetReset offsetReset,
  String name,
  TimestampExtractor timestampExtractor,
  Deserializer keyDeserializer,
  Deserializer valDeserializer,
  String... topics)

addSource simply registers a new SourceNodeFactory by the given name in the nodeFactories internal registry.

addSource maybeAddToResetList every topic in the given topics.

addSource adds few inputs to the following internal registries:

In the end, addSource resets the nodeGroups collection (i.e. null).

Note
A source processor has a unique name, a Topology.AutoOffsetReset, a TimestampExtractor, key and value deserializers, a ProcessorSupplier and at least one topic.

addSource requires that:

  • There is at least one topic or throws a TopologyException

  • Name is specified (not null) and unique across all the registered nodeFactories or throws a TopologyException

  • No topic has been registered earlier

Note

addSource is used when:

maybeAddToResetList Internal Method

void maybeAddToResetList(
  final Collection<T> earliestResets,
  final Collection<T> latestResets,
  final Topology.AutoOffsetReset offsetReset,
  final T item)

maybeAddToResetList…​FIXME

Note
maybeAddToResetList is used when…​FIXME

validateTopicNotAlreadyRegistered Internal Method

void validateTopicNotAlreadyRegistered(final String topic)

validateTopicNotAlreadyRegistered…​FIXME

Note
validateTopicNotAlreadyRegistered is used when…​FIXME

Connecting Processor with State Stores — connectProcessorAndStateStores Method

void connectProcessorAndStateStores(
  String processorName,
  String... stateStoreNames)

connectProcessorAndStateStores simply connectProcessorAndStateStore with processorName and every state store name in stateStoreNames.

connectProcessorAndStateStores reports a NullPointerException when processorName, stateStoreNames or any state store name are nulls.

connectProcessorAndStateStores reports a TopologyException when stateStoreNames is an empty collection.

Note
connectProcessorAndStateStores (plural) is a public method that uses the internal connectProcessorAndStateStore (singular) for a "bulk connect".
Note

connectProcessorAndStateStores is used when:

Adding Global State Store to Topology — addGlobalStore Method

void addGlobalStore(
  StoreBuilder storeBuilder,
  String sourceName,
  TimestampExtractor timestampExtractor,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer,
  String topic,
  String processorName,
  ProcessorSupplier stateUpdateSupplier)

addGlobalStore creates a ProcessorNodeFactory with the given processorName, sourceName (as predecessors) and stateUpdateSupplier (as supplier).

addGlobalStore then does the following housekeeping tasks:

  1. Adds the given topic to globalTopics internal registry

  2. Creates a SourceNodeFactory and registers it in nodeFactories internal registry as sourceName

  3. Associates the sourceName with topic to nodeToSourceTopics

  4. Requests QuickUnion of the names of node groups to add the sourceName

  5. Requests ProcessorNodeFactory to add a state store as name

  6. Associates the processorName with nodeFactory in nodeFactories

  7. Requests QuickUnion of the names of node groups to add the processorName

  8. Requests QuickUnion of the names of node groups to unite the processorName and predecessors

  9. Associates the name with the store in globalStateStores

In the end, addGlobalStore associates the names of the state store and the topic (with the name and topic).

Note

addGlobalStore is used when:

Validating Arguments for Creating Global State Store — validateGlobalStoreArguments Internal Method

void validateGlobalStoreArguments(
  final String sourceName,
  final String topic,
  final String processorName,
  final ProcessorSupplier stateUpdateSupplier,
  final String storeName,
  final boolean loggingEnabled)

validateGlobalStoreArguments validates the input parameters (before adding a global state store to a topology).

validateGlobalStoreArguments throws a NullPointerException when sourceName, topic, stateUpdateSupplier or processorName are null.

validateGlobalStoreArguments throws a TopologyException when:

Note
validateGlobalStoreArguments is used exclusively when InternalTopologyBuilder is requested to add a global state store to a topology.

Registering State Store with Topic (Associating Names) — connectSourceStoreAndTopic Method

void connectSourceStoreAndTopic(
  String sourceStoreName,
  String topic)

connectSourceStoreAndTopic adds the given sourceStoreName with the topic to storeToChangelogTopic internal registry.

connectSourceStoreAndTopic reports a TopologyException when storeToChangelogTopic has sourceStoreName already registered.

Source store [sourceStoreName] is already added.
Note

connectSourceStoreAndTopic is used when:

Connecting State Store with Processor Node — connectProcessorAndStateStore Internal Method

void connectProcessorAndStateStore(
  String processorName,
  String stateStoreName)
Note
connectProcessorAndStateStore (singular) is an internal method that is used by the public connectProcessorAndStateStores (plural).

connectProcessorAndStateStore gets the StateStoreFactory for the given stateStoreName (in stateFactories).

connectProcessorAndStateStore then unites all users of the StateStoreFactory with the given processorName. connectProcessorAndStateStore adds the processorName to the users.

connectProcessorAndStateStore gets the NodeFactory for the given processorName (in nodeFactories). Only when the NodeFactory is a ProcessorNodeFactory, connectProcessorAndStateStore registers the stateStoreName with the ProcessorNodeFactory.

In the end, connectProcessorAndStateStore connectStateStoreNameToSourceTopicsOrPattern (with the input stateStoreName and the ProcessorNodeFactory).

connectProcessorAndStateStore reports a TopologyException when the input stateStoreName or processorName have not been registered yet or the processorName is the name of a source or sink node.

Note
connectProcessorAndStateStore is used when InternalTopologyBuilder is requested to add a state store and connect a processor with state stores.

connectStateStoreNameToSourceTopicsOrPattern Internal Method

void connectStateStoreNameToSourceTopicsOrPattern(
  String stateStoreName,
  ProcessorNodeFactory processorNodeFactory)

connectStateStoreNameToSourceTopicsOrPattern…​FIXME

Note
connectStateStoreNameToSourceTopicsOrPattern is used exclusively when InternalTopologyBuilder is requested to connect a processor with a state store

Adding State Store to Topology — addStateStore Method

void addStateStore(
  StoreBuilder<?> storeBuilder,
  String... processorNames) (1)
void addStateStore(
  StoreBuilder<?> storeBuilder,
  boolean allowOverride,
  String... processorNames)
  1. Uses false for the allowOverride flag

addStateStore creates a StateStoreFactory (with the StoreBuilder) and registers it (in the stateFactories internal registry).

addStateStore then connects the state store with processors (if they are given).

In the end, addStateStore resets (nullify) the nodeGroups internal registry.

addStateStore throws a TopologyException when the given StoreBuilder has already been registered (in the stateFactories internal registry) and the allowOverride flag is off (false):

StateStore [name] is already added.
Note

addStateStore is used (with the allowOverride flag disabled) when:

Topic Groups (TopicsInfos By IDs) — topicGroups Method

Map<Integer, TopicsInfo> topicGroups()

topicGroups…​FIXME

Note
topicGroups is used exclusively when StreamsPartitionAssignor is requested to assign.

Getting Node Groups by ID — nodeGroups Accessor Method

synchronized Map<Integer, Set<String>> nodeGroups()

nodeGroups gives node groups by id.

If node groups by id registry has not been initialized yet, nodeGroups creates the node groups that are the node groups from now on.

Note
nodeGroups is used when InternalTopologyBuilder is requested to build a topology for a topic group ID, globalNodeGroups and topicGroups

Building Global Processor Task Topology — buildGlobalStateTopology Method

ProcessorTopology buildGlobalStateTopology()

buildGlobalStateTopology globalNodeGroups and builds a processor task topology with the global node groups.

buildGlobalStateTopology returns null if globalNodeGroups is empty.

Note
buildGlobalStateTopology is used exclusively when KafkaStreams is created.

describeGlobalStore Internal Method

void describeGlobalStore(final TopologyDescription description, final Set<String> nodes, int id)

describeGlobalStore…​FIXME

Note
describeGlobalStore is used exclusively when InternalTopologyBuilder is requested to describe.

nodeGroupContainsGlobalSourceNode Internal Method

void nodeGroupContainsGlobalSourceNode(final TopologyDescription description, final Set<String> nodes, int id)

nodeGroupContainsGlobalSourceNode…​FIXME

Note
nodeGroupContainsGlobalSourceNode is used exclusively when InternalTopologyBuilder is requested to describe.

Checking If Node Name Is Of Global Source Node — isGlobalSource Internal Method

boolean isGlobalSource(final String nodeName)

isGlobalSource looks up a NodeFactory by the input node name (in the nodeFactories internal registry).

isGlobalSource is positive (i.e. true) when the following all hold:

Otherwise, isGlobalSource is negative (i.e. false).

Note
isGlobalSource is used when InternalTopologyBuilder is requested to describeGlobalStore, globalNodeGroups and nodeGroupContainsGlobalSourceNode.

Global Node Groups — globalNodeGroups Internal Method

Set<String> globalNodeGroups()

globalNodeGroups gives node groups with at least one global source node.

Note
globalNodeGroups is used when InternalTopologyBuilder is requested to build a processor task topology and global processor task topology.

Building Node Groups — makeNodeGroups Internal Method

Map<Integer, Set<String>> makeNodeGroups()

makeNodeGroups starts with no node groups and the local counter of node group IDs as 0.

Note
makeNodeGroups uses Java’s java.util.LinkedHashMap that is Hash table and linked list implementation of the Map interface, with predictable iteration order.

makeNodeGroups takes the names of registered source nodes (from the nodeToSourceTopics and nodeToSourcePatterns internal registries).

makeNodeGroups sorts the names of the source nodes in ascending order (per the natural ordering) and putNodeGroupName for every source node name.

Note
While putNodeGroupName, makeNodeGroups may end up with a new node group ID. After processing all source node names, the node group ID is the last group ID assigned.

makeNodeGroups takes the non-source node names (from the nodeFactories internal registry that are not in the nodeToSourceTopics internal registry).

makeNodeGroups does the same group ID assignment as for the source node names, i.e. sorts the names in ascending order and putNodeGroupName for every node name.

In the end, makeNodeGroups returns the node (names) groups by ID.

Note
makeNodeGroups is used when InternalTopologyBuilder is requested to describe a topology, and get node groups.

putNodeGroupName Internal Method

int putNodeGroupName(
  final String nodeName,
  final int nodeGroupId,
  final Map<Integer, Set<String>> nodeGroups,
  final Map<String, Set<String>> rootToNodeGroup)

putNodeGroupName takes the name of a node, the current node group ID, the current node groups and the rootToNodeGroup.

putNodeGroupName requests QuickUnion of the names of node groups for the root node of the input nodeName.

putNodeGroupName gets the node group for the root node from the input rootToNodeGroup and adds the input nodeName to it.

If the root node was not found in the input rootToNodeGroup, putNodeGroupName registers the root node with an empty node group in rootToNodeGroup. putNodeGroupName then registers the empty node group with an incremented node group ID in nodeGroups.

In the end, putNodeGroupName gives the input nodeGroupId or a new node group ID if the root node was not found in the input rootToNodeGroup.

Note
putNodeGroupName is used exclusively when InternalTopologyBuilder is requested to create the node groups.

Describing Topology (as TopologyDescription) — describe Method

TopologyDescription describe()

describe…​FIXME

import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val itb = new InternalTopologyBuilder()

// Create a state store builder
import org.apache.kafka.streams.state.Stores
val lruMapSupplier = Stores.lruMap("input-stream", 5)
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.state.{KeyValueStore, StoreBuilder}
val storeBuilder = Stores.keyValueStoreBuilder(
  lruMapSupplier,
  Serdes.Long(),
  Serdes.Long()).
  withLoggingDisabled

// Add the state store as a global state store
import org.apache.kafka.streams.processor.TimestampExtractor
val timestampExtractor: TimestampExtractor = null
import org.apache.kafka.common.serialization.LongDeserializer
val keyDeserializer = new LongDeserializer
val valueDeserializer = new LongDeserializer
import org.apache.kafka.streams.kstream.internals.KTableSource
import org.apache.kafka.streams.processor.ProcessorSupplier
import java.lang.{Long => JLong}
val stateUpdateSupplier: ProcessorSupplier[JLong, JLong] = new KTableSource("global-store")
itb.addGlobalStore(
  // Required to make the code compile
  storeBuilder.asInstanceOf[StoreBuilder[KeyValueStore[_, _]]],
  "sourceName",
  timestampExtractor,
  keyDeserializer,
  valueDeserializer,
  "global-store-topic",
  "processorName",
  stateUpdateSupplier)

import org.apache.kafka.streams.TopologyDescription
val td: TopologyDescription = itb.describe
scala> println(td)
Topologies:
   Sub-topology: 0 for global store (will not generate tasks)
    Source: sourceName (topics: global-store-topic)
      --> processorName
    Processor: processorName (stores: [input-stream])
      --> none
      <-- sourceName
Note
describe is used exclusively when Topology is requested to describe.

describeSubtopology Internal Method

void describeSubtopology(
  TopologyDescription description,
  Integer subtopologyId,
  Set<String> nodeNames)

describeSubtopology…​FIXME

Note
describeSubtopology is used exclusively when InternalTopologyBuilder is requested to describe itself.

describeGlobalStore Internal Method

void describeGlobalStore(
  final TopologyDescription description,
  final Set<String> nodes, int id)

describeGlobalStore…​FIXME

Note
describeGlobalStore is used exclusively when InternalTopologyBuilder is requested to describe itself.

Adding Sink Node to Topology — addSink Method

void addSink(
  String name,
  String topic,
  Serializer<K> keySerializer,
  Serializer<V> valSerializer,
  StreamPartitioner<? super K, ? super V> partitioner,
  String... predecessorNames) (1)
void addSink(
  String name,
  TopicNameExtractor<K, V> topicExtractor,
  Serializer<K> keySerializer,
  Serializer<V> valSerializer,
  StreamPartitioner<? super K, ? super V> partitioner,
  String... predecessorNames)

addSink creates a SinkNodeFactory (passing all the inputs along) and registers (adds) it in the nodeFactories internal registry (under the input name).

addSink registers the input topic with the input name in the nodeToSinkTopic internal registry.

addSink adds the input name to the nodeGrouper internal registry and requests it to unite the input name with the input predecessorNames.

Note

addSink is used when:

Adding Internal Topic Name — addInternalTopic Method

void addInternalTopic(
  String topicName)

addInternalTopic simply adds the input topicName to the internalTopicNames internal registry.

Note

addInternalTopic is used when:

Building Topology of Processor Nodes — build Method

ProcessorTopology build() (1)
ProcessorTopology build(
  Integer topicGroupId) (2)

// PRIVATE
private ProcessorTopology build(
  Set<String> nodeGroup)
  1. Uses build with an undefined topicGroupId (i.e. null)

  2. Uses build with nodeGroup being the node names for a given topicGroupId

The private build takes the NodeFactories (from the nodeFactories internal registry).

For every NodeFactory the private build checks if the node (by its name) is included in the input nodeGroup (with the assumption that it is when the nodeGroup is null which can happen when a group ID could not be found in the nodeGroups internal registry) and, if it is, does the following:

  1. Requests the NodeFactory to build a processor node (and adds it to a local processorMap of processors by their names)

  2. For ProcessorNodeFactories, build buildProcessorNode

  3. For SourceNodeFactories, build buildSourceNode

  4. For SinkNodeFactories, build buildSinkNode

In the end, build creates a ProcessorTopology.

build throws a TopologyException for unknown NodeFactories.

Unknown definition class: [className]
Note
nodeGroup can be either global node groups (aka global state topology), a single or all node groups.
Note
The private build is used when InternalTopologyBuilder is requested to build a processor task topology (for a group ID) and build a global processor task topology.
Note
The no-argument build is used exclusively when KafkaStreams is created (as a sanity check to fail-fast in case a ProcessorTopology could not be built due to some exception).

Building Processor Task Topology For Group ID — build Factory Method

ProcessorTopology build(
  Integer topicGroupId)

This variant of build takes either a group ID or null (see the parameter-less build()).

For the input topicGroupId specified (i.e. non-null), build looks up the group ID in the nodeGroups internal registry and builds the topology (for the node names in the node group).

When the input topicGroupId is undefined (i.e. null), build takes the node names (from the nodeGroups internal registry) and removes globalNodeGroups. In the end, build builds the topology (for the node names).

Note

build (with a topic group ID) is used when:

allStateStoreName Method

Set<String> allStateStoreName()

allStateStoreName simply returns the state store names (the keys) from the stateFactories and globalStateStores internal registries.

Note
allStateStoreName is used exclusively when TopologyTestDriver is requested to getAllStateStores.

Creating InternalTopicConfig (Given Name and StateStoreFactory) — createChangelogTopicConfig Internal Method

InternalTopicConfig createChangelogTopicConfig(
  StateStoreFactory factory,
  String name)

createChangelogTopicConfig creates a UnwindowedChangelogTopicConfig or a WindowedChangelogTopicConfig per the isWindowStore flag of the input StateStoreFactory.

Internally, createChangelogTopicConfig requests the input StateStoreFactory for isWindowStore flag.

Note
isWindowStore flag is enabled when a StateStoreFactory is created for a WindowStoreBuilder.

If isWindowStore flag is enabled (true), createChangelogTopicConfig does the following:

  1. Requests the input StateStoreFactory for logConfig and uses it to create a WindowedChangelogTopicConfig (for the input name)

  2. Requests the input StateStoreFactory for retentionPeriod and uses it to requests the WindowedChangelogTopicConfig to setRetentionMs

If isWindowStore flag is disabled (false), createChangelogTopicConfig requests the input StateStoreFactory for logConfig and uses it to create a UnwindowedChangelogTopicConfig (for the input name).

Note
createChangelogTopicConfig is used exclusively when InternalTopologyBuilder is requested for topic groups.

Source Topics — sourceTopicPattern Method

Pattern sourceTopicPattern()

sourceTopicPattern returns the cached source topics pattern if available.

If not, sourceTopicPattern takes the subscribed topics from the nodeToSourceTopics internal registry and sorts them into ascending order (using natural ordering).

Before returning the source topics pattern, sourceTopicPattern buildPatternForOffsetResetTopics and saves the result in the topicPattern internal registry.

Note

sourceTopicPattern is used when:

buildPatternForOffsetResetTopics Internal Method

Pattern buildPatternForOffsetResetTopics(
  Collection<String> sourceTopics,
  Collection<Pattern> sourcePatterns)

buildPatternForOffsetResetTopics…​FIXME

Note
buildPatternForOffsetResetTopics is used when InternalTopologyBuilder is requested to resetTopicsPattern and sourceTopicPattern.

setRegexMatchedTopicsToSourceNodes Internal Method

void setRegexMatchedTopicsToSourceNodes()

setRegexMatchedTopicsToSourceNodes…​FIXME

Note
setRegexMatchedTopicsToSourceNodes is used exclusively when InternalTopologyBuilder is requested to updateSubscriptions.

updateSubscriptions Method

void updateSubscriptions(
  final SubscriptionUpdates subscriptionUpdates,
  final String logPrefix)

updateSubscriptions…​FIXME

Note
updateSubscriptions is used exclusively when InternalTopologyBuilder is requested to updateSubscribedTopics.

updateSubscribedTopics Method

void updateSubscribedTopics(final Set<String> topics, final String logPrefix)

updateSubscribedTopics…​FIXME

Note
updateSubscribedTopics is used when TaskManager is requested to updateSubscriptionsFromAssignment and updateSubscriptionsFromMetadata.

rewriteTopology Method

InternalTopologyBuilder rewriteTopology(final StreamsConfig config)

rewriteTopology…​FIXME

Note
rewriteTopology is used exclusively when KafkaStreams is created.

adjust Internal Method

void adjust(final StreamsConfig config)

adjust…​FIXME

Note
adjust is used exclusively when InternalTopologyBuilder is requested to rewriteTopology.

results matching ""

    No results matching ""