AbstractFetcherThread

AbstractFetcherThread is the extension of the ShutdownableThread contract for fetcher threads that maybeTruncate followed by maybeFetch repeatedly (while doing the work) until shut down.

Note
AbstractFetcherThread is a Java abstract class and cannot be created directly. It is created indirectly for the concrete AbstractFetcherThreads.
Table 1. AbstractFetcherThread Contract (Abstract Methods)
Method Description

buildFetch

buildFetch(
  partitionMap: Map[TopicPartition, PartitionFetchState]
): ResultWithPartitions[Option[FetchRequest.Builder]]

Used exclusively when AbstractFetcherThread is requested to maybeFetch.

endOffsetForEpoch

endOffsetForEpoch(
  topicPartition: TopicPartition,
  epoch: Int): Option[OffsetAndEpoch]

Used exclusively when AbstractFetcherThread is requested to getOffsetTruncationState.

fetchEarliestOffsetFromLeader

fetchEarliestOffsetFromLeader(
  topicPartition: TopicPartition,
  currentLeaderEpoch: Int): Long

Used exclusively when AbstractFetcherThread is requested to fetchOffsetAndTruncate.

fetchEpochEndOffsets

fetchEpochEndOffsets(
  partitions: Map[TopicPartition, EpochData]
): Map[TopicPartition, EpochEndOffset]

Used exclusively when AbstractFetcherThread is requested to truncateToEpochEndOffsets.

fetchFromLeader

fetchFromLeader(
  fetchRequest: FetchRequest.Builder
): Seq[(TopicPartition, FetchData)]

Used exclusively when AbstractFetcherThread is requested to processFetchRequest.

fetchLatestOffsetFromLeader

fetchLatestOffsetFromLeader(
  topicPartition: TopicPartition,
  currentLeaderEpoch: Int): Long

Used exclusively when AbstractFetcherThread is requested to fetchOffsetAndTruncate.

isOffsetForLeaderEpochSupported

isOffsetForLeaderEpochSupported: Boolean

Controls whether an OffsetsForLeaderEpochRequest is supported (that was added in Kafka 0.11.0.0)

Used exclusively when AbstractFetcherThread is requested to fetchTruncatingPartitions.

latestEpoch

latestEpoch(topicPartition: TopicPartition): Option[Int]

Gets the latest (current) leader epoch for the given TopicPartition

Used exclusively when AbstractFetcherThread is requested to fetchTruncatingPartitions.

logEndOffset

logEndOffset(topicPartition: TopicPartition): Long

Used when AbstractFetcherThread is requested to getOffsetTruncationState and fetchOffsetAndTruncate.

processPartitionData

processPartitionData(
  topicPartition: TopicPartition,
  fetchOffset: Long,
  partitionData: FetchData): Option[LogAppendInfo]

Used exclusively when AbstractFetcherThread is requested to processFetchRequest.

truncate

truncate(
  topicPartition: TopicPartition,
  truncationState: OffsetTruncationState): Unit

Used when AbstractFetcherThread is requested to truncateToHighWatermark, maybeTruncateToEpochEndOffsets, and fetchOffsetAndTruncate.

truncateFullyAndStartAt

truncateFullyAndStartAt(
  topicPartition: TopicPartition,
  offset: Long): Unit

Used exclusively when AbstractFetcherThread is requested to fetchOffsetAndTruncate.

Table 2. AbstractFetcherThreads
AbstractFetcherThread Description

ReplicaAlterLogDirsThread

ReplicaFetcherThread

AbstractFetcherThread registers performance metrics.

Table 3. AbstractFetcherThread’s Performance Metrics
Metric Name Description

BytesPerSec

RequestsPerSec

The performance metrics are registered in kafka.server:type=FetcherStats group.

AbstractFetcherThread jconsole.png
Figure 1. AbstractFetcherThread in jconsole
Table 4. AbstractFetcherThread’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

fetcherStats

partitionStates

TopicPartitions with their PartitionFetchState (PartitionStates[PartitionFetchState])

Used when…​FIXME

Creating AbstractFetcherThread Instance

AbstractFetcherThread takes the following to be created:

  • Thread Name

  • Client ID

  • BrokerEndPoint

  • fetchBackOffMs (default: 0)

  • isInterruptible flag (default: true)

AbstractFetcherThread initializes the internal registries and counters.

doWork Method

doWork(): Unit
Note
doWork is part of the ShutdownableThread Contract to do the work.

doWork simply maybeTruncate followed by maybeFetch.

maybeTruncate Internal Method

maybeTruncate(): Unit

maybeTruncate fetchTruncatingPartitions to find the partitions with and without epochs.

For partitions with epochs, maybeTruncate truncateToEpochEndOffsets.

For partitions without epochs, maybeTruncate truncateToHighWatermark.

Note
maybeTruncate is used exclusively when AbstractFetcherThread is requested to do the work.

maybeFetch Internal Method

maybeFetch(): Unit

maybeFetch buildFetch (with the partitionStates).

Note
buildFetch is implementation-specific.

maybeFetch then handlePartitionsWithErrors with partitions that buildFetch could not handle.

In the end, maybeFetch processes the FetchRequest (if created).

maybeFetch blocks the thread (waits) until the fetchBackOffMs elapses when the FetchRequest was not created (when buildFetch). maybeFetch prints out the following TRACE message to the logs:

