buildFetch(
partitionMap: Map[TopicPartition, PartitionFetchState]
): ResultWithPartitions[Option[FetchRequest.Builder]]
AbstractFetcherThread
AbstractFetcherThread is the extension of the ShutdownableThread contract for fetcher threads that maybeTruncate followed by maybeFetch repeatedly (while doing the work) until shut down.
|
Note
|
AbstractFetcherThread is an abstract class and cannot be created directly. It is created indirectly for the concrete AbstractFetcherThreads.
|
| Method | Description |
|---|---|
|
Used when |
|
Used when |
|
Used when |
|
Used when |
|
Used when |
|
Used when |
|
Controls whether an Used when |
|
Gets the latest (current) leader epoch for the given Used when |
|
Used when |
|
Used when |
|
Used when |
|
Used when |
| AbstractFetcherThread | Description |
|---|---|
Performance Metrics
AbstractFetcherThreads are KafkaMetricsGroups with the following performance metrics.
| Metric Name | Description |
|---|---|
|
|
|
The performance metrics are registered in kafka.server:type=FetcherStats group.
Creating AbstractFetcherThread Instance
AbstractFetcherThread takes the following to be created:
-
replica.fetch.backoff.ms (default:
0)
AbstractFetcherThread initializes the internal registries and counters.
Truncating And Fetching — doWork Method
doWork(): Unit
|
Note
|
doWork is part of the ShutdownableThread Contract to do the work.
|
doWork simply maybeTruncate followed by maybeFetch.
maybeTruncate Internal Method
maybeTruncate(): Unit
maybeTruncate fetchTruncatingPartitions to find the partitions with and without epochs.
For partitions with epochs, maybeTruncate truncateToEpochEndOffsets.
For partitions without epochs, maybeTruncate truncateToHighWatermark.
|
Note
|
maybeTruncate is used exclusively when AbstractFetcherThread is requested to do the work.
|
maybeFetch Internal Method
maybeFetch(): Unit
maybeFetch buildFetch (with the partitionStates).
|
Note
|
buildFetch is implementation-specific. |
maybeFetch then handlePartitionsWithErrors with partitions that buildFetch could not handle.
In the end, maybeFetch processes the FetchRequest (if created).
maybeFetch blocks the thread (waits) until the fetchBackOffMs elapses when the FetchRequest was not created (when buildFetch). maybeFetch prints out the following TRACE message to the logs:
There are no active partitions. Back off for [fetchBackOffMs] ms before sending a fetch request
|
Note
|
maybeFetch is used when AbstractFetcherThread is requested to do the work.
|
fetchTruncatingPartitions Internal Method
fetchTruncatingPartitions(): (Map[TopicPartition, EpochData], Set[TopicPartition])
fetchTruncatingPartitions finds the TopicPartitions (in the partitionStates registry) that are in Truncating state (and are not delayed) and splits them into two groups: with and without epochs.
Internally, for every truncating TopicPartition, fetchTruncatingPartitions gets the latest epoch.
If the latest epoch is available and isOffsetForLeaderEpochSupported, fetchTruncatingPartitions registers the TopicPartition as "with epoch" whereas the others as "without epoch".
|
Note
|
Both actions (getting the latest epoch and isOffsetForLeaderEpochSupported) are implementation-specific. |
|
Note
|
fetchTruncatingPartitions is used exclusively when AbstractFetcherThread is requested to maybeTruncate.
|
onPartitionFenced Internal Method
onPartitionFenced(tp: TopicPartition): Unit
onPartitionFenced…FIXME
|
Note
|
onPartitionFenced is used when AbstractFetcherThread is requested to maybeTruncateToEpochEndOffsets, processFetchRequest, and handleOutOfRangeError.
|
maybeTruncateToEpochEndOffsets Internal Method
maybeTruncateToEpochEndOffsets(
fetchedEpochs: Map[TopicPartition, EpochEndOffset]
): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]]
maybeTruncateToEpochEndOffsets…FIXME
|
Note
|
maybeTruncateToEpochEndOffsets is used when AbstractFetcherThread is requested to truncateToEpochEndOffsets.
|
processFetchRequest Internal Method
processFetchRequest(
fetchStates: Map[TopicPartition, PartitionFetchState],
fetchRequest: FetchRequest.Builder): Unit
processFetchRequest prints out the following TRACE message to the logs:
Sending fetch request [fetchRequest]
processFetchRequest then fetchFromLeader.
|
Note
|
fetchFromLeader is implementation-specific. |
processFetchRequest requests the FetcherStats to…FIXME
|
Note
|
processFetchRequest is used when AbstractFetcherThread is requested to maybeFetch.
|
handleOutOfRangeError Internal Method
handleOutOfRangeError(
topicPartition: TopicPartition,
fetchState: PartitionFetchState): Boolean
handleOutOfRangeError…FIXME
|
Note
|
handleOutOfRangeError is used when AbstractFetcherThread is requested to truncate and fetch continuously (and in turn processFetchRequest).
|
markPartitionsForTruncation Method
markPartitionsForTruncation(
topicPartition: TopicPartition,
truncationOffset: Long): Unit
markPartitionsForTruncation(
brokerId: Int,
topicPartition: TopicPartition,
truncationOffset: Long): Unit
markPartitionsForTruncation…FIXME
|
Note
|
markPartitionsForTruncation is used when…FIXME
|
addPartitions Method
addPartitions(
initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Unit
addPartitions…FIXME
|
Note
|
addPartitions is used when AbstractFetcherManager is requested to addFetcherForPartitions.
|
updateFetchOffsetAndMaybeMarkTruncationComplete Internal Method
updateFetchOffsetAndMaybeMarkTruncationComplete(
fetchOffsets: Map[TopicPartition, OffsetTruncationState]): Unit
updateFetchOffsetAndMaybeMarkTruncationComplete…FIXME
|
Note
|
updateFetchOffsetAndMaybeMarkTruncationComplete is used when AbstractFetcherThread is requested to truncateToEpochEndOffsets and truncateToHighWatermark.
|
truncateToEpochEndOffsets Internal Method
truncateToEpochEndOffsets(
latestEpochsForPartitions: Map[TopicPartition, EpochData]): Unit
truncateToEpochEndOffsets…FIXME
|
Note
|
truncateToEpochEndOffsets is used exclusively when AbstractFetcherThread is requested to maybeTruncate.
|
truncateToHighWatermark Internal Method
truncateToHighWatermark(
partitions: Set[TopicPartition]): Unit
truncateToHighWatermark…FIXME
|
Note
|
truncateToHighWatermark is used exclusively when AbstractFetcherThread is requested to maybeTruncate.
|
partitionsAndOffsets Method
partitionsAndOffsets: Map[TopicPartition, InitialFetchState]
partitionsAndOffsets…FIXME
|
Note
|
partitionsAndOffsets is used exclusively when AbstractFetcherManager is requested to resizeThreadPool.
|
getOffsetTruncationState Internal Method
getOffsetTruncationState(
tp: TopicPartition,
leaderEpochOffset: EpochEndOffset): OffsetTruncationState
getOffsetTruncationState…FIXME
|
Note
|
getOffsetTruncationState is used exclusively when AbstractFetcherThread is requested to maybeTruncateToEpochEndOffsets.
|
fetchOffsetAndTruncate Method
fetchOffsetAndTruncate(
topicPartition: TopicPartition,
currentLeaderEpoch: Int): Long
fetchOffsetAndTruncate…FIXME
|
Note
|
fetchOffsetAndTruncate is used exclusively when AbstractFetcherThread is requested to addPartitions and handleOutOfRangeError.
|
handlePartitionsWithErrors Method
handlePartitionsWithErrors(
partitions: Iterable[TopicPartition]): Unit
handlePartitionsWithErrors…FIXME
|
Note
|
handlePartitionsWithErrors is used when AbstractFetcherThread is requested to maybeFetch, truncateToEpochEndOffsets, truncateToHighWatermark, and processFetchRequest.
|
doTruncate Internal Method
doTruncate(
topicPartition: TopicPartition,
truncationState: OffsetTruncationState): Boolean
doTruncate…FIXME
|
Note
|
doTruncate is used when AbstractFetcherThread is requested to maybeTruncate (and in turn truncateToHighWatermark and maybeTruncateToEpochEndOffsets).
|
toMemoryRecords Method
toMemoryRecords(
records: Records): MemoryRecords
toMemoryRecords converts the given records to MemoryRecords.
If the given records are of type MemoryRecords already, toMemoryRecords simply returns it as-is.
If the given records are of type FileRecords, toMemoryRecords allocates a ByteBuffer of the size of the records to readInto and creates a writable MemoryRecords.
|
Note
|
|
shutdown Method
shutdown(): Unit
|
Note
|
shutdown is part of the ShutdownableThread contract to…FIXME.
|
shutdown…FIXME
Internal Properties
| Name | Description |
|---|---|
|
KafkaMetricsGroup (with two performance metrics) |
|
Used when…FIXME |