Demo: Creating Topology with State Store with Logging Enabled

The following demo shows the internals of state stores with logging enabled.

Logging / log4j.properties

The following logging configuration may help understand the internals. Use the following in log4j.properties:

log4j.rootLogger=OFF, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.org.apache.kafka.streams=INFO
log4j.logger.org.apache.kafka.streams.processor.internals.StreamTask=ALL
log4j.logger.org.apache.kafka.streams.processor.internals.StandbyTask=ALL

Kafka Setup

Before you run the demo application, use ./bin/kafka-topics.sh --create to create the source topic.

./bin/kafka-topics.sh \
  --bootstrap-server :9092 \
  --create \
  --topic StateStoreLoggingEnabledDemo-input \
  --partitions 1 \
  --replication-factor 1

Use --partitions 1 as that helps keeping the log messages at the minimum, just to understand the internals. More partitions would simply create many tasks (and add nothing but more log messages).

After the demo application is up and running, use ./bin/kafka-topics.sh --list to list the topics used in the topology.

$ ./bin/kafka-topics.sh --bootstrap-server :9092 --list
StateStoreLoggingEnabledDemo-StateStoreLoggingEnabledDemo-in-memory-key-value-store-changelog
StateStoreLoggingEnabledDemo-StateStoreLoggingEnabledDemo-in-memory-key-value-store-repartition
StateStoreLoggingEnabledDemo-input

After the demo, you may also want to reset the Kafka Streams demo application (in order to reprocess its data from scratch) using ./bin/kafka-streams-application-reset.sh script.

./bin/kafka-streams-application-reset.sh \
  --application-id StateStoreLoggingEnabledDemo \
  --input-topics StateStoreLoggingEnabledDemo-input \
  --execute

Demo Application

val appId = this.getClass.getSimpleName.replace("$", "")
val storeName = s"$appId-in-memory-key-value-store"
val source = s"$appId-input"
val bootstrapServers = ":9092"

println(s"Application ID: $appId")
println(s"Source topics: $source")
println("Make sure that the source topics are available and press ENTER")
/**
 * ./bin/kafka-topics.sh \
 *    --bootstrap-server :9092 \
 *    --create \
 *    --topic StateStoreLoggingEnabledDemo-input \
 *    --partitions 1 \
 *    --replication-factor 1
 */
System.in.read()

// Using Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._

import org.apache.kafka.streams.state.Stores
val storeSupplier = Stores.inMemoryKeyValueStore(storeName)

import org.apache.kafka.streams.scala.kstream.Materialized
import scala.collection.JavaConverters._
val materialized: Materialized[String, Long, ByteArrayKeyValueStore] = Materialized
  .as[String, Long](storeSupplier)
  .withLoggingEnabled(Map.empty[String, String].asJava)

import org.apache.kafka.streams.kstream.Printed
val sysout = Printed.toSysOut[String, Long].withLabel("sysout")

val builder = new StreamsBuilder
builder
  .stream[String, String](source)
  .selectKey { case (_, v) => v.split(",").head }
  .groupByKey
  .count()(materialized)
  .toStream
  .print(sysout)

val topology = builder.build

println(topology.describe)

import org.apache.kafka.streams.StreamsConfig
val config = new java.util.Properties()
// You'd usually use APPLICATION_ID_CONFIG, but it is simply too long
config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
// Using CLIENT_ID_CONFIG allows for explicit application ID
config.put(StreamsConfig.CLIENT_ID_CONFIG, appId)
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)

import org.apache.kafka.streams.KafkaStreams
val ks = new KafkaStreams(topology, config)

// TIP: Copy and paste the above code using :paste mode in sbt console / Scala REPL
// So all the INFO messages go away
// So you can focus on the log messages after starting the KafkaStreams instance

ks.start()

results matching ""

    No results matching ""