KafkaStreams — Streams Client

KafkaStreams is the interface for managing and inspecting the execution environment of the processing topology of a Kafka Streams application.

A KafkaStreams instance is also referred by the name streams client (esp. in logs).

A KafkaStreams instance (process) can be started or closed (shut down). The current state is available using state.

KafkaStreams is running when the state is either RUNNING or REBALANCING.

There could be many KafkaStreams instances running simultaneously (e.g. as separate JVM processes each with its own StreamThreads). While a KafkaStreams instance is running, it allows for inspecting streams metadata using allMetadata, allMetadataForStore, and metadataForKey methods.

A KafkaStreams instance exposes monitoring metrics (incl. the Kafka Producer, Consumers, and AdminClient).

Only when in CREATED state, a KafkaStreams instance can be registered with StateRestoreListeners, StateListeners, and UncaughtExceptionHandlers.

Table 1. KafkaStreams API
Method Description


Collection<StreamsMetadata> allMetadata()


Collection<StreamsMetadata> allMetadataForStore(
  String storeName)


void cleanUp()

Cleans up the local directory with state stores


void close() (1)
boolean close(Duration timeout)
  1. Uses Long.MAX_VALUE for the timeout

Closes the KafkaStreams instance


Set<ThreadMetadata> localThreadsMetadata()


StreamsMetadata metadataForKey(
  String storeName,
  K key,
  Serializer<K> keySerializer)
StreamsMetadata metadataForKey(
  String storeName,
  K key,
  StreamPartitioner<? super K, ?> partitioner)


Map<MetricName, ? extends Metric> metrics()

Kafka monitoring metrics of the KafkaStreams instance (incl. the Kafka Producer, Consumers, and AdminClient used by the StreamThreads)


void setGlobalStateRestoreListener(
  StateRestoreListener globalStateRestoreListener)

Registers a global StateRestoreListener


void setStateListener(
  KafkaStreams.StateListener listener)


void setUncaughtExceptionHandler(
  Thread.UncaughtExceptionHandler eh)


void start()

Starts the KafkaStreams instance (and the Topology)


State state()

The current state of the KafkaStreams instance


T store(
  String storeName,
  QueryableStoreType<T> queryableStoreType)

KafkaStreams is simply a Kafka client that consumes messages from and produces the processing results to Kafka topics (abstracted as SourceNodes and SinkNodes, respectively).

kafka streams KafkaStreams.png
Figure 1. KafkaStreams
A Kafka Streams developer describes the processing logic using a Topology directly (that is a graph of processors) or indirectly through a StreamsBuilder that provides the high-level DSL to define transformations and build a stream processing topology.
val topology: Topology = ...
val config: StreamsConfig = ...

import org.apache.kafka.streams.KafkaStreams
val ks = new KafkaStreams(topology, config)

Once created, KafkaStreams is started up to start consuming, processing, and producing records (as described by a Topology).


(only when in CREATED state) KafkaStreams can be given a StateRestoreListener to be informed about the state-related events: onRestoreStart, onBatchRestored and onRestoreEnd (through DelegatingStateRestoreListener).

import org.apache.kafka.streams.processor.StateRestoreListener
val userRestoreListener: StateRestoreListener = ???

KafkaStreams uses a InternalTopologyBuilder for the following:

KafkaStreams uses stream-client [client.id] for the log prefix (with the clientId).


Enable ALL logging level for org.apache.kafka.streams.KafkaStreams logger to see what happens inside.

Add the following line to log4j.properties:


Cleaning Up Local Directory for State Stores — cleanUp Method

void cleanUp()

cleanUp simply requests StateDirectory to clean when KafkaStreams is not running.

cleanUp can only be executed before KafkaStreams will be started or after has been closed.

cleanUp reports a IllegalStateException when KafkaStreams is running.

Cannot clean up while running.

Closing KafkaStreams — close Method

void close()  (1)
synchronized boolean close(final long timeout, final TimeUnit timeUnit)
  1. Calls close(final long timeout, final TimeUnit timeUnit) with 0 timeout


Always execute close on a KafkaStreams instance even if you never call start to avoid resource leaks.

Creating KafkaStreams Instance

