InternalStreamsBuilder
InternalStreamsBuilder
is an internal entity that StreamsBuilder (of Streams DSL — High-Level Stream Processing DSL) uses to build a topology by adding nodes to the stream graph using the high-level stream operators.
InternalStreamsBuilder
is created exclusively for StreamsBuilder.
InternalStreamsBuilder
takes a InternalTopologyBuilder to be created.
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val intTopologyBuilder = new InternalTopologyBuilder
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
val intStreamsBuilder = new InternalStreamsBuilder(intTopologyBuilder)
InternalStreamsBuilder
manages a StreamsGraphNode as the root node (of the topology to build). Every time InternalStreamsBuilder
is requested to add a stream, a table, a global table, a state store or a global state store the root node gets a corresponding child node added. Once the topology is of a proper structure, InternalStreamsBuilder
can simply be requested to build it (with optional optimizations applied).
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
assert(intStreamsBuilder.isInstanceOf[InternalStreamsBuilder])
val topics = Seq("input")
import org.apache.kafka.streams.kstream.internals.ConsumedInternal
val intConsumed = new ConsumedInternal[String, String]()
import scala.collection.JavaConversions._
// Add a new graph node (stream) to the root node
intStreamsBuilder.stream(topics, intConsumed)
intStreamsBuilder.buildAndOptimizeTopology
val root = intStreamsBuilder.root
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode
assert(root.isInstanceOf[StreamsGraphNode])
assert(root.children.size == 1)
// Let's explore what we built
val streamSourceNode = root.children.head
println(streamSourceNode)
/**
StreamSourceNode{topicNames=[input], topicPattern=null, consumedInternal=org.apache.kafka.streams.kstream.internals.ConsumedInternal@e1781} StreamsGraphNode{nodeName='KSTREAM-SOURCE-0000000000', buildPriority=0, hasWrittenToTopology=true, keyChangingOperation=false, valueChangingOperation=false, mergeNode=false, parentNodes=[root]}
*/
println(intTopologyBuilder.describe)
/**
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> none
*/
InternalStreamsBuilder
is an InternalNameProvider and can be requested for a name for new processors and stores.
InternalStreamsBuilder
uses index
counter that starts at 0
and is incremented every time a new name for a processor or a store is requested. That makes all names unique and sources and stores uniquely identified by name.
Tip
|
Enable Add the following line to
Refer to Application Logging Using log4j. |
Adding State Store to Topology — addStateStore
Method
void addStateStore(
StoreBuilder builder)
addStateStore
creates a StateStoreNode (with the given StoreBuilder) and adds it to the root node.
Note
|
addStateStore is used exclusively when StreamsBuilder is requested to add a state store to a topology.
|
Adding Stream to Topology (and Creating KStream) — stream
Method
KStream<K, V> stream(
Collection<String> topics,
ConsumedInternal<K, V> consumed)
KStream<K, V> stream(
Pattern topicPattern,
ConsumedInternal<K, V> consumed)
stream
creates a new processor name with KSTREAM-SOURCE prefix.
stream
creates a new StreamSourceNode and adds it to the root node.
In the end, stream
returns a new instance of KStreamImpl (with the new processor name that is also used as the name of the single source node and the repartitionRequired flag off).
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val internalTopologyBuilder = new InternalTopologyBuilder()
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
val internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder)
val topics = Seq("input")
import org.apache.kafka.streams.kstream.internals.ConsumedInternal
val consumed = new ConsumedInternal[String, String]()
import collection.JavaConverters._
val kstream = internalStreamsBuilder.stream(topics.asJava, consumed)
scala> :type kstream
org.apache.kafka.streams.kstream.KStream[String,String]
import org.apache.kafka.streams.kstream.internals.KStreamImpl
assert(kstream.isInstanceOf[KStreamImpl[String, String]])
Adding Table to Topology (and Creating KTable) — table
Method
KTable<K, V> table(
String topic,
ConsumedInternal<K, V> consumed,
MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized)
table
…FIXME
Note
|
table is used exclusively when StreamsBuilder is requested to add a table to a topology.
|
Adding Global State Store to Topology — addGlobalStore
Method
void addGlobalStore(
StoreBuilder<KeyValueStore> storeBuilder,
String topic,
ConsumedInternal consumed,
ProcessorSupplier stateUpdateSupplier)
void addGlobalStore(
StoreBuilder<KeyValueStore> storeBuilder,
String sourceName,
String topic,
ConsumedInternal consumed,
String processorName,
ProcessorSupplier stateUpdateSupplier)
addGlobalStore
…FIXME
Note
|
addGlobalStore is used exclusively when StreamsBuilder is requested to addGlobalStore.
|
Adding Global Table to Topology (and Creating GlobalKTable) — globalTable
Method
GlobalKTable<K, V> globalTable(
String topic,
ConsumedInternal<K, V> consumed,
MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized)
globalTable
creates a new GlobalKTableImpl.
Internally, globalTable
requests the MaterializedInternal
to disable logging (regardless whether it was enabled initially or not).
Note
|
GlobalKTables use state stores with logging disabled.
|
globalTable
then creates a KeyValueStoreMaterializer (with the input MaterializedInternal
with logging disabled) and requests it to materialize (and create a StoreBuilder).
globalTable
creates a TableSourceNode
(with the StoreBuilder
, the source processor name with KSTREAM-SOURCE-
prefix, and isGlobalKTable
flag on)
In the end, globalTable
creates a GlobalKTableImpl (with a new KTableSourceValueGetterSupplier and the queryable flag of the MaterializedInternal
).
Note
|
globalTable is used exclusively when StreamsBuilder is requested to add a global table to a topology.
|
New Unique Processor Name — newProcessorName
Method
String newProcessorName(
String prefix)
Note
|
newProcessorName is part of InternalNameProvider Contract to create a new unique name for a processor.
|
newProcessorName
simply takes the input prefix
followed by the index.
Note
|
The index counter is what makes it bound to a InternalStreamsBuilder .
|
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder)
val name = newBuilder.newProcessorName("PREFIX")
scala> println(name)
PREFIX0000000001
New Unique Store Name — newStoreName
Method
String newStoreName(
String prefix)
Note
|
newStoreName is part of InternalNameProvider Contract to create a new unique name for a state store.
|
newStoreName
simply concatenates the input prefix
, STATE-STORE-
and the index.
Note
|
The index counter is what makes it bound to a InternalStreamsBuilder .
|
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder)
val name = newBuilder.newStoreName("PREFIX")
scala> println(name)
PREFIXSTATE-STORE-0000000001
Adding Child Node — addGraphNode
Method
void addGraphNode(
StreamsGraphNode parent,
StreamsGraphNode child)
void addGraphNode(
Collection<StreamsGraphNode> parents,
StreamsGraphNode child)
addGraphNode
simply requests the input StreamsGraphNode to add the given child node.
In the end, addGraphNode
maybeAddNodeForOptimizationMetadata.
Note
|
addGraphNode is used in many places in GroupedStreamAggregateBuilder, InternalStreamsBuilder, KGroupedTableImpl, KStreamImpl, KStreamImplJoin, and KTableImpl.
|
Building Topology (with Optional Optimizations) — buildAndOptimizeTopology
Method
void buildAndOptimizeTopology() (1)
void buildAndOptimizeTopology(
Properties props)
-
For testing only; Uses
null
for theProperties
buildAndOptimizeTopology
does maybePerformOptimizations (with the given Properties
).
buildAndOptimizeTopology
…FIXME
Note
|
buildAndOptimizeTopology is used exclusively when StreamsBuilder is requested to build a topology.
|
maybeAddNodeForOptimizationMetadata
Internal Method
void maybeAddNodeForOptimizationMetadata(
StreamsGraphNode node)
maybeAddNodeForOptimizationMetadata
…FIXME
Note
|
maybeAddNodeForOptimizationMetadata is used exclusively when InternalStreamsBuilder is requested to adding a child node.
|
maybePerformOptimizations
Internal Method
void maybePerformOptimizations(
Properties props)
maybePerformOptimizations
…FIXME
Note
|
maybePerformOptimizations is used exclusively when InternalStreamsBuilder is requested to build a topology (with optional optimizations).
|
getKeyChangingParentNode
Internal Method
StreamsGraphNode getKeyChangingParentNode(
StreamsGraphNode repartitionNode)
getKeyChangingParentNode
…FIXME
Note
|
getKeyChangingParentNode is used exclusively when InternalStreamsBuilder is requested to maybeAddNodeForOptimizationMetadata.
|
maybeOptimizeRepartitionOperations
Internal Method
void maybeOptimizeRepartitionOperations()
maybeOptimizeRepartitionOperations
…FIXME
Note
|
maybeOptimizeRepartitionOperations is used exclusively when InternalStreamsBuilder is requested to maybePerformOptimizations.
|
createRepartitionNode
Internal Method
OptimizableRepartitionNode createRepartitionNode(
String repartitionTopicName,
Serde keySerde,
Serde valueSerde)
createRepartitionNode
…FIXME
Note
|
createRepartitionNode is used when…FIXME
|
findParentNodeMatching
Internal Method
StreamsGraphNode findParentNodeMatching(
StreamsGraphNode startSeekingNode,
Predicate<StreamsGraphNode> parentNodePredicate)
findParentNodeMatching
…FIXME
Note
|
findParentNodeMatching is used when…FIXME
|
getFirstRepartitionTopicName
Internal Method
String getFirstRepartitionTopicName(
Collection<OptimizableRepartitionNode> repartitionNodes)
getFirstRepartitionTopicName
…FIXME
Note
|
getFirstRepartitionTopicName is used when…FIXME
|
getRepartitionSerdes
Internal Method
GroupedInternal getRepartitionSerdes(
Collection<OptimizableRepartitionNode> repartitionNodes)
getRepartitionSerdes
…FIXME
Note
|
getRepartitionSerdes is used when…FIXME
|