DataWritingSparkTask Partition Processing Function

DataWritingSparkTask is the partition processing function that WriteToDataSourceV2Exec physical operator uses to schedule a Spark job when requested to execute.

Note
The DataWritingSparkTask partition processing function is executed on executors.
Tip

Enable INFO or ERROR logging levels for org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask=INFO

Refer to Logging.

Running Partition Processing Function — run Method

run(
  writeTask: DataWriterFactory[InternalRow],
  context: TaskContext,
  iter: Iterator[InternalRow],
  useCommitCoordinator: Boolean): WriterCommitMessage

run requests the given TaskContext for the IDs of the stage, the stage attempt, the partition, the task attempt, and how many times the task may have been attempted (default 0).

run also requests the given TaskContext for the epoch ID (that is streaming.sql.batchId local property) or defaults to 0.

run requests the given DataWriterFactory to create a DataWriter (with the partition, task and epoch IDs).

For every row in the partition (in the given Iterator[InternalRow]), run requests the DataWriter to write the row.

Once all the rows have been written successfully, run requests the DataWriter to commit the write task (with or without requesting the OutputCommitCoordinator for authorization) that gives the final WriterCommitMessage.

In the end, run prints out the following INFO message to the logs:

Committed partition [partId] (task [taskId], attempt [attemptId]stage [stageId].[stageAttempt])

In case of any errors, run prints out the following ERROR message to the logs:

Aborting commit for partition [partId] (task [taskId], attempt [attemptId]stage [stageId].[stageAttempt])

run then requests the DataWriter to abort the write task.

In the end, run prints out the following ERROR message to the logs:

Aborted commit for partition [partId] (task [taskId], attempt [attemptId]stage [stageId].[stageAttempt])
Note
run is used exclusively when WriteToDataSourceV2Exec physical operator is requested to execute (and schedules a Spark job).

useCommitCoordinator Flag Enabled

With the given useCommitCoordinator flag enabled (the default for most DataSourceWriters), run requests the SparkEnv for the OutputCommitCoordinator that is then requested whether to commit the write task output or not (canCommit).

If authorized, run prints out the following INFO message to the logs:

Commit authorized for partition [partId] (task [taskId], attempt [attemptId]stage [stageId].[stageAttempt])

In the end, run requests the DataWriter to commit the write task.


If not authorized, run prints out the following INFO message to the logs and throws a CommitDeniedException.

Commit denied for partition [partId] (task [taskId], attempt [attemptId]stage [stageId].[stageAttempt])

useCommitCoordinator Flag Disabled

With the given useCommitCoordinator flag disabled, run prints out the following INFO message to the logs:

Writer for partition [partId] is committing.

In the end, run requests the DataWriter to commit the write task.

results matching ""

    No results matching ""