Demo: Creating Topology with State Store with Logging Enabled

The following demo shows the internals of state stores with logging enabled.

Logging /

The following logging configuration may help understand the internals. Use the following in

log4j.rootLogger=OFF, stdout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

Kafka Setup

Before you run the demo application, use ./bin/ --create to create the source topic.

./bin/ \
  --bootstrap-server :9092 \
  --create \
  --topic StateStoreLoggingEnabledDemo-input \
  --partitions 1 \
  --replication-factor 1

Use --partitions 1 as that helps keeping the log messages at the minimum, just to understand the internals. More partitions would simply create many tasks (and add nothing but more log messages).

After the demo application is up and running, use ./bin/ --list to list the topics used in the topology.

$ ./bin/ --bootstrap-server :9092 --list

After the demo, you may also want to reset the Kafka Streams demo application (in order to reprocess its data from scratch) using ./bin/ script.

./bin/ \
  --application-id StateStoreLoggingEnabledDemo \
  --input-topics StateStoreLoggingEnabledDemo-input \

Demo Application

val appId = this.getClass.getSimpleName.replace("$", "")
val storeName = s"$appId-in-memory-key-value-store"
val source = s"$appId-input"
val bootstrapServers = ":9092"

println(s"Application ID: $appId")
println(s"Source topics: $source")
println("Make sure that the source topics are available and press ENTER")
 * ./bin/ \
 *    --bootstrap-server :9092 \
 *    --create \
 *    --topic StateStoreLoggingEnabledDemo-input \
 *    --partitions 1 \
 *    --replication-factor 1

// Using Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._

import org.apache.kafka.streams.state.Stores
val storeSupplier = Stores.inMemoryKeyValueStore(storeName)

import org.apache.kafka.streams.scala.kstream.Materialized
import scala.collection.JavaConverters._
val materialized: Materialized[String, Long, ByteArrayKeyValueStore] = Materialized
  .as[String, Long](storeSupplier)
  .withLoggingEnabled(Map.empty[String, String].asJava)

import org.apache.kafka.streams.kstream.Printed
val sysout = Printed.toSysOut[String, Long].withLabel("sysout")

val builder = new StreamsBuilder
  .stream[String, String](source)
  .selectKey { case (_, v) => v.split(",").head }

val topology =


import org.apache.kafka.streams.StreamsConfig
val config = new java.util.Properties()
// You'd usually use APPLICATION_ID_CONFIG, but it is simply too long
config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
// Using CLIENT_ID_CONFIG allows for explicit application ID
config.put(StreamsConfig.CLIENT_ID_CONFIG, appId)
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)

import org.apache.kafka.streams.KafkaStreams
val ks = new KafkaStreams(topology, config)

// TIP: Copy and paste the above code using :paste mode in sbt console / Scala REPL
// So all the INFO messages go away
// So you can focus on the log messages after starting the KafkaStreams instance


results matching ""

    No results matching ""