Starting new streaming query.
Offsets and Metadata Checkpointing
A streaming query can be started from scratch or from checkpoint (that gives fault-tolerance as the state is preserved even when a failure happens).
Stream execution engines use checkpoint location to resume stream processing and get start offsets to start query processing from.
StreamExecution
resumes (populates the start offsets) from the latest checkpointed offsets from the Write-Ahead Log (WAL) of Offsets that may have already been processed (and, if so, committed to the Offset Commit Log).
-
StreamProgress and StreamExecutions (committed and available offsets)
Micro-Batch Stream Processing
In Micro-Batch Stream Processing, the available offsets registry is populated with the latest offsets from the Write-Ahead Log (WAL) when MicroBatchExecution
stream processing engine is requested to populate start offsets from checkpoint (if available) when MicroBatchExecution
is requested to run an activated streaming query (before the first "zero" micro-batch).
The available offsets are then added to the committed offsets when the latest batch ID available (as described above) is exactly the latest batch ID committed to the Offset Commit Log when MicroBatchExecution
stream processing engine is requested to populate start offsets from checkpoint.
When a streaming query is started from scratch (with no checkpoint that has offsets in the Offset Write-Ahead Log), MicroBatchExecution
prints out the following INFO message:
When a streaming query is resumed (restarted) from a checkpoint with offsets in the Offset Write-Ahead Log, MicroBatchExecution
prints out the following INFO message:
Resuming at batch [currentBatchId] with committed offsets [committedOffsets] and available offsets [availableOffsets]
Every time MicroBatchExecution
is requested to check whether a new data is available (in any of the streaming sources)…FIXME
When MicroBatchExecution
is requested to construct the next streaming micro-batch (when MicroBatchExecution
requested to run the activated streaming query), every streaming source is requested for the latest offset available that are added to the availableOffsets registry. Streaming sources report some offsets or none at all (if this source has never received any data). Streaming sources with no data are excluded (filtered out).
MicroBatchExecution
prints out the following TRACE message to the logs:
noDataBatchesEnabled = [noDataBatchesEnabled], lastExecutionRequiresAnotherBatch = [lastExecutionRequiresAnotherBatch], isNewDataAvailable = [isNewDataAvailable], shouldConstructNextBatch = [shouldConstructNextBatch]
With shouldConstructNextBatch internal flag enabled, MicroBatchExecution
commits (adds) the available offsets for the batch to the Write-Ahead Log (WAL) and prints out the following INFO message to the logs:
Committed offsets for batch [currentBatchId]. Metadata [offsetSeqMetadata]
When running a single streaming micro-batch, MicroBatchExecution
requests every Source and MicroBatchReader (in the availableOffsets registry) for unprocessed data (that has not been committed yet and so considered unprocessed).
In the end (of running a single streaming micro-batch), MicroBatchExecution
commits (adds) the available offsets (to the committedOffsets registry) so they are considered processed already.
MicroBatchExecution
prints out the following DEBUG message to the logs:
Completed batch [currentBatchId]
Limitations (Assumptions)
It is assumed that the order of streaming sources in a streaming query matches the order of the offsets of OffsetSeq (in offsetLog) and availableOffsets.
In other words, a streaming query can be modified and then restarted from a checkpoint (to maintain stream processing state) only when the number of streaming sources and their order are preserved across restarts.