Standard Functions for Collections (Collection Functions)

Table 1. (Subset of) Standard Functions for Handling Collections
Name Description

array_contains

array_contains(column: Column, value: Any): Column

explode

explode(e: Column): Column

explode_outer

explode_outer(e: Column): Column

Creates a new row for each element in the given array or map column.

If the array/map is null or empty then null is produced.

from_json

from_json(e: Column, schema: DataType): Column
from_json(e: Column, schema: DataType, options: Map[String, String]): Column
from_json(e: Column, schema: String, options: Map[String, String]): Column
from_json(e: Column, schema: StructType): Column
from_json(e: Column, schema: StructType, options: Map[String, String]): Column

Extract data from arbitrary JSON-encoded values into a StructType or ArrayType of StructType elements with the specified schema

map_keys

map_keys(e: Column): Column

map_values

map_values(e: Column): Column

posexplode

posexplode(e: Column): Column

posexplode_outer

posexplode_outer(e: Column): Column

reverse

reverse(e: Column): Column

Returns a reversed string or an array with reverse order of elements

Note
Support for reversing arrays is new in 2.4.0.

size

size(e: Column): Column

Returns the size of the given array or map. Returns -1 if null.

reverse Collection Function

reverse(e: Column): Column

reverse…​FIXME

size Collection Function

size(e: Column): Column

size returns the size of the given array or map. Returns -1 if null.

Internally, size creates a Column with Size unary expression.

