Spark SQL — Queries Over Structured Data on Massive Scale

Like Apache Spark in general, Spark SQL in particular is all about distributed in-memory computations. The primary difference between Spark SQL and the "bare" Spark Core’s RDD computation models is the framework for loading, querying and persisting structured and semi-structured data using queries that can be expressed using good ol' SQL, HiveQL and the custom high-level SQL-like, declarative, type-safe Dataset API called Structured Query DSL.

Note
Under the covers, structured queries are automatically compiled into corresponding RDD operations.

Regardless of the query language you choose queries all end up as a tree of Catalyst expressions with further optimizations along the way to your large distributed data sets.

As of Spark 2.0, Spark SQL is now de facto the primary and feature-rich interface to Spark’s underlying in-memory distributed platform (hiding Spark Core’s RDDs behind higher-level abstractions).

// Define the schema using a case class
case class Person(name: String, age: Int)

// you could read people from a CSV file
// It's been a while since you saw RDDs, hasn't it?
// Excuse me for bringing you the old past.
import org.apache.spark.rdd.RDD
val peopleRDD: RDD[Person] = sc.parallelize(Seq(Person("Jacek", 10)))

// Convert RDD[Person] to Dataset[Person] and run a query

// Automatic schema inferrence from existing RDDs
scala> val people = peopleRDD.toDS
people: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]

// Query for teenagers using Scala Query DSL
scala> val teenagers = people.where('age >= 10).where('age <= 19).select('name).as[String]
teenagers: org.apache.spark.sql.Dataset[String] = [name: string]

scala> teenagers.show
+-----+
| name|
+-----+
|Jacek|
+-----+

// You could however want to use good ol' SQL, couldn't you?

// 1. Register people Dataset as a temporary view in Catalog
people.createOrReplaceTempView("people")

// 2. Run SQL query
val teenagers = sql("SELECT * FROM people WHERE age >= 10 AND age <= 19")
scala> teenagers.show
+-----+---+
| name|age|
+-----+---+
|Jacek| 10|
+-----+---+

When the Hive support is enabled, Spark developers can read and write data located in existing Apache Hive deployments using HiveQL.

sql("CREATE OR REPLACE TEMPORARY VIEW v1 (key INT, value STRING) USING csv OPTIONS ('path'='people.csv', 'header'='true')")

// Queries are expressed in HiveQL
sql("FROM v1").show

scala> sql("desc EXTENDED v1").show(false)
+----------+---------+-------+
|col_name  |data_type|comment|
+----------+---------+-------+
|# col_name|data_type|comment|
|key       |int      |null   |
|value     |string   |null   |
+----------+---------+-------+

Like SQL and NoSQL databases, Spark SQL offers performance query optimizations using Logical Query Plan Optimizer, code generation (that could often be better than your own custom hand-written code!) and Tungsten execution engine with its own Internal Binary Row Format.

Spark SQL introduces a tabular data abstraction called Dataset (that was previously DataFrame). Dataset data abstraction is designed to make processing large amount of structured tabular data on Spark infrastructure simpler and faster.

Note

Quoting Apache Drill which applies to Spark SQL perfectly:

A SQL query engine for relational and NoSQL databases with direct queries on self-describing and semi-structured data in files, e.g. JSON or Parquet, and HBase tables without needing to specify metadata definitions in a centralized store.

The following snippet shows a batch ETL pipeline to process JSON files and saving their subset as CSVs.

spark.read
  .format("json")
  .load("input-json")
  .select("name", "score")
  .where($"score" > 15)
  .write
  .format("csv")
  .save("output-csv")

With Structured Streaming feature however, the above static batch query becomes dynamic and continuous paving the way for continuous applications.

import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("score", DoubleType, nullable = false) :: Nil)

spark.readStream
  .format("json")
  .schema(schema)
  .load("input-json")
  .select("name", "score")
  .where('score > 15)
  .writeStream
  .format("console")
  .start

// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +-----+-----+
// | name|score|
// +-----+-----+
// |Jacek| 20.5|
// +-----+-----+

As of Spark 2.0, the main data abstraction of Spark SQL is Dataset. It represents a structured data which are records with a known schema. This structured data representation Dataset enables compact binary representation using compressed columnar format that is stored in managed objects outside JVM’s heap. It is supposed to speed computations up by reducing memory usage and GCs.

Spark SQL supports predicate pushdown to optimize performance of Dataset queries and can also generate optimized code at runtime.

Spark SQL comes with the different APIs to work with:

  1. Dataset API (formerly DataFrame API) with a strongly-typed LINQ-like Query DSL that Scala programmers will likely find very appealing to use.

  2. Structured Streaming API (aka Streaming Datasets) for continuous incremental execution of structured queries.

  3. Non-programmers will likely use SQL as their query language through direct integration with Hive

  4. JDBC/ODBC fans can use JDBC interface (through Thrift JDBC/ODBC Server) and connect their tools to Spark’s distributed query engine.

Spark SQL comes with a uniform interface for data access in distributed storage systems like Cassandra or HDFS (Hive, Parquet, JSON) using specialized DataFrameReader and DataFrameWriter objects.

Spark SQL allows you to execute SQL-like queries on large volume of data that can live in Hadoop HDFS or Hadoop-compatible file systems like S3. It can access data from different data sources - files or tables.

Spark SQL defines the following types of functions:

There are two supported catalog implementations — in-memory (default) and hive — that you can set using spark.sql.catalogImplementation setting.

From user@spark:

If you already loaded csv data into a dataframe, why not register it as a table, and use Spark SQL to find max/min or any other aggregates? SELECT MAX(column_name) FROM dftable_name …​ seems natural.

you’re more comfortable with SQL, it might worth registering this DataFrame as a table and generating SQL query to it (generate a string with a series of min-max calls)

You can parse data from external data sources and let the schema inferencer to deduct the schema.

// Example 1
val df = Seq(1 -> 2).toDF("i", "j")
val query = df.groupBy('i)
  .agg(max('j).as("aggOrdering"))
  .orderBy(sum('j))
  .as[(Int, Int)]
query.collect contains (1, 2) // true

// Example 2
val df = Seq((1, 1), (-1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show
+-------------------+
|(IF((a > 0), a, 0))|
+-------------------+
|                  1|
|                  0|
+-------------------+

results matching ""

    No results matching ""