StreamThread — Stream Processor Thread
StreamThread is a stream processor thread (a Java Thread) that runs the main record processing loop when started.
StreamThread is created exclusively alongside KafkaStreams (which is one of the main entities that a Kafka Streams developer uses in a Kafka Streams application).
|
Note
|
KafkaStreams uses num.stream.threads configuration property for the number of StreamThreads to create (default: 1).
|
StreamThread uses a Kafka Consumer and a custom ConsumerRebalanceListener (with the TaskManager) when subscribing to source topics (when requested to run the main record processing loop and enforce a rebalance). StreamThread uses an InternalTopologyBuilder for the source topics to subscribe to.
|
Note
|
When partitions get assigned, the custom ConsumerRebalanceListener requests the TaskManager to create tasks for the assigned partitions.
|
StreamThread uses a Kafka Consumer and a TaskManager that are both created when StreamThread object is requested to create an instance of itself. That is when the StreamThread sets the TASK_MANAGER_FOR_PARTITION_ASSIGNOR internal property and indirectly associates the TaskManager with StreamsPartitionAssignor. StreamThread also requests the given KafkaClientSupplier to create a KafkaConsumer (with the TaskManager) and so when the partition.assignment.strategy configuration property is picked up, StreamsPartitionAssignor is created and eventually configured (that will use the TASK_MANAGER_FOR_PARTITION_ASSIGNOR internal property).
StreamThread uses the poll.ms configuration property (default: 100 ms) as the polling interval when requested to poll records once and process them using active stream tasks.
StreamThread uses the commit.interval.ms configuration property as the flush interval for persisting the position of a processor (when polling records once and processing them using active stream tasks and maybeCommit).
StreamThread uses the Kafka Consumer to:
-
Subscribe to topics (with RebalanceListener) right after
StreamThreadhas been requested to run the main record processing loop -
Poll the topics subscribed (and fetch records if available) right after
StreamThreadhas been requested to get the next batch of records by polling -
resetInvalidOffsets (when an
InvalidOffsetExceptionis reported while polling the topics for records)
StreamThread uses stream-thread [[clientId]-StreamThread-[STREAM_THREAD_ID]] for the logging prefix.
StreamThread requires an InternalTopologyBuilder to be created and uses it for the following:
-
Creating a TaskCreator and a StandbyTaskCreator
-
Running the main record processing loop (and subscribing to the source topics)
StreamThread uses the TaskManager for the following:
-
FIXME
|
Tip
|
Enable any of Add the following line to
Refer to Application Logging Using log4j. |
Creating StreamThread Instance
StreamThread takes the following to be created:
StreamThread initializes the internal properties.
StreamThread and Kafka Consumer
When created, StreamThread is given a KafkaClientSupplier.
StreamThread uses the KafkaClientSupplier to get a Kafka consumer with the consumer-specific configuration.
The Kafka Consumer is then assigned to the TaskManager and to create the StreamThread.
The Kafka Consumer is used in the following:
-
runLoop (to subscribe to source topics with the ConsumerRebalanceListener)
-
enforceRebalance (by unsubscribing from the source topics and (re)subscribing)
-
pollRequests (to poll records)
-
resetInvalidOffsets (to seek to the beginning or the end of partitions assigned)
-
completeShutdown (to close the consumer)
-
consumerMetrics (for the consumer metrics)
StreamThread and AdminClient
When created, StreamThread is given an AdminClient that is only used to create the TaskManager.
Creating StreamThread Instance — create Factory Method
StreamThread create(
InternalTopologyBuilder builder,
StreamsConfig config,
KafkaClientSupplier clientSupplier,
AdminClient adminClient,
UUID processId,
String clientId,
Metrics metrics,
Time time,
StreamsMetadataState streamsMetadataState,
long cacheSizeBytes,
StateDirectory stateDirectory,
StateRestoreListener userStateRestoreListener,
int threadIdx)
create prints out the following INFO message to the logs:
Creating restore consumer client
create requests the input StreamsConfig for getRestoreConsumerConfigs for a new threadClientId (of the format [clientId]-StreamThread-[STREAM_THREAD_ID]).
create requests the given KafkaClientSupplier for getRestoreConsumer for the restoreConsumerConfigs.
create creates a StoreChangelogReader (with the restoreConsumer, the given StateRestoreListener and the configured poll.ms).
|
Note
|
The input StateRestoreListener is a DelegatingStateRestoreListener actually. |
(Only with eos enabled) create…FIXME
create creates a StreamsMetricsThreadImpl with the following:
-
the input
Metrics -
stream-metrics group name
-
thread.[clientId]-StreamThread-[STREAM_THREAD_ID]prefix -
Tags with one entry with
client-idand the[clientId]-StreamThread-[STREAM_THREAD_ID]value.
create creates a ThreadCache (with cacheSizeBytes for the maxCacheSizeBytes and the StreamsMetricsThreadImpl).
create creates a TaskCreator and a StandbyTaskCreator that are used exclusively to create a TaskManager (with a new AssignedStreamsTasks and AssignedStandbyTasks as well as the given StreamsMetadataState).
create prints out the following INFO message to the logs:
Creating consumer client
create requests the input StreamsConfig for application.id configuration property.
create requests the input StreamsConfig for the configuration of a Kafka Consumer for the application ID and the threadClientId (of the format [clientId]-StreamThread-[STREAM_THREAD_ID]) and adds the following internal properties:
-
TASK_MANAGER_FOR_PARTITION_ASSIGNOR to be the
TaskManagerjust created -
ASSIGNMENT_ERROR_CODE to be a new
AtomicInteger
(Only with non-empty latestResetTopicsPattern and earliestResetTopicsPattern patterns) create…FIXME
create requests the given KafkaClientSupplier for a Kafka Consumer (with the consumerConfigs) and associates it with the TaskManager.
In the end, create creates a StreamThread.
|
Note
|
create is used exclusively when KafkaStreams is created.
|
Starting Stream Thread — run Method
void run()
|
Note
|
run is part of Java’s Thread Contract to be executed by a JVM thread.
|
run prints out the following INFO message to the logs.
Starting
At the end, run shuts down (per cleanRun flag that says whether running the main loop stopped cleanly or not).
run re-throws any KafkaException.
run prints out the following ERROR message to the logs for any other Exception:
Encountered the following error during processing: [exception]
|
Note
|
run is used exclusively when KafkaStreams is requested to start.
|
Life Cycle of StreamThread — StreamThread’s States
StreamThread can be in exactly one of the following states at any given point in time:
-
CREATED- The initial state ofStreamThreadright after it was created -
RUNNING-StreamThreadwas requested for the following:-
Polling records once and processing them using active stream tasks when
StreamThreadis in PARTITIONS_ASSIGNED state and TaskManager was positive after updateNewAndRestoringTasks -
Polling records once and processing them using active stream tasks when
StreamThreadpolled for records and happened to transition to PARTITIONS_ASSIGNED state, but (again) only when TaskManager was positive after updateNewAndRestoringTasks
-
PARTITIONS_REVOKED-RebalanceListenerwas requested to handle partition revocation -
PARTITIONS_ASSIGNED-RebalanceListenerwas requested to handle partition assignment -
PENDING_SHUTDOWN-StreamThreadwas requested to shutdown or completeShutdown -
DEAD-StreamThreadis requested to completeShutdown
StreamThread can be transitioned to another state by executing setState.
|
Note
|
StreamThread defines a Java enumeration State with the states above ordered by ordinal. When created, they are assigned the state ordinals that could transition to. You can check whether a transition is valid or not using State.isValidTransition.
|
import org.apache.kafka.streams.processor.internals.StreamThread.State._
// CREATED is the 0th state
assert(CREATED.ordinal == 0)
// RUNNING is the next possible state after CREATED
assert(CREATED.isValidTransition(RUNNING))
// DEAD cannot the next possible state after CREATED
assert(CREATED.isValidTransition(DEAD) == false)
Shutting Down Stream Thread — shutdown Method
void shutdown()
shutdown prints out the following INFO message to the logs:
Informed to shut down
shutdown tries to transition the current state to PENDING_SHUTDOWN.
(only when transitioning from CREATED state) shutdown completeShutdown (with cleanRun flag enabled).
|
Note
|
|
Polling Records Once And Processing Them Using Active Stream Tasks — runOnce Method
void runOnce()
In essence, runOnce requests the Consumer to poll records, adds the records to active stream tasks and requests the TaskManager to process the records by running stream tasks.
|
Note
|
runOnce uses the StreamsMetricsThreadImpl to access sensors and record metrics.
|
Internally, runOnce pollRequests with different poll times as follows:
-
0Lwhen in PARTITIONS_ASSIGNED state -
pollTime when in PARTITIONS_REVOKED, STARTING or RUNNING state
|
Note
|
When in the other states (when pollRequests above),
|
runOnce advanceNowAndComputeLatency.
With records polled, runOnce requests the StreamsMetricsThreadImpl for the pollTimeSensor and requests it to record the above pollLatency followed by adding the records polled to active stream tasks.
If in PARTITIONS_ASSIGNED state, runOnce requests the TaskManager to updateNewAndRestoringTasks and (when all stream tasks are running) changes to the RUNNING state.
runOnce advanceNowAndComputeLatency.
runOnce requests the TaskManager to check out if hasActiveRunningTasks and if so…FIXME
In the end, runOnce maybeUpdateStandbyTasks followed by maybeCommit.
|
Note
|
runOnce is used exclusively when StreamThread is requested to run the main record processing loop.
|
Polling Records — pollRequests Internal Method
ConsumerRecords<byte[], byte[]> pollRequests(
Duration pollTime)
pollRequests simply requests the Kafka Consumer to poll record with the given pollTime.
In case of an InvalidOffsetException, pollRequests resetInvalidOffsets.
In case of a rebalanceException, pollRequests re-throws it as a TaskMigratedException or a StreamsException.
|
Note
|
pollRequests is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.
|
resetInvalidOffsets Internal Method
void resetInvalidOffsets(
InvalidOffsetException e)
resetInvalidOffsets…FIXME
|
Note
|
resetInvalidOffsets is used exclusively when StreamThread is requested to pollRequests (and an InvalidOffsetException is reported).
|
Attempting to Update Running StandbyTasks — maybeUpdateStandbyTasks Internal Method
void maybeUpdateStandbyTasks()
maybeUpdateStandbyTasks…FIXME
maybeUpdateStandbyTasks does nothing and simply returns when StreamThread is not in RUNNING state or the TaskManager has no hasStandbyRunningTasks.
|
Note
|
maybeUpdateStandbyTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.
|
Running Main Record Processing Loop — runLoop Internal Method
void runLoop()
runLoop simply requests the Consumer to subscribe to the source topics (with the custom ConsumerRebalanceListener) and keeps polling records and processing them using active stream tasks until the isRunning flag is off.
runLoop requests the Consumer to subscribe to the source topics (from the InternalTopologyBuilder) with the custom ConsumerRebalanceListener.
runLoop then keeps polling records and processing them using active stream tasks until the isRunning flag is off.
In case of the assignmentErrorCode set to VERSION_PROBING, runLoop prints out the following INFO message to the logs followed by enforcing a rebalance.
Version probing detected. Triggering new rebalance.
In case of TaskMigratedException, runLoop prints out the following WARN message to the logs followed by enforcing a rebalance.
Detected task [taskId] that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Will try to rejoin the consumer group. Below is the detailed description of the task:
[migratedTask]
|
Note
|
runLoop is used exclusively when StreamThread is requested to start.
|
Setting New State — setState Method
State setState(
State newState)
setState…FIXME
|
Note
|
setState is used when…FIXME
|
setRebalanceException Internal Method
void setRebalanceException(
Throwable rebalanceException)
setRebalanceException…FIXME
|
Note
|
setRebalanceException is used when…FIXME
|
Describing Itself (Textual Representation) — toString Method
String toString() (1)
String toString(
String indent)
-
Uses an empty indent
toString gives a text representation with "StreamsThread threadId:" and the thread name followed by the text representation of the TaskManager.
FIXME toString in action
Checking If StreamThread Is Running — isRunning Method
boolean isRunning()
isRunning is true when StreamThread is in one of the following states:
Otherwise, isRunning is false.
|
Note
|
isRunning is simply a pass-through variant of State.isRunning.
|
|
Note
|
|
adminClientMetrics Method
Map<MetricName, Metric> adminClientMetrics()
adminClientMetrics…FIXME
|
Note
|
adminClientMetrics is used exclusively when KafkaStreams is requested for the metrics.
|
consumerMetrics Method
Map<MetricName, Metric> consumerMetrics()
consumerMetrics…FIXME
|
Note
|
consumerMetrics is used when…FIXME
|
producerMetrics Method
Map<MetricName, Metric> producerMetrics()
producerMetrics…FIXME
|
Note
|
producerMetrics is used when…FIXME
|
getConsumerClientId Static Method
String getConsumerClientId(
String threadClientId)
getConsumerClientId…FIXME
|
Note
|
getConsumerClientId is used when…FIXME
|
getRestoreConsumerClientId Static Method
String getRestoreConsumerClientId(
String threadClientId)
getRestoreConsumerClientId…FIXME
|
Note
|
getRestoreConsumerClientId is used when…FIXME
|
getSharedAdminClientId Static Method
String getSharedAdminClientId(
String clientId)
getSharedAdminClientId…FIXME
|
Note
|
getSharedAdminClientId is used when…FIXME
|
getTaskProducerClientId Internal Static Method
String getTaskProducerClientId(
String threadClientId,
TaskId taskId)
getTaskProducerClientId…FIXME
|
Note
|
getTaskProducerClientId is used when…FIXME
|
getThreadProducerClientId Internal Static Method
String getThreadProducerClientId(
String threadClientId)
getThreadProducerClientId…FIXME
|
Note
|
getThreadProducerClientId is used when…FIXME
|
Adding Records to Active Stream Tasks — addRecordsToTasks Internal Method
void addRecordsToTasks(
ConsumerRecords<byte[], byte[]> records)
For every partition of the input records addRecordsToTasks requests the TaskManager for the active stream processor task responsible for the partition.
|
Note
|
The input records may (and often will) be from different partitions or even topics. Unless you use as many StreamThread instances as there are partitions (among the source topics), addRecordsToTasks will be given records from many partitions.
|
With the StreamTask, addRecordsToTasks requests the input mixed-partition ConsumerRecords for the records for the given partition only and then requests the StreamTask to buffer the new records (for the partition).
|
Note
|
ConsumerRecords is a container that holds the list of ConsumerRecord per partition for a particular topic. There is one ConsumerRecord list for every topic partition returned by a Consumer.poll(long) operation.
|
|
Note
|
addRecordsToTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.
|
Enforcing Rebalance — enforceRebalance Internal Method
void enforceRebalance()
enforceRebalance…FIXME
|
Note
|
enforceRebalance is used when…FIXME
|
Committing All Active and Standby Tasks (When Commit Interval Elapsed) — maybeCommit Method
boolean maybeCommit()
maybeCommit commits all tasks (owned by this TaskManager) if the commit interval has elapsed (i.e. the commit interval is non-negative and the time since the last commit is long gone).
Internally, maybeCommit prints out the following TRACE message to the logs:
Committing all active tasks [activeTaskIds] and standby tasks [standbyTaskIds] since [time]ms has elapsed (commit interval is [commitTimeMs]ms)
maybeCommit requests the TaskManager to commitAll.
Only if there are still running active and standby tasks, maybeCommit does the following:
-
Requests the StreamsMetricsThreadImpl for the commitTimeSensor and records the commit time (as the latency of committing all the tasks by their number)
-
Requests the TaskManager to maybePurgeCommitedRecords
maybeCommit prints out the following DEBUG message to the logs:
Committed all active tasks [activeTaskIds] and standby tasks [standbyTaskIds] in [duration]ms
maybeCommit updates the lastCommitMs internal counter with the input now time.
maybeCommit turns the processStandbyRecords flag on.
|
Note
|
maybeCommit is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.
|
Attempting to Punctuate (Running Stream Tasks) — maybePunctuate Internal Method
boolean maybePunctuate()
maybePunctuate requests the TaskManager to punctuate stream tasks.
If the punctuate returned a positive number (greater than 0), maybePunctuate advanceNowAndComputeLatency and requests the StreamsMetricsThreadImpl for the punctuateTimeSensor to record the punctuate time.
In the end, maybePunctuate returns whether the punctuate returned a positive number (true) or not (false).
|
Note
|
maybePunctuate is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.
|
addToResetList Internal Method
void addToResetList(
TopicPartition partition,
Set<TopicPartition> partitions,
String logMessage,
String resetPolicy,
Set<String> loggedTopics)
addToResetList…FIXME
|
Note
|
addToResetList is used when StreamThread…FIXME
|
Computing Latency — advanceNowAndComputeLatency Internal Method
long advanceNowAndComputeLatency()
advanceNowAndComputeLatency updates (advances) the "now" timestamp to be the current timestamp and returns the timestamp difference (latency).
|
Note
|
advanceNowAndComputeLatency is used when StreamThread is requested to poll records once and process them using active stream tasks, maybePunctuate, maybeCommit, and maybeUpdateStandbyTasks.
|
clearStandbyRecords Internal Method
void clearStandbyRecords()
clearStandbyRecords…FIXME
|
Note
|
clearStandbyRecords is used when StreamThread…FIXME
|
updateThreadMetadata Internal Method
void updateThreadMetadata(
Map<TaskId, StreamTask> activeTasks,
Map<TaskId, StandbyTask> standbyTasks)
StreamThread updateThreadMetadata(
String adminClientId)
updateThreadMetadata…FIXME
|
Note
|
updateThreadMetadata is used when StreamThread…FIXME
|
Internal Properties
| Name | Description | ||
|---|---|---|---|
|
|||
|
Time of the last commit |
||
|
Number of iterations when the TaskManager is requested to process records by running stream tasks (one record per task) (while Default: Incremented while polling records once and processing them using active stream tasks Decremented by half while polling records once and processing them using active stream tasks |
||
|
Flag to control whether to maybeUpdateStandbyTasks after maybeCommit Default: Turned off ( Turned on ( |
||
|
|||
|
|
||
|
|||
|
Used when Set when Reset (
|
||
|