import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
val stringEncoder = ExpressionEncoder[String]
scala> val row = stringEncoder.toRow("hello world")
row: org.apache.spark.sql.catalyst.InternalRow = [0,100000000b,6f77206f6c6c6568,646c72]
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
scala> val unsafeRow = row match { case ur: UnsafeRow => ur }
unsafeRow: org.apache.spark.sql.catalyst.expressions.UnsafeRow = [0,100000000b,6f77206f6c6c6568,646c72]
ExpressionEncoder — Expression-Based Encoder
ExpressionEncoder[T] is a generic Encoder of JVM objects of the type T to and from internal binary rows.
ExpressionEncoder[T] uses expressions for a serializer and a deserializer.
|
Note
|
ExpressionEncoder is the only supported implementation of Encoder which is explicitly enforced when Dataset is created (even though Dataset data structure accepts a bare Encoder[T]).
|
ExpressionEncoder uses serializer expressions to encode (aka serialize) a JVM object of type T to an internal binary row format (i.e. InternalRow).
|
Note
|
It is assumed that all serializer expressions contain at least one and the same BoundReference. |
ExpressionEncoder uses a deserializer expression to decode (aka deserialize) a JVM object of type T from internal binary row format.
ExpressionEncoder is flat when serializer uses a single expression (which also means that the objects of a type T are not created using constructor parameters only like Product or DefinedByConstructorParams types).
Internally, a ExpressionEncoder creates a UnsafeProjection (for the input serializer), a InternalRow (of size 1), and a safe Projection (for the input deserializer). They are all internal lazy attributes of the encoder.
| Property | Description |
|---|---|
Used exclusively when |
|
Used exclusively when |
|
Used…FIXME |
|
Note
|
Encoders object contains the default ExpressionEncoders for Scala and Java primitive types, e.g. boolean, long, String, java.sql.Date, java.sql.Timestamp, Array[Byte].
|
Creating ExpressionEncoder Instance
ExpressionEncoder takes the following when created:
-
Serializer expressions (to convert objects of type
Tto internal rows) -
Deserializer expression (to convert internal rows to objects of type
T) -
Scala’s ClassTag for the JVM type
T
Creating Deserialize Expression — ScalaReflection.deserializerFor Method
deserializerFor[T: TypeTag]: Expression
deserializerFor creates an expression to deserialize from internal binary row format to a Scala object of type T.
import org.apache.spark.sql.catalyst.ScalaReflection.deserializerFor
val timestampDeExpr = deserializerFor[java.sql.Timestamp]
scala> println(timestampDeExpr.numberedTreeString)
00 staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp"), true)
01 +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp")
02 +- getcolumnbyordinal(0, TimestampType)
val tuple2DeExpr = deserializerFor[(java.sql.Timestamp, Double)]
scala> println(tuple2DeExpr.numberedTreeString)
00 newInstance(class scala.Tuple2)
01 :- staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2"), true)
02 : +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2")
03 : +- getcolumnbyordinal(0, TimestampType)
04 +- upcast(getcolumnbyordinal(1, DoubleType), DoubleType, - field (class: "scala.Double", name: "_2"), - root class: "scala.Tuple2")
05 +- getcolumnbyordinal(1, DoubleType)
Internally, deserializerFor calls the recursive internal variant of deserializerFor with a single-element walked type path with - root class: "[clsName]"
|
Tip
|
Read up on Scala’s TypeTags in TypeTags and Manifests.
|
|
Note
|
deserializerFor is used exclusively when ExpressionEncoder is created for a Scala type T.
|
Recursive Internal deserializerFor Method
deserializerFor(
tpe: `Type`,
path: Option[Expression],
walkedTypePath: Seq[String]): Expression
| JVM Type (Scala or Java) | Deserialize Expressions |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
User Defined Types (UDTs) |
|
Creating Serialize Expression — ScalaReflection.serializerFor Method
serializerFor[T: TypeTag](inputObject: Expression): CreateNamedStruct
serializerFor creates a CreateNamedStruct expression to serialize a Scala object of type T to internal binary row format.
import org.apache.spark.sql.catalyst.ScalaReflection.serializerFor
import org.apache.spark.sql.catalyst.expressions.BoundReference
import org.apache.spark.sql.types.TimestampType
val boundRef = BoundReference(ordinal = 0, dataType = TimestampType, nullable = true)
val timestampSerExpr = serializerFor[java.sql.Timestamp](boundRef)
scala> println(timestampSerExpr.numberedTreeString)
00 named_struct(value, input[0, timestamp, true])
01 :- value
02 +- input[0, timestamp, true]
Internally, serializerFor calls the recursive internal variant of serializerFor with a single-element walked type path with - root class: "[clsName]" and pattern match on the result expression.
|
Caution
|
FIXME the pattern match part |
|
Tip
|
Read up on Scala’s TypeTags in TypeTags and Manifests.
|
|
Note
|
serializerFor is used exclusively when ExpressionEncoder is created for a Scala type T.
|
Recursive Internal serializerFor Method
serializerFor(
inputObject: Expression,
tpe: `Type`,
walkedTypePath: Seq[String],
seenTypeSet: Set[`Type`] = Set.empty): Expression
serializerFor creates an expression for serializing an object of type T to an internal row.
|
Caution
|
FIXME |
Encoding JVM Object to Internal Binary Row Format — toRow Method
toRow(t: T): InternalRow
toRow encodes (aka serializes) a JVM object t as an internal binary row.
Internally, toRow sets the only JVM object to be t in inputRow and converts the inputRow to a unsafe binary row (using extractProjection).
In case of any exception while serializing, toRow reports a RuntimeException:
Error while encoding: [initial exception]
[multi-line serializer]
|
Note
|
|
Decoding JVM Object From Internal Binary Row Format — fromRow Method
fromRow(row: InternalRow): T
fromRow decodes (aka deserializes) a JVM object from a row InternalRow (with the required values only).
Internally, fromRow uses constructProjection with row and gets the 0th element of type ObjectType that is then cast to the output type T.
In case of any exception while deserializing, fromRow reports a RuntimeException:
Error while decoding: [initial exception]
[deserializer]
|
Note
|
|
Creating ExpressionEncoder For Tuple — tuple Method
tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_]
tuple[T](e: ExpressionEncoder[T]): ExpressionEncoder[Tuple1[T]]
tuple[T1, T2](
e1: ExpressionEncoder[T1],
e2: ExpressionEncoder[T2]): ExpressionEncoder[(T1, T2)]
tuple[T1, T2, T3](
e1: ExpressionEncoder[T1],
e2: ExpressionEncoder[T2],
e3: ExpressionEncoder[T3]): ExpressionEncoder[(T1, T2, T3)]
tuple[T1, T2, T3, T4](
e1: ExpressionEncoder[T1],
e2: ExpressionEncoder[T2],
e3: ExpressionEncoder[T3],
e4: ExpressionEncoder[T4]): ExpressionEncoder[(T1, T2, T3, T4)]
tuple[T1, T2, T3, T4, T5](
e1: ExpressionEncoder[T1],
e2: ExpressionEncoder[T2],
e3: ExpressionEncoder[T3],
e4: ExpressionEncoder[T4],
e5: ExpressionEncoder[T5]): ExpressionEncoder[(T1, T2, T3, T4, T5)]
tuple…FIXME
|
Note
|
tuple is used when…FIXME
|
resolveAndBind Method
resolveAndBind(
attrs: Seq[Attribute] = schema.toAttributes,
analyzer: Analyzer = SimpleAnalyzer): ExpressionEncoder[T]
resolveAndBind…FIXME
// A very common use case
case class Person(id: Long, name: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Person].schema
import org.apache.spark.sql.catalyst.encoders.{RowEncoder, ExpressionEncoder}
import org.apache.spark.sql.Row
val encoder: ExpressionEncoder[Row] = RowEncoder.apply(schema).resolveAndBind()
import org.apache.spark.sql.catalyst.InternalRow
val row = InternalRow(1, "Jacek")
val deserializer = encoder.deserializer
scala> deserializer.eval(row)
java.lang.UnsupportedOperationException: Only code-generated evaluation is supported
at org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow.eval(objects.scala:1105)
... 54 elided
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext
val code = deserializer.genCode(ctx).code
|
Note
|
|