Collection Functions

Table 1. (Subset of) Standard Functions for Handling Collections
Name Description

explode

explode_outer

(new in 2.2.0) Creates a new row for each element in the given array or map column.

If the array/map is null or empty then null is produced.

from_json

Extract data from arbitrary JSON-encoded values into a StructType or ArrayType of StructType elements with the specified schema

explode Function

Caution
FIXME
scala> Seq(Array(0,1,2)).toDF("array").withColumn("num", explode('array)).show
+---------+---+
|    array|num|
+---------+---+
|[0, 1, 2]|  0|
|[0, 1, 2]|  1|
|[0, 1, 2]|  2|
+---------+---+
Note
explode function is an equivalent of flatMap operator for Dataset.

explode_outer Function

explode_outer(e: Column): Column

explode_outer generates a new row for each element in e array or map column.

Note
Unlike explode, explode_outer generates null when the array or map is null or empty.
val arrays = Seq((1,Seq.empty[String])).toDF("id", "array")
scala> arrays.printSchema
root
 |-- id: integer (nullable = false)
 |-- array: array (nullable = true)
 |    |-- element: string (containsNull = true)
scala> arrays.select(explode_outer($"array")).show
+----+
| col|
+----+
|null|
+----+

Internally, explode_outer creates a Column with GeneratorOuter and Explode Catalyst expressions.

val explodeOuter = explode_outer($"array").expr
scala> println(explodeOuter.numberedTreeString)
00 generatorouter(explode('array))
01 +- explode('array)
02    +- 'array

Extracting Data from Arbitrary JSON-Encoded Values — from_json Functions

from_json(e: Column, schema: StructType, options: Map[String, String]): Column (1)
from_json(e: Column, schema: DataType, options: Map[String, String]): Column (2)
from_json(e: Column, schema: StructType): Column (3)
from_json(e: Column, schema: DataType): Column  (4)
from_json(e: Column, schema: String, options: Map[String, String]): Column (5)
  1. Calls <2> with StructType converted to DataType

  2. Calls <1> with empty options

  3. Relays to the other from_json with empty options

  4. (2.3.0) Uses schema as DataType in the JSON format or falls back to StructType in the DDL format

from_json parses a column with a JSON-encoded value into a StructType or ArrayType of StructType elements with the specified schema.

val jsons = Seq("""{ "id": 0 }""").toDF("json")

import org.apache.spark.sql.types._
val schema = new StructType()
  .add($"id".int.copy(nullable = false))

import org.apache.spark.sql.functions.from_json
scala> jsons.select(from_json($"json", schema) as "ids").show
+---+
|ids|
+---+
|[0]|
+---+
Note

A schema can be one of the following:

  1. DataType as a Scala object or in the JSON format

  2. StructType in the DDL format

// Define the schema for JSON-encoded messages
// Note that the schema is nested (on the addresses field)
import org.apache.spark.sql.types._
val addressesSchema = new StructType()
  .add($"city".string)
  .add($"state".string)
  .add($"zip".string)
val schema = new StructType()
  .add($"firstName".string)
  .add($"lastName".string)
  .add($"email".string)
  .add($"addresses".array(addressesSchema))
scala> schema.printTreeString
root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- email: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- zip: string (nullable = true)

// Generate the JSON-encoded schema
// That's the variant of the schema that from_json accepts
val schemaAsJson = schema.json

// Use prettyJson to print out the JSON-encoded schema
// Only for demo purposes
scala> println(schema.prettyJson)
{
  "type" : "struct",
  "fields" : [ {
    "name" : "firstName",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "lastName",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "email",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "addresses",
    "type" : {
      "type" : "array",
      "elementType" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "city",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "state",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "zip",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        } ]
      },
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : { }
  } ]
}

// Let's "validate" the JSON-encoded schema
import org.apache.spark.sql.types.DataType
val dt = DataType.fromJson(schemaAsJson)
scala> println(dt.sql)
STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>

// No exception means that the JSON-encoded schema should be fine
// Use it with from_json
val rawJsons = Seq("""
  {
    "firstName" : "Jacek",
    "lastName" : "Laskowski",
    "email" : "jacek@japila.pl",
    "addresses" : [
      {
        "city" : "Warsaw",
        "state" : "N/A",
        "zip" : "02-791"
      }
    ]
  }
""").toDF("rawjson")
val people = rawJsons
  .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
  .select("json.*") // <-- flatten the struct field
  .withColumn("address", explode($"addresses")) // <-- explode the array field
  .drop("addresses")  // <-- no longer needed
  .select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field
scala> people.show
+---------+---------+---------------+------+-----+------+
|firstName| lastName|          email|  city|state|   zip|
+---------+---------+---------------+------+-----+------+
|    Jacek|Laskowski|jacek@japila.pl|Warsaw|  N/A|02-791|
+---------+---------+---------------+------+-----+------+
Note
options controls how a JSON is parsed and contains the same options as the json format.

Internally, from_json creates a Column with JsonToStructs unary expression.

Note
from_json (creates a JsonToStructs that) uses a JSON parser in FAILFAST parsing mode that simply fails early when a corrupted/malformed record is found (and hence does not support columnNameOfCorruptRecord JSON option).
val jsons = Seq("""{ id: 0 }""").toDF("json")

import org.apache.spark.sql.types._
val schema = new StructType()
  .add($"id".int.copy(nullable = false))
  .add($"corrupted_records".string)
val opts = Map("columnNameOfCorruptRecord" -> "corrupted_records")
scala> jsons.select(from_json($"json", schema, opts) as "ids").show
+----+
| ids|
+----+
|null|
+----+
Note
from_json corresponds to SQL’s from_json.

results matching ""

    No results matching ""