
InternalStreamsBuilder is an internal entity that StreamsBuilder (of Streams DSL — High-Level Stream Processing DSL) uses to build a topology by adding nodes to the stream graph using the high-level stream operators.

InternalStreamsBuilder is created exclusively for StreamsBuilder.

kafka streams InternalStreamsBuilder.png
Figure 1. Creating InternalStreamsBuilder

InternalStreamsBuilder takes a InternalTopologyBuilder to be created.

import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val intTopologyBuilder = new InternalTopologyBuilder

import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
val intStreamsBuilder = new InternalStreamsBuilder(intTopologyBuilder)

InternalStreamsBuilder manages a StreamsGraphNode as the root node (of the topology to build). Every time InternalStreamsBuilder is requested to add a stream, a table, a global table, a state store or a global state store the root node gets a corresponding child node added. Once the topology is of a proper structure, InternalStreamsBuilder can simply be requested to build it (with optional optimizations applied).

import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder

val topics = Seq("input")
import org.apache.kafka.streams.kstream.internals.ConsumedInternal
val intConsumed = new ConsumedInternal[String, String]()
import scala.collection.JavaConversions._
// Add a new graph node (stream) to the root node, intConsumed)


val root = intStreamsBuilder.root
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode
assert(root.children.size == 1)

// Let's explore what we built

val streamSourceNode = root.children.head
StreamSourceNode{topicNames=[input], topicPattern=null, consumedInternal=org.apache.kafka.streams.kstream.internals.ConsumedInternal@e1781} StreamsGraphNode{nodeName='KSTREAM-SOURCE-0000000000', buildPriority=0, hasWrittenToTopology=true, keyChangingOperation=false, valueChangingOperation=false, mergeNode=false, parentNodes=[root]}

   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
      --> none

InternalStreamsBuilder is an InternalNameProvider and can be requested for a name for new processors and stores.

InternalStreamsBuilder uses index counter that starts at 0 and is incremented every time a new name for a processor or a store is requested. That makes all names unique and sources and stores uniquely identified by name.


Enable ALL logging level for org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder logger to see what happens inside.

Add the following line to

Adding State Store to Topology — addStateStore Method

void addStateStore(
  StoreBuilder builder)

addStateStore creates a StateStoreNode (with the given StoreBuilder) and adds it to the root node.

addStateStore is used exclusively when StreamsBuilder is requested to add a state store to a topology.

Adding Stream to Topology (and Creating KStream) — stream Method

KStream<K, V> stream(
  Collection<String> topics,
  ConsumedInternal<K, V> consumed)
KStream<K, V> stream(
  Pattern topicPattern,
  ConsumedInternal<K, V> consumed)

stream creates a new StreamSourceNode and adds it to the root node.

In the end, stream returns a new instance of KStreamImpl (with the new processor name that is also used as the name of the single source node and the repartitionRequired flag off).

import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val internalTopologyBuilder = new InternalTopologyBuilder()

import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
val internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder)

val topics = Seq("input")
import org.apache.kafka.streams.kstream.internals.ConsumedInternal
val consumed = new ConsumedInternal[String, String]()
import collection.JavaConverters._
val kstream =, consumed)
scala> :type kstream

import org.apache.kafka.streams.kstream.internals.KStreamImpl
assert(kstream.isInstanceOf[KStreamImpl[String, String]])

stream is used when:

  • StreamsBuilder is requested to stream

  • KStreamImpl is requested to through

Adding Table to Topology (and Creating KTable) — table Method

KTable<K, V> table(
  String topic,
  ConsumedInternal<K, V> consumed,
  MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized)


table is used exclusively when StreamsBuilder is requested to add a table to a topology.

Adding Global State Store to Topology — addGlobalStore Method

void addGlobalStore(
  StoreBuilder<KeyValueStore> storeBuilder,
  String topic,
  ConsumedInternal consumed,
  ProcessorSupplier stateUpdateSupplier)
void addGlobalStore(
  StoreBuilder<KeyValueStore> storeBuilder,
  String sourceName,
  String topic,
  ConsumedInternal consumed,
  String processorName,
  ProcessorSupplier stateUpdateSupplier)


addGlobalStore is used exclusively when StreamsBuilder is requested to addGlobalStore.

Adding Global Table to Topology (and Creating GlobalKTable) — globalTable Method

GlobalKTable<K, V> globalTable(
  String topic,
  ConsumedInternal<K, V> consumed,
  MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized)

globalTable creates a new GlobalKTableImpl.

