StreamsBuilder — The Entry Point to High-Level Streams DSL

StreamsBuilder is the entry point to the Streams DSL — High-Level Stream Processing DSL.

StreamsBuilder provides the operators to build a processor topology of local and global state stores, global tables, streams, and tables.

Tip
Use Scala API for Kafka Streams to make your Kafka Streams development more pleasant if Scala is the programming language of your choice.
Table 1. StreamsBuilder API / Operators
Operator Description

addGlobalStore

StreamsBuilder addGlobalStore(
  StoreBuilder storeBuilder,
  String topic,
  Consumed consumed,
  ProcessorSupplier stateUpdateSupplier)

Adds a global StateStore (given StoreBuilder, Consumed and ProcessorSupplier) to the topology.

addStateStore

StreamsBuilder addStateStore(StoreBuilder builder)

Adds a StateStore to the topology (given StoreBuilder).

build

Topology build()  (1)
Topology build(Properties props)
  1. Uses null for the Properties

Builds the topology.

globalTable

GlobalKTable<K, V> globalTable(
  String topic)
GlobalKTable<K, V> globalTable(
  String topic,
  Consumed<K, V> consumed)
GlobalKTable<K, V> globalTable(
  String topic,
  Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
GlobalKTable<K, V> globalTable(
  String topic,
  Consumed<K, V> consumed,
  Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)

Creates a GlobalKTable for the given topic (and Consumed, Materialized or both).

stream

KStream<K, V> stream(
  Collection<String> topics)
KStream<K, V> stream(
  Collection<String> topics,
  Consumed<K, V> consumed)
KStream<K, V> stream(
  Pattern topicPattern)
KStream<K, V> stream(
  Pattern topicPattern,
  Consumed<K, V> consumed)
KStream<K, V> stream(
  String topic)
KStream<K, V> stream(
  String topic,
  Consumed<K, V> consumed)

Creates a KStream for the given topic(s) (and Consumed).

table

KTable<K, V> table(
  String topic)
KTable<K, V> table(
  String topic,
  Consumed<K, V> consumed)
KTable<K, V> table(
  String topic,
  Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
KTable<K, V> table(
  String topic,
  Consumed<K, V> consumed,
  Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)

Creates a KTable for the given topic (and Consumed, Materialized or both).

StreamsBuilder takes no arguments when created.

import org.apache.kafka.streams.StreamsBuilder
val builder = new StreamsBuilder

A typical Kafka Streams application (that uses Streams DSL and Scala API for Kafka Streams) looks as follows:

// Using Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._

val builder = new StreamsBuilder

// Add a KStream if needed
// K and V are the types of keys and values, accordingly
builder.stream[K, V](...)

// Add a KTable if needed
builder.table[K, V](...)

// Add a global store if needed
builder.addGlobalStore(...)

// Add a global store if needed
builder.globalTable[K, V](...)

// In the end, build a topology
val topology = builder.build

When created, StreamsBuilder creates an empty Topology (that you enrich using the operators). The topology is immediately requested for the InternalTopologyBuilder that is in turn used to create an InternalStreamsBuilder.

kafka streams StreamsBuilder.png
Figure 1. StreamsBuilder, Topology and InternalStreamsBuilder

All operators use the InternalStreamsBuilder behind the scenes. In other words, StreamsBuilder offers a more developer-friendly high-level API for developing Kafka Streams applications than using the InternalStreamsBuilder API directly (and is a façade of InternalStreamsBuilder).

Creating KTable for Topic — table Method

KTable<K, V> table(
  String topic)
KTable<K, V> table(
  String topic,
  Consumed<K, V> consumed)
KTable<K, V> table(
  String topic,
  Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
KTable<K, V> table(
  String topic,
  Consumed<K, V> consumed,
  Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)

table…​FIXME

Adding GlobalKTable to Topology — globalTable Method

GlobalKTable<K, V> globalTable(
  String topic)
GlobalKTable<K, V> globalTable(
  String topic,
  Consumed<K, V> consumed)
GlobalKTable<K, V> globalTable(
  String topic,
  Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
GlobalKTable<K, V> globalTable(
  String topic,
  Consumed<K, V> consumed,
  Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)

globalTable creates an ConsumedInternal for the given Consumed.

globalTable creates a new MaterializedInternal (with a new Materialized with the keySerde and the valueSerde of the ConsumedInternal).

Note
The new MaterializedInternal uses KeyValueStore<Bytes, byte[]> StateStore.

globalTable requests the MaterializedInternal to generateStoreNameIfNeeded (with the InternalStreamsBuilder and the input topic name).

In the end, globalTable requests the InternalStreamsBuilder to add a GlobalKTable to the topology (with the topic name, the ConsumedInternal and the MaterializedInternal).

Demo: Non-queryable GlobalKTable
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._

import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder

val globalTable = builder.globalTable[String, String](topic = "global-table")
scala> :type globalTable
org.apache.kafka.streams.kstream.GlobalKTable[String,String]

assert(globalTable.queryableStoreName == null)

val topology = builder.build
scala> println(topology.describe)
Topologies:
   Sub-topology: 0 for global store (will not generate tasks)
    Source: KTABLE-SOURCE-0000000001 (topics: [global-table])
      --> KTABLE-SOURCE-0000000002
    Processor: KTABLE-SOURCE-0000000002 (stores: [global-table-STATE-STORE-0000000000])
      --> none
      <-- KTABLE-SOURCE-0000000001
Demo: Queryable GlobalKTable
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._

import org.apache.kafka.streams.scala.StreamsBuilder
val builder = new StreamsBuilder

import org.apache.kafka.streams.state.Stores
val supplier = Stores.inMemoryKeyValueStore("queryable-store-name")

import org.apache.kafka.streams.scala.kstream.Materialized
val materialized = Materialized.as[String, String](supplier)
val zipCodes = builder.globalTable[String, String](topic = "zip-codes", materialized)

scala> :type zipCodes
org.apache.kafka.streams.kstream.GlobalKTable[String,String]

assert(zipCodes.queryableStoreName == "queryable-store-name")

val topology = builder.build
scala> println(topology.describe)
Topologies:
   Sub-topology: 0 for global store (will not generate tasks)
    Source: KTABLE-SOURCE-0000000000 (topics: [zip-codes])
      --> KTABLE-SOURCE-0000000001
    Processor: KTABLE-SOURCE-0000000001 (stores: [queryable-store-name])
      --> none
      <-- KTABLE-SOURCE-0000000000

Registering Global State Store (in Topology) — addGlobalStore Method

StreamsBuilder addGlobalStore(
  StoreBuilder storeBuilder,
  String topic,
  Consumed consumed,
  ProcessorSupplier stateUpdateSupplier)

addGlobalStore…​FIXME

addStateStore Method

StreamsBuilder addStateStore(StoreBuilder builder)

addStateStore…​FIXME

Creating KStream (of Records from One or Many Topics) — stream Method

KStream<K, V> stream(
  Collection<String> topics)
KStream<K, V> stream(
  Collection<String> topics,
  Consumed<K, V> consumed)
KStream<K, V> stream(
  Pattern topicPattern)
KStream<K, V> stream(
  Pattern topicPattern,
  Consumed<K, V> consumed)
KStream<K, V> stream(
  String topic)
KStream<K, V> stream(
  String topic,
  Consumed<K, V> consumed)

stream creates a KStream (of keys of type K and values of type V) for the defined topic(s) and the parameters in the input Consumed.

scala> :type builder
org.apache.kafka.streams.StreamsBuilder

// Create a KStream to read records from the input topic
// Keys and values of the records are of String type
val input = builder.stream[String, String]("input")

scala> :type input
org.apache.kafka.streams.kstream.KStream[String,String]

Internally, stream creates a ConsumedInternal (for the input Consumed) and requests the InternalStreamsBuilder to create a KStream (for the input topics and the ConsumedInternal).

Building Topology — build Method

Topology build()
Topology build(Properties props)

build requests the InternalStreamsBuilder to buildAndOptimizeTopology (with the given Properties) and returns the underlying topology.

results matching ""

    No results matching ""