ProcessorTopology with(
List<ProcessorNode> processorNodes,
Map<String, SourceNode> sourcesByTopic,
List<StateStore> stateStoresByName,
Map<String, String> storeToChangelogTopic)
ProcessorTopology — Physical Processor Task Topology
ProcessorTopology is a physical processor task topology (of logical processor node topology).
A ProcessorTopology is associated with every task.
A ProcessorTopology is used to create AbstractTask, GlobalStateManagerImpl, GlobalStateUpdateTask, GlobalStreamThread, StandbyTask, and StreamTask.
ProcessorTopology is created when InternalTopologyBuilder is requested to build processor task and global state topologies.
|
Note
|
Once created, all internal registries in ProcessorTopology will never change.
|
ProcessorTopology can also be created using the "with" methods.
| Method | Description |
|---|---|
|
|
|
|
|
|
|
|
|
|
// Using Scala with the Java-centric API of Kafka Streams
import collection.JavaConverters._
import org.apache.kafka.streams.processor.internals.ProcessorNode
val processorNodes = Seq.empty[ProcessorNode[_, _]].asJava
import org.apache.kafka.streams.processor.internals.SourceNode
val sourcesByTopic = Map.empty[String, SourceNode[_, _]].asJava
import org.apache.kafka.streams.processor.StateStore
val stateStoresByName = Seq.empty[StateStore].asJava
val storeToChangelogTopic = Map.empty[String, String].asJava
import org.apache.kafka.streams.processor.internals.ProcessorTopology
val topology = ProcessorTopology.`with`(processorNodes, sourcesByTopic, stateStoresByName, storeToChangelogTopic)
// The topology is empty (no children) so nothing is printed out except the header "ProcessorTopology:"
scala> println(topology)
ProcessorTopology:
Creating ProcessorTopology Instance
ProcessorTopology takes the following when created:
-
SourceNodes by name
-
SinkNodes by name
-
Local state stores
-
Global state stores
-
Names of the state stores and the names of the corresponding changelog topics (from the InternalTopologyBuilder)
hasPersistentLocalStore Method
boolean hasPersistentLocalStore()
hasPersistentLocalStore is positive (true) when one of the local StateStores (in the stateStores internal registry) is persistent.
|
Note
|
hasPersistentLocalStore is used exclusively when KafkaStreams is created (to create a StateDirectory).
|
hasPersistentGlobalStore Method
boolean hasPersistentGlobalStore()
hasPersistentGlobalStore is positive (true) when one of the global StateStores (in the globalStateStores internal registry) is persistent.
|
Note
|
hasPersistentGlobalStore is used exclusively when KafkaStreams is created (to create a StateDirectory).
|