KafkaStreams — Streams Client

KafkaStreams is the interface for managing and inspecting the execution environment of the processing topology of a Kafka Streams application.

Note
A KafkaStreams instance is also referred by the name streams client (esp. in logs).

A KafkaStreams instance (process) can be started or closed (shut down). The current state is available using state.

KafkaStreams is running when the state is either RUNNING or REBALANCING.

There could be many KafkaStreams instances running simultaneously (e.g. as separate JVM processes each with its own StreamThreads). While a KafkaStreams instance is running, it allows for inspecting streams metadata using allMetadata, allMetadataForStore, and metadataForKey methods.

A KafkaStreams instance exposes monitoring metrics (incl. the Kafka Producer, Consumers, and AdminClient).

Only when in CREATED state, a KafkaStreams instance can be registered with StateRestoreListeners, StateListeners, and UncaughtExceptionHandlers.

Table 1. KafkaStreams API
Method Description

allMetadata

Collection<StreamsMetadata> allMetadata()

allMetadataForStore

Collection<StreamsMetadata> allMetadataForStore(
  String storeName)

cleanUp

void cleanUp()

Cleans up the local directory with state stores

close

void close() (1)
boolean close(Duration timeout)
  1. Uses Long.MAX_VALUE for the timeout

Closes the KafkaStreams instance

localThreadsMetadata

Set<ThreadMetadata> localThreadsMetadata()

metadataForKey

StreamsMetadata metadataForKey(
  String storeName,
  K key,
  Serializer<K> keySerializer)
StreamsMetadata metadataForKey(
  String storeName,
  K key,
  StreamPartitioner<? super K, ?> partitioner)

metrics

Map<MetricName, ? extends Metric> metrics()

Kafka monitoring metrics of the KafkaStreams instance (incl. the Kafka Producer, Consumers, and AdminClient used by the StreamThreads)

setGlobalStateRestoreListener

void setGlobalStateRestoreListener(
  StateRestoreListener globalStateRestoreListener)

Registers a global StateRestoreListener

setStateListener

void setStateListener(
  KafkaStreams.StateListener listener)

setUncaughtExceptionHandler

void setUncaughtExceptionHandler(
  Thread.UncaughtExceptionHandler eh)

start

void start()

Starts the KafkaStreams instance (and the Topology)

state

State state()

The current state of the KafkaStreams instance

store

T store(
  String storeName,
  QueryableStoreType<T> queryableStoreType)

KafkaStreams is simply a Kafka client that consumes messages from and produces the processing results to Kafka topics (abstracted as SourceNodes and SinkNodes, respectively).

kafka streams KafkaStreams.png
Figure 1. KafkaStreams
Note
A Kafka Streams developer describes the processing logic using a Topology directly (that is a graph of processors) or indirectly through a StreamsBuilder that provides the high-level DSL to define transformations and build a stream processing topology.
val topology: Topology = ...
val config: StreamsConfig = ...

import org.apache.kafka.streams.KafkaStreams
val ks = new KafkaStreams(topology, config)

Once created, KafkaStreams is started up to start consuming, processing, and producing records (as described by a Topology).

ks.start

(only when in CREATED state) KafkaStreams can be given a StateRestoreListener to be informed about the state-related events: onRestoreStart, onBatchRestored and onRestoreEnd (through DelegatingStateRestoreListener).

import org.apache.kafka.streams.processor.StateRestoreListener
val userRestoreListener: StateRestoreListener = ???
ks.setGlobalStateRestoreListener(userRestoreListener)

KafkaStreams uses a InternalTopologyBuilder for the following:

KafkaStreams uses stream-client [client.id] for the log prefix (with the clientId).

Tip

Enable ALL logging level for org.apache.kafka.streams.KafkaStreams logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.KafkaStreams=ALL

Cleaning Up Local Directory for State Stores — cleanUp Method

void cleanUp()

cleanUp simply requests StateDirectory to clean when KafkaStreams is not running.

Note
cleanUp can only be executed before KafkaStreams will be started or after has been closed.

cleanUp reports a IllegalStateException when KafkaStreams is running.

Cannot clean up while running.

Closing KafkaStreams — close Method

void close()  (1)
synchronized boolean close(final long timeout, final TimeUnit timeUnit)
  1. Calls close(final long timeout, final TimeUnit timeUnit) with 0 timeout

close…​FIXME

Important
Always execute close on a KafkaStreams instance even if you never call start to avoid resource leaks.

Creating KafkaStreams Instance

