log4j.logger.org.apache.kafka.streams.processor.internals.ProcessorStateManager=ALL
ProcessorStateManager
ProcessorStateManager is a concrete StateManager (as a AbstractStateManager) that…FIXME
ProcessorStateManager is created exclusively when AbstractTask is created (for StandbyTask and StreamTask tasks).
|
Note
|
ProcessorStateManager is created with the isStandby flag to indicate whether used by StandbyTask (true) or StreamTask (false).
|
ProcessorStateManager uses the -changelog suffix for the name of storeChangelogTopic.
ProcessorStateManager uses…FIXME…for the base directory.
ProcessorStateManager manages changelog partitions that are the Kafka partitions of the registered StateStores.
|
Tip
|
Enable Add the following line to Refer to Application Logging Using log4j. |
Creating ProcessorStateManager Instance
ProcessorStateManager takes the following when created:
-
Source Kafka TopicPartitions
-
isStandbyflag (to indicate whether used by StandbyTask, i.e.trueor StreamTask, i.e.falsethat is used exclusively when registering a state store) -
Names of the state stores and the names of the corresponding changelog topics (from the ProcessorTopology)
-
eosEnabledFlag (that corresponds to processing.guarantee configuration property)
ProcessorStateManager initializes the internal registries and counters.
Registering State Store (and StateRestoreCallback) — register Method
void register(
StateStore store,
StateRestoreCallback stateRestoreCallback)
|
Note
|
register is part of StateManager Contract to register a state store (with an associated StateRestoreCallback).
|
register prints out the following DEBUG message to the logs:
Registering state store [storeName] to its state manager
register looks up the given StateStore (by its name) in the storeToChangelogTopic internal registry.
If not found, register adds the StateStore in the registeredStores.
If the changelog topic is found, register finds the partition ID by the changelog topic and creates a Kafka TopicPartition. register finds the RecordConverter for the StateStore.
If the ProcessorStateManager is in standby mode, register prints out the following TRACE message to the logs and registers the topic with the input StateRestoreCallback and the RecordConverter in the restoreCallbacks and recordConverters registries, respectively.
Preparing standby replica of persistent state store [storeName] with changelog topic [topic]
If the ProcessorStateManager is not in standby mode, register prints out the following TRACE message to the logs, creates a StateRestorer and requests the ChangelogReader to register it.
Restoring state store [storeName] from changelog topic [topic] at checkpoint [offset]
register creates a StateRestorer and requests the ChangelogReader to register it.
register then adds the (store) partition to the changelogPartitions internal registry.
In the end, register registers the StateStore (in the registeredStores registry).
register throws an IllegalArgumentException if the name of the StateStore is .checkpoint.
[logPrefix]Illegal store name: .checkpoint
register throws an IllegalArgumentException if the StateStore is already registered (in the registeredStores registry).
[logPrefix]Store [storeName] has already been registered.
Closing ProcessorStateManager — close Method
void close(Map<TopicPartition, Long> ackedOffsets)
|
Note
|
close is part of StateManager Contract to…FIXME.
|
close…FIXME
Flushing All State Stores — flush Method
void flush()
|
Note
|
flush is part of the StateManager Contract to flush all state stores registered with the state manager.
|
flush…FIXME
Requesting Global StateStore By Name — getGlobalStore Method
StateStore getGlobalStore(String name)
|
Note
|
getGlobalStore is part of StateManager Contract to…FIXME.
|
getGlobalStore…FIXME
Requesting StateStore By Name — getStore Method
StateStore getStore(String name)
|
Note
|
getStore is part of StateManager Contract to…FIXME.
|
getStore…FIXME
reinitializeStateStoresForPartitions Method
void reinitializeStateStoresForPartitions(
Collection<TopicPartition> partitions,
InternalProcessorContext processorContext)
|
Note
|
reinitializeStateStoresForPartitions is part of StateManager Contract to…FIXME.
|
reinitializeStateStoresForPartitions…FIXME
storeChangelogTopic Static Method
static String storeChangelogTopic(
String applicationId,
String storeName)
storeChangelogTopic simply returns the following topic name:
[applicationId]-[storeName][STATE_CHANGELOG_TOPIC_SUFFIX]
|
Note
|
|
Finding Partition ID For Topic (Name) — getPartition Internal Method
int getPartition(String topic)
getPartition tries to find the TopicPartition for the input topic name (in the partitionForTopic internal registry).
If found, getPartition returns the partition of the TopicPartition.
Otherwise, getPartition returns the partition of the TaskId.
|
Note
|
getPartition is used when ProcessorStateManager is requested to register a StateStore, checkpointed and checkpoint.
|
checkpointed Method
Map<TopicPartition, Long> checkpointed()
|
Note
|
checkpointed is part of the Checkpointable Contract to…FIXME.
|
checkpointed…FIXME
updateStandbyStates Method
List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(
TopicPartition storePartition,
List<ConsumerRecord<byte[], byte[]>> records)
updateStandbyStates…FIXME
|
Note
|
updateStandbyStates is used exclusively when StandbyTask is requested to update standby replicas of the state store.
|
Checkpointing Offsets (Writing Offsets to Checkpoint File) — checkpoint Method
void checkpoint(Map<TopicPartition, Long> checkpointableOffsets)
|
Note
|
checkpoint is part of the Checkpointable Contract to checkpoint offsets.
|
checkpoint requests the ChangelogReader for restoredOffsets and adds them to the checkpointableOffsets registry.
For every state store (in the stores internal registry), checkpoint…FIXME
checkpoint creates a new OffsetCheckpoint (with the .checkpoint file in the base directory) unless it was done already.
checkpoint prints out the following TRACE message to the logs:
Writing checkpoint: [checkpointableOffsets]
In the end, checkpoint requests the OffsetCheckpoint to write the checkpointableOffsets.
registerGlobalStateStores Method
void registerGlobalStateStores(List<StateStore> stateStores)
registerGlobalStateStores…FIXME
|
Note
|
registerGlobalStateStores is used exclusively when StreamTask is created.
|
Internal Properties
| Name | Description |
|---|---|
|
|
|
Kafka TopicPartitions by changelog topic name ( Initialized and filled in when Used exclusively when |
|
|
|