AbstractFetcherThread

AbstractFetcherThread is the extension of the ShutdownableThread contract for fetcher threads that maybeTruncate followed by maybeFetch repeatedly (while doing the work) until shut down.

Note
AbstractFetcherThread is an abstract class and cannot be created directly. It is created indirectly for the concrete AbstractFetcherThreads.
Table 1. AbstractFetcherThread Contract (Abstract Methods)
Method Description

buildFetch

buildFetch(
  partitionMap: Map[TopicPartition, PartitionFetchState]
): ResultWithPartitions[Option[FetchRequest.Builder]]

Used when AbstractFetcherThread is requested to maybeFetch.

endOffsetForEpoch

endOffsetForEpoch(
  topicPartition: TopicPartition,
  epoch: Int): Option[OffsetAndEpoch]

Used when AbstractFetcherThread is requested to getOffsetTruncationState.

fetchEarliestOffsetFromLeader

fetchEarliestOffsetFromLeader(
  topicPartition: TopicPartition,
  currentLeaderEpoch: Int): Long

Used when AbstractFetcherThread is requested to fetchOffsetAndTruncate.

fetchEpochEndOffsets

fetchEpochEndOffsets(
  partitions: Map[TopicPartition, EpochData]
): Map[TopicPartition, EpochEndOffset]

Used when AbstractFetcherThread is requested to truncateToEpochEndOffsets.

fetchFromLeader

fetchFromLeader(
  fetchRequest: FetchRequest.Builder
): Seq[(TopicPartition, FetchData)]

Used when AbstractFetcherThread is requested to processFetchRequest.

fetchLatestOffsetFromLeader

fetchLatestOffsetFromLeader(
  topicPartition: TopicPartition,
  currentLeaderEpoch: Int): Long

Used when AbstractFetcherThread is requested to fetchOffsetAndTruncate.

isOffsetForLeaderEpochSupported

isOffsetForLeaderEpochSupported: Boolean

Controls whether an OffsetsForLeaderEpochRequest is supported (that was added in Kafka 0.11.0.0)

Used when AbstractFetcherThread is requested to fetchTruncatingPartitions.

latestEpoch

latestEpoch(topicPartition: TopicPartition): Option[Int]

Gets the latest (current) leader epoch for the given TopicPartition

Used when AbstractFetcherThread is requested to fetchTruncatingPartitions.

logEndOffset

logEndOffset(topicPartition: TopicPartition): Long

Used when AbstractFetcherThread is requested to getOffsetTruncationState and fetchOffsetAndTruncate.

processPartitionData

processPartitionData(
  topicPartition: TopicPartition,
  fetchOffset: Long,
  partitionData: FetchData): Option[LogAppendInfo]

Used when AbstractFetcherThread is requested to processFetchRequest.

truncate

truncate(
  topicPartition: TopicPartition,
  truncationState: OffsetTruncationState): Unit

Used when AbstractFetcherThread is requested to doTruncate and fetchOffsetAndTruncate

truncateFullyAndStartAt

truncateFullyAndStartAt(
  topicPartition: TopicPartition,
  offset: Long): Unit

Used when AbstractFetcherThread is requested to fetchOffsetAndTruncate.

Table 2. AbstractFetcherThreads
AbstractFetcherThread Description

ReplicaAlterLogDirsThread

ReplicaFetcherThread

Performance Metrics

AbstractFetcherThreads are KafkaMetricsGroups with the following performance metrics.

Table 3. AbstractFetcherThread’s Performance Metrics
Metric Name Description

BytesPerSec

RequestsPerSec

The performance metrics are registered in kafka.server:type=FetcherStats group.

AbstractFetcherThread jconsole.png
Figure 1. AbstractFetcherThread in jconsole

Creating AbstractFetcherThread Instance

