StreamThread — Stream Processor Thread

StreamThread is a stream processor thread (a Java Thread) that runs the main record processing loop when started.

kafka streams StreamThread.png
Figure 1. StreamThread and Stream Processing

StreamThread is created exclusively alongside KafkaStreams (which is one of the main entities that a Kafka Streams developer uses in a Kafka Streams application).

Note
KafkaStreams uses num.stream.threads configuration property for the number of StreamThreads to create (default: 1).

StreamThread uses a Kafka Consumer and a custom ConsumerRebalanceListener (with the TaskManager) when subscribing to source topics (when requested to run the main record processing loop and enforce a rebalance). StreamThread uses an InternalTopologyBuilder for the source topics to subscribe to.

StreamThread uses a Kafka Consumer and a TaskManager that are both created when StreamThread object is requested to create an instance of itself. That is when the StreamThread sets the TASK_MANAGER_FOR_PARTITION_ASSIGNOR internal property and indirectly associates the TaskManager with StreamsPartitionAssignor. StreamThread also requests the given KafkaClientSupplier to create a KafkaConsumer (with the TaskManager) and so when the partition.assignment.strategy configuration property is picked up, StreamsPartitionAssignor is created and eventually configured (that will use the TASK_MANAGER_FOR_PARTITION_ASSIGNOR internal property).

kafka streams StreamThread TASK MANAGER FOR PARTITION ASSIGNOR.png
Figure 2. StreamThread and Registering TaskManager under TASK_MANAGER_FOR_PARTITION_ASSIGNOR

StreamThread uses the poll.ms configuration property (default: 100 ms) as the polling interval when requested to poll records once and process them using active stream tasks.

StreamThread uses the commit.interval.ms configuration property as the flush interval for persisting the position of a processor (when polling records once and processing them using active stream tasks and maybeCommit).

StreamThread uses the Kafka Consumer to:

StreamThread uses stream-thread [[clientId]-StreamThread-[STREAM_THREAD_ID]] for the logging prefix.

StreamThread requires an InternalTopologyBuilder to be created and uses it for the following:

StreamThread uses the TaskManager for the following:

  • FIXME

Tip

Enable any of ALL logging levels for org.apache.kafka.streams.processor.internals.StreamThread logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.StreamThread=ALL

Creating StreamThread Instance

StreamThread takes the following to be created:

StreamThread initializes the internal properties.

StreamThread and Kafka Consumer

When created, StreamThread is given a KafkaClientSupplier.

StreamThread uses the KafkaClientSupplier to get a Kafka consumer with the consumer-specific configuration.

The Kafka Consumer is then assigned to the TaskManager and to create the StreamThread.

The Kafka Consumer is used in the following:

StreamThread and AdminClient

When created, StreamThread is given an AdminClient that is only used to create the TaskManager.

Creating StreamThread Instance — create Factory Method

StreamThread create(
  InternalTopologyBuilder builder,
  StreamsConfig config,
  KafkaClientSupplier clientSupplier,
  AdminClient adminClient,
  UUID processId,
  String clientId,
  Metrics metrics,
  Time time,
  StreamsMetadataState streamsMetadataState,
  long cacheSizeBytes,
  StateDirectory stateDirectory,
  StateRestoreListener userStateRestoreListener,
  int threadIdx)
kafka streams StreamThread create.png
Figure 3. Creating StreamThread

create prints out the following INFO message to the logs:

Creating restore consumer client

create requests the input StreamsConfig for getRestoreConsumerConfigs for a new threadClientId (of the format [clientId]-StreamThread-[STREAM_THREAD_ID]).

create requests the given KafkaClientSupplier for getRestoreConsumer for the restoreConsumerConfigs.

create creates a StoreChangelogReader (with the restoreConsumer, the given StateRestoreListener and the configured poll.ms).

(Only with eos enabled) create…​FIXME

create creates a StreamsMetricsThreadImpl with the following:

  • the input Metrics

  • stream-metrics group name

  • thread.[clientId]-StreamThread-[STREAM_THREAD_ID] prefix

  • Tags with one entry with client-id and the [clientId]-StreamThread-[STREAM_THREAD_ID] value.

create creates a ThreadCache (with cacheSizeBytes for the maxCacheSizeBytes and the StreamsMetricsThreadImpl).

create creates a TaskCreator and a StandbyTaskCreator that are used exclusively to create a TaskManager (with a new AssignedStreamsTasks and AssignedStandbyTasks as well as the given StreamsMetadataState).

