ExpressionEncoder — Expression-Based Encoder

ExpressionEncoder[T] is a generic Encoder of JVM objects of the type T to and from internal binary rows.

ExpressionEncoder[T] uses expressions for a serializer and a deserializer.

Note
ExpressionEncoder is the only supported implementation of Encoder which is explicitly enforced when Dataset is created (even though Dataset data structure accepts a bare Encoder[T]).
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
val stringEncoder = ExpressionEncoder[String]
scala> val row = stringEncoder.toRow("hello world")
row: org.apache.spark.sql.catalyst.InternalRow = [0,100000000b,6f77206f6c6c6568,646c72]

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
scala> val unsafeRow = row match { case ur: UnsafeRow => ur }
unsafeRow: org.apache.spark.sql.catalyst.expressions.UnsafeRow = [0,100000000b,6f77206f6c6c6568,646c72]

ExpressionEncoder uses serializer expressions to encode (aka serialize) a JVM object of type T to an internal binary row format (i.e. InternalRow).

Note
It is assumed that all serializer expressions contain at least one and the same BoundReference.

ExpressionEncoder uses a deserializer expression to decode (aka deserialize) a JVM object of type T from internal binary row format.

ExpressionEncoder is flat when serializer uses a single expression (which also means that the objects of a type T are not created using constructor parameters only like Product or DefinedByConstructorParams types).

Internally, a ExpressionEncoder creates a UnsafeProjection (for the input serializer), a InternalRow (of size 1), and a safe Projection (for the input deserializer). They are all internal lazy attributes of the encoder.

Table 1. ExpressionEncoder’s (Lazily-Initialized) Internal Properties
Property Description

constructProjection

Projection generated for the deserializer expression

Used exclusively when ExpressionEncoder is requested for a JVM object from a Spark SQL row (i.e. InternalRow).

extractProjection

UnsafeProjection generated for the serializer expressions

Used exclusively when ExpressionEncoder is requested for an encoded version of a JVM object as a Spark SQL row (i.e. InternalRow).

inputRow

GenericInternalRow (with the underlying storage array) of size 1 (i.e. it can only store a single JVM object of any type).

Used…​FIXME

Note
Encoders object contains the default ExpressionEncoders for Scala and Java primitive types, e.g. boolean, long, String, java.sql.Date, java.sql.Timestamp, Array[Byte].

Creating ExpressionEncoder — apply Method

apply[T : TypeTag](): ExpressionEncoder[T]
Caution
FIXME

Creating ExpressionEncoder Instance

ExpressionEncoder takes the following when created:

  • Schema

  • Flag whether ExpressionEncoder is flat or not

  • Serializer expressions (to convert objects of type T to internal rows)

  • Deserializer expression (to convert internal rows to objects of type T)

  • Scala’s ClassTag for the JVM type T

Creating Deserialize Expression — ScalaReflection.deserializerFor Method

deserializerFor[T: TypeTag]: Expression

deserializerFor creates an expression to deserialize from internal binary row format to a Scala object of type T.

import org.apache.spark.sql.catalyst.ScalaReflection.deserializerFor
val timestampDeExpr = deserializerFor[java.sql.Timestamp]
scala> println(timestampDeExpr.numberedTreeString)
00 staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp"), true)
01 +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp")
02    +- getcolumnbyordinal(0, TimestampType)

val tuple2DeExpr = deserializerFor[(java.sql.Timestamp, Double)]
scala> println(tuple2DeExpr.numberedTreeString)
00 newInstance(class scala.Tuple2)
01 :- staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2"), true)
02 :  +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2")
03 :     +- getcolumnbyordinal(0, TimestampType)
04 +- upcast(getcolumnbyordinal(1, DoubleType), DoubleType, - field (class: "scala.Double", name: "_2"), - root class: "scala.Tuple2")
05    +- getcolumnbyordinal(1, DoubleType)

Internally, deserializerFor calls the recursive internal variant of deserializerFor with a single-element walked type path with - root class: "[clsName]"

Tip
Read up on Scala’s TypeTags in TypeTags and Manifests.
Note
deserializerFor is used exclusively when ExpressionEncoder is created for a Scala type T.

Recursive Internal deserializerFor Method

deserializerFor(
  tpe: `Type`,
  path: Option[Expression],
  walkedTypePath: Seq[String]): Expression
Table 2. JVM Types and Deserialize Expressions (in evaluation order)
JVM Type (Scala or Java) Deserialize Expressions

Option[T]

java.lang.Integer

java.lang.Long

java.lang.Double

java.lang.Float

java.lang.Short

java.lang.Byte

java.lang.Boolean

java.sql.Date

java.sql.Timestamp

java.lang.String

java.math.BigDecimal

scala.BigDecimal

java.math.BigInteger

scala.math.BigInt

Array[T]

Seq[T]

Map[K, V]

SQLUserDefinedType

User Defined Types (UDTs)

Product (including Tuple) or DefinedByConstructorParams

Creating Serialize Expression — ScalaReflection.serializerFor Method

serializerFor[T: TypeTag](inputObject: Expression): CreateNamedStruct

