KafkaMicroBatchReader

KafkaMicroBatchReader is created exclusively when KafkaSourceProvider is requested to create a MicroBatchReader.

KafkaMicroBatchReader uses the DataSourceOptions to access the kafkaConsumer.pollTimeoutMs option (default: spark.network.timeout or 120s).

KafkaMicroBatchReader uses the DataSourceOptions to access the maxOffsetsPerTrigger option (default: (undefined)).

KafkaMicroBatchReader uses the Kafka properties for executors to create KafkaMicroBatchInputPartitions when requested to planInputPartitions.

Table 1. KafkaMicroBatchReader’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

endPartitionOffsets

Ending offsets for the assigned partitions (Map[TopicPartition, Long])

Used when…​FIXME

rangeCalculator

Used exclusively when KafkaMicroBatchReader is requested to planInputPartitions (to calculate offset ranges)

startPartitionOffsets

Starting offsets for the assigned partitions (Map[TopicPartition, Long])

Used when…​FIXME

Tip

Enable WARN, INFO or DEBUG logging levels for org.apache.spark.sql.kafka010.KafkaMicroBatchReader to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaMicroBatchReader=DEBUG

Refer to Logging.

Creating KafkaMicroBatchReader Instance

KafkaMicroBatchReader takes the following to be created:

KafkaMicroBatchReader initializes the internal registries and counters.

readSchema Method

readSchema(): StructType
Note
readSchema is part of the DataSourceReader contract to…​FIXME.

readSchema simply returns the predefined fixed schema.

Stopping Streaming Reader — stop Method

stop(): Unit
Note
stop is part of the BaseStreamingSource Contract to stop a streaming reader.

stop simply requests the KafkaOffsetReader to close.

planInputPartitions Method

planInputPartitions(): java.util.List[InputPartition[InternalRow]]
Note
planInputPartitions is part of the DataSourceReader contract to…​FIXME.

planInputPartitions first finds the new partitions (TopicPartitions that are in the endPartitionOffsets but not in the startPartitionOffsets) and requests the KafkaOffsetReader to fetch their earliest offsets.

planInputPartitions prints out the following INFO message to the logs:

Partitions added: [newPartitionInitialOffsets]

planInputPartitions then prints out the following DEBUG message to the logs:

TopicPartitions: [comma-separated list of TopicPartitions]

planInputPartitions requests the KafkaOffsetRangeCalculator for offset ranges (given the startPartitionOffsets and the newly-calculated newPartitionInitialOffsets as the fromOffsets, the endPartitionOffsets as the untilOffsets, and the available executors (sorted in descending order)).

In the end, planInputPartitions creates a KafkaMicroBatchInputPartition for every offset range (with the Kafka properties for executors, the pollTimeoutMs, the failOnDataLoss flag and whether to reuse a Kafka consumer among Spark tasks).

Note
A KafkaMicroBatchInputPartition uses a shared Kafka consumer only when all the offset ranges have distinct TopicPartitions, so concurrent tasks (of a stage in a Spark job) will not interfere and read the same TopicPartitions.

planInputPartitions reports data loss when…​FIXME

Available Executors in Spark Cluster (Sorted By Host and Executor ID in Descending Order) — getSortedExecutorList Internal Method

getSortedExecutorList(): Array[String]

getSortedExecutorList requests the BlockManager to request the BlockManagerMaster to get the peers (the other nodes in a Spark cluster), creates a ExecutorCacheTaskLocation for every pair of host and executor ID, and in the end, sort it in descending order.

Note
getSortedExecutorList is used exclusively when KafkaMicroBatchReader is requested to planInputPartitions (and calculates offset ranges).

results matching ""

    No results matching ""