ProcessorStateManager

ProcessorStateManager is a concrete StateManager (as a AbstractStateManager) that…​FIXME

ProcessorStateManager is created exclusively when AbstractTask is created (for StandbyTask and StreamTask tasks).

Note
ProcessorStateManager is created with the isStandby flag to indicate whether used by StandbyTask (true) or StreamTask (false).

ProcessorStateManager uses the -changelog suffix for the name of storeChangelogTopic.

ProcessorStateManager uses…​FIXME…​for the base directory.

ProcessorStateManager manages changelog partitions that are the Kafka partitions of the registered StateStores.

Tip

Enable ALL logging levels for org.apache.kafka.streams.processor.internals.ProcessorStateManager logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.ProcessorStateManager=ALL

Creating ProcessorStateManager Instance

ProcessorStateManager takes the following when created:

ProcessorStateManager initializes the internal registries and counters.

Registering State Store (and StateRestoreCallback) — register Method

void register(
  StateStore store,
  StateRestoreCallback stateRestoreCallback)
Note
register is part of StateManager Contract to register a state store (with an associated StateRestoreCallback).

register prints out the following DEBUG message to the logs:

Registering state store [storeName] to its state manager

register looks up the given StateStore (by its name) in the storeToChangelogTopic internal registry.

If not found, register adds the StateStore in the registeredStores.

If the changelog topic is found, register finds the partition ID by the changelog topic and creates a Kafka TopicPartition. register finds the RecordConverter for the StateStore.

If the ProcessorStateManager is in standby mode, register prints out the following TRACE message to the logs and registers the topic with the input StateRestoreCallback and the RecordConverter in the restoreCallbacks and recordConverters registries, respectively.

Preparing standby replica of persistent state store [storeName] with changelog topic [topic]

If the ProcessorStateManager is not in standby mode, register prints out the following TRACE message to the logs, creates a StateRestorer and requests the ChangelogReader to register it.

Restoring state store [storeName] from changelog topic [topic] at checkpoint [offset]

register creates a StateRestorer and requests the ChangelogReader to register it.

register then adds the (store) partition to the changelogPartitions internal registry.

In the end, register registers the StateStore (in the registeredStores registry).

register throws an IllegalArgumentException if the name of the StateStore is .checkpoint.

[logPrefix]Illegal store name: .checkpoint

register throws an IllegalArgumentException if the StateStore is already registered (in the registeredStores registry).

[logPrefix]Store [storeName] has already been registered.

Closing ProcessorStateManager — close Method

void close(Map<TopicPartition, Long> ackedOffsets)
Note
close is part of StateManager Contract to…​FIXME.

close…​FIXME

Flushing All State Stores — flush Method

void flush()
Note
flush is part of the StateManager Contract to flush all state stores registered with the state manager.

flush…​FIXME

Requesting Global StateStore By Name — getGlobalStore Method

StateStore getGlobalStore(String name)
Note
getGlobalStore is part of StateManager Contract to…​FIXME.

getGlobalStore…​FIXME

Requesting StateStore By Name — getStore Method

StateStore getStore(String name)
Note
getStore is part of StateManager Contract to…​FIXME.

getStore…​FIXME

reinitializeStateStoresForPartitions Method

void reinitializeStateStoresForPartitions(
  Collection<TopicPartition> partitions,
  InternalProcessorContext processorContext)
Note
reinitializeStateStoresForPartitions is part of StateManager Contract to…​FIXME.

reinitializeStateStoresForPartitions…​FIXME

storeChangelogTopic Static Method

static String storeChangelogTopic(
  String applicationId,
  String storeName)

storeChangelogTopic simply returns the following topic name:

[applicationId]-[storeName][STATE_CHANGELOG_TOPIC_SUFFIX]
Note

storeChangelogTopic is used when:

Finding Partition ID For Topic (Name) — getPartition Internal Method

int getPartition(String topic)

getPartition tries to find the TopicPartition for the input topic name (in the partitionForTopic internal registry).

If found, getPartition returns the partition of the TopicPartition.

Otherwise, getPartition returns the partition of the TaskId.

Note
getPartition is used when ProcessorStateManager is requested to register a StateStore, checkpointed and checkpoint.

checkpointed Method

Map<TopicPartition, Long> checkpointed()
Note
checkpointed is part of the Checkpointable Contract to…​FIXME.

checkpointed…​FIXME

updateStandbyStates Method

List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(
  TopicPartition storePartition,
  List<ConsumerRecord<byte[], byte[]>> records)

updateStandbyStates…​FIXME

Note
updateStandbyStates is used exclusively when StandbyTask is requested to update standby replicas of the state store.

Checkpointing Offsets (Writing Offsets to Checkpoint File) — checkpoint Method

void checkpoint(Map<TopicPartition, Long> checkpointableOffsets)
Note
checkpoint is part of the Checkpointable Contract to checkpoint offsets.

checkpoint requests the ChangelogReader for restoredOffsets and adds them to the checkpointableOffsets registry.

For every state store (in the stores internal registry), checkpoint…​FIXME

checkpoint creates a new OffsetCheckpoint (with the .checkpoint file in the base directory) unless it was done already.

checkpoint prints out the following TRACE message to the logs:

Writing checkpoint: [checkpointableOffsets]

In the end, checkpoint requests the OffsetCheckpoint to write the checkpointableOffsets.

registerGlobalStateStores Method

void registerGlobalStateStores(List<StateStore> stateStores)

registerGlobalStateStores…​FIXME

Note
registerGlobalStateStores is used exclusively when StreamTask is created.

Internal Properties

Name Description

restoreCallbacks

StateRestoreCallback by changelog topic name (Map<String, StateRestoreCallback>)

Used when…​FIXME

partitionForTopic

Kafka TopicPartitions by changelog topic name (Map<String, TopicPartition>)

Initialized and filled in when ProcessorStateManager is created

Used exclusively when ProcessorStateManager is requested to find the partition ID for a topic (name).

recordConverters

registeredStores

results matching ""

    No results matching ""