void pollAndUpdate()
StateConsumer
StateConsumer is…FIXME
StateConsumer is created exclusively when GlobalStreamThread is requested to initialize.
StateConsumer uses global Kafka consumer that is the restore Kafka consumer from KafkaClientSupplier (which by default is DefaultKafkaClientSupplier).
StateConsumer uses Kafka Consumer’s manual topic assignment feature when initializing itself, i.e. uses Consumer.assign and Consumer.seek methods to assign topics and seek to offsets, respectively.
StateConsumer uses poll.ms configuration property as the polling interval while polling the topics assigned for records.
StateConsumer uses commit.interval.ms configuration property as the flush interval while polling the topics assigned for records.
Polling Topics Assigned for Records and Flushing State — pollAndUpdate Method
pollAndUpdate requests the global Kafka Consumer to fetch records from the topics and partitions assigned (upon initialization).
pollAndUpdate polls for records for the polling interval that is configured by poll.ms configuration property.
For every record received, pollAndUpdate requests GlobalStateMaintainer to update.
If the time between the last flush is longer than the flush interval pollAndUpdate requests GlobalStateMaintainer to flushState and records the current time in last flush internal registry.
In case of InvalidOffsetException pollAndUpdate prints out the following ERROR message and reports a StreamsException.
Updating global state failed. You can restart KafkaStreams to recover from this error.
|
Note
|
pollAndUpdate is used exclusively when GlobalStreamThread is up and running.
|
Initializing StateConsumer (Initializing GlobalStateMaintainer and Setting Offsets) — initialize Method
void initialize()
initialize requests GlobalStateMaintainer to initialize (and give topic partitions and offsets).
initialize then assigns the topic partitions to the global Kafka Consumer and sets the fetch offsets that the consumer will use (on the next poll).
In the end, initialize records the current time in lastFlush.
|
Note
|
initialize is used exclusively when GlobalStreamThread is requested to initialize.
|
Creating StateConsumer Instance
StateConsumer takes the following when created:
-
(Global) Kafka Consumer (
Consumer<byte[], byte[]>)