log4j.rootLogger=OFF, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.org.apache.kafka.streams=INFO
log4j.logger.org.apache.kafka.streams.processor.internals.StreamTask=ALL
log4j.logger.org.apache.kafka.streams.processor.internals.StandbyTask=ALL
Demo: Creating Topology with State Store with Logging Enabled
The following demo shows the internals of state stores with logging enabled.
Logging / log4j.properties
The following logging configuration may help understand the internals. Use the following in log4j.properties
:
Refer to Application Logging Using log4j.
Kafka Setup
Before you run the demo application, use ./bin/kafka-topics.sh --create
to create the source topic.
./bin/kafka-topics.sh \
--bootstrap-server :9092 \
--create \
--topic StateStoreLoggingEnabledDemo-input \
--partitions 1 \
--replication-factor 1
Use --partitions 1
as that helps keeping the log messages at the minimum, just to understand the internals. More partitions would simply create many tasks (and add nothing but more log messages).
After the demo application is up and running, use ./bin/kafka-topics.sh --list
to list the topics used in the topology.
$ ./bin/kafka-topics.sh --bootstrap-server :9092 --list
StateStoreLoggingEnabledDemo-StateStoreLoggingEnabledDemo-in-memory-key-value-store-changelog
StateStoreLoggingEnabledDemo-StateStoreLoggingEnabledDemo-in-memory-key-value-store-repartition
StateStoreLoggingEnabledDemo-input
After the demo, you may also want to reset the Kafka Streams demo application (in order to reprocess its data from scratch) using ./bin/kafka-streams-application-reset.sh
script.
./bin/kafka-streams-application-reset.sh \
--application-id StateStoreLoggingEnabledDemo \
--input-topics StateStoreLoggingEnabledDemo-input \
--execute
Demo Application
val appId = this.getClass.getSimpleName.replace("$", "")
val storeName = s"$appId-in-memory-key-value-store"
val source = s"$appId-input"
val bootstrapServers = ":9092"
println(s"Application ID: $appId")
println(s"Source topics: $source")
println("Make sure that the source topics are available and press ENTER")
/**
* ./bin/kafka-topics.sh \
* --bootstrap-server :9092 \
* --create \
* --topic StateStoreLoggingEnabledDemo-input \
* --partitions 1 \
* --replication-factor 1
*/
System.in.read()
// Using Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._
import org.apache.kafka.streams.state.Stores
val storeSupplier = Stores.inMemoryKeyValueStore(storeName)
import org.apache.kafka.streams.scala.kstream.Materialized
import scala.collection.JavaConverters._
val materialized: Materialized[String, Long, ByteArrayKeyValueStore] = Materialized
.as[String, Long](storeSupplier)
.withLoggingEnabled(Map.empty[String, String].asJava)
import org.apache.kafka.streams.kstream.Printed
val sysout = Printed.toSysOut[String, Long].withLabel("sysout")
val builder = new StreamsBuilder
builder
.stream[String, String](source)
.selectKey { case (_, v) => v.split(",").head }
.groupByKey
.count()(materialized)
.toStream
.print(sysout)
val topology = builder.build
println(topology.describe)
import org.apache.kafka.streams.StreamsConfig
val config = new java.util.Properties()
// You'd usually use APPLICATION_ID_CONFIG, but it is simply too long
config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
// Using CLIENT_ID_CONFIG allows for explicit application ID
config.put(StreamsConfig.CLIENT_ID_CONFIG, appId)
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
import org.apache.kafka.streams.KafkaStreams
val ks = new KafkaStreams(topology, config)
// TIP: Copy and paste the above code using :paste mode in sbt console / Scala REPL
// So all the INFO messages go away
// So you can focus on the log messages after starting the KafkaStreams instance
ks.start()