create prints out the following INFO message to the logs:

Creating consumer client

create requests the input StreamsConfig for application.id configuration property.

create requests the input StreamsConfig for the configuration of a Kafka Consumer for the application ID and the threadClientId (of the format [clientId]-StreamThread-[STREAM_THREAD_ID]) and adds the following internal properties:

(Only with non-empty latestResetTopicsPattern and earliestResetTopicsPattern patterns) create…​FIXME

create requests the given KafkaClientSupplier for a Kafka Consumer (with the consumerConfigs) and associates it with the TaskManager.

In the end, create creates a StreamThread.

Note
create is used exclusively when KafkaStreams is created.

Starting Stream Thread — run Method

void run()
Note
run is part of Java’s Thread Contract to be executed by a JVM thread.

run prints out the following INFO message to the logs.

Starting

At the end, run shuts down (per cleanRun flag that says whether running the main loop stopped cleanly or not).

run re-throws any KafkaException.

run prints out the following ERROR message to the logs for any other Exception:

Encountered the following error during processing: [exception]
Note
run is used exclusively when KafkaStreams is requested to start.

Life Cycle of StreamThread — StreamThread’s States

StreamThread can be in exactly one of the following states at any given point in time:

  1. CREATED - The initial state of StreamThread right after it was created

  2. RUNNING - StreamThread was requested for the following:

  3. STARTING

  4. PARTITIONS_REVOKED - RebalanceListener was requested to handle partition revocation

  5. PARTITIONS_ASSIGNED - RebalanceListener was requested to handle partition assignment

  6. PENDING_SHUTDOWN - StreamThread was requested to shutdown or completeShutdown

  7. DEAD - StreamThread is requested to completeShutdown

kafka streams StreamThread lifecycle.png
Figure 4. StreamThread’s Life Cycle

StreamThread can be transitioned to another state by executing setState.

Note
StreamThread defines a Java enumeration State with the states above ordered by ordinal. When created, they are assigned the state ordinals that could transition to. You can check whether a transition is valid or not using State.isValidTransition.
import org.apache.kafka.streams.processor.internals.StreamThread.State._

// CREATED is the 0th state
assert(CREATED.ordinal == 0)

// RUNNING is the next possible state after CREATED
assert(CREATED.isValidTransition(RUNNING))

// DEAD cannot the next possible state after CREATED
assert(CREATED.isValidTransition(DEAD) == false)

Shutting Down Stream Thread — shutdown Method

void shutdown()

shutdown prints out the following INFO message to the logs:

Informed to shut down

(only when transitioning from CREATED state) shutdown completeShutdown (with cleanRun flag enabled).

Note

shutdown is used when:

Polling Records Once And Processing Them Using Active Stream Tasks — runOnce Method

void runOnce()

In essence, runOnce requests the Consumer to poll records, adds the records to active stream tasks and requests the TaskManager to process the records by running stream tasks.

kafka streams StreamThread runOnce.png
Figure 5. StreamThread and Polling Records Once And Processing Them Using Active Stream Tasks
Note
runOnce uses the StreamsMetricsThreadImpl to access sensors and record metrics.

Internally, runOnce pollRequests with different poll times as follows:

Note

When in the other states (when pollRequests above), runOnce prints out the following ERROR message to the logs followed by throwing a StreamsException:

Unexpected state [state] during normal iteration

With records polled, runOnce requests the StreamsMetricsThreadImpl for the pollTimeSensor and requests it to record the above pollLatency followed by adding the records polled to active stream tasks.

If in PARTITIONS_ASSIGNED state, runOnce requests the TaskManager to updateNewAndRestoringTasks and (when all stream tasks are running) changes to the RUNNING state.

runOnce requests the TaskManager to check out if hasActiveRunningTasks and if so…​FIXME

In the end, runOnce maybeUpdateStandbyTasks followed by maybeCommit.

Note
runOnce is used exclusively when StreamThread is requested to run the main record processing loop.

Polling Records — pollRequests Internal Method

ConsumerRecords<byte[], byte[]> pollRequests(
  Duration pollTime)

pollRequests simply requests the Kafka Consumer to poll record with the given pollTime.

In case of an InvalidOffsetException, pollRequests resetInvalidOffsets.

In case of a rebalanceException, pollRequests re-throws it as a TaskMigratedException or a StreamsException.

Note
pollRequests is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.

resetInvalidOffsets Internal Method

void resetInvalidOffsets(
  InvalidOffsetException e)

