Spark SQL — Structured Queries on Large Scale

Like Apache Spark in general, Spark SQL in particular is all about distributed in-memory computations. The primary difference between them — Spark SQL and the "bare" Spark Core’s RDD computation model — is in offering a framework for loading, querying and persisting structured and semi-structured data using structured queries that can equally be expressed using SQL (with subqueries), Hive QL and the custom high-level SQL-like, declarative, type-safe Dataset API (for a structured query DSL). Regardless of the structured query language you choose they all end up as a tree of Catalyst expressions with further optimizations along the way to your large distributed data sets.

With the recent changes in Spark 2.0, Spark SQL is now de facto the primary and feature-rich interface to Spark’s underlying in-memory distributed platform (hiding Spark Core’s RDDs behind higher-level abstractions).

// Found at http://stackoverflow.com/a/32514683/1305344
val dataset = Seq(
   "08/11/2015",
   "09/11/2015",
   "09/12/2015").toDF("date_string")

dataset.registerTempTable("dates")

// Inside spark-shell
scala > sql(
  """SELECT date_string,
        from_unixtime(unix_timestamp(date_string,'MM/dd/yyyy'), 'EEEEE') AS dow
      FROM dates""").show
+-----------+--------+
|date_string|     dow|
+-----------+--------+
| 08/11/2015| Tuesday|
| 09/11/2015|  Friday|
| 09/12/2015|Saturday|
+-----------+--------+

Like SQL and NoSQL databases, Spark SQL offers performance query optimizations using Catalyst’s Logical Query Plan Optimizer, code generation (that could often be better than your own custom handmade code!) and Tungsten execution engine with its own Internal Binary Row Format.

Spark SQL introduces a tabular data abstraction called Dataset (that was previously DataFrame). Dataset data abstraction is designed to make processing large amount of structured tabular data on Spark infrastructure simpler and faster.

Note

Quoting Apache Drill which applies to Spark SQL perfectly:

A SQL query engine for relational and NoSQL databases with direct queries on self-describing and semi-structured data in files, e.g. JSON or Parquet, and HBase tables without needing to specify metadata definitions in a centralized store.

The following snippet shows a batch ETL pipeline to process JSON files and saving their subset as CSVs.

spark.read
  .format("json")
  .load("input-json")
  .select("name", "score")
  .where($"score" > 15)
  .write
  .format("csv")
  .save("output-csv")

With Structured Streaming feature however, the above static batch query becomes dynamic and continuous paving the way for continuous applications.

import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("score", DoubleType, nullable = false) :: Nil)

spark.readStream
  .format("json")
  .schema(schema)
  .load("input-json")
  .select("name", "score")
  .where('score > 15)
  .writeStream
  .format("console")
  .start

// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +-----+-----+
// | name|score|
// +-----+-----+
// |Jacek| 20.5|
// +-----+-----+

As of Spark 2.0, the main data abstraction of Spark SQL is Dataset. It represents a structured data which are records with a known schema. This structured data representation Dataset enables compact binary representation using compressed columnar format that is stored in managed objects outside JVM’s heap. It is supposed to speed computations up by reducing memory usage and GCs.

Spark SQL supports predicate pushdown to optimize performance of Dataset queries and can also generate optimized code at runtime.

Spark SQL comes with the different APIs to work with:

  1. Dataset API (formerly DataFrame API) with a strongly-typed LINQ-like Query DSL that Scala programmers will likely find very appealing to use.

  2. Structured Streaming API (aka Streaming Datasets) for continuous incremental execution of structured queries.

  3. Non-programmers will likely use SQL as their query language through direct integration with Hive

  4. JDBC/ODBC fans can use JDBC interface (through Thrift JDBC/ODBC Server) and connect their tools to Spark’s distributed query engine.

Spark SQL comes with a uniform interface for data access in distributed storage systems like Cassandra or HDFS (Hive, Parquet, JSON) using specialized DataFrameReader and DataFrameWriter objects.

Spark SQL allows you to execute SQL-like queries on large volume of data that can live in Hadoop HDFS or Hadoop-compatible file systems like S3. It can access data from different data sources - files or tables.

Spark SQL defines three types of functions:

There are two supported catalog implementations — in-memory (default) and hive — that you can set using spark.sql.catalogImplementation setting.

From user@spark:

If you already loaded csv data into a dataframe, why not register it as a table, and use Spark SQL to find max/min or any other aggregates? SELECT MAX(column_name) FROM dftable_name …​ seems natural.

you’re more comfortable with SQL, it might worth registering this DataFrame as a table and generating SQL query to it (generate a string with a series of min-max calls)

You can parse data from external data sources and let the schema inferencer to deduct the schema.

// Example 1
val df = Seq(1 -> 2).toDF("i", "j")
val query = df.groupBy('i)
  .agg(max('j).as("aggOrdering"))
  .orderBy(sum('j))
  .as[(Int, Int)]
query.collect contains (1, 2) // true

// Example 2
val df = Seq((1, 1), (-1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show
+-------------------+
|(IF((a > 0), a, 0))|
+-------------------+
|                  1|
|                  0|
+-------------------+

results matching ""

    No results matching ""