ProcessorStateManager is a concrete StateManager (as a AbstractStateManager) that…​FIXME

ProcessorStateManager is created exclusively when AbstractTask is created (for StandbyTask and StreamTask tasks).

ProcessorStateManager is created with the isStandby flag to indicate whether used by StandbyTask (true) or StreamTask (false).

ProcessorStateManager uses the -changelog suffix for the name of storeChangelogTopic.

ProcessorStateManager uses…​FIXME…​for the base directory.

ProcessorStateManager manages changelog partitions that are the Kafka partitions of the registered StateStores.


Enable ALL logging levels for org.apache.kafka.streams.processor.internals.ProcessorStateManager logger to see what happens inside.

Add the following line to

Creating ProcessorStateManager Instance

ProcessorStateManager takes the following when created:

ProcessorStateManager initializes the internal registries and counters.

Registering State Store (and StateRestoreCallback) — register Method

void register(
  StateStore store,
  StateRestoreCallback stateRestoreCallback)
register is part of StateManager Contract to register a state store (with an associated StateRestoreCallback).

register prints out the following DEBUG message to the logs:

Registering state store [storeName] to its state manager

register looks up the given StateStore (by its name) in the storeToChangelogTopic internal registry.

If not found, register adds the StateStore in the registeredStores.

If the changelog topic is found, register finds the partition ID by the changelog topic and creates a Kafka TopicPartition. register finds the RecordConverter for the StateStore.

If the ProcessorStateManager is in standby mode, register prints out the following TRACE message to the logs and registers the topic with the input StateRestoreCallback and the RecordConverter in the restoreCallbacks and recordConverters registries, respectively.

Preparing standby replica of persistent state store [storeName] with changelog topic [topic]

If the ProcessorStateManager is not in standby mode, register prints out the following TRACE message to the logs, creates a StateRestorer and requests the ChangelogReader to register it.

Restoring state store [storeName] from changelog topic [topic] at checkpoint [offset]

register creates a StateRestorer and requests the ChangelogReader to register it.

register then adds the (store) partition to the changelogPartitions internal registry.

In the end, register registers the StateStore (in the registeredStores registry).

register throws an IllegalArgumentException if the name of the StateStore is .checkpoint.

[logPrefix]Illegal store name: .checkpoint

register throws an IllegalArgumentException if the StateStore is already registered (in the registeredStores registry).

[logPrefix]Store [storeName] has already been registered.

Closing ProcessorStateManager — close Method

void close(Map<TopicPartition, Long> ackedOffsets)
close is part of StateManager Contract to…​FIXME.


Flushing All State Stores — flush Method

void flush()
flush is part of the StateManager Contract to flush all state stores registered with the state manager.


Requesting Global StateStore By Name — getGlobalStore Method

StateStore getGlobalStore(String name)
getGlobalStore is part of StateManager Contract to…​FIXME.


Requesting StateStore By Name — getStore Method

StateStore getStore(String name)
getStore is part of StateManager Contract to…​FIXME.


reinitializeStateStoresForPartitions Method

void reinitializeStateStoresForPartitions(
  Collection<TopicPartition> partitions,
  InternalProcessorContext processorContext)
reinitializeStateStoresForPartitions is part of StateManager Contract to…​FIXME.


storeChangelogTopic Static Method

static String storeChangelogTopic(
  String applicationId,
  String storeName)

storeChangelogTopic simply returns the following topic name:


storeChangelogTopic is used when:

Finding Partition ID For Topic (Name) — getPartition Internal Method

int getPartition(String topic)

getPartition tries to find the TopicPartition for the input topic name (in the partitionForTopic internal registry).

If found, getPartition returns the partition of the TopicPartition.

Otherwise, getPartition returns the partition of the TaskId.

getPartition is used when ProcessorStateManager is requested to register a StateStore, checkpointed and checkpoint.

checkpointed Method

Map<TopicPartition, Long> checkpointed()
checkpointed is part of the Checkpointable Contract to…​FIXME.


updateStandbyStates Method

List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(
  TopicPartition storePartition,
  List<ConsumerRecord<byte[], byte[]>> records)


updateStandbyStates is used exclusively when StandbyTask is requested to update standby replicas of the state store.

Checkpointing Offsets (Writing Offsets to Checkpoint File) — checkpoint Method

void checkpoint(Map<TopicPartition, Long> checkpointableOffsets)
checkpoint is part of the Checkpointable Contract to checkpoint offsets.

checkpoint requests the ChangelogReader for restoredOffsets and adds them to the checkpointableOffsets registry.

For every state store (in the stores internal registry), checkpoint…​FIXME

checkpoint creates a new OffsetCheckpoint (with the .checkpoint file in the base directory) unless it was done already.

checkpoint prints out the following TRACE message to the logs:

Writing checkpoint: [checkpointableOffsets]

In the end, checkpoint requests the OffsetCheckpoint to write the checkpointableOffsets.

registerGlobalStateStores Method

void registerGlobalStateStores(List<StateStore> stateStores)


registerGlobalStateStores is used exclusively when StreamTask is created.

Internal Properties

Name Description


StateRestoreCallback by changelog topic name (Map<String, StateRestoreCallback>)

Used when…​FIXME


Kafka TopicPartitions by changelog topic name (Map<String, TopicPartition>)

Initialized and filled in when ProcessorStateManager is created

Used exclusively when ProcessorStateManager is requested to find the partition ID for a topic (name).



results matching ""

    No results matching ""