import org.apache.spark.sql.functions.size
val c = size('id)
scala> println(c.expr.asCode)
Size(UnresolvedAttribute(ArrayBuffer(id)))

posexplode Collection Function

posexplode(e: Column): Column

posexplode…​FIXME

posexplode_outer Collection Function

posexplode_outer(e: Column): Column

posexplode_outer…​FIXME

explode Collection Function

Caution
FIXME
scala> Seq(Array(0,1,2)).toDF("array").withColumn("num", explode('array)).show
+---------+---+
|    array|num|
+---------+---+
|[0, 1, 2]|  0|
|[0, 1, 2]|  1|
|[0, 1, 2]|  2|
+---------+---+
Note
explode function is an equivalent of flatMap operator for Dataset.

explode_outer Collection Function

explode_outer(e: Column): Column

explode_outer generates a new row for each element in e array or map column.

Note
Unlike explode, explode_outer generates null when the array or map is null or empty.
val arrays = Seq((1,Seq.empty[String])).toDF("id", "array")
scala> arrays.printSchema
root
 |-- id: integer (nullable = false)
 |-- array: array (nullable = true)
 |    |-- element: string (containsNull = true)
scala> arrays.select(explode_outer($"array")).show
+----+
| col|
+----+
|null|
+----+

Internally, explode_outer creates a Column with GeneratorOuter and Explode Catalyst expressions.

val explodeOuter = explode_outer($"array").expr
scala> println(explodeOuter.numberedTreeString)
00 generatorouter(explode('array))
01 +- explode('array)
02    +- 'array

Extracting Data from Arbitrary JSON-Encoded Values — from_json Collection Function

from_json(e: Column, schema: StructType, options: Map[String, String]): Column (1)
from_json(e: Column, schema: DataType, options: Map[String, String]): Column (2)
from_json(e: Column, schema: StructType): Column (3)
from_json(e: Column, schema: DataType): Column  (4)
from_json(e: Column, schema: String, options: Map[String, String]): Column (5)
  1. Calls <2> with StructType converted to DataType

  2. (fixme)

  3. Calls <1> with empty options

  4. Relays to the other from_json with empty options

  5. Uses schema as DataType in the JSON format or falls back to StructType in the DDL format

from_json parses a column with a JSON-encoded value into a StructType or ArrayType of StructType elements with the specified schema.

val jsons = Seq("""{ "id": 0 }""").toDF("json")

import org.apache.spark.sql.types._
val schema = new StructType()
  .add($"id".int.copy(nullable = false))

import org.apache.spark.sql.functions.from_json
scala> jsons.select(from_json($"json", schema) as "ids").show
+---+
|ids|
+---+
|[0]|
+---+
Note

A schema can be one of the following:

  1. DataType as a Scala object or in the JSON format

  2. StructType in the DDL format

// Define the schema for JSON-encoded messages
// Note that the schema is nested (on the addresses field)
import org.apache.spark.sql.types._
val addressesSchema = new StructType()
  .add($"city".string)
  .add($"state".string)
  .add($"zip".string)
val schema = new StructType()
  .add($"firstName".string)
  .add($"lastName".string)
  .add($"email".string)
  .add($"addresses".array(addressesSchema))
scala> schema.printTreeString
root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- email: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- zip: string (nullable = true)

// Generate the JSON-encoded schema
// That's the variant of the schema that from_json accepts
val schemaAsJson = schema.json

// Use prettyJson to print out the JSON-encoded schema
// Only for demo purposes
scala> println(schema.prettyJson)
{
  "type" : "struct",
  "fields" : [ {
    "name" : "firstName",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "lastName",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "email",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "addresses",
    "type" : {
      "type" : "array",
      "elementType" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "city",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "state",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "zip",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        } ]
      },
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : { }
  } ]
}

// Let's "validate" the JSON-encoded schema
import org.apache.spark.sql.types.DataType
val dt = DataType.fromJson(schemaAsJson)
scala> println(dt.sql)
STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>

// No exception means that the JSON-encoded schema should be fine
// Use it with from_json
val rawJsons = Seq("""
  {
    "firstName" : "Jacek",
    "lastName" : "Laskowski",
    "email" : "[email protected]",
    "addresses" : [
      {
        "city" : "Warsaw",
        "state" : "N/A",
        "zip" : "02-791"
      }
    ]
  }
""").toDF("rawjson")
val people = rawJsons
  .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
  .select("json.*") // <-- flatten the struct field
  .withColumn("address", explode($"addresses")) // <-- explode the array field
  .drop("addresses")  // <-- no longer needed
  .select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field
scala> people.show
+---------+---------+---------------+------+-----+------+
|firstName| lastName|          email|  city|state|   zip|
+---------+---------+---------------+------+-----+------+
|    Jacek|Laskowski|[email protected]|Warsaw|  N/A|02-791|
+---------+---------+---------------+------+-----+------+
Note
options controls how a JSON is parsed and contains the same options as the json format.

Internally, from_json creates a Column with JsonToStructs unary expression.

Note
from_json (creates a JsonToStructs that) uses a JSON parser in FAILFAST parsing mode that simply fails early when a corrupted/malformed record is found (and hence does not support columnNameOfCorruptRecord JSON option).
val jsons = Seq("""{ id: 0 }""").toDF("json")

import org.apache.spark.sql.types._
val schema = new StructType()
  .add($"id".int.copy(nullable = false))
  .add($"corrupted_records".string)
val opts = Map("columnNameOfCorruptRecord" -> "corrupted_records")
scala> jsons.select(from_json($"json", schema, opts) as "ids").show
+----+
| ids|
+----+
|null|
+----+
Note
from_json corresponds to SQL’s from_json.

array_contains Collection Function

array_contains(column: Column, value: Any): Column

array_contains creates a Column for a column argument as an array and the value of same type as the type of the elements of the array.

Internally, array_contains creates a Column with a ArrayContains expression.

// Arguments must be an array followed by a value of same type as the array elements
import org.apache.spark.sql.functions.array_contains
val c = array_contains(column = $"ids", value = 1)

val ids = Seq(Seq(1,2,3), Seq(1), Seq(2,3)).toDF("ids")
val q = ids.filter(c)
scala> q.show
+---------+
|      ids|
+---------+
|[1, 2, 3]|
|      [1]|
+---------+

array_contains corresponds to SQL’s array_contains.

import org.apache.spark.sql.functions.array_contains
val c = array_contains(column = $"ids", value = Array(1, 2))
val e = c.expr
scala> println(e.sql)
array_contains(`ids`, [1,2])
Tip
Use SQL’s array_contains to use values from columns for the column and value arguments.
val codes = Seq(
  (Seq(1, 2, 3), 2),
  (Seq(1), 1),
  (Seq.empty[Int], 1),
  (Seq(2, 4, 6), 0)).toDF("codes", "cd")
scala> codes.show
+---------+---+
|    codes| cd|
+---------+---+
|[1, 2, 3]|  2|
|      [1]|  1|
|       []|  1|
|[2, 4, 6]|  0|
+---------+---+

val q = codes.where("array_contains(codes, cd)")
scala> q.show
+---------+---+
|    codes| cd|
+---------+---+
|[1, 2, 3]|  2|
|      [1]|  1|
+---------+---+

// array_contains standard function with Columns does NOT work. Why?!
// Asked this question on StackOverflow --> https://stackoverflow.com/q/50412939/1305344
val q = codes.where(array_contains($"codes", $"cd"))
scala> q.show
java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.ColumnName cd
  at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:77)
  at org.apache.spark.sql.functions$.array_contains(functions.scala:3046)
  ... 50 elided

// Thanks Russel for this excellent "workaround"
// https://stackoverflow.com/a/50413766/1305344
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.ArrayContains
val q = codes.where(new Column(ArrayContains($"codes".expr, $"cd".expr)))
scala> q.show
+---------+---+
|    codes| cd|
+---------+---+
|[1, 2, 3]|  2|
|      [1]|  1|
+---------+---+

map_keys Collection Function

map_keys(e: Column): Column

map_keys…​FIXME

map_values Collection Function

map_values(e: Column): Column

map_values…​FIXME

results matching ""

    No results matching ""