emptyDataFrame: DataFrame
SQLContext
|
Caution
|
As of Spark 2.0.0 |
In the pre-Spark 2.0’s ear, SQLContext was the entry point for Spark SQL. Whatever you did in Spark SQL it had to start from creating an instance of SQLContext.
A SQLContext object requires a SparkContext, a CacheManager, and a SQLListener. They are all transient and do not participate in serializing a SQLContext.
You should use SQLContext for the following:
Creating SQLContext Instance
You can create a SQLContext using the following constructors:
-
SQLContext(sc: SparkContext) -
SQLContext.getOrCreate(sc: SparkContext) -
SQLContext.newSession()allows for creating a new instance ofSQLContextwith a separate SQL configuration (through a sharedSparkContext).
Setting Configuration Properties
You can set Spark SQL configuration properties using:
-
setConf(props: Properties): Unit -
setConf(key: String, value: String): Unit
You can get the current value of a configuration property by key using:
-
getConf(key: String): String -
getConf(key: String, defaultValue: String): String -
getAllConfs: immutable.Map[String, String]
|
Note
|
Properties that start with spark.sql are reserved for Spark SQL. |
Creating DataFrames
emptyDataFrame
emptyDataFrame creates an empty DataFrame. It calls createDataFrame with an empty RDD[Row] and an empty schema StructType(Nil).
createDataFrame for RDD and Seq
createDataFrame[A <: Product](rdd: RDD[A]): DataFrame
createDataFrame[A <: Product](data: Seq[A]): DataFrame
createDataFrame family of methods can create a DataFrame from an RDD of Scala’s Product types like case classes or tuples or Seq thereof.
createDataFrame for RDD of Row with Explicit Schema
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
This variant of createDataFrame creates a DataFrame from RDD of Row and explicit schema.
Registering User-Defined Functions (UDF)
udf: UDFRegistration
udf method gives you access to UDFRegistration to manipulate user-defined functions. Functions registered using udf are available for Hive queries only.
|
Tip
|
Read up on UDFs in UDFs — User-Defined Functions document. |
// Create a DataFrame
val df = Seq("hello", "world!").zip(0 to 1).toDF("text", "id")
// Register the DataFrame as a temporary table in Hive
df.registerTempTable("texts")
scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| texts| true|
+---------+-----------+
scala> sql("SELECT * FROM texts").show
+------+---+
| text| id|
+------+---+
| hello| 0|
|world!| 1|
+------+---+
// Just a Scala function
val my_upper: String => String = _.toUpperCase
// Register the function as UDF
spark.udf.register("my_upper", my_upper)
scala> sql("SELECT *, my_upper(text) AS MY_UPPER FROM texts").show
+------+---+--------+
| text| id|MY_UPPER|
+------+---+--------+
| hello| 0| HELLO|
|world!| 1| WORLD!|
+------+---+--------+
Caching DataFrames in In-Memory Cache
isCached(tableName: String): Boolean
isCached method asks CacheManager whether tableName table is cached in memory or not. It simply requests CacheManager for CachedData and when exists, it assumes the table is cached.
cacheTable(tableName: String): Unit
You can cache a table in memory using cacheTable.
|
Caution
|
Why would I want to cache a table? |
uncacheTable(tableName: String)
clearCache(): Unit
uncacheTable and clearCache remove one or all in-memory cached tables.
Implicits — SQLContext.implicits
The implicits object is a helper class with methods to convert objects into Datasets and DataFrames, and also comes with many Encoders for "primitive" types as well as the collections thereof.
|
Note
|
Import the implicits by
|
It holds Encoders for Scala "primitive" types like Int, Double, String, and their collections.
It offers support for creating Dataset from RDD of any types (for which an encoder exists in scope), or case classes or tuples, and Seq.
It also offers conversions from Scala’s Symbol or $ to Column.
It also offers conversions from RDD or Seq of Product types (e.g. case classes or tuples) to DataFrame. It has direct conversions from RDD of Int, Long and String to DataFrame with a single column name _1.
|
Note
|
It is not possible to call toDF methods on RDD objects of other "primitive" types except Int, Long, and String.
|
Creating Datasets
createDataset[T: Encoder](data: Seq[T]): Dataset[T]
createDataset[T: Encoder](data: RDD[T]): Dataset[T]
createDataset family of methods creates a Dataset from a collection of elements of type T, be it a regular Scala Seq or Spark’s RDD.
It requires that there is an encoder in scope.
|
Note
|
Importing SQLContext.implicits brings many encoders available in scope. |
Accessing DataFrameReader (read method)
read: DataFrameReader
The experimental read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.
Creating External Tables
createExternalTable(tableName: String, path: String): DataFrame
createExternalTable(tableName: String, path: String, source: String): DataFrame
createExternalTable(tableName: String, source: String, options: Map[String, String]): DataFrame
createExternalTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame
The experimental createExternalTable family of methods is used to create an external table tableName and return a corresponding DataFrame.
|
Caution
|
FIXME What is an external table? |
It assumes parquet as the default data source format that you can change using spark.sql.sources.default setting.
Dropping Temporary Tables
dropTempTable(tableName: String): Unit
dropTempTable method drops a temporary table tableName.
|
Caution
|
FIXME What is a temporary table? |
Creating Dataset[Long] (range method)
range(end: Long): Dataset[Long]
range(start: Long, end: Long): Dataset[Long]
range(start: Long, end: Long, step: Long): Dataset[Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long]
The range family of methods creates a Dataset[Long] with the sole id column of LongType for given start, end, and step.
|
Note
|
The three first variants use SparkContext.defaultParallelism for the number of partitions numPartitions.
|
scala> spark.range(5)
res0: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> .show
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+
Creating DataFrames for Table
table(tableName: String): DataFrame
table method creates a tableName table and returns a corresponding DataFrame.
Listing Existing Tables
tables(): DataFrame
tables(databaseName: String): DataFrame
table methods return a DataFrame that holds names of existing tables in a database.
scala> spark.tables.show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| t| true|
| t2| true|
+---------+-----------+
The schema consists of two columns - tableName of StringType and isTemporary of BooleanType.
|
Note
|
tables is a result of SHOW TABLES [IN databaseName].
|
tableNames(): Array[String]
tableNames(databaseName: String): Array[String]
tableNames are similar to tables with the only difference that they return Array[String] which is a collection of table names.
Accessing StreamingQueryManager
streams: StreamingQueryManager
The streams method returns a StreamingQueryManager that is used to…TK
|
Caution
|
FIXME |
Managing Active SQLContext for JVM
SQLContext.getOrCreate(sparkContext: SparkContext): SQLContext
SQLContext.getOrCreate method returns an active SQLContext object for the JVM or creates a new one using a given sparkContext.
|
Note
|
It is a factory-like method that works on SQLContext class.
|
Interestingly, there are two helper methods to set and clear the active SQLContext object - setActive and clearActive respectively.
setActive(spark: SQLContext): Unit
clearActive(): Unit
Executing SQL Queries
sql(sqlText: String): DataFrame
sql executes the sqlText SQL query.
|
Note
|
It supports Hive statements through HiveContext. |
scala> sql("set spark.sql.hive.version").show(false)
16/04/10 15:19:36 INFO HiveSqlParser: Parsing command: set spark.sql.hive.version
+----------------------+-----+
|key |value|
+----------------------+-----+
|spark.sql.hive.version|1.2.1|
+----------------------+-----+
scala> sql("describe database extended default").show(false)
16/04/10 15:21:14 INFO HiveSqlParser: Parsing command: describe database extended default
+-------------------------+--------------------------+
|database_description_item|database_description_value|
+-------------------------+--------------------------+
|Database Name |default |
|Description |Default Hive database |
|Location |file:/user/hive/warehouse |
|Properties | |
+-------------------------+--------------------------+
// Create temporary table
scala> spark.range(10).registerTempTable("t")
16/04/14 23:34:31 INFO HiveSqlParser: Parsing command: t
scala> sql("CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t")
16/04/14 23:34:38 INFO HiveSqlParser: Parsing command: CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t
scala> spark.tables.show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| t| true|
| t2| true|
+---------+-----------+
sql parses sqlText using a dialect that can be set up using spark.sql.dialect setting.
|
Note
|
|
|
Tip
|
You may also use spark-sql shell script to interact with Hive. |
Internally, it uses SessionState.sqlParser.parsePlan(sql) method to create a LogicalPlan.
|
Caution
|
FIXME Review |
scala> sql("show tables").show(false)
16/04/09 13:05:32 INFO HiveSqlParser: Parsing command: show tables
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|dafa |false |
+---------+-----------+
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating New Session
newSession(): SQLContext
You can use newSession method to create a new session without a cost of instantiating a new SqlContext from scratch.
newSession returns a new SqlContext that shares SparkContext, CacheManager, SQLListener, and ExternalCatalog.
|
Caution
|
FIXME Why would I need that? |