StoreChangelogReader

StoreChangelogReader is the default ChangelogReader.

StoreChangelogReader is created for a StreamThread for the only purpose of creating the TaskCreator, the StandbyTaskCreator and the TaskManager.

kafka streams StoreChangelogReader.png
Figure 1. StoreChangelogReader

StoreChangelogReader is used for the following:

Tip

Enable ALL logging level for org.apache.kafka.streams.processor.internals.StoreChangelogReader logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.StoreChangelogReader=ALL

Creating StoreChangelogReader Instance

StoreChangelogReader takes the following to be created:

StoreChangelogReader initializes the internal properties.

Poll Duration and StreamsConfig.POLL_MS_CONFIG

When created, StoreChangelogReader is given the poll duration that is configured using StreamsConfig.POLL_MS_CONFIG configuration property.

The poll duration is used exclusively for the restore Consumer to fetch data for changelog partitions (using Consumer.poll) when restoring active StreamTasks (from the changelog partitions).

User-Defined StateRestoreListener

When created, StoreChangelogReader is given a user-defined StateRestoreListener.

StoreChangelogReader uses the StateRestoreListener exclusively when registering a StateRestorer for a changelog partition so that the StateRestorer to be registered is associated with the listener.

Registering StateRestorer For Changelog Partition — register Method

void register(StateRestorer restorer)
Note
register is part of ChangelogReader Contract to register a StateRestorer (for a changelog partition).

register requests the given StateRestorer for the partition.

If the stateRestorers internal registry does not have the partition registered, register requests the StateRestorer to setUserRestoreListener with the userStateRestoreListener and adds it to the stateRestorers registry. register prints out the following TRACE message to the logs:

Added restorer for changelog [partition]

In the end, register adds the partition to the needsInitializing internal registry.

reset Method

void reset()
Note
reset is part of ChangelogReader Contract to…​FIXME.

reset…​FIXME

restoredOffsets Method

Map<TopicPartition, Long> restoredOffsets()
Note
restoredOffsets is part of ChangelogReader Contract to restore offsets.

restoredOffsets returns pairs of TopicPartition and restoredOffset (from the associated StateRestorer that is persistent for the state store that is persistent).

Internally, for every pair of TopicPartition and StateRestorer (in the stateRestorers internal registry), restoredOffsets requests the StateRestorer to restoredOffset when the restorer is persistent (i.e. when the associated state store is persistent).

Restoring Active StreamTasks (From Changelog Partitions) — restore Method

Collection<TopicPartition> restore(
  RestoringTasks active)
Note
restore is part of the ChangelogReader Contract to restore logging-enabled state stores using the RestoringTasks.

restore initializes (with the given active RestoringTasks) when the needsInitializing internal registry has at least one changelog partition (that was added when registering a StateRestorer for a changelog partition).

restore simply finishes with the completely-restored changelog partitions when there is no changelog partitions that need restoring. restore also requests the restore Consumer to unsubscribe from changelog topics (KafkaConsumer.unsubscribe).

With at least one changelog partition that needs restoring, restore requests the restore Consumer to fetch data for the changelog partitions (KafkaConsumer.poll) with the poll duration.

restore…​FIXME

restore removes all changelog partitions that are completed from the needsRestoring internal registry.

In the end, restore requests the restore Consumer to unsubscribe from changelog topics (KafkaConsumer.unsubscribe) when the needsRestoring internal registry has no changelog partition and simply finishes with the completely-restored changelog partitions.

In case of InvalidOffsetException, restore…​FIXME

initialize Internal Method

void initialize(RestoringTasks active)

initialize…​FIXME

Note
initialize is used exclusively when StateRestorer is requested to restore (and there are needsInitializing changelog partitions).

Restoring State Stores From Changelog Topics — startRestoration Internal Method

void startRestoration(
  Set<TopicPartition> initialized,
  RestoringTasks active)

