InternalStreamsBuilder

InternalStreamsBuilder is an internal entity that StreamsBuilder (of Streams DSL — High-Level Stream Processing DSL) uses to build a topology by adding nodes to the stream graph using the high-level stream operators.

InternalStreamsBuilder is created exclusively for StreamsBuilder.

kafka streams InternalStreamsBuilder.png
Figure 1. Creating InternalStreamsBuilder

InternalStreamsBuilder takes a InternalTopologyBuilder to be created.

import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val intTopologyBuilder = new InternalTopologyBuilder

import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
val intStreamsBuilder = new InternalStreamsBuilder(intTopologyBuilder)

InternalStreamsBuilder manages a StreamsGraphNode as the root node (of the topology to build). Every time InternalStreamsBuilder is requested to add a stream, a table, a global table, a state store or a global state store the root node gets a corresponding child node added. Once the topology is of a proper structure, InternalStreamsBuilder can simply be requested to build it (with optional optimizations applied).

import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
assert(intStreamsBuilder.isInstanceOf[InternalStreamsBuilder])

val topics = Seq("input")
import org.apache.kafka.streams.kstream.internals.ConsumedInternal
val intConsumed = new ConsumedInternal[String, String]()
import scala.collection.JavaConversions._
// Add a new graph node (stream) to the root node
intStreamsBuilder.stream(topics, intConsumed)

intStreamsBuilder.buildAndOptimizeTopology

val root = intStreamsBuilder.root
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode
assert(root.isInstanceOf[StreamsGraphNode])
assert(root.children.size == 1)

// Let's explore what we built

val streamSourceNode = root.children.head
println(streamSourceNode)
/**
StreamSourceNode{topicNames=[input], topicPattern=null, consumedInternal=org.apache.kafka.streams.kstream.internals.ConsumedInternal@e1781} StreamsGraphNode{nodeName='KSTREAM-SOURCE-0000000000', buildPriority=0, hasWrittenToTopology=true, keyChangingOperation=false, valueChangingOperation=false, mergeNode=false, parentNodes=[root]}
*/

println(intTopologyBuilder.describe)
/**
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
      --> none
*/

InternalStreamsBuilder is an InternalNameProvider and can be requested for a name for new processors and stores.

InternalStreamsBuilder uses index counter that starts at 0 and is incremented every time a new name for a processor or a store is requested. That makes all names unique and sources and stores uniquely identified by name.

Tip

Enable ALL logging level for org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder=ALL

Adding State Store to Topology — addStateStore Method

void addStateStore(
  StoreBuilder builder)

addStateStore creates a StateStoreNode (with the given StoreBuilder) and adds it to the root node.

Note
addStateStore is used exclusively when StreamsBuilder is requested to add a state store to a topology.

Adding Stream to Topology (and Creating KStream) — stream Method

KStream<K, V> stream(
  Collection<String> topics,
  ConsumedInternal<K, V> consumed)
KStream<K, V> stream(
  Pattern topicPattern,
  ConsumedInternal<K, V> consumed)

stream creates a new StreamSourceNode and adds it to the root node.

In the end, stream returns a new instance of KStreamImpl (with the new processor name that is also used as the name of the single source node and the repartitionRequired flag off).

import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val internalTopologyBuilder = new InternalTopologyBuilder()

import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
val internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder)

val topics = Seq("input")
import org.apache.kafka.streams.kstream.internals.ConsumedInternal
val consumed = new ConsumedInternal[String, String]()
import collection.JavaConverters._
val kstream = internalStreamsBuilder.stream(topics.asJava, consumed)
scala> :type kstream
org.apache.kafka.streams.kstream.KStream[String,String]

import org.apache.kafka.streams.kstream.internals.KStreamImpl
assert(kstream.isInstanceOf[KStreamImpl[String, String]])
Note

stream is used when:

  • StreamsBuilder is requested to stream

  • KStreamImpl is requested to through

Adding Table to Topology (and Creating KTable) — table Method

KTable<K, V> table(
  String topic,
  ConsumedInternal<K, V> consumed,
  MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized)

table…​FIXME

Note
table is used exclusively when StreamsBuilder is requested to add a table to a topology.

Adding Global State Store to Topology — addGlobalStore Method

void addGlobalStore(
  StoreBuilder<KeyValueStore> storeBuilder,
  String topic,
  ConsumedInternal consumed,
  ProcessorSupplier stateUpdateSupplier)
void addGlobalStore(
  StoreBuilder<KeyValueStore> storeBuilder,
  String sourceName,
  String topic,
  ConsumedInternal consumed,
  String processorName,
  ProcessorSupplier stateUpdateSupplier)

addGlobalStore…​FIXME

Note
addGlobalStore is used exclusively when StreamsBuilder is requested to addGlobalStore.

Adding Global Table to Topology (and Creating GlobalKTable) — globalTable Method

GlobalKTable<K, V> globalTable(
  String topic,
  ConsumedInternal<K, V> consumed,
  MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized)

globalTable creates a new GlobalKTableImpl.

Internally, globalTable requests the MaterializedInternal to disable logging (regardless whether it was enabled initially or not).

Note
GlobalKTables use state stores with logging disabled.

