buildFetch(
partitionMap: Map[TopicPartition, PartitionFetchState]
): ResultWithPartitions[Option[FetchRequest.Builder]]
AbstractFetcherThread
AbstractFetcherThread
is the extension of the ShutdownableThread contract for fetcher threads that maybeTruncate followed by maybeFetch repeatedly (while doing the work) until shut down.
Note
|
AbstractFetcherThread is an abstract class and cannot be created directly. It is created indirectly for the concrete AbstractFetcherThreads.
|
Method | Description |
---|---|
|
Used when |
|
Used when |
|
Used when |
|
Used when |
|
Used when |
|
Used when |
|
Controls whether an Used when |
|
Gets the latest (current) leader epoch for the given Used when |
|
Used when |
|
Used when |
|
Used when |
|
Used when |
AbstractFetcherThread | Description |
---|---|
Performance Metrics
AbstractFetcherThreads are KafkaMetricsGroups with the following performance metrics.
Metric Name | Description |
---|---|
|
|
|
The performance metrics are registered in kafka.server:type=FetcherStats group.
Creating AbstractFetcherThread Instance
AbstractFetcherThread
takes the following to be created:
-
replica.fetch.backoff.ms (default:
0
)
AbstractFetcherThread
initializes the internal registries and counters.
Truncating And Fetching — doWork
Method
doWork(): Unit
Note
|
doWork is part of the ShutdownableThread Contract to do the work.
|
doWork
simply maybeTruncate followed by maybeFetch.
maybeTruncate
Internal Method
maybeTruncate(): Unit
maybeTruncate
fetchTruncatingPartitions to find the partitions with and without epochs.
For partitions with epochs, maybeTruncate
truncateToEpochEndOffsets.
For partitions without epochs, maybeTruncate
truncateToHighWatermark.
Note
|
maybeTruncate is used exclusively when AbstractFetcherThread is requested to do the work.
|
maybeFetch
Internal Method
maybeFetch(): Unit
maybeFetch
buildFetch (with the partitionStates).
Note
|
buildFetch is implementation-specific. |
maybeFetch
then handlePartitionsWithErrors with partitions that buildFetch could not handle.
In the end, maybeFetch
processes the FetchRequest
(if created).
maybeFetch
blocks the thread (waits) until the fetchBackOffMs elapses when the FetchRequest
was not created (when buildFetch). maybeFetch
prints out the following TRACE message to the logs:
There are no active partitions. Back off for [fetchBackOffMs] ms before sending a fetch request
Note
|
maybeFetch is used when AbstractFetcherThread is requested to do the work.
|
fetchTruncatingPartitions
Internal Method
fetchTruncatingPartitions(): (Map[TopicPartition, EpochData], Set[TopicPartition])
fetchTruncatingPartitions
finds the TopicPartitions
(in the partitionStates registry) that are in Truncating
state (and are not delayed) and splits them into two groups: with and without epochs.
Internally, for every truncating TopicPartition
, fetchTruncatingPartitions
gets the latest epoch.
If the latest epoch is available and isOffsetForLeaderEpochSupported, fetchTruncatingPartitions
registers the TopicPartition
as "with epoch" whereas the others as "without epoch".
Note
|
Both actions (getting the latest epoch and isOffsetForLeaderEpochSupported) are implementation-specific. |
Note
|
fetchTruncatingPartitions is used exclusively when AbstractFetcherThread is requested to maybeTruncate.
|
onPartitionFenced
Internal Method
onPartitionFenced(tp: TopicPartition): Unit
onPartitionFenced
…FIXME
Note
|
onPartitionFenced is used when AbstractFetcherThread is requested to maybeTruncateToEpochEndOffsets, processFetchRequest, and handleOutOfRangeError.
|
maybeTruncateToEpochEndOffsets
Internal Method
maybeTruncateToEpochEndOffsets(
fetchedEpochs: Map[TopicPartition, EpochEndOffset]
): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]]
maybeTruncateToEpochEndOffsets
…FIXME
Note
|
maybeTruncateToEpochEndOffsets is used when AbstractFetcherThread is requested to truncateToEpochEndOffsets.
|
processFetchRequest
Internal Method
processFetchRequest(
fetchStates: Map[TopicPartition, PartitionFetchState],
fetchRequest: FetchRequest.Builder): Unit
processFetchRequest
prints out the following TRACE message to the logs:
Sending fetch request [fetchRequest]
processFetchRequest
then fetchFromLeader.
Note
|
fetchFromLeader is implementation-specific. |
processFetchRequest
requests the FetcherStats to…FIXME
Note
|
processFetchRequest is used when AbstractFetcherThread is requested to maybeFetch.
|
handleOutOfRangeError
Internal Method
handleOutOfRangeError(
topicPartition: TopicPartition,
fetchState: PartitionFetchState): Boolean
handleOutOfRangeError
…FIXME
Note
|
handleOutOfRangeError is used when AbstractFetcherThread is requested to truncate and fetch continuously (and in turn processFetchRequest).
|
markPartitionsForTruncation
Method
markPartitionsForTruncation(
topicPartition: TopicPartition,
truncationOffset: Long): Unit
markPartitionsForTruncation(
brokerId: Int,
topicPartition: TopicPartition,
truncationOffset: Long): Unit
markPartitionsForTruncation
…FIXME
Note
|
markPartitionsForTruncation is used when…FIXME
|
addPartitions
Method
addPartitions(
initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Unit
addPartitions
…FIXME
Note
|
addPartitions is used when AbstractFetcherManager is requested to addFetcherForPartitions.
|
updateFetchOffsetAndMaybeMarkTruncationComplete
Internal Method
updateFetchOffsetAndMaybeMarkTruncationComplete(
fetchOffsets: Map[TopicPartition, OffsetTruncationState]): Unit
updateFetchOffsetAndMaybeMarkTruncationComplete
…FIXME
Note
|
updateFetchOffsetAndMaybeMarkTruncationComplete is used when AbstractFetcherThread is requested to truncateToEpochEndOffsets and truncateToHighWatermark.
|
truncateToEpochEndOffsets
Internal Method
truncateToEpochEndOffsets(
latestEpochsForPartitions: Map[TopicPartition, EpochData]): Unit
truncateToEpochEndOffsets
…FIXME
Note
|
truncateToEpochEndOffsets is used exclusively when AbstractFetcherThread is requested to maybeTruncate.
|
truncateToHighWatermark
Internal Method
truncateToHighWatermark(
partitions: Set[TopicPartition]): Unit
truncateToHighWatermark
…FIXME
Note
|
truncateToHighWatermark is used exclusively when AbstractFetcherThread is requested to maybeTruncate.
|
partitionsAndOffsets
Method
partitionsAndOffsets: Map[TopicPartition, InitialFetchState]
partitionsAndOffsets
…FIXME
Note
|
partitionsAndOffsets is used exclusively when AbstractFetcherManager is requested to resizeThreadPool.
|
getOffsetTruncationState
Internal Method
getOffsetTruncationState(
tp: TopicPartition,
leaderEpochOffset: EpochEndOffset): OffsetTruncationState
getOffsetTruncationState
…FIXME
Note
|
getOffsetTruncationState is used exclusively when AbstractFetcherThread is requested to maybeTruncateToEpochEndOffsets.
|
fetchOffsetAndTruncate
Method
fetchOffsetAndTruncate(
topicPartition: TopicPartition,
currentLeaderEpoch: Int): Long
fetchOffsetAndTruncate
…FIXME
Note
|
fetchOffsetAndTruncate is used exclusively when AbstractFetcherThread is requested to addPartitions and handleOutOfRangeError.
|
handlePartitionsWithErrors
Method
handlePartitionsWithErrors(
partitions: Iterable[TopicPartition]): Unit
handlePartitionsWithErrors
…FIXME
Note
|
handlePartitionsWithErrors is used when AbstractFetcherThread is requested to maybeFetch, truncateToEpochEndOffsets, truncateToHighWatermark, and processFetchRequest.
|
doTruncate
Internal Method
doTruncate(
topicPartition: TopicPartition,
truncationState: OffsetTruncationState): Boolean
doTruncate
…FIXME
Note
|
doTruncate is used when AbstractFetcherThread is requested to maybeTruncate (and in turn truncateToHighWatermark and maybeTruncateToEpochEndOffsets).
|
toMemoryRecords
Method
toMemoryRecords(
records: Records): MemoryRecords
toMemoryRecords
converts the given records to MemoryRecords.
If the given records are of type MemoryRecords already, toMemoryRecords
simply returns it as-is.
If the given records are of type FileRecords, toMemoryRecords
allocates a ByteBuffer
of the size of the records to readInto and creates a writable MemoryRecords.
Note
|
|
shutdown
Method
shutdown(): Unit
Note
|
shutdown is part of the ShutdownableThread contract to…FIXME.
|
shutdown
…FIXME
Internal Properties
Name | Description |
---|---|
|
KafkaMetricsGroup (with two performance metrics) |
|
Used when…FIXME |