resetInvalidOffsets…​FIXME

Note
resetInvalidOffsets is used exclusively when StreamThread is requested to pollRequests (and an InvalidOffsetException is reported).

Attempting to Update Running StandbyTasks — maybeUpdateStandbyTasks Internal Method

void maybeUpdateStandbyTasks()

maybeUpdateStandbyTasks…​FIXME

maybeUpdateStandbyTasks does nothing and simply returns when StreamThread is not in RUNNING state or the TaskManager has no hasStandbyRunningTasks.

Note
maybeUpdateStandbyTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.

Running Main Record Processing Loop — runLoop Internal Method

void runLoop()

runLoop simply requests the Consumer to subscribe to the source topics (with the custom ConsumerRebalanceListener) and keeps polling records and processing them using active stream tasks until the isRunning flag is off.

kafka streams StreamThread runLoop.png
Figure 6. StreamThread and Running Main Record Processing Loop

runLoop requests the Consumer to subscribe to the source topics (from the InternalTopologyBuilder) with the custom ConsumerRebalanceListener.

In case of the assignmentErrorCode set to VERSION_PROBING, runLoop prints out the following INFO message to the logs followed by enforcing a rebalance.

Version probing detected. Triggering new rebalance.

In case of TaskMigratedException, runLoop prints out the following WARN message to the logs followed by enforcing a rebalance.

Detected task [taskId] that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Will try to rejoin the consumer group. Below is the detailed description of the task:
[migratedTask]
Note
runLoop is used exclusively when StreamThread is requested to start.

Setting New State — setState Method

State setState(
  State newState)

setState…​FIXME

Note
setState is used when…​FIXME

setRebalanceException Internal Method

void setRebalanceException(
  Throwable rebalanceException)

setRebalanceException…​FIXME

Note
setRebalanceException is used when…​FIXME

Describing Itself (Textual Representation) — toString Method

String toString() (1)
String toString(
  String indent)
  1. Uses an empty indent

toString gives a text representation with "StreamsThread threadId:" and the thread name followed by the text representation of the TaskManager.

FIXME toString in action

Checking If StreamThread Is Running — isRunning Method

boolean isRunning()

isRunning is true when StreamThread is in one of the following states:

Otherwise, isRunning is false.

Note
isRunning is simply a pass-through variant of State.isRunning.
Note

isRunning is used when:

adminClientMetrics Method

Map<MetricName, Metric> adminClientMetrics()

adminClientMetrics…​FIXME

Note
adminClientMetrics is used exclusively when KafkaStreams is requested for the metrics.

consumerMetrics Method

Map<MetricName, Metric> consumerMetrics()

consumerMetrics…​FIXME

Note
consumerMetrics is used when…​FIXME

producerMetrics Method

Map<MetricName, Metric> producerMetrics()

producerMetrics…​FIXME

Note
producerMetrics is used when…​FIXME

getConsumerClientId Static Method

String getConsumerClientId(
  String threadClientId)

getConsumerClientId…​FIXME

Note
getConsumerClientId is used when…​FIXME

getRestoreConsumerClientId Static Method

String getRestoreConsumerClientId(
  String threadClientId)

getRestoreConsumerClientId…​FIXME

Note
getRestoreConsumerClientId is used when…​FIXME

getSharedAdminClientId Static Method

String getSharedAdminClientId(
  String clientId)

getSharedAdminClientId…​FIXME

Note
getSharedAdminClientId is used when…​FIXME

tasks Method

Map<TaskId, StreamTask> tasks()

tasks…​FIXME

Note
tasks is used when…​FIXME

getTaskProducerClientId Internal Static Method

String getTaskProducerClientId(
  String threadClientId,
  TaskId taskId)

getTaskProducerClientId…​FIXME

Note
getTaskProducerClientId is used when…​FIXME

getThreadProducerClientId Internal Static Method

String getThreadProducerClientId(
  String threadClientId)

getThreadProducerClientId…​FIXME

Note
getThreadProducerClientId is used when…​FIXME

Adding Records to Active Stream Tasks — addRecordsToTasks Internal Method

void addRecordsToTasks(
  ConsumerRecords<byte[], byte[]> records)

For every partition of the input records addRecordsToTasks requests the TaskManager for the active stream processor task responsible for the partition.

Note
The input records may (and often will) be from different partitions or even topics. Unless you use as many StreamThread instances as there are partitions (among the source topics), addRecordsToTasks will be given records from many partitions.

