KafkaMicroBatchReader is the MicroBatchReader for kafka data source for Micro-Batch Stream Processing.

KafkaMicroBatchReader is created exclusively when KafkaSourceProvider is requested to create a MicroBatchReader.

KafkaMicroBatchReader uses the DataSourceOptions to access the kafkaConsumer.pollTimeoutMs option (default: spark.network.timeout or 120s).

KafkaMicroBatchReader uses the DataSourceOptions to access the maxOffsetsPerTrigger option (default: (undefined)).

KafkaMicroBatchReader uses the Kafka properties for executors to create KafkaMicroBatchInputPartitions when requested to planInputPartitions.


Enable ALL logging level for org.apache.spark.sql.kafka010.KafkaMicroBatchReader to see what happens inside.

Add the following line to conf/log4j.properties:


Refer to Logging.

Creating KafkaMicroBatchReader Instance

KafkaMicroBatchReader takes the following to be created:

KafkaMicroBatchReader initializes the internal registries and counters.

readSchema Method

readSchema(): StructType
readSchema is part of the DataSourceReader contract to…​FIXME.

readSchema simply returns the predefined fixed schema.

Stopping Streaming Reader — stop Method

stop(): Unit
stop is part of the BaseStreamingSource Contract to stop a streaming reader.

stop simply requests the KafkaOffsetReader to close.

Plan Input Partitions — planInputPartitions Method

planInputPartitions(): java.util.List[InputPartition[InternalRow]]
planInputPartitions is part of the DataSourceReader contract in Spark SQL for the number of InputPartitions to use as RDD partitions (when DataSourceV2ScanExec physical operator is requested for the partitions of the input RDD).

planInputPartitions first finds the new partitions (TopicPartitions that are in the endPartitionOffsets but not in the startPartitionOffsets) and requests the KafkaOffsetReader to fetch their earliest offsets.

planInputPartitions prints out the following INFO message to the logs:

Partitions added: [newPartitionInitialOffsets]

planInputPartitions then prints out the following DEBUG message to the logs:

TopicPartitions: [comma-separated list of TopicPartitions]

planInputPartitions requests the KafkaOffsetRangeCalculator for offset ranges (given the startPartitionOffsets and the newly-calculated newPartitionInitialOffsets as the fromOffsets, the endPartitionOffsets as the untilOffsets, and the available executors (sorted in descending order)).

In the end, planInputPartitions creates a KafkaMicroBatchInputPartition for every offset range (with the Kafka properties for executors, the pollTimeoutMs, the failOnDataLoss flag and whether to reuse a Kafka consumer among Spark tasks).

KafkaMicroBatchInputPartition uses a shared Kafka consumer only when all the offset ranges have distinct TopicPartitions, so concurrent tasks (of a stage in a Spark job) will not interfere and read the same TopicPartitions.

planInputPartitions reports data loss when…​FIXME

Available Executors in Spark Cluster (Sorted By Host and Executor ID in Descending Order) — getSortedExecutorList Internal Method

getSortedExecutorList(): Array[String]

getSortedExecutorList requests the BlockManager to request the BlockManagerMaster to get the peers (the other nodes in a Spark cluster), creates a ExecutorCacheTaskLocation for every pair of host and executor ID, and in the end, sort it in descending order.

getSortedExecutorList is used exclusively when KafkaMicroBatchReader is requested to planInputPartitions (and calculates offset ranges).

getOrCreateInitialPartitionOffsets Internal Method

getOrCreateInitialPartitionOffsets(): PartitionOffsetMap


getOrCreateInitialPartitionOffsets is used exclusively for the initialPartitionOffsets internal registry.

getStartOffset Method

getStartOffset: Offset
getStartOffset is part of the MicroBatchReader Contract to get the start (beginning) offsets.


getEndOffset Method

getEndOffset: Offset
getEndOffset is part of the MicroBatchReader Contract to get the end offsets.


deserializeOffset Method

deserializeOffset(json: String): Offset
deserializeOffset is part of the MicroBatchReader Contract to deserialize an offset (from JSON format).


Internal Properties

Name Description


Ending offsets for the assigned partitions (Map[TopicPartition, Long])

Used when…​FIXME


initialPartitionOffsets: Map[TopicPartition, Long]


Used exclusively when KafkaMicroBatchReader is requested to planInputPartitions (to calculate offset ranges)


Starting offsets for the assigned partitions (Map[TopicPartition, Long])

Used when…​FIXME

results matching ""

    No results matching ""