serializerFor creates a CreateNamedStruct expression to serialize a Scala object of type T to internal binary row format.

import org.apache.spark.sql.catalyst.ScalaReflection.serializerFor

import org.apache.spark.sql.catalyst.expressions.BoundReference
import org.apache.spark.sql.types.TimestampType
val boundRef = BoundReference(ordinal = 0, dataType = TimestampType, nullable = true)

val timestampSerExpr = serializerFor[java.sql.Timestamp](boundRef)
scala> println(timestampSerExpr.numberedTreeString)
00 named_struct(value, input[0, timestamp, true])
01 :- value
02 +- input[0, timestamp, true]

Internally, serializerFor calls the recursive internal variant of serializerFor with a single-element walked type path with - root class: "[clsName]" and pattern match on the result expression.

Caution
FIXME the pattern match part
Tip
Read up on Scala’s TypeTags in TypeTags and Manifests.
Note
serializerFor is used exclusively when ExpressionEncoder is created for a Scala type T.

Recursive Internal serializerFor Method

serializerFor(
  inputObject: Expression,
  tpe: `Type`,
  walkedTypePath: Seq[String],
  seenTypeSet: Set[`Type`] = Set.empty): Expression

serializerFor creates an expression for serializing an object of type T to an internal row.

Caution
FIXME

Encoding JVM Object to Internal Binary Row Format — toRow Method

toRow(t: T): InternalRow

toRow encodes (aka serializes) a JVM object t as an internal binary row.

Internally, toRow sets the only JVM object to be t in inputRow and converts the inputRow to a unsafe binary row (using extractProjection).

In case of any exception while serializing, toRow reports a RuntimeException:

Error while encoding: [initial exception]
[multi-line serializer]
Note

toRow is mostly used when SparkSession is requested for:

Decoding JVM Object From Internal Binary Row Format — fromRow Method

fromRow(row: InternalRow): T

fromRow decodes (aka deserializes) a JVM object from a row InternalRow (with the required values only).

Internally, fromRow uses constructProjection with row and gets the 0th element of type ObjectType that is then cast to the output type T.

In case of any exception while deserializing, fromRow reports a RuntimeException:

Error while decoding: [initial exception]
[deserializer]
Note

fromRow is used for:

  • Dataset operators, i.e. head, collect, collectAsList, toLocalIterator

  • Structured Streaming’s ForeachSink

Creating ExpressionEncoder For Tuple — tuple Method

tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_]
tuple[T](e: ExpressionEncoder[T]): ExpressionEncoder[Tuple1[T]]
tuple[T1, T2](
  e1: ExpressionEncoder[T1],
  e2: ExpressionEncoder[T2]): ExpressionEncoder[(T1, T2)]
tuple[T1, T2, T3](
  e1: ExpressionEncoder[T1],
  e2: ExpressionEncoder[T2],
  e3: ExpressionEncoder[T3]): ExpressionEncoder[(T1, T2, T3)]
tuple[T1, T2, T3, T4](
  e1: ExpressionEncoder[T1],
  e2: ExpressionEncoder[T2],
  e3: ExpressionEncoder[T3],
  e4: ExpressionEncoder[T4]): ExpressionEncoder[(T1, T2, T3, T4)]
tuple[T1, T2, T3, T4, T5](
  e1: ExpressionEncoder[T1],
  e2: ExpressionEncoder[T2],
  e3: ExpressionEncoder[T3],
  e4: ExpressionEncoder[T4],
  e5: ExpressionEncoder[T5]): ExpressionEncoder[(T1, T2, T3, T4, T5)]

tuple…​FIXME

Note
tuple is used when…​FIXME

resolveAndBind Method

resolveAndBind(
  attrs: Seq[Attribute] = schema.toAttributes,
  analyzer: Analyzer = SimpleAnalyzer): ExpressionEncoder[T]

resolveAndBind…​FIXME

// A very common use case
case class Person(id: Long, name: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Person].schema

import org.apache.spark.sql.catalyst.encoders.{RowEncoder, ExpressionEncoder}
import org.apache.spark.sql.Row
val encoder: ExpressionEncoder[Row] = RowEncoder.apply(schema).resolveAndBind()

import org.apache.spark.sql.catalyst.InternalRow
val row = InternalRow(1, "Jacek")

val deserializer = encoder.deserializer

scala> deserializer.eval(row)
java.lang.UnsupportedOperationException: Only code-generated evaluation is supported
  at org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow.eval(objects.scala:1105)
  ... 54 elided

import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext
val code = deserializer.genCode(ctx).code
Note

resolveAndBind is used when:

  • InternalRowDataWriterFactory is requested to create a DataWriter

  • Dataset is requested for the deserializer expression (to convert internal rows to objects of type T)

  • TypedAggregateExpression is created

  • JdbcUtils is requested to resultSetToRows

  • Spark Structured Streaming’s FlatMapGroupsWithStateExec physical operator is requested for the state deserializer (i.e. stateDeserializer)

  • Spark Structured Streaming’s ForeachSink is requested to add a streaming batch (i.e. addBatch)

results matching ""

    No results matching ""