There are no active partitions. Back off for [fetchBackOffMs] ms before sending a fetch request
Note
maybeFetch is used exclusively when AbstractFetcherThread is requested to do the work.

fetchTruncatingPartitions Internal Method

fetchTruncatingPartitions(): (Map[TopicPartition, EpochData], Set[TopicPartition])

fetchTruncatingPartitions finds the TopicPartitions (in the partitionStates registry) that are in Truncating state (and are not delayed) and splits them into two groups: with and without epochs.

Internally, for every truncating TopicPartition, fetchTruncatingPartitions gets the latest epoch.

If the latest epoch is available and isOffsetForLeaderEpochSupported, fetchTruncatingPartitions registers the TopicPartition as "with epoch" whereas the others as "without epoch".

Note
fetchTruncatingPartitions is used exclusively when AbstractFetcherThread is requested to maybeTruncate.

onPartitionFenced Internal Method

onPartitionFenced(tp: TopicPartition): Unit

onPartitionFenced…​FIXME

Note
onPartitionFenced is used when AbstractFetcherThread is requested to maybeTruncateToEpochEndOffsets, processFetchRequest, and handleOutOfRangeError.

maybeTruncateToEpochEndOffsets Internal Method

maybeTruncateToEpochEndOffsets(
  fetchedEpochs: Map[TopicPartition, EpochEndOffset]
): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]]

maybeTruncateToEpochEndOffsets…​FIXME

Note
maybeTruncateToEpochEndOffsets is used exclusively when AbstractFetcherThread is requested to truncateToEpochEndOffsets.

processFetchRequest Internal Method

processFetchRequest(
  fetchStates: Map[TopicPartition, PartitionFetchState],
  fetchRequest: FetchRequest.Builder): Unit

processFetchRequest prints out the following TRACE message to the logs:

Sending fetch request [fetchRequest]

processFetchRequest then fetchFromLeader.

Note
fetchFromLeader is implementation-specific.

processFetchRequest requests the FetcherStats to…​FIXME

Note
processFetchRequest is used exclusively when AbstractFetcherThread is requested to maybeFetch.

handleOutOfRangeError Internal Method

handleOutOfRangeError(
  topicPartition: TopicPartition,
  fetchState: PartitionFetchState): Boolean

handleOutOfRangeError…​FIXME

Note
handleOutOfRangeError is used exclusively when AbstractFetcherThread is requested to processFetchRequest.

markPartitionsForTruncation Method

markPartitionsForTruncation(
  topicPartition: TopicPartition,
  truncationOffset: Long): Unit
markPartitionsForTruncation(
  brokerId: Int,
  topicPartition: TopicPartition,
  truncationOffset: Long): Unit

markPartitionsForTruncation…​FIXME

Note
markPartitionsForTruncation is used when…​FIXME

addPartitions Method

addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch])

addPartitions…​FIXME

Note
addPartitions is used exclusively when AbstractFetcherManager is requested to addFetcherForPartitions.

updateFetchOffsetAndMaybeMarkTruncationComplete Internal Method

updateFetchOffsetAndMaybeMarkTruncationComplete(
  fetchOffsets: Map[TopicPartition, OffsetTruncationState]): Unit

updateFetchOffsetAndMaybeMarkTruncationComplete…​FIXME

Note
updateFetchOffsetAndMaybeMarkTruncationComplete is used when AbstractFetcherThread is requested to truncateToEpochEndOffsets and truncateToHighWatermark.

truncateToEpochEndOffsets Internal Method

truncateToEpochEndOffsets(
  latestEpochsForPartitions: Map[TopicPartition, EpochData]): Unit

truncateToEpochEndOffsets…​FIXME

Note
truncateToEpochEndOffsets is used exclusively when AbstractFetcherThread is requested to maybeTruncate.

truncateToHighWatermark Internal Method

truncateToHighWatermark(partitions: Set[TopicPartition]): Unit

truncateToHighWatermark…​FIXME

Note
truncateToHighWatermark is used exclusively when AbstractFetcherThread is requested to maybeTruncate.

partitionsAndOffsets Method

partitionsAndOffsets: Map[TopicPartition, InitialFetchState]

partitionsAndOffsets…​FIXME

Note
partitionsAndOffsets is used exclusively when AbstractFetcherManager is requested to resizeThreadPool.

getOffsetTruncationState Internal Method

getOffsetTruncationState(
  tp: TopicPartition,
  leaderEpochOffset: EpochEndOffset): OffsetTruncationState

getOffsetTruncationState…​FIXME

Note
getOffsetTruncationState is used exclusively when AbstractFetcherThread is requested to maybeTruncateToEpochEndOffsets.

fetchOffsetAndTruncate Method

fetchOffsetAndTruncate(
  topicPartition: TopicPartition,
  currentLeaderEpoch: Int): Long

fetchOffsetAndTruncate…​FIXME

Note
fetchOffsetAndTruncate is used exclusively when AbstractFetcherThread is requested to addPartitions and handleOutOfRangeError.

handlePartitionsWithErrors Method

handlePartitionsWithErrors(partitions: Iterable[TopicPartition]): Unit

handlePartitionsWithErrors…​FIXME

Note
handlePartitionsWithErrors is used when AbstractFetcherThread is requested to maybeFetch, truncateToEpochEndOffsets, truncateToHighWatermark, and processFetchRequest.

results matching ""

    No results matching ""