log4j.logger.org.apache.spark.sql.kafka010.KafkaOffsetReader=ALL
KafkaOffsetReader
KafkaOffsetReader relies on the ConsumerStrategy to create a Kafka Consumer.
KafkaOffsetReader creates a Kafka Consumer with group.id (ConsumerConfig.GROUP_ID_CONFIG) configuration explicitly set to nextGroupId (i.e. the given driverGroupIdPrefix followed by nextId).
KafkaOffsetReader is created when:
-
KafkaRelationis requested to build a distributed data scan with column pruning -
KafkaSourceProvideris requested to create a KafkaSource, createMicroBatchReader, and createContinuousReader
| Name | Description |
|---|---|
|
Default: |
|
KafkaOffsetReader defines the predefined fixed schema.
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating KafkaOffsetReader Instance
KafkaOffsetReader takes the following to be created:
-
Kafka parameters (as name-value pairs that are used exclusively to create a Kafka consumer
KafkaOffsetReader initializes the internal properties.
nextGroupId Internal Method
nextGroupId(): String
nextGroupId sets the groupId to be the driverGroupIdPrefix, - followed by the nextId (i.e. [driverGroupIdPrefix]-[nextId]).
|
Note
|
nextGroupId is used exclusively when KafkaOffsetReader is requested for a Kafka Consumer.
|
resetConsumer Internal Method
resetConsumer(): Unit
resetConsumer…FIXME
|
Note
|
resetConsumer is used when…FIXME
|
fetchTopicPartitions Method
fetchTopicPartitions(): Set[TopicPartition]
|
Caution
|
FIXME |
|
Note
|
fetchTopicPartitions is used when KafkaRelation getPartitionOffsets.
|
Fetching Earliest Offsets — fetchEarliestOffsets Method
fetchEarliestOffsets(): Map[TopicPartition, Long]
fetchEarliestOffsets(newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
|
Caution
|
FIXME |
|
Note
|
fetchEarliestOffsets is used when KafkaSource rateLimit and generates a DataFrame for a batch (when new partitions have been assigned).
|
Fetching Latest Offsets — fetchLatestOffsets Method
fetchLatestOffsets(): Map[TopicPartition, Long]
|
Caution
|
FIXME |
|
Note
|
fetchLatestOffsets is used when KafkaSource gets offsets or initialPartitionOffsets is initialized.
|
withRetriesWithoutInterrupt Internal Method
withRetriesWithoutInterrupt(
body: => Map[TopicPartition, Long]): Map[TopicPartition, Long]
withRetriesWithoutInterrupt…FIXME
|
Note
|
withRetriesWithoutInterrupt is used when…FIXME
|
Fetching Offsets for Selected TopicPartitions — fetchSpecificOffsets Method
fetchSpecificOffsets(
partitionOffsets: Map[TopicPartition, Long],
reportDataLoss: String => Unit): KafkaSourceOffset
fetchSpecificOffsets requests the Kafka Consumer to poll(0).
fetchSpecificOffsets requests the Kafka Consumer for assigned partitions (using Consumer.assignment()).
fetchSpecificOffsets requests the Kafka Consumer to pause(partitions).
You should see the following DEBUG message in the logs:
DEBUG KafkaOffsetReader: Partitions assigned to consumer: [partitions]. Seeking to [partitionOffsets]
For every partition offset in the input partitionOffsets, fetchSpecificOffsets requests the Kafka Consumer to:
-
seekToEndfor the latest (aka-1) -
seekToBeginningfor the earliest (aka-2) -
seekfor other offsets
In the end, fetchSpecificOffsets creates a collection of Kafka’s TopicPartition and position (using the Kafka Consumer).
|
Note
|
fetchSpecificOffsets is used when KafkaSource fetches and verifies initial partition offsets.
|
Creating Kafka Consumer — createConsumer Internal Method
createConsumer(): Consumer[Array[Byte], Array[Byte]]
createConsumer requests ConsumerStrategy to create a Kafka Consumer with driverKafkaParams and new generated group.id Kafka property.
|
Note
|
createConsumer is used when KafkaOffsetReader is created (and initializes consumer) and resetConsumer
|
Creating Kafka Consumer (Unless Already Available) — consumer Method
consumer: Consumer[Array[Byte], Array[Byte]]
consumer gives the cached Kafka Consumer or creates one itself.
|
Note
|
Since consumer method is used (to access the internal Kafka Consumer) in the fetch methods that gives the property of creating a new Kafka Consumer whenever the internal Kafka Consumer reference become null, i.e. as in resetConsumer.
|
consumer…FIXME
|
Note
|
consumer is used when KafkaOffsetReader is requested to fetchTopicPartitions, fetchSpecificOffsets, fetchEarliestOffsets, and fetchLatestOffsets.
|
Closing — close Method
close(): Unit
close stop the Kafka Consumer (if the Kafka Consumer is available).
close requests the ExecutorService to shut down.
|
Note
|
|
runUninterruptibly Internal Method
runUninterruptibly[T](body: => T): T
runUninterruptibly…FIXME
|
Note
|
runUninterruptibly is used when…FIXME
|
stopConsumer Internal Method
stopConsumer(): Unit
stopConsumer…FIXME
|
Note
|
stopConsumer is used when…FIXME
|
Textual Representation — toString Method
toString: String
|
Note
|
toString is part of the java.lang.Object contract for the string representation of the object.
|
toString…FIXME
Internal Properties
| Name | Description |
|---|---|
|
Kafka’s Consumer ( Initialized when Used when |
|
|
|
|
|
|
|
|
|
Initially |
|