log4j.logger.org.apache.spark.sql.kafka010.KafkaMicroBatchReader=ALL
KafkaMicroBatchReader
KafkaMicroBatchReader is the MicroBatchReader for kafka data source for Micro-Batch Stream Processing.
KafkaMicroBatchReader is created exclusively when KafkaSourceProvider is requested to create a MicroBatchReader.
KafkaMicroBatchReader uses the DataSourceOptions to access the kafkaConsumer.pollTimeoutMs option (default: spark.network.timeout or 120s).
KafkaMicroBatchReader uses the DataSourceOptions to access the maxOffsetsPerTrigger option (default: (undefined)).
KafkaMicroBatchReader uses the Kafka properties for executors to create KafkaMicroBatchInputPartitions when requested to planInputPartitions.
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating KafkaMicroBatchReader Instance
KafkaMicroBatchReader takes the following to be created:
-
Desired starting KafkaOffsetRangeLimit
-
failOnDataLoss option
KafkaMicroBatchReader initializes the internal registries and counters.
readSchema Method
readSchema(): StructType
|
Note
|
readSchema is part of the DataSourceReader contract to…FIXME.
|
readSchema simply returns the predefined fixed schema.
Stopping Streaming Reader — stop Method
stop(): Unit
|
Note
|
stop is part of the BaseStreamingSource Contract to stop a streaming reader.
|
stop simply requests the KafkaOffsetReader to close.
Plan Input Partitions — planInputPartitions Method
planInputPartitions(): java.util.List[InputPartition[InternalRow]]
|
Note
|
planInputPartitions is part of the DataSourceReader contract in Spark SQL for the number of InputPartitions to use as RDD partitions (when DataSourceV2ScanExec physical operator is requested for the partitions of the input RDD).
|
planInputPartitions first finds the new partitions (TopicPartitions that are in the endPartitionOffsets but not in the startPartitionOffsets) and requests the KafkaOffsetReader to
fetch their earliest offsets.
planInputPartitions prints out the following INFO message to the logs:
Partitions added: [newPartitionInitialOffsets]
planInputPartitions then prints out the following DEBUG message to the logs:
TopicPartitions: [comma-separated list of TopicPartitions]
planInputPartitions requests the KafkaOffsetRangeCalculator for offset ranges (given the startPartitionOffsets and the newly-calculated newPartitionInitialOffsets as the fromOffsets, the endPartitionOffsets as the untilOffsets, and the available executors (sorted in descending order)).
In the end, planInputPartitions creates a KafkaMicroBatchInputPartition for every offset range (with the Kafka properties for executors, the pollTimeoutMs, the failOnDataLoss flag and whether to reuse a Kafka consumer among Spark tasks).
|
Note
|
KafkaMicroBatchInputPartition uses a shared Kafka consumer only when all the offset ranges have distinct TopicPartitions, so concurrent tasks (of a stage in a Spark job) will not interfere and read the same TopicPartitions.
|
planInputPartitions reports data loss when…FIXME
Available Executors in Spark Cluster (Sorted By Host and Executor ID in Descending Order) — getSortedExecutorList Internal Method
getSortedExecutorList(): Array[String]
getSortedExecutorList requests the BlockManager to request the BlockManagerMaster to get the peers (the other nodes in a Spark cluster), creates a ExecutorCacheTaskLocation for every pair of host and executor ID, and in the end, sort it in descending order.
|
Note
|
getSortedExecutorList is used exclusively when KafkaMicroBatchReader is requested to planInputPartitions (and calculates offset ranges).
|
getOrCreateInitialPartitionOffsets Internal Method
getOrCreateInitialPartitionOffsets(): PartitionOffsetMap
getOrCreateInitialPartitionOffsets…FIXME
|
Note
|
getOrCreateInitialPartitionOffsets is used exclusively for the initialPartitionOffsets internal registry.
|
getStartOffset Method
getStartOffset: Offset
|
Note
|
getStartOffset is part of the MicroBatchReader Contract to get the start (beginning) offsets.
|
getStartOffset…FIXME
getEndOffset Method
getEndOffset: Offset
|
Note
|
getEndOffset is part of the MicroBatchReader Contract to get the end offsets.
|
getEndOffset…FIXME
deserializeOffset Method
deserializeOffset(json: String): Offset
|
Note
|
deserializeOffset is part of the MicroBatchReader Contract to deserialize an offset (from JSON format).
|
deserializeOffset…FIXME
Internal Properties
| Name | Description |
|---|---|
|
|
|
|
|
KafkaOffsetRangeCalculator (for the given DataSourceOptions) Used exclusively when |
|