SparkSession — The Entry Point to Spark SQL

SparkSession is the entry point to Spark SQL. It is one of the very first objects you create while developing a Spark SQL application.

As a Spark developer, you create a SparkSession using the SparkSession.builder method (that gives you access to Builder API that you use to configure the session).

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
  .appName("My Spark Application")  // optional and will be autogenerated if not specified
  .master("local[*]")               // only for demo and testing purposes, use spark-submit instead
  .enableHiveSupport()              // self-explanatory, isn't it?
  .config("spark.sql.warehouse.dir", "target/spark-warehouse")
  .withExtensions { extensions =>
    extensions.injectResolutionRule { session =>
      ...
    }
    extensions.injectOptimizerRule { session =>
      ...
    }
  }
  .getOrCreate

Once created, SparkSession allows for creating a DataFrame (based on an RDD or a Scala Seq), creating a Dataset, accessing the Spark SQL services (e.g. ExperimentalMethods, ExecutionListenerManager, UDFRegistration), executing a SQL query, loading a table and the last but not least accessing DataFrameReader interface to load a dataset of the format of your choice (to some extent).

Note

spark object in spark-shell (the instance of SparkSession that is auto-created) has Hive support enabled.

In order to disable the pre-configured Hive support in the spark object, use spark.sql.catalogImplementation internal configuration property with in-memory value (that uses InMemoryCatalog external catalog instead).

$ spark-shell --conf spark.sql.catalogImplementation=in-memory

You can have as many SparkSessions as you want in a single Spark application. The common use case is to keep relational entities separate logically in catalogs per SparkSession.

In the end, you stop a SparkSession using SparkSession.stop method.

spark.stop
Table 1. SparkSession API (Object and Instance Methods)
Method Description

active

active: SparkSession

(New in 2.4.0)

builder

builder(): Builder

Object method to create a Builder to get the current SparkSession instance or create a new one.

catalog

catalog: Catalog

Access to the current metadata catalog of relational entities, e.g. database(s), tables, functions, table columns, and temporary views.

clearActiveSession

clearActiveSession(): Unit

Object method

clearDefaultSession

clearDefaultSession(): Unit

Object method

close

close(): Unit

conf

conf: RuntimeConfig

Access to the current runtime configuration

createDataFrame

createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame

createDataset

createDataset[T : Encoder](data: RDD[T]): Dataset[T]
createDataset[T : Encoder](data: Seq[T]): Dataset[T]

emptyDataFrame

emptyDataFrame: DataFrame

emptyDataset

emptyDataset[T: Encoder]: Dataset[T]

experimental

experimental: ExperimentalMethods

Access to the current ExperimentalMethods

getActiveSession

getActiveSession: Option[SparkSession]

Object method

getDefaultSession

getDefaultSession: Option[SparkSession]

Object method

implicits

import spark.implicits._

listenerManager

listenerManager: ExecutionListenerManager

Access to the current ExecutionListenerManager

newSession

newSession(): SparkSession

Creates a new SparkSession

range

range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]

Creates a Dataset[java.lang.Long]

read

read: DataFrameReader

Access to the current DataFrameReader to load data from external data sources

sessionState

sessionState: SessionState

Access to the current SessionState

Internally, sessionState clones the optional parent SessionState (if given when creating the SparkSession) or creates a new SessionState using BaseSessionStateBuilder as defined by spark.sql.catalogImplementation configuration property:

setActiveSession

setActiveSession(session: SparkSession): Unit

Object method

setDefaultSession

setDefaultSession(session: SparkSession): Unit

Object method

sharedState

sharedState: SharedState

Access to the current SharedState

sparkContext

sparkContext: SparkContext

Access to the underlying SparkContext

sql

sql(sqlText: String): DataFrame

"Executes" a SQL query

sessionState

sessionState: SessionState

sqlContext

sqlContext: SQLContext

Access to the underlying SQLContext

stop

stop(): Unit

Stops the associated SparkContext

table

table(tableName: String): DataFrame

Loads data from a table

time

time[T](f: => T): T

Executes a code block and prints out (to standard output) the time taken to execute it

udf

udf: UDFRegistration

Access to the current UDFRegistration

version

version: String

Returns the version of Apache Spark

Note
baseRelationToDataFrame acts as a mechanism to plug BaseRelation object hierarchy in into LogicalPlan object hierarchy that SparkSession uses to bridge them.

Creating SparkSession Using Builder Pattern — builder Object Method

builder(): Builder

