void pollAndUpdate()
StateConsumer
StateConsumer
is…FIXME
StateConsumer
is created exclusively when GlobalStreamThread
is requested to initialize.
StateConsumer
uses global Kafka consumer that is the restore Kafka consumer from KafkaClientSupplier
(which by default is DefaultKafkaClientSupplier).
StateConsumer
uses Kafka Consumer’s manual topic assignment feature when initializing itself, i.e. uses Consumer.assign and Consumer.seek methods to assign topics and seek to offsets, respectively.
StateConsumer
uses poll.ms configuration property as the polling interval while polling the topics assigned for records.
StateConsumer
uses commit.interval.ms configuration property as the flush interval while polling the topics assigned for records.
Polling Topics Assigned for Records and Flushing State — pollAndUpdate
Method
pollAndUpdate
requests the global Kafka Consumer to fetch records from the topics and partitions assigned (upon initialization).
pollAndUpdate
polls for records for the polling interval that is configured by poll.ms configuration property.
For every record received, pollAndUpdate
requests GlobalStateMaintainer to update.
If the time between the last flush is longer than the flush interval pollAndUpdate
requests GlobalStateMaintainer to flushState and records the current time in last flush internal registry.
In case of InvalidOffsetException
pollAndUpdate
prints out the following ERROR message and reports a StreamsException
.
Updating global state failed. You can restart KafkaStreams to recover from this error.
Note
|
pollAndUpdate is used exclusively when GlobalStreamThread is up and running.
|
Initializing StateConsumer (Initializing GlobalStateMaintainer and Setting Offsets) — initialize
Method
void initialize()
initialize
requests GlobalStateMaintainer to initialize (and give topic partitions and offsets).
initialize
then assigns the topic partitions to the global Kafka Consumer and sets the fetch offsets that the consumer will use (on the next poll
).
In the end, initialize
records the current time in lastFlush.
Note
|
initialize is used exclusively when GlobalStreamThread is requested to initialize.
|
Creating StateConsumer Instance
StateConsumer
takes the following when created:
-
(Global) Kafka Consumer (
Consumer<byte[], byte[]>
)