With the StreamTask, addRecordsToTasks requests the input mixed-partition ConsumerRecords for the records for the given partition only and then requests the StreamTask to buffer the new records (for the partition).

kafka streams StreamThread addRecordsToTasks.png
Figure 7. StreamThread and Adding Records to Active Stream Tasks
Note
ConsumerRecords is a container that holds the list of ConsumerRecord per partition for a particular topic. There is one ConsumerRecord list for every topic partition returned by a Consumer.poll(long) operation.
Note
addRecordsToTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.

Enforcing Rebalance — enforceRebalance Internal Method

void enforceRebalance()

enforceRebalance…​FIXME

Note
enforceRebalance is used when…​FIXME

Committing All Active and Standby Tasks (When Commit Interval Elapsed) — maybeCommit Method

boolean maybeCommit()

maybeCommit commits all tasks (owned by this TaskManager) if the commit interval has elapsed (i.e. the commit interval is non-negative and the time since the last commit is long gone).

Internally, maybeCommit prints out the following TRACE message to the logs:

Committing all active tasks [activeTaskIds] and standby tasks [standbyTaskIds] since [time]ms has elapsed (commit interval is [commitTimeMs]ms)

maybeCommit requests the TaskManager to commitAll.

Only if there are still running active and standby tasks, maybeCommit does the following:

  1. Requests the StreamsMetricsThreadImpl for the commitTimeSensor and records the commit time (as the latency of committing all the tasks by their number)

  2. Requests the TaskManager to maybePurgeCommitedRecords

maybeCommit prints out the following DEBUG message to the logs:

Committed all active tasks [activeTaskIds] and standby tasks [standbyTaskIds] in [duration]ms

maybeCommit updates the lastCommitMs internal counter with the input now time.

maybeCommit turns the processStandbyRecords flag on.

Note
maybeCommit is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.

Attempting to Punctuate (Running Stream Tasks) — maybePunctuate Internal Method

boolean maybePunctuate()

maybePunctuate requests the TaskManager to punctuate stream tasks.

If the punctuate returned a positive number (greater than 0), maybePunctuate advanceNowAndComputeLatency and requests the StreamsMetricsThreadImpl for the punctuateTimeSensor to record the punctuate time.

In the end, maybePunctuate returns whether the punctuate returned a positive number (true) or not (false).

Note
maybePunctuate is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.

addToResetList Internal Method

void addToResetList(
  TopicPartition partition,
  Set<TopicPartition> partitions,
  String logMessage,
  String resetPolicy,
  Set<String> loggedTopics)

addToResetList…​FIXME

Note
addToResetList is used when StreamThread…​FIXME

Computing Latency — advanceNowAndComputeLatency Internal Method

long advanceNowAndComputeLatency()

advanceNowAndComputeLatency updates (advances) the "now" timestamp to be the current timestamp and returns the timestamp difference (latency).

Note
advanceNowAndComputeLatency is used when StreamThread is requested to poll records once and process them using active stream tasks, maybePunctuate, maybeCommit, and maybeUpdateStandbyTasks.

clearStandbyRecords Internal Method

void clearStandbyRecords()

clearStandbyRecords…​FIXME

Note
clearStandbyRecords is used when StreamThread…​FIXME

completeShutdown Internal Method

void completeShutdown(
  boolean cleanRun)

completeShutdown…​FIXME

Note
completeShutdown is used when StreamThread is requested to run and shutdown.

updateThreadMetadata Internal Method

void updateThreadMetadata(
  Map<TaskId, StreamTask> activeTasks,
  Map<TaskId, StandbyTask> standbyTasks)
StreamThread updateThreadMetadata(
  String adminClientId)

updateThreadMetadata…​FIXME

Note
updateThreadMetadata is used when StreamThread…​FIXME

Internal Properties

Name Description

builder

lastCommitMs

Time of the last commit

numIterations

processStandbyRecords

Flag to control whether to maybeUpdateStandbyTasks after maybeCommit

Default: false

Turned off (false) in maybeUpdateStandbyTasks (after requesting the StandbyTasks to update)

Turned on (true) when attempting to commit (and the time to commit has come per commit.interval.ms configuration property)

now

"now" timestamp

rebalanceListener

Note
StreamThread requests InternalTopologyBuilder for the source topic pattern to subscribe to.

standbyRecords

stateListener

Used when StreamThread is requested to change a state

Set when KafkaStreams is created

Reset (null) when:

timerStartedMs

The timestamp when the timer has started

results matching ""

    No results matching ""