import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("My Spark Application") // optional and will be autogenerated if not specified
.master("local[*]") // only for demo and testing purposes, use spark-submit instead
.enableHiveSupport() // self-explanatory, isn't it?
.config("spark.sql.warehouse.dir", "target/spark-warehouse")
.withExtensions { extensions =>
extensions.injectResolutionRule { session =>
...
}
extensions.injectOptimizerRule { session =>
...
}
}
.getOrCreate
SparkSession — The Entry Point to Spark SQL
SparkSession is the entry point to Spark SQL. It is one of the very first objects you create while developing a Spark SQL application.
As a Spark developer, you create a SparkSession using the SparkSession.builder method (that gives you access to Builder API that you use to configure the session).
Once created, SparkSession allows for creating a DataFrame (based on an RDD or a Scala Seq), creating a Dataset, accessing the Spark SQL services (e.g. ExperimentalMethods, ExecutionListenerManager, UDFRegistration), executing a SQL query, loading a table and the last but not least accessing DataFrameReader interface to load a dataset of the format of your choice (to some extent).
You can enable Apache Hive support with support for an external Hive metastore.
|
Note
|
In order to disable the pre-configured Hive support in the
|
You can have as many SparkSessions as you want in a single Spark application. The common use case is to keep relational entities separate logically in catalogs per SparkSession.
In the end, you stop a SparkSession using SparkSession.stop method.
spark.stop
| Method | Description |
|---|---|
|
(New in 2.4.0) |
Object method to create a Builder to get the current |
|
Access to the current metadata catalog of relational entities, e.g. database(s), tables, functions, table columns, and temporary views. |
|
Object method |
|
Object method |
|
|
|
Access to the current runtime configuration |
|
|
|
|
|
|
|
|
|
Access to the current ExperimentalMethods |
|
Object method |
|
Object method |
|
|
|
Access to the current ExecutionListenerManager |
|
Creates a new |
|
Creates a |
|
Access to the current DataFrameReader to load data from external data sources |
|
Access to the current SessionState Internally,
|
|
Object method |
|
Object method |
|
Access to the current SharedState |
|
Access to the underlying |
|
"Executes" a SQL query |
|
|
|
|
Access to the underlying SQLContext |
Stops the associated SparkContext |
|
Loads data from a table |
|
Executes a code block and prints out (to standard output) the time taken to execute it |
|
Access to the current UDFRegistration |
|
Returns the version of Apache Spark |
|
Note
|
baseRelationToDataFrame acts as a mechanism to plug BaseRelation object hierarchy in into LogicalPlan object hierarchy that SparkSession uses to bridge them.
|
Creating SparkSession Using Builder Pattern — builder Object Method
builder(): Builder
builder creates a new Builder that you use to build a fully-configured SparkSession using a fluent API.
import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
|
Tip
|
Read about Fluent interface design pattern in Wikipedia, the free encyclopedia. |
Accessing Version of Spark — version Method
version: String
version returns the version of Apache Spark in use.
Internally, version uses spark.SPARK_VERSION value that is the version property in spark-version-info.properties properties file on CLASSPATH.
Creating Empty Dataset (Given Encoder) — emptyDataset Operator
emptyDataset[T: Encoder]: Dataset[T]
emptyDataset creates an empty Dataset (assuming that future records being of type T).
scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]
scala> strings.printSchema
root
|-- value: string (nullable = true)
emptyDataset creates a LocalRelation logical query plan.
Creating Dataset from Local Collections or RDDs — createDataset Methods
createDataset[T : Encoder](data: RDD[T]): Dataset[T]
createDataset[T : Encoder](data: Seq[T]): Dataset[T]
createDataset creates a Dataset from a local Scala collection, i.e. Seq[T], Java’s List[T], or a distributed RDD[T].
scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> one.show
+-----+
|value|
+-----+
| 1|
+-----+
createDataset creates a LocalRelation (for the input data collection) or LogicalRDD (for the input RDD[T]) logical operators.
|
Tip
|
You may want to consider implicits object and
|
Internally, createDataset first looks up the implicit expression encoder in scope to access the AttributeReferences (of the schema).
|
Note
|
Only unresolved expression encoders are currently supported. |
The expression encoder is then used to map elements (of the input Seq[T]) into a collection of InternalRows. With the references and rows, createDataset returns a Dataset with a LocalRelation logical query plan.
Creating Dataset With Single Long Column — range Operator
range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]
range family of methods create a Dataset of Long numbers.
scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
| 0|
| 2|
+---+
|
Note
|
The three first variants (that do not specify numPartitions explicitly) use SparkContext.defaultParallelism for the number of partitions numPartitions.
|
Internally, range creates a new Dataset[Long] with Range logical plan and Encoders.LONG encoder.
Creating Empty DataFrame — emptyDataFrame method
emptyDataFrame: DataFrame
emptyDataFrame creates an empty DataFrame (with no rows and columns).
It calls createDataFrame with an empty RDD[Row] and an empty schema StructType(Nil).
Creating DataFrames from Local Collections or RDDs — createDataFrame Method
createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
// private[sql]
createDataFrame(rowRDD: RDD[Row], schema: StructType, needsConversion: Boolean): DataFrame
createDataFrame creates a DataFrame using RDD[Row] and the input schema. It is assumed that the rows in rowRDD all match the schema.
|
Caution
|
FIXME |
Executing SQL Queries (aka SQL Mode) — sql Method
sql(sqlText: String): DataFrame
sql executes the sqlText SQL statement and creates a DataFrame.
|
Note
|
|
scala> sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]
scala> sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []
// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")
scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata| false|
+---------+-----------+
Internally, sql requests the current ParserInterface to execute a SQL query that gives a LogicalPlan.
|
Note
|
sql uses SessionState to access the current ParserInterface.
|
sql then creates a DataFrame using the current SparkSession (itself) and the LogicalPlan.
|
Tip
|
spark-sql is the main SQL environment in Spark to work with pure SQL statements (where you do not have to use Scala to execute them).
|
Accessing UDFRegistration — udf Attribute
udf: UDFRegistration
udf attribute gives access to UDFRegistration that allows registering user-defined functions for SQL-based queries.
val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)
val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")
scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
| a| A|
| b| B|
| c| C|
+-----+-----+
Internally, it is simply an alias for SessionState.udfRegistration.
Loading Data From Table — table Method
table(tableName: String): DataFrame (1)
// private[sql]
table(tableIdent: TableIdentifier): DataFrame
-
Parses
tableNameto aTableIdentifierand calls the othertable
table creates a DataFrame (wrapper) from the input tableName table (but only if available in the session catalog).
scala> spark.catalog.tableExists("t1")
res1: Boolean = true
// t1 exists in the catalog
// let's load it
val t1 = spark.table("t1")
Accessing Metastore — catalog Attribute
catalog: Catalog
catalog attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational entities like databases, tables, functions, table columns, and views).
|
Tip
|
All methods in Catalog return Datasets.
|
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default| null| MANAGED| false|
| strs| null| null|TEMPORARY| true|
+------------------+--------+-----------+---------+-----------+
Internally, catalog creates a CatalogImpl (that uses the current SparkSession).
Accessing DataFrameReader — read method
read: DataFrameReader
read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.
val spark: SparkSession = // create instance
val dfReader: DataFrameReader = spark.read
Getting Runtime Configuration — conf Attribute
conf: RuntimeConfig
conf returns the current RuntimeConfig.
Internally, conf creates a RuntimeConfig (when requested the very first time and cached afterwards) with the SQLConf of the SessionState.
streams Attribute
streams: StreamingQueryManager
streams attribute gives access to StreamingQueryManager (through SessionState).
val spark: SparkSession = ...
spark.streams.active.foreach(println)
experimentalMethods Attribute
experimental: ExperimentalMethods
experimentalMethods is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]s.
|
Note
|
experimental is used in SparkPlanner and SparkOptimizer. Hive and Structured Streaming use it for their own extra strategies and optimization rules.
|
Creating SparkSession Instance — newSession method
newSession(): SparkSession
newSession creates (starts) a new SparkSession (with the current SparkContext and SharedState).
scala> val newSession = spark.newSession
newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a
Stopping SparkSession — stop Method
stop(): Unit
stop stops the SparkSession, i.e. stops the underlying SparkContext.
Create DataFrame for BaseRelation — baseRelationToDataFrame Method
baseRelationToDataFrame(
baseRelation: BaseRelation): DataFrame
Internally, baseRelationToDataFrame creates a DataFrame from the input BaseRelation wrapped inside LogicalRelation.
|
Note
|
LogicalRelation is an logical plan adapter for BaseRelation (so BaseRelation can be part of a logical plan).
|
|
Note
|
|
Creating SessionState Instance — instantiateSessionState Internal Method
instantiateSessionState(className: String, sparkSession: SparkSession): SessionState
instantiateSessionState finds the className that is then used to create and build a BaseSessionStateBuilder.
instantiateSessionState may report an IllegalArgumentException while instantiating the class of a SessionState:
Error while instantiating '[className]'
|
Note
|
instantiateSessionState is used exclusively when SparkSession is requested for SessionState per spark.sql.catalogImplementation configuration property (and one is not available yet).
|
sessionStateClassName Internal Method
sessionStateClassName(
conf: SparkConf): String
sessionStateClassName gives the name of the class of the SessionState per spark.sql.catalogImplementation, i.e.
|
Note
|
sessionStateClassName is used exclusively when SparkSession is requested for the SessionState (and one is not available yet).
|
Creating DataFrame From RDD Of Internal Binary Rows and Schema — internalCreateDataFrame Internal Method
internalCreateDataFrame(
catalystRows: RDD[InternalRow],
schema: StructType,
isStreaming: Boolean = false): DataFrame
internalCreateDataFrame creates a DataFrame with a LogicalRDD.
|
Note
|
|
Creating SparkSession Instance
SparkSession takes the following when created:
-
Optional SharedState
-
Optional SessionState
Accessing ExperimentalMethods — experimental Method
experimental: ExperimentalMethods
experimental…FIXME
Accessing ExecutionListenerManager — listenerManager Method
listenerManager: ExecutionListenerManager
listenerManager…FIXME
setActiveSession Object Method
setActiveSession(session: SparkSession): Unit
setActiveSession…FIXME