org.apache.spark:spark-avro_2.12:2.4.0
Avro Data Source
Spark SQL supports structured queries over Avro files as well as in columns (in a DataFrame
).
Note
|
Apache Avro is a data serialization format and provides the following features:
|
Avro data source is provided by the spark-avro
external module. You should include it as a dependency in your Spark application (e.g. spark-submit --packages
or in build.sbt
).
The following shows how to include the spark-avro
module in a spark-shell
session.
$ ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:2.4.0
Name | Description |
---|---|
Parses an Avro-encoded binary column and converts to a Catalyst value per JSON-encoded Avro schema |
|
Converts a column to an Avro-encoded binary column |
After the module is loaded, you should import the org.apache.spark.sql.avro
package to have the from_avro and to_avro functions available.
import org.apache.spark.sql.avro._
Converting Column to Avro-Encoded Binary Column — to_avro
Method
to_avro(data: Column): Column
to_avro
creates a Column with the CatalystDataToAvro unary expression (with the Catalyst expression of the given data
column).
import org.apache.spark.sql.avro._
val q = spark.range(1).withColumn("to_avro_id", to_avro('id))
scala> q.show
+---+----------+
| id|to_avro_id|
+---+----------+
| 0| [00]|
+---+----------+
val logicalPlan = q.queryExecution.logical
scala> println(logicalPlan.numberedTreeString)
00 'Project [id#33L, catalystdatatoavro('id) AS to_avro_id#35]
01 +- Range (0, 1, step=1, splits=Some(8))
import org.apache.spark.sql.avro.CatalystDataToAvro
// Let's use QueryExecution.analyzed instead
// https://issues.apache.org/jira/browse/SPARK-26063
val analyzedPlan = q.queryExecution.analyzed
val toAvroExpr = analyzedPlan.expressions.drop(1).head.children.head.asInstanceOf[CatalystDataToAvro]
scala> println(toAvroExpr.sql)
to_avro(`id`, bigint)
Converting Avro-Encoded Column to Catalyst Value — from_avro
Method
from_avro(data: Column, jsonFormatSchema: String): Column
from_avro
creates a Column with the AvroDataToCatalyst unary expression (with the Catalyst expression of the given data
column and the jsonFormatSchema
JSON-encoded schema).
import org.apache.spark.sql.avro._
val data = spark.range(1).withColumn("to_avro_id", to_avro('id))
// Use from_avro to decode to_avro-encoded id column
val jsonFormatSchema = s"""
|{
| "type": "long",
| "name": "id"
|}
""".stripMargin
val q = data.select(from_avro('to_avro_id, jsonFormatSchema) as "id_from_avro")
scala> q.show
+------------+
|id_from_avro|
+------------+
| 0|
+------------+
val logicalPlan = q.queryExecution.logical
scala> println(logicalPlan.numberedTreeString)
00 'Project [avrodatatocatalyst('to_avro_id,
01 {
02 "type": "long",
03 "name": "id"
04 }
05 ) AS id_from_avro#77]
06 +- Project [id#66L, catalystdatatoavro(id#66L) AS to_avro_id#68]
07 +- Range (0, 1, step=1, splits=Some(8))
import org.apache.spark.sql.avro.AvroDataToCatalyst
// Let's use QueryExecution.analyzed instead
// https://issues.apache.org/jira/browse/SPARK-26063
val analyzedPlan = q.queryExecution.analyzed
val fromAvroExpr = analyzedPlan.expressions.head.children.head.asInstanceOf[AvroDataToCatalyst]
scala> println(fromAvroExpr.sql)
from_avro(`to_avro_id`, bigint)