// Created directly
import org.apache.kafka.streams.Topology
val topology = new Topology
// Created using Streams DSL (StreamsBuilder API)
// Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._
val builder = new StreamsBuilder
val topology = builder.build
scala> :type topology
org.apache.kafka.streams.Topology
Topology — Directed Acyclic Graph of Stream Processing Nodes
Topology
is a directed acyclic graph of stream processing nodes that represents the stream processing logic of a Kafka Streams application.
Topology
can be created directly (as part of Low-Level Processor API) or indirectly using Streams DSL — High-Level Stream Processing DSL.
Topology
provides the fluent API to add local and global state stores, sources, processors and sinks to build advanced stream processing graphs.
Topology
takes no arguments when created.
Once created, Topology
can be extended with sources, processors (optionally connected to one or more state stores), sinks, with local and global state stores.
scala> :type topology
org.apache.kafka.streams.Topology
import org.apache.kafka.streams.state.Stores
val storeBuilder = Stores
.keyValueStoreBuilder[String, String](
Stores.inMemoryKeyValueStore("in-memory-key-value-store"),
Serdes.String,
Serdes.String)
.withLoggingDisabled // this is for a global table
val sourceName = "demo-source-processor"
val timestampExtractor = null
val keyDeserializer = Serdes.String.deserializer
val valueDeserializer = Serdes.String.deserializer
val topic = "demo-topic"
val processorName = "demo-processor-supplier"
import org.apache.kafka.streams.kstream.internals.KTableSource
val stateUpdateSupplier = new KTableSource[String, String]("store-name")
topology.addGlobalStore(
storeBuilder,
sourceName,
timestampExtractor,
keyDeserializer,
valueDeserializer,
topic,
processorName,
stateUpdateSupplier)
Topology
can be described.
scala> :type topology
org.apache.kafka.streams.Topology
scala> println(topology.describe)
Topologies:
Sub-topology: 0 for global store (will not generate tasks)
Source: demo-source-processor (topics: [demo-topic])
--> demo-processor-supplier
Processor: demo-processor-supplier (stores: [in-memory-key-value-store])
--> none
<-- demo-source-processor
Topology
is a logical representation of a ProcessorTopology.
Method | Description |
---|---|
|
Adds a global StateStore (with the StoreBuilder, ProcessorSupplier and optional TimestampExtractor) to the topology. Internally, |
|
Adds a new processor node (with the ProcessorSupplier) to the topology Internally, |
|
Adds a new sink node (with the optional TopicNameExtractor and StreamPartitioner) to the topology. Internally, |
|
Adds a new source node (with the optional AutoOffsetReset and TimestampExtractor) to the topology. Internally, |
|
Adds a new state store (as a StoreBuilder) to the topology and associates it with processors Internally, |
|
Connects the processor node with state stores (by name). Internally, |
|
Describes the topology via TopologyDescription (meta representation) Internally, |
Internally, Topology
uses an InternalTopologyBuilder for all the methods and is simply a thin layer atop (that aims at making Kafka Streams developers' life simpler).
Topology
defines offset reset policy (AutoOffsetReset
) that can be one of the following values: