ProcessorTopology with(
List<ProcessorNode> processorNodes,
Map<String, SourceNode> sourcesByTopic,
List<StateStore> stateStoresByName,
Map<String, String> storeToChangelogTopic)
ProcessorTopology — Physical Processor Task Topology
ProcessorTopology
is a physical processor task topology (of logical processor node topology).
A ProcessorTopology
is associated with every task.
A ProcessorTopology
is used to create AbstractTask, GlobalStateManagerImpl, GlobalStateUpdateTask, GlobalStreamThread, StandbyTask, and StreamTask.
ProcessorTopology
is created when InternalTopologyBuilder
is requested to build processor task and global state topologies.
Note
|
Once created, all internal registries in ProcessorTopology will never change.
|
ProcessorTopology
can also be created using the "with" methods.
Method | Description |
---|---|
|
|
|
|
|
|
|
|
|
|
// Using Scala with the Java-centric API of Kafka Streams
import collection.JavaConverters._
import org.apache.kafka.streams.processor.internals.ProcessorNode
val processorNodes = Seq.empty[ProcessorNode[_, _]].asJava
import org.apache.kafka.streams.processor.internals.SourceNode
val sourcesByTopic = Map.empty[String, SourceNode[_, _]].asJava
import org.apache.kafka.streams.processor.StateStore
val stateStoresByName = Seq.empty[StateStore].asJava
val storeToChangelogTopic = Map.empty[String, String].asJava
import org.apache.kafka.streams.processor.internals.ProcessorTopology
val topology = ProcessorTopology.`with`(processorNodes, sourcesByTopic, stateStoresByName, storeToChangelogTopic)
// The topology is empty (no children) so nothing is printed out except the header "ProcessorTopology:"
scala> println(topology)
ProcessorTopology:
Creating ProcessorTopology Instance
ProcessorTopology
takes the following when created:
-
SourceNodes by name
-
SinkNodes by name
-
Local state stores
-
Global state stores
-
Names of the state stores and the names of the corresponding changelog topics (from the InternalTopologyBuilder)
hasPersistentLocalStore
Method
boolean hasPersistentLocalStore()
hasPersistentLocalStore
is positive (true
) when one of the local StateStores (in the stateStores internal registry) is persistent.
Note
|
hasPersistentLocalStore is used exclusively when KafkaStreams is created (to create a StateDirectory).
|
hasPersistentGlobalStore
Method
boolean hasPersistentGlobalStore()
hasPersistentGlobalStore
is positive (true
) when one of the global StateStores (in the globalStateStores internal registry) is persistent.
Note
|
hasPersistentGlobalStore is used exclusively when KafkaStreams is created (to create a StateDirectory).
|