is the default ChangelogReader.
is created for a StreamThread for the only purpose of creating the TaskCreator, the StandbyTaskCreator and the TaskManager.

is used for the following:
Tasks to create a ProcessorStateManager
TaskManager when suspending all (active and standby) tasks and state (at the beginning of consumer rebalance) and updateNewAndRestoringTasks (at the end of consumer rebalance)
Enable Add the following line to
Refer to Application Logging Using log4j. |
Creating StoreChangelogReader Instance
takes the following to be created:
initializes the internal properties.
Poll Duration and StreamsConfig.POLL_MS_CONFIG
When created, StoreChangelogReader
is given the poll duration that is configured using StreamsConfig.POLL_MS_CONFIG configuration property.
The poll duration is used exclusively for the restore Consumer to fetch data for changelog partitions (using Consumer.poll) when restoring active StreamTasks (from the changelog partitions).
User-Defined StateRestoreListener
When created, StoreChangelogReader
is given a user-defined StateRestoreListener.
uses the StateRestoreListener exclusively when registering a StateRestorer for a changelog partition so that the StateRestorer to be registered is associated with the listener.
Registering StateRestorer For Changelog Partition — register
void register(StateRestorer restorer)
register is part of ChangelogReader Contract to register a StateRestorer (for a changelog partition).
requests the given StateRestorer
for the partition.
If the stateRestorers internal registry does not have the partition registered, register
requests the StateRestorer
to setUserRestoreListener with the userStateRestoreListener and adds it to the stateRestorers registry. register
prints out the following TRACE message to the logs:
Added restorer for changelog [partition]
In the end, register
adds the partition to the needsInitializing internal registry.
Map<TopicPartition, Long> restoredOffsets()
restoredOffsets is part of ChangelogReader Contract to restore offsets.
returns pairs of TopicPartition
and restoredOffset (from the associated StateRestorer that is persistent for the state store that is persistent).
Internally, for every pair of TopicPartition
and StateRestorer (in the stateRestorers internal registry), restoredOffsets
requests the StateRestorer
to restoredOffset when the restorer is persistent (i.e. when the associated state store is persistent).
Restoring Active StreamTasks (From Changelog Partitions) — restore
Collection<TopicPartition> restore(
RestoringTasks active)
restore is part of the ChangelogReader Contract to restore logging-enabled state stores using the RestoringTasks.
initializes (with the given active RestoringTasks) when the needsInitializing internal registry has at least one changelog partition (that was added when registering a StateRestorer for a changelog partition).
simply finishes with the completely-restored changelog partitions when there is no changelog partitions that need restoring. restore
also requests the restore Consumer to unsubscribe from changelog topics (KafkaConsumer.unsubscribe
With at least one changelog partition that needs restoring, restore
requests the restore Consumer to fetch data for the changelog partitions (KafkaConsumer.poll
) with the poll duration.
removes all changelog partitions that are completed from the needsRestoring internal registry.
In the end, restore
requests the restore Consumer to unsubscribe from changelog topics (KafkaConsumer.unsubscribe
) when the needsRestoring internal registry has no changelog partition and simply finishes with the completely-restored changelog partitions.
In case of InvalidOffsetException
, restore
Internal Method
void initialize(RestoringTasks active)
initialize is used exclusively when StateRestorer is requested to restore (and there are needsInitializing changelog partitions).
Restoring State Stores From Changelog Topics — startRestoration
Internal Method
void startRestoration(
Set<TopicPartition> initialized,
RestoringTasks active)
prints out the following DEBUG message to the logs:
Start restoring state stores from changelog topics [initialized]
requests the restore Consumer for partition assignment, adds the initialized
partitions and finally requests the restore Consumer to use the partitions only (aka manual partition assignment).
For every initialized
partition, startRestoration
uses the stateRestorers internal registry to find the associated StateRestorer
that is then requested for the checkpoint offset.
There are two possible cases of the checkpoint offsets.
When the checkpoint offset is known, startRestoration
prints out the following TRACE message to the logs:
Found checkpoint [checkpoint] from changelog [partition] for store [storeName].
requests the restore Consumer to seek (the fetch offsets) for the partition to the checkpoint.
looks up the partition in the endOffsets internal registry and prints out the following DEBUG message to the logs:
Restoring partition [partition] from offset [startingOffset] to endOffset [endOffset]
requests the StateRestorer
to set the starting offset (with the offset of the next record to be fetched for the partition using the restore Consumer).
requests the StateRestorer
to restoreStarted.
When the checkpoint offset is unknown, startRestoration
prints out the following TRACE message to the logs:
Did not find checkpoint from changelog [partition] for store [storeName], rewinding to beginning.
requests the restore Consumer to seek to the beginning (KafkaConsumer.seekToBeginning
) for the partition.
adds the partition to needsPositionUpdate
local registry.
For every StateRestorer
in the startRestoration
local registry (for which the checkpoint offset was unknown), startRestoration
requests the StateRestorer
for the partition.
requests the given active RestoringTasks for the restoring StreamTask of the changelog partition.
There are two possible cases of the restoring StreamTask.
With Exactly-Once Support enabled, startRestoration
prints out the following INFO message to the logs:
No checkpoint found for task [id] state store [storeName] changelog [partition] with EOS turned on. Reinitializing the task and restore its state from the beginning.
removes the partition from the needsInitializing internal registry (and the initialized
local registry).
requests the StateRestorer
to set the checkpoint offset (with the offset of the next record to be fetched for the partition using the restore Consumer).
requests the StreamTask
to reinitializeStateStoresForPartitions with the partition.
With Exactly-Once Support disabled, startRestoration
prints out the following INFO message to the logs:
Restoring task [id]'s state store [storeName] from beginning of the changelog [partition]
requests the restore Consumer for the offset of the next record to be fetched (position) for the partition to the StateRestorer
looks up the partition of the StateRestorer
in the endOffsets internal registry and prints out the following DEBUG message to the logs:
Restoring partition [partition] from offset [position] to endOffset [endOffset]
requests the StateRestorer
to set the starting offset to the position (of the restore Consumer).
requests the StateRestorer
to restoreStarted.
In the end, startRestoration
adds all initialized
partitions to the needsRestoring internal registry.
startRestoration is used exclusively when StoreChangelogReader is requested to initialize (when requested to restore).
Internal Method
long processNext(
List<ConsumerRecord<byte[], byte[]>> records,
StateRestorer restorer,
Long endOffset)
processNext is used exclusively when StoreChangelogReader is requested to restore active StreamTasks (from changelog partitions).
Internal Properties
Name | Description |
Changelog partitions (of StateRestorers) that need initializing (
Used in restore |
Changelog partitions (of StateRestorers) that need restoring (
Used in restore |
StateRestorers per partition of changelog topic of a state store ( Used in restore, initialize, and restoredOffsets |