log4j.logger.org.apache.spark.sql.kafka010.KafkaMicroBatchReader=ALL
KafkaMicroBatchReader
KafkaMicroBatchReader
is the MicroBatchReader for kafka data source for Micro-Batch Stream Processing.
KafkaMicroBatchReader
is created exclusively when KafkaSourceProvider
is requested to create a MicroBatchReader.
KafkaMicroBatchReader
uses the DataSourceOptions to access the kafkaConsumer.pollTimeoutMs option (default: spark.network.timeout
or 120s
).
KafkaMicroBatchReader
uses the DataSourceOptions to access the maxOffsetsPerTrigger option (default: (undefined)
).
KafkaMicroBatchReader
uses the Kafka properties for executors to create KafkaMicroBatchInputPartitions when requested to planInputPartitions.
Tip
|
Enable Add the following line to Refer to Logging. |
Creating KafkaMicroBatchReader Instance
KafkaMicroBatchReader
takes the following to be created:
-
Desired starting KafkaOffsetRangeLimit
-
failOnDataLoss option
KafkaMicroBatchReader
initializes the internal registries and counters.
readSchema
Method
readSchema(): StructType
Note
|
readSchema is part of the DataSourceReader contract to…FIXME.
|
readSchema
simply returns the predefined fixed schema.
Stopping Streaming Reader — stop
Method
stop(): Unit
Note
|
stop is part of the BaseStreamingSource Contract to stop a streaming reader.
|
stop
simply requests the KafkaOffsetReader to close.
Plan Input Partitions — planInputPartitions
Method
planInputPartitions(): java.util.List[InputPartition[InternalRow]]
Note
|
planInputPartitions is part of the DataSourceReader contract in Spark SQL for the number of InputPartitions to use as RDD partitions (when DataSourceV2ScanExec physical operator is requested for the partitions of the input RDD).
|
planInputPartitions
first finds the new partitions (TopicPartitions
that are in the endPartitionOffsets but not in the startPartitionOffsets) and requests the KafkaOffsetReader to
fetch their earliest offsets.
planInputPartitions
prints out the following INFO message to the logs:
Partitions added: [newPartitionInitialOffsets]
planInputPartitions
then prints out the following DEBUG message to the logs:
TopicPartitions: [comma-separated list of TopicPartitions]
planInputPartitions
requests the KafkaOffsetRangeCalculator for offset ranges (given the startPartitionOffsets and the newly-calculated newPartitionInitialOffsets
as the fromOffsets
, the endPartitionOffsets as the untilOffsets
, and the available executors (sorted in descending order)).
In the end, planInputPartitions
creates a KafkaMicroBatchInputPartition for every offset range (with the Kafka properties for executors, the pollTimeoutMs, the failOnDataLoss flag and whether to reuse a Kafka consumer among Spark tasks).
Note
|
KafkaMicroBatchInputPartition uses a shared Kafka consumer only when all the offset ranges have distinct TopicPartitions , so concurrent tasks (of a stage in a Spark job) will not interfere and read the same TopicPartitions .
|
planInputPartitions
reports data loss when…FIXME
Available Executors in Spark Cluster (Sorted By Host and Executor ID in Descending Order) — getSortedExecutorList
Internal Method
getSortedExecutorList(): Array[String]
getSortedExecutorList
requests the BlockManager
to request the BlockManagerMaster
to get the peers (the other nodes in a Spark cluster), creates a ExecutorCacheTaskLocation
for every pair of host and executor ID, and in the end, sort it in descending order.
Note
|
getSortedExecutorList is used exclusively when KafkaMicroBatchReader is requested to planInputPartitions (and calculates offset ranges).
|
getOrCreateInitialPartitionOffsets
Internal Method
getOrCreateInitialPartitionOffsets(): PartitionOffsetMap
getOrCreateInitialPartitionOffsets
…FIXME
Note
|
getOrCreateInitialPartitionOffsets is used exclusively for the initialPartitionOffsets internal registry.
|
getStartOffset
Method
getStartOffset: Offset
Note
|
getStartOffset is part of the MicroBatchReader Contract to get the start (beginning) offsets.
|
getStartOffset
…FIXME
getEndOffset
Method
getEndOffset: Offset
Note
|
getEndOffset is part of the MicroBatchReader Contract to get the end offsets.
|
getEndOffset
…FIXME
deserializeOffset
Method
deserializeOffset(json: String): Offset
Note
|
deserializeOffset is part of the MicroBatchReader Contract to deserialize an offset (from JSON format).
|
deserializeOffset
…FIXME
Internal Properties
Name | Description |
---|---|
|
|
|
|
|
KafkaOffsetRangeCalculator (for the given DataSourceOptions) Used exclusively when |
|