log4j.logger.org.apache.kafka.streams.processor.internals.ProcessorStateManager=ALL
ProcessorStateManager
ProcessorStateManager
is a concrete StateManager (as a AbstractStateManager) that…FIXME
ProcessorStateManager
is created exclusively when AbstractTask
is created (for StandbyTask and StreamTask tasks).
Note
|
ProcessorStateManager is created with the isStandby flag to indicate whether used by StandbyTask (true ) or StreamTask (false ).
|
ProcessorStateManager
uses the -changelog suffix for the name of storeChangelogTopic.
ProcessorStateManager
uses…FIXME…for the base directory.
ProcessorStateManager
manages changelog partitions that are the Kafka partitions of the registered StateStores.
Tip
|
Enable Add the following line to Refer to Application Logging Using log4j. |
Creating ProcessorStateManager Instance
ProcessorStateManager
takes the following when created:
-
Source Kafka TopicPartitions
-
isStandby
flag (to indicate whether used by StandbyTask, i.e.true
or StreamTask, i.e.false
that is used exclusively when registering a state store) -
Names of the state stores and the names of the corresponding changelog topics (from the ProcessorTopology)
-
eosEnabled
Flag (that corresponds to processing.guarantee configuration property)
ProcessorStateManager
initializes the internal registries and counters.
Registering State Store (and StateRestoreCallback) — register
Method
void register(
StateStore store,
StateRestoreCallback stateRestoreCallback)
Note
|
register is part of StateManager Contract to register a state store (with an associated StateRestoreCallback).
|
register
prints out the following DEBUG message to the logs:
Registering state store [storeName] to its state manager
register
looks up the given StateStore (by its name) in the storeToChangelogTopic internal registry.
If not found, register
adds the StateStore
in the registeredStores.
If the changelog topic is found, register
finds the partition ID by the changelog topic and creates a Kafka TopicPartition
. register
finds the RecordConverter for the StateStore.
If the ProcessorStateManager
is in standby mode, register
prints out the following TRACE message to the logs and registers the topic with the input StateRestoreCallback
and the RecordConverter
in the restoreCallbacks and recordConverters registries, respectively.
Preparing standby replica of persistent state store [storeName] with changelog topic [topic]
If the ProcessorStateManager
is not in standby mode, register
prints out the following TRACE message to the logs, creates a StateRestorer and requests the ChangelogReader to register it.
Restoring state store [storeName] from changelog topic [topic] at checkpoint [offset]
register
creates a StateRestorer and requests the ChangelogReader to register it.
register
then adds the (store) partition to the changelogPartitions internal registry.
In the end, register
registers the StateStore
(in the registeredStores registry).
register
throws an IllegalArgumentException
if the name of the StateStore is .checkpoint.
[logPrefix]Illegal store name: .checkpoint
register
throws an IllegalArgumentException
if the StateStore
is already registered (in the registeredStores registry).
[logPrefix]Store [storeName] has already been registered.
Closing ProcessorStateManager — close
Method
void close(Map<TopicPartition, Long> ackedOffsets)
Note
|
close is part of StateManager Contract to…FIXME.
|
close
…FIXME
Flushing All State Stores — flush
Method
void flush()
Note
|
flush is part of the StateManager Contract to flush all state stores registered with the state manager.
|
flush
…FIXME
Requesting Global StateStore By Name — getGlobalStore
Method
StateStore getGlobalStore(String name)
Note
|
getGlobalStore is part of StateManager Contract to…FIXME.
|
getGlobalStore
…FIXME
Requesting StateStore By Name — getStore
Method
StateStore getStore(String name)
Note
|
getStore is part of StateManager Contract to…FIXME.
|
getStore
…FIXME
reinitializeStateStoresForPartitions
Method
void reinitializeStateStoresForPartitions(
Collection<TopicPartition> partitions,
InternalProcessorContext processorContext)
Note
|
reinitializeStateStoresForPartitions is part of StateManager Contract to…FIXME.
|
reinitializeStateStoresForPartitions
…FIXME
storeChangelogTopic
Static Method
static String storeChangelogTopic(
String applicationId,
String storeName)
storeChangelogTopic
simply returns the following topic name:
[applicationId]-[storeName][STATE_CHANGELOG_TOPIC_SUFFIX]
Note
|
|
Finding Partition ID For Topic (Name) — getPartition
Internal Method
int getPartition(String topic)
getPartition
tries to find the TopicPartition for the input topic
name (in the partitionForTopic internal registry).
If found, getPartition
returns the partition of the TopicPartition.
Otherwise, getPartition
returns the partition of the TaskId.
Note
|
getPartition is used when ProcessorStateManager is requested to register a StateStore, checkpointed and checkpoint.
|
checkpointed
Method
Map<TopicPartition, Long> checkpointed()
Note
|
checkpointed is part of the Checkpointable Contract to…FIXME.
|
checkpointed
…FIXME
updateStandbyStates
Method
List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(
TopicPartition storePartition,
List<ConsumerRecord<byte[], byte[]>> records)
updateStandbyStates
…FIXME
Note
|
updateStandbyStates is used exclusively when StandbyTask is requested to update standby replicas of the state store.
|
Checkpointing Offsets (Writing Offsets to Checkpoint File) — checkpoint
Method
void checkpoint(Map<TopicPartition, Long> checkpointableOffsets)
Note
|
checkpoint is part of the Checkpointable Contract to checkpoint offsets.
|
checkpoint
requests the ChangelogReader for restoredOffsets and adds them to the checkpointableOffsets registry.
For every state store (in the stores internal registry), checkpoint
…FIXME
checkpoint
creates a new OffsetCheckpoint (with the .checkpoint file in the base directory) unless it was done already.
checkpoint
prints out the following TRACE message to the logs:
Writing checkpoint: [checkpointableOffsets]
In the end, checkpoint
requests the OffsetCheckpoint to write the checkpointableOffsets.
registerGlobalStateStores
Method
void registerGlobalStateStores(List<StateStore> stateStores)
registerGlobalStateStores
…FIXME
Note
|
registerGlobalStateStores is used exclusively when StreamTask is created.
|
Internal Properties
Name | Description |
---|---|
|
|
|
Kafka TopicPartitions by changelog topic name ( Initialized and filled in when Used exclusively when |
|
|
|