Topology — Directed Acyclic Graph of Stream Processing Nodes

Topology is a directed acyclic graph of stream processing nodes that represents the stream processing logic of a Kafka Streams application.

Topology can be created directly (as part of Low-Level Processor API) or indirectly using Streams DSL — High-Level Stream Processing DSL.

Topology provides the fluent API to add local and global state stores, sources, processors and sinks to build advanced stream processing graphs.

Topology takes no arguments when created.

// Created directly
import org.apache.kafka.streams.Topology
val topology = new Topology

// Created using Streams DSL (StreamsBuilder API)
// Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._

val builder = new StreamsBuilder
val topology = builder.build
scala> :type topology
org.apache.kafka.streams.Topology

Once created, Topology can be extended with sources, processors (optionally connected to one or more state stores), sinks, with local and global state stores.

scala> :type topology
org.apache.kafka.streams.Topology

import org.apache.kafka.streams.state.Stores
val storeBuilder = Stores
  .keyValueStoreBuilder[String, String](
    Stores.inMemoryKeyValueStore("in-memory-key-value-store"),
    Serdes.String,
    Serdes.String)
  .withLoggingDisabled  // this is for a global table
val sourceName = "demo-source-processor"
val timestampExtractor = null
val keyDeserializer = Serdes.String.deserializer
val valueDeserializer = Serdes.String.deserializer
val topic = "demo-topic"
val processorName = "demo-processor-supplier"
import org.apache.kafka.streams.kstream.internals.KTableSource
val stateUpdateSupplier = new KTableSource[String, String]("store-name")
topology.addGlobalStore(
  storeBuilder,
  sourceName,
  timestampExtractor,
  keyDeserializer,
  valueDeserializer,
  topic,
  processorName,
  stateUpdateSupplier)

Topology can be described.

scala> :type topology
org.apache.kafka.streams.Topology

scala> println(topology.describe)
Topologies:
   Sub-topology: 0 for global store (will not generate tasks)
    Source: demo-source-processor (topics: [demo-topic])
      --> demo-processor-supplier
    Processor: demo-processor-supplier (stores: [in-memory-key-value-store])
      --> none
      <-- demo-source-processor

Topology is a logical representation of a ProcessorTopology.

Table 1. Topology API / Methods
Method Description

addGlobalStore

Topology addGlobalStore(
  StoreBuilder storeBuilder,
  String sourceName,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer,
  String topic,
  String processorName,
  ProcessorSupplier stateUpdateSupplier)
Topology addGlobalStore(
  StoreBuilder storeBuilder,
  String sourceName,
  TimestampExtractor timestampExtractor,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer,
  String topic,
  String processorName,
  ProcessorSupplier stateUpdateSupplier)

Adds a global StateStore (with the StoreBuilder, ProcessorSupplier and optional TimestampExtractor) to the topology.

Internally, addGlobalStore simply requests the InternalTopologyBuilder to add a global store.

addProcessor

Topology addProcessor(
  String name,
  ProcessorSupplier supplier,
  String... parentNames)

Adds a new processor node (with the ProcessorSupplier) to the topology

Internally, addProcessor simply requests the InternalTopologyBuilder to add a processor.

addSink

Topology addSink(
  String name,
  String topic,
  Serializer<K> keySerializer,
  Serializer<V> valueSerializer,
  StreamPartitioner<? super K, ? super V> partitioner,
  String... parentNames)
Topology addSink(
  String name,
  String topic,
  Serializer<K> keySerializer,
  Serializer<V> valueSerializer,
  String... parentNames)
Topology addSink(
  String name,
  String topic,
  StreamPartitioner<? super K, ? super V> partitioner,
  String... parentNames)
Topology addSink(
  String name,
  String topic,
  String... parentNames)
Topology addSink(
  String name,
  TopicNameExtractor<K, V> topicExtractor,
  Serializer<K> keySerializer,
  Serializer<V> valueSerializer,
  StreamPartitioner<? super K, ? super V> partitioner,
  String... parentNames)
Topology addSink(
  String name,
  TopicNameExtractor<K, V> topicExtractor,
  Serializer<K> keySerializer,
  Serializer<V> valueSerializer,
  String... parentNames)
Topology addSink(
  String name,
  TopicNameExtractor<K, V> topicExtractor,
  StreamPartitioner<? super K, ? super V> partitioner,
  String... parentNames)
Topology addSink(
  String name,
  TopicNameExtractor<K, V> topicExtractor,
  String... parentNames)

Adds a new sink node (with the optional TopicNameExtractor and StreamPartitioner) to the topology.

Internally, addSink simply requests the InternalTopologyBuilder to add a sink.

addSource

Topology addSource(
  AutoOffsetReset offsetReset,
  String name,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer,
  Pattern topicPattern)
Topology addSource(
  AutoOffsetReset offsetReset,
  String name,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer,
  String... topics)
Topology addSource(
  AutoOffsetReset offsetReset,
  String name,
  Pattern topicPattern)
Topology addSource(
  AutoOffsetReset offsetReset,
  String name,
  String... topics)
Topology addSource(
  AutoOffsetReset offsetReset,
  String name,
  TimestampExtractor timestampExtractor,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer,
  Pattern topicPattern)
Topology addSource(
  AutoOffsetReset offsetReset,
  String name,
  TimestampExtractor timestampExtractor,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer,
  String... topics)
Topology addSource(
  AutoOffsetReset offsetReset,
  TimestampExtractor timestampExtractor,
  String name,
  Pattern topicPattern)
Topology addSource(
  AutoOffsetReset offsetReset,
  TimestampExtractor timestampExtractor,
  String name,
  String... topics)
Topology addSource(
  String name,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer,
  Pattern topicPattern)
Topology addSource(
  String name,
  Deserializer keyDeserializer,
  Deserializer valueDeserializer,
  String... topics)
Topology addSource(
  String name,
  Pattern topicPattern)
Topology addSource(
  String name,
  String... topics)
Topology addSource(
  TimestampExtractor timestampExtractor,
  String name,
  Pattern topicPattern)
Topology addSource(
  TimestampExtractor timestampExtractor,
  String name,
  String... topics)

Adds a new source node (with the optional AutoOffsetReset and TimestampExtractor) to the topology.

Internally, addSource simply requests the InternalTopologyBuilder to add a source.

addStateStore

Topology addStateStore(
  StoreBuilder storeBuilder,
  String... processorNames)

Adds a new state store (as a StoreBuilder) to the topology and associates it with processors

Internally, addStateStore simply requests the InternalTopologyBuilder to add a state store.

connectProcessorAndStateStores

Topology connectProcessorAndStateStores(
  String processorName,
  String... stateStoreNames)

Connects the processor node with state stores (by name).

Internally, connectProcessorAndStateStores simply requests the InternalTopologyBuilder to connect a processor with state stores.

describe

TopologyDescription describe()

Describes the topology via TopologyDescription (meta representation)

Internally, describe simply requests the InternalTopologyBuilder to describe a topology.

Internally, Topology uses an InternalTopologyBuilder for all the methods and is simply a thin layer atop (that aims at making Kafka Streams developers' life simpler).

kafka streams Topology InternalTopologyBuilder.png
Figure 1. Topology and InternalTopologyBuilder

Topology defines offset reset policy (AutoOffsetReset) that can be one of the following values:

  • EARLIEST

  • LATEST

results matching ""

    No results matching ""