globalTable then creates a KeyValueStoreMaterializer (with the input MaterializedInternal with logging disabled) and requests it to materialize (and create a StoreBuilder).

globalTable creates a TableSourceNode (with the StoreBuilder, the source processor name with KSTREAM-SOURCE- prefix, and isGlobalKTable flag on)

globalTable adds the TableSourceNode to the root node.

In the end, globalTable creates a GlobalKTableImpl (with a new KTableSourceValueGetterSupplier and the queryable flag of the MaterializedInternal).

Note
globalTable is used exclusively when StreamsBuilder is requested to add a global table to a topology.

New Unique Processor Name — newProcessorName Method

String newProcessorName(
  String prefix)
Note
newProcessorName is part of InternalNameProvider Contract to create a new unique name for a processor.

newProcessorName simply takes the input prefix followed by the index.

Note
The index counter is what makes it bound to a InternalStreamsBuilder.
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder)

val name = newBuilder.newProcessorName("PREFIX")
scala> println(name)
PREFIX0000000001

New Unique Store Name — newStoreName Method

String newStoreName(
  String prefix)
Note
newStoreName is part of InternalNameProvider Contract to create a new unique name for a state store.

newStoreName simply concatenates the input prefix, STATE-STORE- and the index.

Note
The index counter is what makes it bound to a InternalStreamsBuilder.
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder)

val name = newBuilder.newStoreName("PREFIX")
scala> println(name)
PREFIXSTATE-STORE-0000000001

Adding Child Node — addGraphNode Method

void addGraphNode(
  StreamsGraphNode parent,
  StreamsGraphNode child)
void addGraphNode(
  Collection<StreamsGraphNode> parents,
  StreamsGraphNode child)

addGraphNode simply requests the input StreamsGraphNode to add the given child node.

In the end, addGraphNode maybeAddNodeForOptimizationMetadata.

Building Topology (with Optional Optimizations) — buildAndOptimizeTopology Method

void buildAndOptimizeTopology() (1)
void buildAndOptimizeTopology(
  Properties props)
  1. For testing only; Uses null for the Properties

buildAndOptimizeTopology does maybePerformOptimizations (with the given Properties).

buildAndOptimizeTopology…​FIXME

Note
buildAndOptimizeTopology is used exclusively when StreamsBuilder is requested to build a topology.

maybeAddNodeForOptimizationMetadata Internal Method

void maybeAddNodeForOptimizationMetadata(
  StreamsGraphNode node)

maybeAddNodeForOptimizationMetadata…​FIXME

Note
maybeAddNodeForOptimizationMetadata is used exclusively when InternalStreamsBuilder is requested to adding a child node.

maybePerformOptimizations Internal Method

void maybePerformOptimizations(
  Properties props)

maybePerformOptimizations…​FIXME

Note
maybePerformOptimizations is used exclusively when InternalStreamsBuilder is requested to build a topology (with optional optimizations).

getKeyChangingParentNode Internal Method

StreamsGraphNode getKeyChangingParentNode(
  StreamsGraphNode repartitionNode)

getKeyChangingParentNode…​FIXME

Note
getKeyChangingParentNode is used exclusively when InternalStreamsBuilder is requested to maybeAddNodeForOptimizationMetadata.

maybeOptimizeRepartitionOperations Internal Method

void maybeOptimizeRepartitionOperations()

maybeOptimizeRepartitionOperations…​FIXME

Note
maybeOptimizeRepartitionOperations is used exclusively when InternalStreamsBuilder is requested to maybePerformOptimizations.

createRepartitionNode Internal Method

OptimizableRepartitionNode createRepartitionNode(
  String repartitionTopicName,
  Serde keySerde,
  Serde valueSerde)

createRepartitionNode…​FIXME

Note
createRepartitionNode is used when…​FIXME

findParentNodeMatching Internal Method

StreamsGraphNode findParentNodeMatching(
  StreamsGraphNode startSeekingNode,
  Predicate<StreamsGraphNode> parentNodePredicate)

findParentNodeMatching…​FIXME

Note
findParentNodeMatching is used when…​FIXME

getFirstRepartitionTopicName Internal Method

String getFirstRepartitionTopicName(
  Collection<OptimizableRepartitionNode> repartitionNodes)

getFirstRepartitionTopicName…​FIXME

Note
getFirstRepartitionTopicName is used when…​FIXME

getRepartitionSerdes Internal Method

GroupedInternal getRepartitionSerdes(
  Collection<OptimizableRepartitionNode> repartitionNodes)

getRepartitionSerdes…​FIXME

Note
getRepartitionSerdes is used when…​FIXME

maybeUpdateKeyChangingRepartitionNodeMap Internal Method

void maybeUpdateKeyChangingRepartitionNodeMap()

maybeUpdateKeyChangingRepartitionNodeMap…​FIXME

Note
maybeUpdateKeyChangingRepartitionNodeMap is used when…​FIXME

optimizeKTableSourceTopics Internal Method

void optimizeKTableSourceTopics()

optimizeKTableSourceTopics…​FIXME

Note
optimizeKTableSourceTopics is used when…​FIXME

results matching ""

    No results matching ""