Internally, globalTable requests the MaterializedInternal to disable logging (regardless whether it was enabled initially or not).

GlobalKTables use state stores with logging disabled.

globalTable then creates a KeyValueStoreMaterializer (with the input MaterializedInternal with logging disabled) and requests it to materialize (and create a StoreBuilder).

globalTable creates a TableSourceNode (with the StoreBuilder, the source processor name with KSTREAM-SOURCE- prefix, and isGlobalKTable flag on)

globalTable adds the TableSourceNode to the root node.

In the end, globalTable creates a GlobalKTableImpl (with a new KTableSourceValueGetterSupplier and the queryable flag of the MaterializedInternal).

globalTable is used exclusively when StreamsBuilder is requested to add a global table to a topology.

New Unique Processor Name — newProcessorName Method

String newProcessorName(
  String prefix)
newProcessorName is part of InternalNameProvider Contract to create a new unique name for a processor.

newProcessorName simply takes the input prefix followed by the index.

The index counter is what makes it bound to a InternalStreamsBuilder.
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder)

val name = newBuilder.newProcessorName("PREFIX")
scala> println(name)

New Unique Store Name — newStoreName Method

String newStoreName(
  String prefix)
newStoreName is part of InternalNameProvider Contract to create a new unique name for a state store.

newStoreName simply concatenates the input prefix, STATE-STORE- and the index.

The index counter is what makes it bound to a InternalStreamsBuilder.
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder)

val name = newBuilder.newStoreName("PREFIX")
scala> println(name)

Adding Child Node — addGraphNode Method

void addGraphNode(
  StreamsGraphNode parent,
  StreamsGraphNode child)
void addGraphNode(
  Collection<StreamsGraphNode> parents,
  StreamsGraphNode child)

addGraphNode simply requests the input StreamsGraphNode to add the given child node.

In the end, addGraphNode maybeAddNodeForOptimizationMetadata.

Building Topology (with Optional Optimizations) — buildAndOptimizeTopology Method

void buildAndOptimizeTopology() (1)
void buildAndOptimizeTopology(
  Properties props)
  1. For testing only; Uses null for the Properties

buildAndOptimizeTopology does maybePerformOptimizations (with the given Properties).


buildAndOptimizeTopology is used exclusively when StreamsBuilder is requested to build a topology.

maybeAddNodeForOptimizationMetadata Internal Method

void maybeAddNodeForOptimizationMetadata(
  StreamsGraphNode node)


maybeAddNodeForOptimizationMetadata is used exclusively when InternalStreamsBuilder is requested to adding a child node.

maybePerformOptimizations Internal Method

void maybePerformOptimizations(
  Properties props)


maybePerformOptimizations is used exclusively when InternalStreamsBuilder is requested to build a topology (with optional optimizations).

getKeyChangingParentNode Internal Method

StreamsGraphNode getKeyChangingParentNode(
  StreamsGraphNode repartitionNode)


getKeyChangingParentNode is used exclusively when InternalStreamsBuilder is requested to maybeAddNodeForOptimizationMetadata.

maybeOptimizeRepartitionOperations Internal Method

void maybeOptimizeRepartitionOperations()


maybeOptimizeRepartitionOperations is used exclusively when InternalStreamsBuilder is requested to maybePerformOptimizations.

createRepartitionNode Internal Method

OptimizableRepartitionNode createRepartitionNode(
  String repartitionTopicName,
  Serde keySerde,
  Serde valueSerde)


createRepartitionNode is used when…​FIXME

findParentNodeMatching Internal Method

StreamsGraphNode findParentNodeMatching(
  StreamsGraphNode startSeekingNode,
  Predicate<StreamsGraphNode> parentNodePredicate)


findParentNodeMatching is used when…​FIXME

getFirstRepartitionTopicName Internal Method

String getFirstRepartitionTopicName(
  Collection<OptimizableRepartitionNode> repartitionNodes)


getFirstRepartitionTopicName is used when…​FIXME

getRepartitionSerdes Internal Method

GroupedInternal getRepartitionSerdes(
  Collection<OptimizableRepartitionNode> repartitionNodes)


getRepartitionSerdes is used when…​FIXME

maybeUpdateKeyChangingRepartitionNodeMap Internal Method

void maybeUpdateKeyChangingRepartitionNodeMap()


maybeUpdateKeyChangingRepartitionNodeMap is used when…​FIXME

optimizeKTableSourceTopics Internal Method

void optimizeKTableSourceTopics()


optimizeKTableSourceTopics is used when…​FIXME