builder creates a new Builder that you use to build a fully-configured SparkSession using a fluent API.

import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
Tip
Read about Fluent interface design pattern in Wikipedia, the free encyclopedia.

Accessing Version of Spark — version Method

version: String

version returns the version of Apache Spark in use.

Internally, version uses spark.SPARK_VERSION value that is the version property in spark-version-info.properties properties file on CLASSPATH.

Creating Empty Dataset (Given Encoder) — emptyDataset Operator

emptyDataset[T: Encoder]: Dataset[T]

emptyDataset creates an empty Dataset (assuming that future records being of type T).

scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]

scala> strings.printSchema
root
 |-- value: string (nullable = true)

emptyDataset creates a LocalRelation logical query plan.

Creating Dataset from Local Collections or RDDs — createDataset Methods

createDataset[T : Encoder](data: RDD[T]): Dataset[T]
createDataset[T : Encoder](data: Seq[T]): Dataset[T]

createDataset creates a Dataset from a local Scala collection, i.e. Seq[T], Java’s List[T], or a distributed RDD[T].

scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> one.show
+-----+
|value|
+-----+
|    1|
+-----+

createDataset creates a LocalRelation (for the input data collection) or LogicalRDD (for the input RDD[T]) logical operators.

Tip

You may want to consider implicits object and toDS method instead.

val spark: SparkSession = ...
import spark.implicits._

scala> val one = Seq(1).toDS
one: org.apache.spark.sql.Dataset[Int] = [value: int]

Internally, createDataset first looks up the implicit expression encoder in scope to access the AttributeReferences (of the schema).

Note
Only unresolved expression encoders are currently supported.

The expression encoder is then used to map elements (of the input Seq[T]) into a collection of InternalRows. With the references and rows, createDataset returns a Dataset with a LocalRelation logical query plan.

Creating Dataset With Single Long Column — range Operator

range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]

range family of methods create a Dataset of Long numbers.

scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
|  0|
|  2|
+---+
Note
The three first variants (that do not specify numPartitions explicitly) use SparkContext.defaultParallelism for the number of partitions numPartitions.

Internally, range creates a new Dataset[Long] with Range logical plan and Encoders.LONG encoder.

Creating Empty DataFrame —  emptyDataFrame method

emptyDataFrame: DataFrame

emptyDataFrame creates an empty DataFrame (with no rows and columns).

It calls createDataFrame with an empty RDD[Row] and an empty schema StructType(Nil).

Creating DataFrames from Local Collections or RDDs — createDataFrame Method

createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
// private[sql]
createDataFrame(rowRDD: RDD[Row], schema: StructType, needsConversion: Boolean): DataFrame

createDataFrame creates a DataFrame using RDD[Row] and the input schema. It is assumed that the rows in rowRDD all match the schema.

Caution
FIXME

Executing SQL Queries (aka SQL Mode) — sql Method

sql(sqlText: String): DataFrame

sql executes the sqlText SQL statement and creates a DataFrame.

Note

sql is imported in spark-shell so you can execute SQL statements as if sql were a part of the environment.

scala> :imports
 1) import spark.implicits._       (72 terms, 43 are implicit)
 2) import spark.sql               (1 terms)
scala> sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]

scala> sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []

// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")

scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata|      false|
+---------+-----------+

Internally, sql requests the current ParserInterface to execute a SQL query that gives a LogicalPlan.

Note
sql uses SessionState to access the current ParserInterface.

sql then creates a DataFrame using the current SparkSession (itself) and the LogicalPlan.

Tip

spark-sql is the main SQL environment in Spark to work with pure SQL statements (where you do not have to use Scala to execute them).

spark-sql> show databases;
default
Time taken: 0.028 seconds, Fetched 1 row(s)

Accessing UDFRegistration — udf Attribute

udf: UDFRegistration

udf attribute gives access to UDFRegistration that allows registering user-defined functions for SQL-based queries.

val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)

val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")

scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
|    a|    A|
|    b|    B|
|    c|    C|
+-----+-----+

Internally, it is simply an alias for SessionState.udfRegistration.

Loading Data From Table — table Method

table(tableName: String): DataFrame (1)
// private[sql]
table(tableIdent: TableIdentifier): DataFrame
  1. Parses tableName to a TableIdentifier and calls the other table

table creates a DataFrame (wrapper) from the input tableName table (but only if available in the session catalog).

scala> spark.catalog.tableExists("t1")
res1: Boolean = true