startRestoration prints out the following DEBUG message to the logs:

Start restoring state stores from changelog topics [initialized]

startRestoration requests the restore Consumer for partition assignment, adds the initialized partitions and finally requests the restore Consumer to use the partitions only (aka manual partition assignment).

For every initialized partition, startRestoration uses the stateRestorers internal registry to find the associated StateRestorer that is then requested for the checkpoint offset.

There are two possible cases of the checkpoint offsets.

When the checkpoint offset is known, startRestoration prints out the following TRACE message to the logs:

Found checkpoint [checkpoint] from changelog [partition] for store [storeName].

startRestoration requests the restore Consumer to seek (the fetch offsets) for the partition to the checkpoint.

startRestoration looks up the partition in the endOffsets internal registry and prints out the following DEBUG message to the logs:

Restoring partition [partition] from offset [startingOffset] to endOffset [endOffset]

startRestoration requests the StateRestorer to set the starting offset (with the offset of the next record to be fetched for the partition using the restore Consumer).

startRestoration requests the StateRestorer to restoreStarted.

When the checkpoint offset is unknown, startRestoration prints out the following TRACE message to the logs:

Did not find checkpoint from changelog [partition] for store [storeName], rewinding to beginning.

startRestoration requests the restore Consumer to seek to the beginning (KafkaConsumer.seekToBeginning) for the partition.

startRestoration adds the partition to needsPositionUpdate local registry.

For every StateRestorer in the startRestoration local registry (for which the checkpoint offset was unknown), startRestoration requests the StateRestorer for the partition.

startRestoration requests the given active RestoringTasks for the restoring StreamTask of the changelog partition.

There are two possible cases of the restoring StreamTask.

With Exactly-Once Support enabled, startRestoration prints out the following INFO message to the logs:

No checkpoint found for task [id] state store [storeName] changelog [partition] with EOS turned on. Reinitializing the task and restore its state from the beginning.

startRestoration removes the partition from the needsInitializing internal registry (and the initialized local registry).

startRestoration requests the StateRestorer to set the checkpoint offset (with the offset of the next record to be fetched for the partition using the restore Consumer).

startRestoration requests the StreamTask to reinitializeStateStoresForPartitions with the partition.

With Exactly-Once Support disabled, startRestoration prints out the following INFO message to the logs:

Restoring task [id]'s state store [storeName] from beginning of the changelog [partition]

startRestoration requests the restore Consumer for the offset of the next record to be fetched (position) for the partition to the StateRestorer.

startRestoration looks up the partition of the StateRestorer in the endOffsets internal registry and prints out the following DEBUG message to the logs:

Restoring partition [partition] from offset [position] to endOffset [endOffset]

startRestoration requests the StateRestorer to set the starting offset to the position (of the restore Consumer).

startRestoration requests the StateRestorer to restoreStarted.

In the end, startRestoration adds all initialized partitions to the needsRestoring internal registry.

Note
startRestoration is used exclusively when StoreChangelogReader is requested to initialize (when requested to restore).

processNext Internal Method

long processNext(
  List<ConsumerRecord<byte[], byte[]>> records,
  StateRestorer restorer,
  Long endOffset)

processNext…​FIXME

Note
processNext is used exclusively when StoreChangelogReader is requested to restore active StreamTasks (from changelog partitions).

Internal Properties

Name Description

completedRestorers

Completely-restored changelog partitions (Set<TopicPartition>)

endOffsets

needsInitializing

Changelog partitions (of StateRestorers) that need initializing (Set<TopicPartition>)

Used in restore

needsRestoring

Changelog partitions (of StateRestorers) that need restoring (Set<TopicPartition>)

Used in restore

partitionInfo

stateRestorers

StateRestorers per partition of changelog topic of a state store (Map<TopicPartition, StateRestorer>)

  • New StateRestorer added in register

  • All StateRestorers removed in reset

results matching ""

    No results matching ""