// public API
KafkaStreams(
  final Topology topology,
  final Properties props) (1)

// public API (mostly for testing)
KafkaStreams(
  final Topology topology,
  final Properties props,
  final KafkaClientSupplier clientSupplier) (3)
KafkaStreams(
  final Topology topology,
  final Properties props,
  final Time time)  (4)

// private/internal API
KafkaStreams(
  final InternalTopologyBuilder internalTopologyBuilder,
  final StreamsConfig config,
  final KafkaClientSupplier clientSupplier) (5)
KafkaStreams(
  final InternalTopologyBuilder internalTopologyBuilder,
  final StreamsConfig config,
  final KafkaClientSupplier clientSupplier,
  final Time time)  (6)
  1. Calls the internal KafkaStreams (5) with a new DefaultKafkaClientSupplier

  2. Calls the internal KafkaStreams (6) with SystemTime

KafkaStreams takes the following to be created:

KafkaStreams initializes the internal properties.

While being created, KafkaStreams…​FIXME

KafkaStreams requests the input KafkaClientSupplier for a Kafka AdminClient (for the AdminClient configuration for the clientId).

setRunningFromCreated Internal Method

boolean setRunningFromCreated()

setRunningFromCreated…​FIXME

Note
setRunningFromCreated is used exclusively when KafkaStreams is started.

Starting KafkaStreams — start Method

synchronized void start()
throws IllegalStateException, StreamsException

start starts the Topology (that in turn starts consuming, processing, and producing records).

Internally, start prints out the following DEBUG message to the logs:

Starting Streams client

start marks KafkaStreams as running (i.e. transitions from CREATED to RUNNING state and notifies StateListeners).

start starts global stream thread if defined (which is when…​FIXME)

start starts stream threads.

start schedules a thread that requests StateDirectory to cleanRemovedTasks every state.cleanup.delay.ms milliseconds.

You should see the following DEBUG message in the logs:

Started Streams client

In case the changing state to running fails, start merely prints out the following ERROR message to the logs:

Already stopped, cannot re-start

Registering Global StateRestoreListener — setGlobalStateRestoreListener Method

void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener)

setGlobalStateRestoreListener registers a StateRestoreListener (in a Kafka Streams application).

Internally, setGlobalStateRestoreListener simply sets the globalStateRestoreListener internal property to be the input StateRestoreListener (only when in CREATED state).

setGlobalStateRestoreListener throws a IllegalStateException when not in CREATED state:

Can only set GlobalStateRestoreListener in CREATED state. Current state is: [state]

allMetadata Method

Collection<StreamsMetadata> allMetadata()

Making Sure That KafkaStreams Is Running — validateIsRunning Internal Method

void validateIsRunning()

validateIsRunning throws a IllegalStateException when KafkaStreams is not running. Otherwise, validateIsRunning does nothing.

KafkaStreams is not running. State is [state].
Note
validateIsRunning is used when KafkaStreams is requested to allMetadata, allMetadataForStore, metadataForKey, metadataForKey, store, and localThreadsMetadata.

Internal Properties

Name Description

clientId

Client ID

Used for the following:

globalStateRestoreListener

A user-defined global StateRestoreListener to be notified about the state-related events: onRestoreStart, onBatchRestored and onRestoreEnd (through DelegatingStateRestoreListener)

globalStreamThread

stateDirCleaner

A single-threaded executor (java.util.concurrent.ScheduledExecutorService) that uses a single daemon thread with the name as clientId followed by -CleanupThread

Used to schedule a periodic action that requests the StateDirectory to cleanRemovedTasks after and every state.cleanup.delay.ms milliseconds (and only when the state is RUNNING)

Initialized when KafkaStreams is created and shut down when requested to close

adminClient

Kafka AdminClient (that allows for managing and inspecting topics, brokers, configurations and ACLs)

clientId

Client ID that is initialized when KafkaStreams is created as follows:

queryableStoreProvider

QueryableStoreProvider

stateDirectory

StateDirectory

stateLock

Object lock for…​FIXME

streamsMetadataState

StreamsMetadataState (with the InternalTopologyBuilder and application.server configuration property)

KafkaStreams is simply a public facade to expose the StreamsMetadataState using the following methods:

Initialized when KafkaStreams is created to create StreamThreads

threads

Note
The number of stream processor threads per KafkaStreams instance is controlled by num.stream.threads configuration property (default: 1 processing thread).
  • Created when KafkaStreams is created

  • Started when KafkaStreams is started

  • Shut down when KafkaStreams is closed

results matching ""

    No results matching ""