ProcessorTopology — Physical Processor Task Topology

ProcessorTopology is a physical processor task topology (of logical processor node topology).

A ProcessorTopology is associated with every task.

ProcessorTopology is created when InternalTopologyBuilder is requested to build processor task and global state topologies.

Note
Once created, all internal registries in ProcessorTopology will never change.

ProcessorTopology can also be created using the "with" methods.

Table 1. ProcessorTopology’s with Methods
Method Description

with

ProcessorTopology with(
  List<ProcessorNode> processorNodes,
  Map<String, SourceNode> sourcesByTopic,
  List<StateStore> stateStoresByName,
  Map<String, String> storeToChangelogTopic)

withGlobalStores

ProcessorTopology withGlobalStores(
  List<StateStore> stateStores,
  Map<String, String> storeToChangelogTopic)

withLocalStores

ProcessorTopology withLocalStores(
  List<StateStore> stateStores,
  Map<String, String> storeToChangelogTopic)

withRepartitionTopics

ProcessorTopology withRepartitionTopics(
  List<ProcessorNode> processorNodes,
  Map<String, SourceNode> sourcesByTopic,
  Set<String> repartitionTopics)

withSources

ProcessorTopology withSources(
  List<ProcessorNode> processorNodes,
  Map<String, SourceNode> sourcesByTopic)
// Using Scala with the Java-centric API of Kafka Streams
import collection.JavaConverters._

import org.apache.kafka.streams.processor.internals.ProcessorNode
val processorNodes = Seq.empty[ProcessorNode[_, _]].asJava
import org.apache.kafka.streams.processor.internals.SourceNode
val sourcesByTopic = Map.empty[String, SourceNode[_, _]].asJava
import org.apache.kafka.streams.processor.StateStore
val stateStoresByName = Seq.empty[StateStore].asJava
val storeToChangelogTopic = Map.empty[String, String].asJava

import org.apache.kafka.streams.processor.internals.ProcessorTopology
val topology = ProcessorTopology.`with`(processorNodes, sourcesByTopic, stateStoresByName, storeToChangelogTopic)

// The topology is empty (no children) so nothing is printed out except the header "ProcessorTopology:"
scala> println(topology)
ProcessorTopology:

Creating ProcessorTopology Instance

ProcessorTopology takes the following when created:

hasPersistentLocalStore Method

boolean hasPersistentLocalStore()

hasPersistentLocalStore is positive (true) when one of the local StateStores (in the stateStores internal registry) is persistent.

Note
hasPersistentLocalStore is used exclusively when KafkaStreams is created (to create a StateDirectory).

hasPersistentGlobalStore Method

boolean hasPersistentGlobalStore()

hasPersistentGlobalStore is positive (true) when one of the global StateStores (in the globalStateStores internal registry) is persistent.

Note
hasPersistentGlobalStore is used exclusively when KafkaStreams is created (to create a StateDirectory).

results matching ""

    No results matching ""