EpochCoordinator RPC Endpoint — Coordinating Epochs and Offsets Across Partition Tasks

EpochCoordinator is a ThreadSafeRpcEndpoint that tracks offsets and epochs (coordinates epochs) by handling messages (in fire-and-forget one-way and request-response two-way modes) from…​FIXME

EpochCoordinator is created (using create factory method) when ContinuousExecution is requested to run a streaming query in continuous mode.

Table 1. EpochCoordinator RPC Endpoint’s Messages
Message Description

CommitPartitionEpoch

  • Partition ID

  • Epoch

  • DataSource API V2’s WriterCommitMessage

Sent out (in one-way asynchronous mode) exclusively when ContinuousWriteRDD is requested to compute a partition (after all rows were written down to a streaming sink)

GetCurrentEpoch

Sent out (in request-response synchronous mode) exclusively when EpochMarkerGenerator thread is requested to run

IncrementAndGetEpoch

Sent out (in request-response synchronous mode) exclusively when ContinuousExecution is requested to run a streaming query in continuous mode (and start a separate epoch update thread)

ReportPartitionOffset

Sent out (in one-way asynchronous mode) exclusively when ContinuousQueuedDataReader is requested for the next row to be read in the current epoch, and the epoch is done

SetReaderPartitions

  • Number of partitions

Sent out (in request-response synchronous mode) exclusively when DataSourceV2ScanExec leaf physical operator is requested for the input RDDs (for a ContinuousReader and is about to create a ContinuousDataSourceRDD)

The number of partitions is exactly the number of InputPartitions from the ContinuousReader.

SetWriterPartitions

  • Number of partitions

Sent out (in request-response synchronous mode) exclusively when WriteToContinuousDataSourceExec leaf physical operator is requested to execute and generate a recipe for a distributed computation (as an RDD[InternalRow]) (and requests a ContinuousWriteRDD to collect that simply never finishes…​and that’s the trick of continuous mode)

StopContinuousExecutionWrites

Sent out (in request-response synchronous mode) exclusively when ContinuousExecution is requested to run a streaming query in continuous mode (and it finishes successfully or not)

Tip

Enable ALL logging level for org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef* logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef*=ALL

Refer to Logging.

Receiving Messages (Fire-And-Forget One-Way Mode) — receive Method

receive: PartialFunction[Any, Unit]
Note
receive is part of the RpcEndpoint Contract in Apache Spark to receive messages in fire-and-forget one-way mode.

receive handles the following messages:

With the queryWritesStopped turned on, receive simply swallows messages and does nothing.

Receiving Messages (Request-Response Two-Way Mode) — receiveAndReply Method

receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]
Note
receiveAndReply is part of the RpcEndpoint Contract in Apache Spark to receive and reply to messages in request-response two-way mode.

receiveAndReply handles the following messages:

resolveCommitsAtEpoch Internal Method

resolveCommitsAtEpoch(epoch: Long): Unit

resolveCommitsAtEpoch…​FIXME

Note
resolveCommitsAtEpoch is used exclusively when EpochCoordinator is requested to handle CommitPartitionEpoch and ReportPartitionOffset messages.

commitEpoch Internal Method

commitEpoch(
  epoch: Long,
  messages: Iterable[WriterCommitMessage]): Unit

commitEpoch…​FIXME

Note
commitEpoch is used exclusively when EpochCoordinator is requested to resolveCommitsAtEpoch.

Creating EpochCoordinator Instance

EpochCoordinator takes the following to be created:

EpochCoordinator initializes the internal properties.

Registering EpochCoordinator RPC Endpoint — create Factory Method

create(
  writer: StreamWriter,
  reader: ContinuousReader,
  query: ContinuousExecution,
  epochCoordinatorId: String,
  startEpoch: Long,
  session: SparkSession,
  env: SparkEnv): RpcEndpointRef

create simply creates a new EpochCoordinator and requests the RpcEnv to register a RPC endpoint as EpochCoordinator-[id] (where id is the given epochCoordinatorId).

create prints out the following INFO message to the logs:

Registered EpochCoordinator endpoint
Note
create is used exclusively when ContinuousExecution is requested to run a streaming query in continuous mode.

Internal Properties

Name Description

queryWritesStopped

Flag that indicates whether to drop messages (true) or not (false) when requested to handle one synchronously

Default: false

Turned on (true) when requested to handle a synchronous StopContinuousExecutionWrites message

results matching ""

    No results matching ""