WriteToContinuousDataSourceExec Unary Physical Operator

WriteToContinuousDataSourceExec is a unary physical operator that creates a ContinuousWriteRDD for continuous write.


A unary physical operator (UnaryExecNode) is a physical operator with a single child physical operator.

Read up on UnaryExecNode (and physical operators in general) in The Internals of Spark SQL book.

WriteToContinuousDataSourceExec is created exclusively when DataSourceV2Strategy execution planning strategy is requested to plan a WriteToContinuousDataSource unary logical operator.

WriteToContinuousDataSourceExec takes the following to be created:

WriteToContinuousDataSourceExec uses empty output schema (which is exactly to say that no output is expected whatsoever).


Enable ALL logging level for org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSourceExec to see what happens inside.

Add the following line to conf/log4j.properties:


Refer to Logging.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
doExecute is part of SparkPlan Contract to generate the runtime representation of an physical operator as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

doExecute requests the StreamWriter to create a DataWriterFactory.

doExecute then requests the child physical operator to execute (that gives a RDD[InternalRow]) and uses the RDD[InternalRow] and the DataWriterFactory to create a ContinuousWriteRDD.

doExecute prints out the following INFO message to the logs:

Start processing data source writer: [writer]. The input RDD has [partitions] partitions.

doExecute requests the EpochCoordinatorRef helper for a remote reference to the EpochCoordinator RPC endpoint (using the __epoch_coordinator_id local property).

The EpochCoordinator RPC endpoint runs on the driver as the single point to coordinate epochs across partition tasks.

doExecute requests the EpochCoordinator RPC endpoint reference to send out a SetWriterPartitions message synchronously.

In the end, doExecute requests the ContinuousWriteRDD to collect (which simply runs a Spark job on all partitions in an RDD and returns the results in an array).

Requesting the ContinuousWriteRDD to collect is how a Spark job is ran that in turn runs tasks (one per partition) that are described by the ContinuousWriteRDD.compute method. Since executing collect is meant to run a Spark job (with tasks on executors), it’s in the discretion of the tasks themselves to decide when to finish (so if they want to run indefinitely, so be it). What a clever trick!

results matching ""

    No results matching ""