StreamsGraphNode Contract — Graph Nodes that Write to Topology

StreamsGraphNode is the abstraction of graph nodes that can write to a topology.

Table 1. StreamsGraphNode Contract (Abstract Methods Only)
Method Description

writeToTopology

void writeToTopology(
  InternalTopologyBuilder topologyBuilder)

Used exclusively when InternalStreamsBuilder is requested to build a topology (when StreamsBuilder is requested to build a topology)

StreamsGraphNode is used in InternalStreamsBuilder to build a topology. InternalStreamsBuilder uses the root node internally that all child StreamsGraphNodes can be added to.

import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
val intTopologyBuilder = new InternalTopologyBuilder

import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder
val intStreamsBuilder = new InternalStreamsBuilder(intTopologyBuilder)

import org.apache.kafka.streams.state.Stores
val storeName = "input-stream"
val lruMapSupplier = Stores.lruMap(storeName, 5)
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.state.{KeyValueStore, StoreBuilder}
val storeBuilder = Stores.keyValueStoreBuilder(
  lruMapSupplier,
  Serdes.Long(),
  Serdes.Long())

intStreamsBuilder.addStateStore(storeBuilder)

val root = intStreamsBuilder.root
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode
assert(root.isInstanceOf[StreamsGraphNode])
scala> println(root.toString)
StreamsGraphNode{nodeName='root', buildPriority=null, hasWrittenToTopology=true, keyChangingOperation=false, valueChangingOperation=false, mergeNode=false, parentNodes=[]}

val children = root.children
assert(children.isInstanceOf[java.util.Collection[StreamsGraphNode]])

import scala.collection.JavaConverters._
val stateStoreNode = children.asScala.head
import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode
assert(stateStoreNode.isInstanceOf[StateStoreNode])
scala> println(stateStoreNode)
StateStoreNode{ name='input-stream', logConfig={}, loggingEnabled='true'}

// Triggers StreamsGraphNode.writeToTopology of all nodes
intStreamsBuilder.buildAndOptimizeTopology

intTopologyBuilder.setApplicationId("required to complete optimization")

assert(intTopologyBuilder.allStateStoreName.asScala.head == storeName)
Table 2. StreamsGraphNodes (Direct Implementations and Extensions Only)
StreamsGraphNode Description

BaseJoinProcessorNode

Base of KTableKTableJoinNode and StreamStreamJoinNode

BaseRepartitionNode

Base of GroupedTableOperationRepartitionNode and OptimizableRepartitionNode

ProcessorGraphNode

Represents stateless operators in KStreamImpl and KTableImpl

Base of StatefulProcessorNode

StateStoreNode

Represents StreamsBuilder.addStateStore operator

Base of GlobalStoreNode

StreamSinkNode

Represents KStreamImpl.to operator

StreamSourceNode

Represents StreamsBuilder.stream and KStreamImpl.through operators

Base of TableSourceNode

StreamTableJoinNode

Represents KStreamImpl.join and KStreamImpl.leftJoin operators

TableProcessorNode

Represents KTableImpl.filter, KTableImpl.filterNot, KTableImpl.mapValues, and KTableImpl.transformValues operators

StreamsGraphNode takes a single node name to be created.

Note
StreamsGraphNode is a Java abstract class and cannot be created directly. It is created indirectly for the concrete StreamsGraphNodes and as the root node of InternalStreamsBuilder.

isValueChangingOperation Method

boolean isValueChangingOperation()

isValueChangingOperation simply returns the valueChangingOperation internal flag.

Note
isValueChangingOperation is used exclusively when InternalStreamsBuilder is requested to find the key-changing parent node.

setValueChangingOperation Method

void setValueChangingOperation(
  boolean valueChangingOperation)

setValueChangingOperation simply sets the valueChangingOperation internal flag to the given valueChangingOperation value.

Note
setValueChangingOperation is used when KStreamImpl is requested to mapValues, flatMapValues, doTransformValues, and doFlatTransformValues

Adding Child Node — addChild Method

void addChild(
  StreamsGraphNode childNode)

addChild…​FIXME

Note
addChild is used when InternalStreamsBuilder is requested to add a child node and maybeOptimizeRepartitionOperations.

Describing Itself (Textual Representation) — toString Method

String toString()
Note
toString is part of the java.lang.Object for a string representation of the object.

toString…​FIXME

Internal Properties

Name Description

valueChangingOperation

Represents whether the StreamsGraphNode is value-changing and changes record values (true) or not (false)

Used when StreamsGraphNode is requested for the textual representation

results matching ""

    No results matching ""