import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("My Spark Application") // optional and will be autogenerated if not specified
.master("local[*]") // only for demo and testing purposes, use spark-submit instead
.enableHiveSupport() // self-explanatory, isn't it?
.config("spark.sql.warehouse.dir", "target/spark-warehouse")
.withExtensions { extensions =>
extensions.injectResolutionRule { session =>
...
}
extensions.injectOptimizerRule { session =>
...
}
}
.getOrCreate
SparkSession — The Entry Point to Spark SQL
SparkSession
is the entry point to Spark SQL. It is one of the very first objects you create while developing a Spark SQL application.
As a Spark developer, you create a SparkSession
using the SparkSession.builder method (that gives you access to Builder API that you use to configure the session).
Once created, SparkSession
allows for creating a DataFrame (based on an RDD or a Scala Seq
), creating a Dataset, accessing the Spark SQL services (e.g. ExperimentalMethods, ExecutionListenerManager, UDFRegistration), executing a SQL query, loading a table and the last but not least accessing DataFrameReader interface to load a dataset of the format of your choice (to some extent).
You can enable Apache Hive support with support for an external Hive metastore.
Note
|
In order to disable the pre-configured Hive support in the
|
You can have as many SparkSessions
as you want in a single Spark application. The common use case is to keep relational entities separate logically in catalogs per SparkSession
.
In the end, you stop a SparkSession
using SparkSession.stop method.
spark.stop
Method | Description |
---|---|
|
(New in 2.4.0) |
Object method to create a Builder to get the current |
|
Access to the current metadata catalog of relational entities, e.g. database(s), tables, functions, table columns, and temporary views. |
|
Object method |
|
Object method |
|
|
|
Access to the current runtime configuration |
|
|
|
|
|
|
|
|
|
Access to the current ExperimentalMethods |
|
Object method |
|
Object method |
|
|
|
Access to the current ExecutionListenerManager |
|
Creates a new |
|
Creates a |
|
Access to the current DataFrameReader to load data from external data sources |
|
Access to the current SessionState Internally,
|
|
Object method |
|
Object method |
|
Access to the current SharedState |
|
Access to the underlying |
|
"Executes" a SQL query |
|
|
|
|
Access to the underlying SQLContext |
Stops the associated SparkContext |
|
Loads data from a table |
|
Executes a code block and prints out (to standard output) the time taken to execute it |
|
Access to the current UDFRegistration |
|
Returns the version of Apache Spark |
Note
|
baseRelationToDataFrame acts as a mechanism to plug BaseRelation object hierarchy in into LogicalPlan object hierarchy that SparkSession uses to bridge them.
|
Creating SparkSession Using Builder Pattern — builder
Object Method
builder(): Builder
builder
creates a new Builder that you use to build a fully-configured SparkSession
using a fluent API.
import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
Tip
|
Read about Fluent interface design pattern in Wikipedia, the free encyclopedia. |
Accessing Version of Spark — version
Method
version: String
version
returns the version of Apache Spark in use.
Internally, version
uses spark.SPARK_VERSION
value that is the version
property in spark-version-info.properties
properties file on CLASSPATH.
Creating Empty Dataset (Given Encoder) — emptyDataset
Operator
emptyDataset[T: Encoder]: Dataset[T]
emptyDataset
creates an empty Dataset (assuming that future records being of type T
).
scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]
scala> strings.printSchema
root
|-- value: string (nullable = true)
emptyDataset
creates a LocalRelation
logical query plan.
Creating Dataset from Local Collections or RDDs — createDataset
Methods
createDataset[T : Encoder](data: RDD[T]): Dataset[T]
createDataset[T : Encoder](data: Seq[T]): Dataset[T]
createDataset
creates a Dataset from a local Scala collection, i.e. Seq[T]
, Java’s List[T]
, or a distributed RDD[T]
.
scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> one.show
+-----+
|value|
+-----+
| 1|
+-----+
createDataset
creates a LocalRelation (for the input data
collection) or LogicalRDD (for the input RDD[T]
) logical operators.
Tip
|
You may want to consider implicits object and
|
Internally, createDataset
first looks up the implicit expression encoder in scope to access the AttributeReference
s (of the schema).
Note
|
Only unresolved expression encoders are currently supported. |
The expression encoder is then used to map elements (of the input Seq[T]
) into a collection of InternalRows. With the references and rows, createDataset
returns a Dataset with a LocalRelation
logical query plan.
Creating Dataset With Single Long Column — range
Operator
range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]
range
family of methods create a Dataset of Long
numbers.
scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
| 0|
| 2|
+---+
Note
|
The three first variants (that do not specify numPartitions explicitly) use SparkContext.defaultParallelism for the number of partitions numPartitions .
|
Internally, range
creates a new Dataset[Long]
with Range
logical plan and Encoders.LONG
encoder.
Creating Empty DataFrame — emptyDataFrame
method
emptyDataFrame: DataFrame
emptyDataFrame
creates an empty DataFrame
(with no rows and columns).
It calls createDataFrame with an empty RDD[Row]
and an empty schema StructType(Nil).
Creating DataFrames from Local Collections or RDDs — createDataFrame
Method
createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
// private[sql]
createDataFrame(rowRDD: RDD[Row], schema: StructType, needsConversion: Boolean): DataFrame
createDataFrame
creates a DataFrame
using RDD[Row]
and the input schema
. It is assumed that the rows in rowRDD
all match the schema
.
Caution
|
FIXME |
Executing SQL Queries (aka SQL Mode) — sql
Method
sql(sqlText: String): DataFrame
sql
executes the sqlText
SQL statement and creates a DataFrame.
Note
|
|
scala> sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]
scala> sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []
// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")
scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata| false|
+---------+-----------+
Internally, sql
requests the current ParserInterface
to execute a SQL query that gives a LogicalPlan.
Note
|
sql uses SessionState to access the current ParserInterface .
|
sql
then creates a DataFrame using the current SparkSession
(itself) and the LogicalPlan.
Tip
|
spark-sql is the main SQL environment in Spark to work with pure SQL statements (where you do not have to use Scala to execute them).
|
Accessing UDFRegistration — udf
Attribute
udf: UDFRegistration
udf
attribute gives access to UDFRegistration that allows registering user-defined functions for SQL-based queries.
val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)
val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")
scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
| a| A|
| b| B|
| c| C|
+-----+-----+
Internally, it is simply an alias for SessionState.udfRegistration.
Loading Data From Table — table
Method
table(tableName: String): DataFrame (1)
// private[sql]
table(tableIdent: TableIdentifier): DataFrame
-
Parses
tableName
to aTableIdentifier
and calls the othertable
table
creates a DataFrame (wrapper) from the input tableName
table (but only if available in the session catalog).
scala> spark.catalog.tableExists("t1")
res1: Boolean = true
// t1 exists in the catalog
// let's load it
val t1 = spark.table("t1")
Accessing Metastore — catalog
Attribute
catalog: Catalog
catalog
attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational entities like databases, tables, functions, table columns, and views).
Tip
|
All methods in Catalog return Datasets .
|
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default| null| MANAGED| false|
| strs| null| null|TEMPORARY| true|
+------------------+--------+-----------+---------+-----------+
Internally, catalog
creates a CatalogImpl (that uses the current SparkSession
).
Accessing DataFrameReader — read
method
read: DataFrameReader
read
method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame
.
val spark: SparkSession = // create instance
val dfReader: DataFrameReader = spark.read
Getting Runtime Configuration — conf
Attribute
conf: RuntimeConfig
conf
returns the current RuntimeConfig.
Internally, conf
creates a RuntimeConfig (when requested the very first time and cached afterwards) with the SQLConf of the SessionState.
streams
Attribute
streams: StreamingQueryManager
streams
attribute gives access to StreamingQueryManager (through SessionState).
val spark: SparkSession = ...
spark.streams.active.foreach(println)
experimentalMethods
Attribute
experimental: ExperimentalMethods
experimentalMethods
is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]
s.
Note
|
experimental is used in SparkPlanner and SparkOptimizer. Hive and Structured Streaming use it for their own extra strategies and optimization rules.
|
Creating SparkSession Instance — newSession
method
newSession(): SparkSession
newSession
creates (starts) a new SparkSession
(with the current SparkContext and SharedState).
scala> val newSession = spark.newSession
newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a
Stopping SparkSession — stop
Method
stop(): Unit
stop
stops the SparkSession
, i.e. stops the underlying SparkContext
.
Create DataFrame for BaseRelation — baseRelationToDataFrame
Method
baseRelationToDataFrame(
baseRelation: BaseRelation): DataFrame
Internally, baseRelationToDataFrame
creates a DataFrame from the input BaseRelation wrapped inside LogicalRelation.
Note
|
LogicalRelation is an logical plan adapter for BaseRelation (so BaseRelation can be part of a logical plan).
|
Note
|
|
Creating SessionState Instance — instantiateSessionState
Internal Method
instantiateSessionState(className: String, sparkSession: SparkSession): SessionState
instantiateSessionState
finds the className
that is then used to create and build a BaseSessionStateBuilder
.
instantiateSessionState
may report an IllegalArgumentException
while instantiating the class of a SessionState
:
Error while instantiating '[className]'
Note
|
instantiateSessionState is used exclusively when SparkSession is requested for SessionState per spark.sql.catalogImplementation configuration property (and one is not available yet).
|
sessionStateClassName
Internal Method
sessionStateClassName(
conf: SparkConf): String
sessionStateClassName
gives the name of the class of the SessionState per spark.sql.catalogImplementation, i.e.
Note
|
sessionStateClassName is used exclusively when SparkSession is requested for the SessionState (and one is not available yet).
|
Creating DataFrame From RDD Of Internal Binary Rows and Schema — internalCreateDataFrame
Internal Method
internalCreateDataFrame(
catalystRows: RDD[InternalRow],
schema: StructType,
isStreaming: Boolean = false): DataFrame
internalCreateDataFrame
creates a DataFrame with a LogicalRDD.
Note
|
|
Creating SparkSession Instance
SparkSession
takes the following when created:
-
Optional SharedState
-
Optional SessionState
Accessing ExperimentalMethods — experimental
Method
experimental: ExperimentalMethods
experimental
…FIXME
Accessing ExecutionListenerManager — listenerManager
Method
listenerManager: ExecutionListenerManager
listenerManager
…FIXME
setActiveSession
Object Method
setActiveSession(session: SparkSession): Unit
setActiveSession
…FIXME