// t1 exists in the catalog
// let's load it
val t1 = spark.table("t1")

Accessing Metastore — catalog Attribute

catalog: Catalog

catalog attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational entities like databases, tables, functions, table columns, and views).

Tip
All methods in Catalog return Datasets.
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
|              name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default|       null|  MANAGED|      false|
|              strs|    null|       null|TEMPORARY|       true|
+------------------+--------+-----------+---------+-----------+

Internally, catalog creates a CatalogImpl (that uses the current SparkSession).

Accessing DataFrameReader — read method

read: DataFrameReader

read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.

val spark: SparkSession = // create instance
val dfReader: DataFrameReader = spark.read

Getting Runtime Configuration — conf Attribute

conf: RuntimeConfig

conf returns the current RuntimeConfig.

Internally, conf creates a RuntimeConfig (when requested the very first time and cached afterwards) with the SQLConf of the SessionState.

readStream method

readStream: DataStreamReader

readStream returns a new DataStreamReader.

streams Attribute

streams: StreamingQueryManager

streams attribute gives access to StreamingQueryManager (through SessionState).

val spark: SparkSession = ...
spark.streams.active.foreach(println)

experimentalMethods Attribute

experimental: ExperimentalMethods

experimentalMethods is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]s.

Note
experimental is used in SparkPlanner and SparkOptimizer. Hive and Structured Streaming use it for their own extra strategies and optimization rules.

Creating SparkSession Instance — newSession method

newSession(): SparkSession

newSession creates (starts) a new SparkSession (with the current SparkContext and SharedState).

scala> val newSession = spark.newSession
newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a

Stopping SparkSession — stop Method

stop(): Unit

stop stops the SparkSession, i.e. stops the underlying SparkContext.

Create DataFrame for BaseRelation — baseRelationToDataFrame Method

baseRelationToDataFrame(
  baseRelation: BaseRelation): DataFrame

Internally, baseRelationToDataFrame creates a DataFrame from the input BaseRelation wrapped inside LogicalRelation.

Note
LogicalRelation is an logical plan adapter for BaseRelation (so BaseRelation can be part of a logical plan).
Note

baseRelationToDataFrame is used when:

Creating SessionState Instance — instantiateSessionState Internal Method

instantiateSessionState(className: String, sparkSession: SparkSession): SessionState

instantiateSessionState finds the className that is then used to create and build a BaseSessionStateBuilder.

instantiateSessionState may report an IllegalArgumentException while instantiating the class of a SessionState:

Error while instantiating '[className]'
Note
instantiateSessionState is used exclusively when SparkSession is requested for SessionState per spark.sql.catalogImplementation configuration property (and one is not available yet).

sessionStateClassName Internal Method

sessionStateClassName(
  conf: SparkConf): String

sessionStateClassName gives the name of the class of the SessionState per spark.sql.catalogImplementation, i.e.

Note
sessionStateClassName is used exclusively when SparkSession is requested for the SessionState (and one is not available yet).

Creating DataFrame From RDD Of Internal Binary Rows and Schema — internalCreateDataFrame Internal Method

internalCreateDataFrame(
  catalystRows: RDD[InternalRow],
  schema: StructType,
  isStreaming: Boolean = false): DataFrame

internalCreateDataFrame creates a DataFrame with a LogicalRDD.

Note

internalCreateDataFrame is used when:

Creating SparkSession Instance

SparkSession takes the following when created:

clearActiveSession Object Method

clearActiveSession(): Unit

clearActiveSession…​FIXME

clearDefaultSession Object Method

clearDefaultSession(): Unit

clearDefaultSession…​FIXME

Accessing ExperimentalMethods — experimental Method

experimental: ExperimentalMethods

experimental…​FIXME

getActiveSession Object Method

getActiveSession: Option[SparkSession]

getActiveSession…​FIXME

getDefaultSession Object Method

getDefaultSession: Option[SparkSession]

getDefaultSession…​FIXME

Accessing ExecutionListenerManager — listenerManager Method

listenerManager: ExecutionListenerManager

listenerManager…​FIXME

setActiveSession Object Method

setActiveSession(session: SparkSession): Unit

setActiveSession…​FIXME

setDefaultSession Object Method

setDefaultSession(session: SparkSession): Unit

setDefaultSession…​FIXME

Accessing SharedState — sharedState Method

sharedState: SharedState

sharedState…​FIXME

Measuring Duration of Executing Code Block — time Method

time[T](f: => T): T

time…​FIXME

results matching ""

    No results matching ""