KafkaOffsetReader

KafkaOffsetReader relies on the ConsumerStrategy to create a Kafka Consumer.

KafkaOffsetReader creates a Kafka Consumer with group.id (ConsumerConfig.GROUP_ID_CONFIG) configuration explicitly set to nextGroupId (i.e. the given driverGroupIdPrefix followed by nextId).

KafkaOffsetReader is created when:

Table 1. KafkaOffsetReader’s Options
Name Description

fetchOffset.numRetries

Default: 3

fetchOffset.retryIntervalMs

How long to wait before retries

Default: 1000

KafkaOffsetReader defines the predefined fixed schema.

Tip

Enable ALL logging level for org.apache.spark.sql.kafka010.KafkaOffsetReader to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaOffsetReader=ALL

Refer to Logging.

Creating KafkaOffsetReader Instance

KafkaOffsetReader takes the following to be created:

KafkaOffsetReader initializes the internal properties.

nextGroupId Internal Method

nextGroupId(): String

nextGroupId sets the groupId to be the driverGroupIdPrefix, - followed by the nextId (i.e. [driverGroupIdPrefix]-[nextId]).

In the end, nextGroupId increments the nextId and returns the groupId.

Note
nextGroupId is used exclusively when KafkaOffsetReader is requested for a Kafka Consumer.

resetConsumer Internal Method

resetConsumer(): Unit

resetConsumer…​FIXME

Note
resetConsumer is used when…​FIXME

fetchTopicPartitions Method

fetchTopicPartitions(): Set[TopicPartition]
Caution
FIXME
Note
fetchTopicPartitions is used when KafkaRelation getPartitionOffsets.

Fetching Earliest Offsets — fetchEarliestOffsets Method

fetchEarliestOffsets(): Map[TopicPartition, Long]
fetchEarliestOffsets(newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
Caution
FIXME
Note
fetchEarliestOffsets is used when KafkaSource rateLimit and generates a DataFrame for a batch (when new partitions have been assigned).

Fetching Latest Offsets — fetchLatestOffsets Method

fetchLatestOffsets(): Map[TopicPartition, Long]
Caution
FIXME
Note
fetchLatestOffsets is used when KafkaSource gets offsets or initialPartitionOffsets is initialized.

withRetriesWithoutInterrupt Internal Method

withRetriesWithoutInterrupt(
  body: => Map[TopicPartition, Long]): Map[TopicPartition, Long]

withRetriesWithoutInterrupt…​FIXME

Note
withRetriesWithoutInterrupt is used when…​FIXME

Fetching Offsets for Selected TopicPartitions — fetchSpecificOffsets Method

fetchSpecificOffsets(
  partitionOffsets: Map[TopicPartition, Long],
  reportDataLoss: String => Unit): KafkaSourceOffset
KafkaOffsetReader fetchSpecificOffsets.png
Figure 1. KafkaOffsetReader’s fetchSpecificOffsets

fetchSpecificOffsets requests the Kafka Consumer to poll(0).

fetchSpecificOffsets requests the Kafka Consumer for assigned partitions (using Consumer.assignment()).

fetchSpecificOffsets requests the Kafka Consumer to pause(partitions).

You should see the following DEBUG message in the logs:

DEBUG KafkaOffsetReader: Partitions assigned to consumer: [partitions]. Seeking to [partitionOffsets]

For every partition offset in the input partitionOffsets, fetchSpecificOffsets requests the Kafka Consumer to:

  • seekToEnd for the latest (aka -1)

  • seekToBeginning for the earliest (aka -2)

  • seek for other offsets

In the end, fetchSpecificOffsets creates a collection of Kafka’s TopicPartition and position (using the Kafka Consumer).

Note
fetchSpecificOffsets is used when KafkaSource fetches and verifies initial partition offsets.

Creating Kafka Consumer — createConsumer Internal Method

createConsumer(): Consumer[Array[Byte], Array[Byte]]
Note
createConsumer is used when KafkaOffsetReader is created (and initializes consumer) and resetConsumer

Creating Kafka Consumer (Unless Already Available) — consumer Method

consumer: Consumer[Array[Byte], Array[Byte]]

consumer gives the cached Kafka Consumer or creates one itself.

Note
Since consumer method is used (to access the internal Kafka Consumer) in the fetch methods that gives the property of creating a new Kafka Consumer whenever the internal Kafka Consumer reference become null, i.e. as in resetConsumer.

consumer…​FIXME

Note
consumer is used when KafkaOffsetReader is requested to fetchTopicPartitions, fetchSpecificOffsets, fetchEarliestOffsets, and fetchLatestOffsets.

Closing — close Method

close(): Unit

close stop the Kafka Consumer (if the Kafka Consumer is available).

close requests the ExecutorService to shut down.

Note

close is used when:

runUninterruptibly Internal Method

runUninterruptibly[T](body: => T): T

runUninterruptibly…​FIXME

Note
runUninterruptibly is used when…​FIXME

stopConsumer Internal Method

stopConsumer(): Unit

stopConsumer…​FIXME

Note
stopConsumer is used when…​FIXME

Textual Representation — toString Method

toString: String
Note
toString is part of the java.lang.Object contract for the string representation of the object.

toString…​FIXME

Internal Properties

Name Description

_consumer

Kafka’s Consumer (Consumer[Array[Byte], Array[Byte]])

Initialized when KafkaOffsetReader is created.

Used when KafkaOffsetReader:

execContext

groupId

kafkaReaderThread

maxOffsetFetchAttempts

nextId

Initially 0

offsetFetchAttemptIntervalMs

results matching ""

    No results matching ""