StateConsumer

StateConsumer is…​FIXME

StateConsumer is created exclusively when GlobalStreamThread is requested to initialize.

StateConsumer uses global Kafka consumer that is the restore Kafka consumer from KafkaClientSupplier (which by default is DefaultKafkaClientSupplier).

StateConsumer uses Kafka Consumer’s manual topic assignment feature when initializing itself, i.e. uses Consumer.assign and Consumer.seek methods to assign topics and seek to offsets, respectively.

StateConsumer uses poll.ms configuration property as the polling interval while polling the topics assigned for records.

StateConsumer uses commit.interval.ms configuration property as the flush interval while polling the topics assigned for records.

Polling Topics Assigned for Records and Flushing State — pollAndUpdate Method

void pollAndUpdate()

pollAndUpdate requests the global Kafka Consumer to fetch records from the topics and partitions assigned (upon initialization).

pollAndUpdate polls for records for the polling interval that is configured by poll.ms configuration property.

For every record received, pollAndUpdate requests GlobalStateMaintainer to update.

If the time between the last flush is longer than the flush interval pollAndUpdate requests GlobalStateMaintainer to flushState and records the current time in last flush internal registry.

In case of InvalidOffsetException pollAndUpdate prints out the following ERROR message and reports a StreamsException.

Updating global state failed. You can restart KafkaStreams to recover from this error.
Note
pollAndUpdate is used exclusively when GlobalStreamThread is up and running.

Initializing StateConsumer (Initializing GlobalStateMaintainer and Setting Offsets) — initialize Method

void initialize()

initialize requests GlobalStateMaintainer to initialize (and give topic partitions and offsets).

initialize then assigns the topic partitions to the global Kafka Consumer and sets the fetch offsets that the consumer will use (on the next poll).

In the end, initialize records the current time in lastFlush.

Note
initialize is used exclusively when GlobalStreamThread is requested to initialize.

Creating StateConsumer Instance

StateConsumer takes the following when created:

  • LogContext

  • (Global) Kafka Consumer (Consumer<byte[], byte[]>)

  • GlobalStateMaintainer

  • Time

  • Polling interval (in milliseconds)

  • Flush interval (in milliseconds)

results matching ""

    No results matching ""