AbstractFetcherThread takes the following to be created:

  • Thread Name

  • Client ID

  • BrokerEndPoint

  • FailedPartitions

  • replica.fetch.backoff.ms (default: 0)

  • isInterruptible flag (default: true)

AbstractFetcherThread initializes the internal registries and counters.

Truncating And Fetching — doWork Method

doWork(): Unit
Note
doWork is part of the ShutdownableThread Contract to do the work.

doWork simply maybeTruncate followed by maybeFetch.

maybeTruncate Internal Method

maybeTruncate(): Unit

maybeTruncate fetchTruncatingPartitions to find the partitions with and without epochs.

For partitions with epochs, maybeTruncate truncateToEpochEndOffsets.

For partitions without epochs, maybeTruncate truncateToHighWatermark.

Note
maybeTruncate is used exclusively when AbstractFetcherThread is requested to do the work.

maybeFetch Internal Method

maybeFetch(): Unit

maybeFetch buildFetch (with the partitionStates).

Note
buildFetch is implementation-specific.

maybeFetch then handlePartitionsWithErrors with partitions that buildFetch could not handle.

In the end, maybeFetch processes the FetchRequest (if created).

maybeFetch blocks the thread (waits) until the fetchBackOffMs elapses when the FetchRequest was not created (when buildFetch). maybeFetch prints out the following TRACE message to the logs:

There are no active partitions. Back off for [fetchBackOffMs] ms before sending a fetch request
Note
maybeFetch is used when AbstractFetcherThread is requested to do the work.

fetchTruncatingPartitions Internal Method

fetchTruncatingPartitions(): (Map[TopicPartition, EpochData], Set[TopicPartition])

fetchTruncatingPartitions finds the TopicPartitions (in the partitionStates registry) that are in Truncating state (and are not delayed) and splits them into two groups: with and without epochs.

Internally, for every truncating TopicPartition, fetchTruncatingPartitions gets the latest epoch.

If the latest epoch is available and isOffsetForLeaderEpochSupported, fetchTruncatingPartitions registers the TopicPartition as "with epoch" whereas the others as "without epoch".

Note
fetchTruncatingPartitions is used exclusively when AbstractFetcherThread is requested to maybeTruncate.

onPartitionFenced Internal Method

onPartitionFenced(tp: TopicPartition): Unit

onPartitionFenced…​FIXME

Note
onPartitionFenced is used when AbstractFetcherThread is requested to maybeTruncateToEpochEndOffsets, processFetchRequest, and handleOutOfRangeError.

maybeTruncateToEpochEndOffsets Internal Method

maybeTruncateToEpochEndOffsets(
  fetchedEpochs: Map[TopicPartition, EpochEndOffset]
): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]]

maybeTruncateToEpochEndOffsets…​FIXME

Note
maybeTruncateToEpochEndOffsets is used when AbstractFetcherThread is requested to truncateToEpochEndOffsets.

processFetchRequest Internal Method

processFetchRequest(
  fetchStates: Map[TopicPartition, PartitionFetchState],
  fetchRequest: FetchRequest.Builder): Unit

processFetchRequest prints out the following TRACE message to the logs:

Sending fetch request [fetchRequest]

processFetchRequest then fetchFromLeader.

Note
fetchFromLeader is implementation-specific.

processFetchRequest requests the FetcherStats to…​FIXME

Note
processFetchRequest is used when AbstractFetcherThread is requested to maybeFetch.

handleOutOfRangeError Internal Method

handleOutOfRangeError(
  topicPartition: TopicPartition,
  fetchState: PartitionFetchState): Boolean

handleOutOfRangeError…​FIXME

Note
handleOutOfRangeError is used when AbstractFetcherThread is requested to truncate and fetch continuously (and in turn processFetchRequest).

markPartitionsForTruncation Method

markPartitionsForTruncation(
  topicPartition: TopicPartition,
  truncationOffset: Long): Unit
markPartitionsForTruncation(
  brokerId: Int,
  topicPartition: TopicPartition,
  truncationOffset: Long): Unit

markPartitionsForTruncation…​FIXME

Note
markPartitionsForTruncation is used when…​FIXME

addPartitions Method

addPartitions(
  initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Unit

addPartitions…​FIXME

Note
addPartitions is used when AbstractFetcherManager is requested to addFetcherForPartitions.

updateFetchOffsetAndMaybeMarkTruncationComplete Internal Method

updateFetchOffsetAndMaybeMarkTruncationComplete(
  fetchOffsets: Map[TopicPartition, OffsetTruncationState]): Unit

updateFetchOffsetAndMaybeMarkTruncationComplete…​FIXME

Note
updateFetchOffsetAndMaybeMarkTruncationComplete is used when AbstractFetcherThread is requested to truncateToEpochEndOffsets and truncateToHighWatermark.

truncateToEpochEndOffsets Internal Method

truncateToEpochEndOffsets(
  latestEpochsForPartitions: Map[TopicPartition, EpochData]): Unit

truncateToEpochEndOffsets…​FIXME

Note
truncateToEpochEndOffsets is used exclusively when AbstractFetcherThread is requested to maybeTruncate.

truncateToHighWatermark Internal Method

truncateToHighWatermark(
  partitions: Set[TopicPartition]): Unit

truncateToHighWatermark…​FIXME

Note
truncateToHighWatermark is used exclusively when AbstractFetcherThread is requested to maybeTruncate.

partitionsAndOffsets Method

partitionsAndOffsets: Map[TopicPartition, InitialFetchState]

partitionsAndOffsets…​FIXME

Note
partitionsAndOffsets is used exclusively when AbstractFetcherManager is requested to resizeThreadPool.

getOffsetTruncationState Internal Method

getOffsetTruncationState(
  tp: TopicPartition,
  leaderEpochOffset: EpochEndOffset): OffsetTruncationState

getOffsetTruncationState…​FIXME

Note
getOffsetTruncationState is used exclusively when AbstractFetcherThread is requested to maybeTruncateToEpochEndOffsets.

fetchOffsetAndTruncate Method

fetchOffsetAndTruncate(
  topicPartition: TopicPartition,
  currentLeaderEpoch: Int): Long

fetchOffsetAndTruncate…​FIXME

Note
fetchOffsetAndTruncate is used exclusively when AbstractFetcherThread is requested to addPartitions and handleOutOfRangeError.

handlePartitionsWithErrors Method

handlePartitionsWithErrors(
  partitions: Iterable[TopicPartition]): Unit

handlePartitionsWithErrors…​FIXME

Note
handlePartitionsWithErrors is used when AbstractFetcherThread is requested to maybeFetch, truncateToEpochEndOffsets, truncateToHighWatermark, and processFetchRequest.

doTruncate Internal Method

doTruncate(
  topicPartition: TopicPartition,
  truncationState: OffsetTruncationState): Boolean

doTruncate…​FIXME

Note
doTruncate is used when AbstractFetcherThread is requested to maybeTruncate (and in turn truncateToHighWatermark and maybeTruncateToEpochEndOffsets).

toMemoryRecords Method

toMemoryRecords(
  records: Records): MemoryRecords

toMemoryRecords converts the given records to MemoryRecords.

If the given records are of type MemoryRecords already, toMemoryRecords simply returns it as-is.

If the given records are of type FileRecords, toMemoryRecords allocates a ByteBuffer of the size of the records to readInto and creates a writable MemoryRecords.

Note

toMemoryRecords is used when:

shutdown Method

shutdown(): Unit
Note
shutdown is part of the ShutdownableThread contract to…​FIXME.

shutdown…​FIXME

Internal Properties

Name Description

fetcherStats

partitionStates

TopicPartitions with their PartitionFetchState (PartitionStates[PartitionFetchState])

Used when…​FIXME

results matching ""

    No results matching ""