SQLExecution Helper Object

SQLExecution defines spark.sql.execution.id Spark property that is used to track multiple Spark jobs that should all together constitute a single structured query execution (that could be easily reported as a single execution unit).

import org.apache.spark.sql.execution.SQLExecution
scala> println(SQLExecution.EXECUTION_ID_KEY)
spark.sql.execution.id

Actions of a structured query are executed using SQLExecution.withNewExecutionId static method that sets spark.sql.execution.id as Spark Core’s local property and "stitches" different Spark jobs as parts of one structured query action (that you can then see in web UI’s SQL tab).

Tip

Use SparkListener to listen to SparkListenerSQLExecutionStart events and know the execution ids of structured queries that have been executed in a Spark SQL application.

// "SQLAppStatusListener" idea is borrowed from
// Spark SQL's org.apache.spark.sql.execution.ui.SQLAppStatusListener
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.execution.ui.{SparkListenerDriverAccumUpdates, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
public class SQLAppStatusListener extends SparkListener {
  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    case _ => // Ignore
  }
  def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
    // Find the QueryExecution for the Dataset action that triggered the event
    // This is the SQL-specific way
    import org.apache.spark.sql.execution.SQLExecution
    queryExecution = SQLExecution.getQueryExecution(event.executionId)
  }
  def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    // Find the QueryExecution for the Dataset action that triggered the event
    // This is a general Spark Core way using local properties
    import org.apache.spark.sql.execution.SQLExecution
    val executionIdStr = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    // Note that the Spark job may or may not be a part of a structured query
    if (executionIdStr != null) {
      queryExecution = SQLExecution.getQueryExecution(executionIdStr.toLong)
    }
  }
  def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {}
  def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {}
}

val sqlListener = new SQLAppStatusListener()
spark.sparkContext.addSparkListener(sqlListener)
Note
Jobs without spark.sql.execution.id key are not considered to belong to SQL query executions.

SQLExecution keeps track of all execution ids and their QueryExecutions in executionIdToQueryExecution internal registry.

Tip
Use SQLExecution.getQueryExecution to find the QueryExecution for an execution id.

Executing Dataset Action (with Zero or More Spark Jobs) Under New Execution Id — withNewExecutionId Method

withNewExecutionId[T](
  sparkSession: SparkSession,
  queryExecution: QueryExecution)(body: => T): T

withNewExecutionId executes body query action with a new execution id (given as the input executionId or auto-generated) so that all Spark jobs that have been scheduled by the query action could be marked as parts of the same Dataset action execution.

withNewExecutionId allows for collecting all the Spark jobs (even executed on separate threads) together under a single SQL query execution for reporting purposes, e.g. to reporting them as one single structured query in web UI.

Note
If there is another execution id already set, it is replaced for the course of the current action.

In addition, the QueryExecution variant posts SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events (to LiveListenerBus event bus) before and after executing the body action, respectively. It is used to inform SQLListener when a SQL query execution starts and ends.

Note
Nested execution ids are not supported in the QueryExecution variant.
Note

withNewExecutionId is used when:

  • Dataset is requested to Dataset.withNewExecutionId and withAction

  • DataFrameWriter is requested to run a command

  • Spark Structured Streaming’s StreamExecution commits a batch to a streaming sink

  • Spark Thrift Server’s SparkSQLDriver runs a command

Finding QueryExecution for Execution ID — getQueryExecution Method

getQueryExecution(executionId: Long): QueryExecution

getQueryExecution simply gives the QueryExecution for the executionId or null if not found.

Executing Action (with Zero or More Spark Jobs) Tracked Under Given Execution Id — withExecutionId Method

withExecutionId[T](
  sc: SparkContext,
  executionId: String)(body: => T): T

withExecutionId executes the body action as part of executing multiple Spark jobs under executionId execution identifier.

def body = println("Hello World")
scala> SQLExecution.withExecutionId(sc = spark.sparkContext, executionId = "Custom Name")(body)
Hello World
Note

withExecutionId is used when:

checkSQLExecutionId Method

checkSQLExecutionId(sparkSession: SparkSession): Unit

checkSQLExecutionId…​FIXME

Note
checkSQLExecutionId is used exclusively when FileFormatWriter is requested to write the result of a structured query.

withSQLConfPropagated Method

withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T

withSQLConfPropagated…​FIXME

Note

withSQLConfPropagated is used when:

  • SQLExecution is requested to withNewExecutionId and withExecutionId

  • TextInputJsonDataSource is requested to inferFromDataset

  • MultiLineJsonDataSource is requested to infer

results matching ""

    No results matching ""