StreamsBuilder addGlobalStore(
StoreBuilder storeBuilder,
String topic,
Consumed consumed,
ProcessorSupplier stateUpdateSupplier)
StreamsBuilder — The Entry Point to High-Level Streams DSL
StreamsBuilder
is the entry point to the Streams DSL — High-Level Stream Processing DSL.
StreamsBuilder
provides the operators to build a processor topology of local and global state stores, global tables, streams, and tables.
Tip
|
Use Scala API for Kafka Streams to make your Kafka Streams development more pleasant if Scala is the programming language of your choice. |
Operator | Description |
---|---|
Adds a global StateStore (given StoreBuilder, Consumed and ProcessorSupplier) to the topology. |
|
Adds a StateStore to the topology (given StoreBuilder). |
|
Builds the topology. |
|
Creates a GlobalKTable for the given topic (and Consumed, Materialized or both). |
|
|
|
Creates a KTable for the given topic (and Consumed, Materialized or both). |
StreamsBuilder
takes no arguments when created.
import org.apache.kafka.streams.StreamsBuilder
val builder = new StreamsBuilder
A typical Kafka Streams application (that uses Streams DSL and Scala API for Kafka Streams) looks as follows:
// Using Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._
val builder = new StreamsBuilder
// Add a KStream if needed
// K and V are the types of keys and values, accordingly
builder.stream[K, V](...)
// Add a KTable if needed
builder.table[K, V](...)
// Add a global store if needed
builder.addGlobalStore(...)
// Add a global store if needed
builder.globalTable[K, V](...)
// In the end, build a topology
val topology = builder.build
When created, StreamsBuilder
creates an empty Topology (that you enrich using the operators). The topology is immediately requested for the InternalTopologyBuilder that is in turn used to create an InternalStreamsBuilder.
All operators use the InternalStreamsBuilder behind the scenes. In other words, StreamsBuilder
offers a more developer-friendly high-level API for developing Kafka Streams applications than using the InternalStreamsBuilder API directly (and is a façade of InternalStreamsBuilder
).
Creating KTable for Topic — table
Method
KTable<K, V> table(
String topic)
KTable<K, V> table(
String topic,
Consumed<K, V> consumed)
KTable<K, V> table(
String topic,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
KTable<K, V> table(
String topic,
Consumed<K, V> consumed,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
table
…FIXME
Adding GlobalKTable to Topology — globalTable
Method
GlobalKTable<K, V> globalTable(
String topic)
GlobalKTable<K, V> globalTable(
String topic,
Consumed<K, V> consumed)
GlobalKTable<K, V> globalTable(
String topic,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
GlobalKTable<K, V> globalTable(
String topic,
Consumed<K, V> consumed,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
globalTable
creates an ConsumedInternal for the given Consumed.
globalTable
creates a new MaterializedInternal (with a new Materialized with the keySerde and the valueSerde of the ConsumedInternal
).
Note
|
The new MaterializedInternal uses KeyValueStore<Bytes, byte[]> StateStore.
|
globalTable
requests the MaterializedInternal
to generateStoreNameIfNeeded (with the InternalStreamsBuilder and the input topic
name).
In the end, globalTable
requests the InternalStreamsBuilder to add a GlobalKTable to the topology (with the topic
name, the ConsumedInternal
and the MaterializedInternal
).
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
val globalTable = builder.globalTable[String, String](topic = "global-table")
scala> :type globalTable
org.apache.kafka.streams.kstream.GlobalKTable[String,String]
assert(globalTable.queryableStoreName == null)
val topology = builder.build
scala> println(topology.describe)
Topologies:
Sub-topology: 0 for global store (will not generate tasks)
Source: KTABLE-SOURCE-0000000001 (topics: [global-table])
--> KTABLE-SOURCE-0000000002
Processor: KTABLE-SOURCE-0000000002 (stores: [global-table-STATE-STORE-0000000000])
--> none
<-- KTABLE-SOURCE-0000000001
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder
import org.apache.kafka.streams.state.Stores
val supplier = Stores.inMemoryKeyValueStore("queryable-store-name")
import org.apache.kafka.streams.scala.kstream.Materialized
val materialized = Materialized.as[String, String](supplier)
val zipCodes = builder.globalTable[String, String](topic = "zip-codes", materialized)
scala> :type zipCodes
org.apache.kafka.streams.kstream.GlobalKTable[String,String]
assert(zipCodes.queryableStoreName == "queryable-store-name")
val topology = builder.build
scala> println(topology.describe)
Topologies:
Sub-topology: 0 for global store (will not generate tasks)
Source: KTABLE-SOURCE-0000000000 (topics: [zip-codes])
--> KTABLE-SOURCE-0000000001
Processor: KTABLE-SOURCE-0000000001 (stores: [queryable-store-name])
--> none
<-- KTABLE-SOURCE-0000000000
Registering Global State Store (in Topology) — addGlobalStore
Method
StreamsBuilder addGlobalStore(
StoreBuilder storeBuilder,
String topic,
Consumed consumed,
ProcessorSupplier stateUpdateSupplier)
addGlobalStore
…FIXME
Creating KStream (of Records from One or Many Topics) — stream
Method
KStream<K, V> stream(
Collection<String> topics)
KStream<K, V> stream(
Collection<String> topics,
Consumed<K, V> consumed)
KStream<K, V> stream(
Pattern topicPattern)
KStream<K, V> stream(
Pattern topicPattern,
Consumed<K, V> consumed)
KStream<K, V> stream(
String topic)
KStream<K, V> stream(
String topic,
Consumed<K, V> consumed)
stream
creates a KStream (of keys of type K
and values of type V
) for the defined topic(s) and the parameters in the input Consumed.
scala> :type builder
org.apache.kafka.streams.StreamsBuilder
// Create a KStream to read records from the input topic
// Keys and values of the records are of String type
val input = builder.stream[String, String]("input")
scala> :type input
org.apache.kafka.streams.kstream.KStream[String,String]
Internally, stream
creates a ConsumedInternal (for the input Consumed) and requests the InternalStreamsBuilder to create a KStream (for the input topics
and the ConsumedInternal
).
Building Topology — build
Method
Topology build()
Topology build(Properties props)
build
requests the InternalStreamsBuilder to buildAndOptimizeTopology (with the given Properties
) and returns the underlying topology.