StreamThread — Stream Processor Thread
StreamThread
is a stream processor thread (a Java Thread) that runs the main record processing loop when started.
StreamThread
is created exclusively alongside KafkaStreams (which is one of the main entities that a Kafka Streams developer uses in a Kafka Streams application).
Note
|
KafkaStreams uses num.stream.threads configuration property for the number of StreamThreads to create (default: 1 ).
|
StreamThread
uses a Kafka Consumer and a custom ConsumerRebalanceListener (with the TaskManager) when subscribing to source topics (when requested to run the main record processing loop and enforce a rebalance). StreamThread
uses an InternalTopologyBuilder for the source topics to subscribe to.
Note
|
When partitions get assigned, the custom ConsumerRebalanceListener requests the TaskManager to create tasks for the assigned partitions.
|
StreamThread
uses a Kafka Consumer and a TaskManager that are both created when StreamThread
object is requested to create an instance of itself. That is when the StreamThread
sets the TASK_MANAGER_FOR_PARTITION_ASSIGNOR internal property and indirectly associates the TaskManager with StreamsPartitionAssignor. StreamThread
also requests the given KafkaClientSupplier to create a KafkaConsumer (with the TaskManager) and so when the partition.assignment.strategy configuration property is picked up, StreamsPartitionAssignor
is created and eventually configured (that will use the TASK_MANAGER_FOR_PARTITION_ASSIGNOR internal property).
StreamThread
uses the poll.ms configuration property (default: 100
ms) as the polling interval when requested to poll records once and process them using active stream tasks.
StreamThread
uses the commit.interval.ms configuration property as the flush interval for persisting the position of a processor (when polling records once and processing them using active stream tasks and maybeCommit).
StreamThread
uses the Kafka Consumer to:
-
Subscribe to topics (with RebalanceListener) right after
StreamThread
has been requested to run the main record processing loop -
Poll the topics subscribed (and fetch records if available) right after
StreamThread
has been requested to get the next batch of records by polling -
resetInvalidOffsets (when an
InvalidOffsetException
is reported while polling the topics for records)
StreamThread
uses stream-thread [[clientId]-StreamThread-[STREAM_THREAD_ID]] for the logging prefix.
StreamThread
requires an InternalTopologyBuilder to be created and uses it for the following:
-
Creating a TaskCreator and a StandbyTaskCreator
-
Running the main record processing loop (and subscribing to the source topics)
StreamThread
uses the TaskManager for the following:
-
FIXME
Tip
|
Enable any of Add the following line to
Refer to Application Logging Using log4j. |
Creating StreamThread Instance
StreamThread
takes the following to be created:
StreamThread
initializes the internal properties.
StreamThread and Kafka Consumer
When created, StreamThread
is given a KafkaClientSupplier.
StreamThread
uses the KafkaClientSupplier
to get a Kafka consumer with the consumer-specific configuration.
The Kafka Consumer is then assigned to the TaskManager and to create the StreamThread.
The Kafka Consumer is used in the following:
-
runLoop (to subscribe to source topics with the ConsumerRebalanceListener)
-
enforceRebalance (by unsubscribing from the source topics and (re)subscribing)
-
pollRequests (to poll records)
-
resetInvalidOffsets (to seek to the beginning or the end of partitions assigned)
-
completeShutdown (to close the consumer)
-
consumerMetrics (for the consumer metrics)
StreamThread and AdminClient
When created, StreamThread
is given an AdminClient
that is only used to create the TaskManager.
Creating StreamThread Instance — create
Factory Method
StreamThread create(
InternalTopologyBuilder builder,
StreamsConfig config,
KafkaClientSupplier clientSupplier,
AdminClient adminClient,
UUID processId,
String clientId,
Metrics metrics,
Time time,
StreamsMetadataState streamsMetadataState,
long cacheSizeBytes,
StateDirectory stateDirectory,
StateRestoreListener userStateRestoreListener,
int threadIdx)
create
prints out the following INFO message to the logs:
Creating restore consumer client
create
requests the input StreamsConfig
for getRestoreConsumerConfigs for a new threadClientId
(of the format [clientId]-StreamThread-[STREAM_THREAD_ID]
).
create
requests the given KafkaClientSupplier
for getRestoreConsumer for the restoreConsumerConfigs
.
create
creates a StoreChangelogReader (with the restoreConsumer
, the given StateRestoreListener and the configured poll.ms).
Note
|
The input StateRestoreListener is a DelegatingStateRestoreListener actually. |
(Only with eos enabled) create
…FIXME
create
creates a StreamsMetricsThreadImpl with the following:
-
the input
Metrics
-
stream-metrics group name
-
thread.[clientId]-StreamThread-[STREAM_THREAD_ID]
prefix -
Tags with one entry with
client-id
and the[clientId]-StreamThread-[STREAM_THREAD_ID]
value.
create
creates a ThreadCache (with cacheSizeBytes
for the maxCacheSizeBytes
and the StreamsMetricsThreadImpl
).
create
creates a TaskCreator and a StandbyTaskCreator that are used exclusively to create a TaskManager (with a new AssignedStreamsTasks and AssignedStandbyTasks as well as the given StreamsMetadataState).
create
prints out the following INFO message to the logs:
Creating consumer client
create
requests the input StreamsConfig
for application.id configuration property.
create
requests the input StreamsConfig
for the configuration of a Kafka Consumer for the application ID and the threadClientId
(of the format [clientId]-StreamThread-[STREAM_THREAD_ID]
) and adds the following internal properties:
-
TASK_MANAGER_FOR_PARTITION_ASSIGNOR to be the
TaskManager
just created -
ASSIGNMENT_ERROR_CODE to be a new
AtomicInteger
(Only with non-empty latestResetTopicsPattern
and earliestResetTopicsPattern
patterns) create
…FIXME
create
requests the given KafkaClientSupplier
for a Kafka Consumer (with the consumerConfigs
) and associates it with the TaskManager
.
In the end, create
creates a StreamThread.
Note
|
create is used exclusively when KafkaStreams is created.
|
Starting Stream Thread — run
Method
void run()
Note
|
run is part of Java’s Thread Contract to be executed by a JVM thread.
|
run
prints out the following INFO message to the logs.
Starting
At the end, run
shuts down (per cleanRun
flag that says whether running the main loop stopped cleanly or not).
run
re-throws any KafkaException
.
run
prints out the following ERROR message to the logs for any other Exception
:
Encountered the following error during processing: [exception]
Note
|
run is used exclusively when KafkaStreams is requested to start.
|
Life Cycle of StreamThread — StreamThread’s States
StreamThread
can be in exactly one of the following states at any given point in time:
-
CREATED
- The initial state ofStreamThread
right after it was created -
RUNNING
-StreamThread
was requested for the following:-
Polling records once and processing them using active stream tasks when
StreamThread
is in PARTITIONS_ASSIGNED state and TaskManager was positive after updateNewAndRestoringTasks -
Polling records once and processing them using active stream tasks when
StreamThread
polled for records and happened to transition to PARTITIONS_ASSIGNED state, but (again) only when TaskManager was positive after updateNewAndRestoringTasks
-
PARTITIONS_REVOKED
-RebalanceListener
was requested to handle partition revocation -
PARTITIONS_ASSIGNED
-RebalanceListener
was requested to handle partition assignment -
PENDING_SHUTDOWN
-StreamThread
was requested to shutdown or completeShutdown -
DEAD
-StreamThread
is requested to completeShutdown
StreamThread
can be transitioned to another state by executing setState.
Note
|
StreamThread defines a Java enumeration State with the states above ordered by ordinal. When created, they are assigned the state ordinals that could transition to. You can check whether a transition is valid or not using State.isValidTransition .
|
import org.apache.kafka.streams.processor.internals.StreamThread.State._
// CREATED is the 0th state
assert(CREATED.ordinal == 0)
// RUNNING is the next possible state after CREATED
assert(CREATED.isValidTransition(RUNNING))
// DEAD cannot the next possible state after CREATED
assert(CREATED.isValidTransition(DEAD) == false)
Shutting Down Stream Thread — shutdown
Method
void shutdown()
shutdown
prints out the following INFO message to the logs:
Informed to shut down
shutdown
tries to transition the current state to PENDING_SHUTDOWN.
(only when transitioning from CREATED state) shutdown
completeShutdown (with cleanRun
flag enabled).
Note
|
|
Polling Records Once And Processing Them Using Active Stream Tasks — runOnce
Method
void runOnce()
In essence, runOnce
requests the Consumer to poll records, adds the records to active stream tasks and requests the TaskManager to process the records by running stream tasks.
Note
|
runOnce uses the StreamsMetricsThreadImpl to access sensors and record metrics.
|
Internally, runOnce
pollRequests with different poll times as follows:
-
0L
when in PARTITIONS_ASSIGNED state -
pollTime when in PARTITIONS_REVOKED, STARTING or RUNNING state
Note
|
When in the other states (when pollRequests above),
|
runOnce
advanceNowAndComputeLatency.
With records polled, runOnce
requests the StreamsMetricsThreadImpl for the pollTimeSensor and requests it to record the above pollLatency
followed by adding the records polled to active stream tasks.
If in PARTITIONS_ASSIGNED state, runOnce
requests the TaskManager to updateNewAndRestoringTasks and (when all stream tasks are running) changes to the RUNNING state.
runOnce
advanceNowAndComputeLatency.
runOnce
requests the TaskManager to check out if hasActiveRunningTasks and if so…FIXME
In the end, runOnce
maybeUpdateStandbyTasks followed by maybeCommit.
Note
|
runOnce is used exclusively when StreamThread is requested to run the main record processing loop.
|
Polling Records — pollRequests
Internal Method
ConsumerRecords<byte[], byte[]> pollRequests(
Duration pollTime)
pollRequests
simply requests the Kafka Consumer to poll record with the given pollTime
.
In case of an InvalidOffsetException
, pollRequests
resetInvalidOffsets.
In case of a rebalanceException, pollRequests
re-throws it as a TaskMigratedException
or a StreamsException
.
Note
|
pollRequests is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.
|
resetInvalidOffsets
Internal Method
void resetInvalidOffsets(
InvalidOffsetException e)
resetInvalidOffsets
…FIXME
Note
|
resetInvalidOffsets is used exclusively when StreamThread is requested to pollRequests (and an InvalidOffsetException is reported).
|
Attempting to Update Running StandbyTasks — maybeUpdateStandbyTasks
Internal Method
void maybeUpdateStandbyTasks()
maybeUpdateStandbyTasks
…FIXME
maybeUpdateStandbyTasks
does nothing and simply returns when StreamThread
is not in RUNNING state or the TaskManager has no hasStandbyRunningTasks.
Note
|
maybeUpdateStandbyTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.
|
Running Main Record Processing Loop — runLoop
Internal Method
void runLoop()
runLoop
simply requests the Consumer to subscribe to the source topics (with the custom ConsumerRebalanceListener) and keeps polling records and processing them using active stream tasks until the isRunning flag is off.
runLoop
requests the Consumer to subscribe to the source topics (from the InternalTopologyBuilder) with the custom ConsumerRebalanceListener.
runLoop
then keeps polling records and processing them using active stream tasks until the isRunning flag is off.
In case of the assignmentErrorCode set to VERSION_PROBING
, runLoop
prints out the following INFO message to the logs followed by enforcing a rebalance.
Version probing detected. Triggering new rebalance.
In case of TaskMigratedException
, runLoop
prints out the following WARN message to the logs followed by enforcing a rebalance.
Detected task [taskId] that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Will try to rejoin the consumer group. Below is the detailed description of the task:
[migratedTask]
Note
|
runLoop is used exclusively when StreamThread is requested to start.
|
Setting New State — setState
Method
State setState(
State newState)
setState
…FIXME
Note
|
setState is used when…FIXME
|
setRebalanceException
Internal Method
void setRebalanceException(
Throwable rebalanceException)
setRebalanceException
…FIXME
Note
|
setRebalanceException is used when…FIXME
|
Describing Itself (Textual Representation) — toString
Method
String toString() (1)
String toString(
String indent)
-
Uses an empty indent
toString
gives a text representation with "StreamsThread threadId:" and the thread name followed by the text representation of the TaskManager.
FIXME toString in action
Checking If StreamThread Is Running — isRunning
Method
boolean isRunning()
isRunning
is true
when StreamThread
is in one of the following states:
Otherwise, isRunning
is false
.
Note
|
isRunning is simply a pass-through variant of State.isRunning.
|
Note
|
|
adminClientMetrics
Method
Map<MetricName, Metric> adminClientMetrics()
adminClientMetrics
…FIXME
Note
|
adminClientMetrics is used exclusively when KafkaStreams is requested for the metrics.
|
consumerMetrics
Method
Map<MetricName, Metric> consumerMetrics()
consumerMetrics
…FIXME
Note
|
consumerMetrics is used when…FIXME
|
producerMetrics
Method
Map<MetricName, Metric> producerMetrics()
producerMetrics
…FIXME
Note
|
producerMetrics is used when…FIXME
|
getConsumerClientId
Static Method
String getConsumerClientId(
String threadClientId)
getConsumerClientId
…FIXME
Note
|
getConsumerClientId is used when…FIXME
|
getRestoreConsumerClientId
Static Method
String getRestoreConsumerClientId(
String threadClientId)
getRestoreConsumerClientId
…FIXME
Note
|
getRestoreConsumerClientId is used when…FIXME
|
getSharedAdminClientId
Static Method
String getSharedAdminClientId(
String clientId)
getSharedAdminClientId
…FIXME
Note
|
getSharedAdminClientId is used when…FIXME
|
getTaskProducerClientId
Internal Static Method
String getTaskProducerClientId(
String threadClientId,
TaskId taskId)
getTaskProducerClientId
…FIXME
Note
|
getTaskProducerClientId is used when…FIXME
|
getThreadProducerClientId
Internal Static Method
String getThreadProducerClientId(
String threadClientId)
getThreadProducerClientId
…FIXME
Note
|
getThreadProducerClientId is used when…FIXME
|
Adding Records to Active Stream Tasks — addRecordsToTasks
Internal Method
void addRecordsToTasks(
ConsumerRecords<byte[], byte[]> records)
For every partition of the input records addRecordsToTasks
requests the TaskManager for the active stream processor task responsible for the partition.
Note
|
The input records may (and often will) be from different partitions or even topics. Unless you use as many StreamThread instances as there are partitions (among the source topics), addRecordsToTasks will be given records from many partitions.
|
With the StreamTask, addRecordsToTasks
requests the input mixed-partition ConsumerRecords
for the records for the given partition only and then requests the StreamTask
to buffer the new records (for the partition).
Note
|
ConsumerRecords is a container that holds the list of ConsumerRecord per partition for a particular topic. There is one ConsumerRecord list for every topic partition returned by a Consumer.poll(long) operation.
|
Note
|
addRecordsToTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.
|
Enforcing Rebalance — enforceRebalance
Internal Method
void enforceRebalance()
enforceRebalance
…FIXME
Note
|
enforceRebalance is used when…FIXME
|
Committing All Active and Standby Tasks (When Commit Interval Elapsed) — maybeCommit
Method
boolean maybeCommit()
maybeCommit
commits all tasks (owned by this TaskManager) if the commit interval has elapsed (i.e. the commit interval is non-negative and the time since the last commit is long gone).
Internally, maybeCommit
prints out the following TRACE message to the logs:
Committing all active tasks [activeTaskIds] and standby tasks [standbyTaskIds] since [time]ms has elapsed (commit interval is [commitTimeMs]ms)
maybeCommit
requests the TaskManager to commitAll.
Only if there are still running active and standby tasks, maybeCommit
does the following:
-
Requests the StreamsMetricsThreadImpl for the commitTimeSensor and records the commit time (as the latency of committing all the tasks by their number)
-
Requests the TaskManager to maybePurgeCommitedRecords
maybeCommit
prints out the following DEBUG message to the logs:
Committed all active tasks [activeTaskIds] and standby tasks [standbyTaskIds] in [duration]ms
maybeCommit
updates the lastCommitMs internal counter with the input now
time.
maybeCommit
turns the processStandbyRecords flag on.
Note
|
maybeCommit is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.
|
Attempting to Punctuate (Running Stream Tasks) — maybePunctuate
Internal Method
boolean maybePunctuate()
maybePunctuate
requests the TaskManager to punctuate stream tasks.
If the punctuate returned a positive number (greater than 0
), maybePunctuate
advanceNowAndComputeLatency and requests the StreamsMetricsThreadImpl for the punctuateTimeSensor to record the punctuate time.
In the end, maybePunctuate
returns whether the punctuate returned a positive number (true
) or not (false
).
Note
|
maybePunctuate is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks.
|
addToResetList
Internal Method
void addToResetList(
TopicPartition partition,
Set<TopicPartition> partitions,
String logMessage,
String resetPolicy,
Set<String> loggedTopics)
addToResetList
…FIXME
Note
|
addToResetList is used when StreamThread …FIXME
|
Computing Latency — advanceNowAndComputeLatency
Internal Method
long advanceNowAndComputeLatency()
advanceNowAndComputeLatency
updates (advances) the "now" timestamp to be the current timestamp and returns the timestamp difference (latency).
Note
|
advanceNowAndComputeLatency is used when StreamThread is requested to poll records once and process them using active stream tasks, maybePunctuate, maybeCommit, and maybeUpdateStandbyTasks.
|
clearStandbyRecords
Internal Method
void clearStandbyRecords()
clearStandbyRecords
…FIXME
Note
|
clearStandbyRecords is used when StreamThread …FIXME
|
updateThreadMetadata
Internal Method
void updateThreadMetadata(
Map<TaskId, StreamTask> activeTasks,
Map<TaskId, StandbyTask> standbyTasks)
StreamThread updateThreadMetadata(
String adminClientId)
updateThreadMetadata
…FIXME
Note
|
updateThreadMetadata is used when StreamThread …FIXME
|
Internal Properties
Name | Description | ||
---|---|---|---|
|
|||
|
Time of the last commit |
||
|
Number of iterations when the TaskManager is requested to process records by running stream tasks (one record per task) (while Default: Incremented while polling records once and processing them using active stream tasks Decremented by half while polling records once and processing them using active stream tasks |
||
|
Flag to control whether to maybeUpdateStandbyTasks after maybeCommit Default: Turned off ( Turned on ( |
||
|
|||
|
|
||
|
|||
|
Used when Set when Reset (
|
||
|