// public API
  final Topology topology,
  final Properties props) (1)

// public API (mostly for testing)
  final Topology topology,
  final Properties props,
  final KafkaClientSupplier clientSupplier) (3)
  final Topology topology,
  final Properties props,
  final Time time)  (4)

// private/internal API
  final InternalTopologyBuilder internalTopologyBuilder,
  final StreamsConfig config,
  final KafkaClientSupplier clientSupplier) (5)
  final InternalTopologyBuilder internalTopologyBuilder,
  final StreamsConfig config,
  final KafkaClientSupplier clientSupplier,
  final Time time)  (6)
  1. Calls the internal KafkaStreams (5) with a new DefaultKafkaClientSupplier

  2. Calls the internal KafkaStreams (6) with SystemTime

KafkaStreams takes the following to be created:

KafkaStreams initializes the internal properties.

While being created, KafkaStreams…​FIXME

KafkaStreams requests the input KafkaClientSupplier for a Kafka AdminClient (for the AdminClient configuration for the clientId).

setRunningFromCreated Internal Method

boolean setRunningFromCreated()


setRunningFromCreated is used exclusively when KafkaStreams is started.

Starting KafkaStreams — start Method

synchronized void start()
throws IllegalStateException, StreamsException

start starts the Topology (that in turn starts consuming, processing, and producing records).

Internally, start prints out the following DEBUG message to the logs:

Starting Streams client

start marks KafkaStreams as running (i.e. transitions from CREATED to RUNNING state and notifies StateListeners).

start starts global stream thread if defined (which is when…​FIXME)

start starts stream threads.

start schedules a thread that requests StateDirectory to cleanRemovedTasks every state.cleanup.delay.ms milliseconds.

You should see the following DEBUG message in the logs:

Started Streams client

In case the changing state to running fails, start merely prints out the following ERROR message to the logs:

Already stopped, cannot re-start

Registering Global StateRestoreListener — setGlobalStateRestoreListener Method

void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener)

setGlobalStateRestoreListener registers a StateRestoreListener (in a Kafka Streams application).

Internally, setGlobalStateRestoreListener simply sets the globalStateRestoreListener internal property to be the input StateRestoreListener (only when in CREATED state).

setGlobalStateRestoreListener throws a IllegalStateException when not in CREATED state:

Can only set GlobalStateRestoreListener in CREATED state. Current state is: [state]

allMetadata Method

Collection<StreamsMetadata> allMetadata()

Making Sure That KafkaStreams Is Running — validateIsRunning Internal Method

void validateIsRunning()

validateIsRunning throws a IllegalStateException when KafkaStreams is not running. Otherwise, validateIsRunning does nothing.

KafkaStreams is not running. State is [state].
validateIsRunning is used when KafkaStreams is requested to allMetadata, allMetadataForStore, metadataForKey, metadataForKey, store, and localThreadsMetadata.

Internal Properties

Name Description


Client ID

Used for the following:


A user-defined global StateRestoreListener to be notified about the state-related events: onRestoreStart, onBatchRestored and onRestoreEnd (through DelegatingStateRestoreListener)



A single-threaded executor (java.util.concurrent.ScheduledExecutorService) that uses a single daemon thread with the name as clientId followed by -CleanupThread

Used to schedule a periodic action that requests the StateDirectory to cleanRemovedTasks after and every state.cleanup.delay.ms milliseconds (and only when the state is RUNNING)

Initialized when KafkaStreams is created and shut down when requested to close


Kafka AdminClient (that allows for managing and inspecting topics, brokers, configurations and ACLs)


Client ID that is initialized when KafkaStreams is created as follows:






Object lock for…​FIXME


StreamsMetadataState (with the InternalTopologyBuilder and application.server configuration property)

KafkaStreams is simply a public facade to expose the StreamsMetadataState using the following methods:

Initialized when KafkaStreams is created to create StreamThreads


The number of stream processor threads per KafkaStreams instance is controlled by num.stream.threads configuration property (default: 1 processing thread).
  • Created when KafkaStreams is created

  • Started when KafkaStreams is started

  • Shut down when KafkaStreams is closed

results matching ""

    No results matching ""