import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
val stringEncoder = ExpressionEncoder[String]
scala> val row = stringEncoder.toRow("hello world")
row: org.apache.spark.sql.catalyst.InternalRow = [0,100000000b,6f77206f6c6c6568,646c72]
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
scala> val unsafeRow = row match { case ur: UnsafeRow => ur }
unsafeRow: org.apache.spark.sql.catalyst.expressions.UnsafeRow = [0,100000000b,6f77206f6c6c6568,646c72]
ExpressionEncoder — Expression-Based Encoder
ExpressionEncoder[T]
is a generic Encoder of JVM objects of the type T
to and from internal binary rows.
ExpressionEncoder[T]
uses expressions for a serializer and a deserializer.
Note
|
ExpressionEncoder is the only supported implementation of Encoder which is explicitly enforced when Dataset is created (even though Dataset data structure accepts a bare Encoder[T] ).
|
ExpressionEncoder
uses serializer expressions to encode (aka serialize) a JVM object of type T
to an internal binary row format (i.e. InternalRow
).
Note
|
It is assumed that all serializer expressions contain at least one and the same BoundReference. |
ExpressionEncoder
uses a deserializer expression to decode (aka deserialize) a JVM object of type T
from internal binary row format.
ExpressionEncoder
is flat when serializer uses a single expression (which also means that the objects of a type T
are not created using constructor parameters only like Product
or DefinedByConstructorParams
types).
Internally, a ExpressionEncoder
creates a UnsafeProjection (for the input serializer), a InternalRow (of size 1
), and a safe Projection
(for the input deserializer). They are all internal lazy attributes of the encoder.
Property | Description |
---|---|
Used exclusively when |
|
Used exclusively when |
|
Used…FIXME |
Note
|
Encoders object contains the default ExpressionEncoders for Scala and Java primitive types, e.g. boolean , long , String , java.sql.Date , java.sql.Timestamp , Array[Byte] .
|
Creating ExpressionEncoder Instance
ExpressionEncoder
takes the following when created:
-
Serializer expressions (to convert objects of type
T
to internal rows) -
Deserializer expression (to convert internal rows to objects of type
T
) -
Scala’s ClassTag for the JVM type
T
Creating Deserialize Expression — ScalaReflection.deserializerFor
Method
deserializerFor[T: TypeTag]: Expression
deserializerFor
creates an expression to deserialize from internal binary row format to a Scala object of type T
.
import org.apache.spark.sql.catalyst.ScalaReflection.deserializerFor
val timestampDeExpr = deserializerFor[java.sql.Timestamp]
scala> println(timestampDeExpr.numberedTreeString)
00 staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp"), true)
01 +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - root class: "java.sql.Timestamp")
02 +- getcolumnbyordinal(0, TimestampType)
val tuple2DeExpr = deserializerFor[(java.sql.Timestamp, Double)]
scala> println(tuple2DeExpr.numberedTreeString)
00 newInstance(class scala.Tuple2)
01 :- staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2"), true)
02 : +- upcast(getcolumnbyordinal(0, TimestampType), TimestampType, - field (class: "java.sql.Timestamp", name: "_1"), - root class: "scala.Tuple2")
03 : +- getcolumnbyordinal(0, TimestampType)
04 +- upcast(getcolumnbyordinal(1, DoubleType), DoubleType, - field (class: "scala.Double", name: "_2"), - root class: "scala.Tuple2")
05 +- getcolumnbyordinal(1, DoubleType)
Internally, deserializerFor
calls the recursive internal variant of deserializerFor with a single-element walked type path with - root class: "[clsName]"
Tip
|
Read up on Scala’s TypeTags in TypeTags and Manifests.
|
Note
|
deserializerFor is used exclusively when ExpressionEncoder is created for a Scala type T .
|
Recursive Internal deserializerFor
Method
deserializerFor(
tpe: `Type`,
path: Option[Expression],
walkedTypePath: Seq[String]): Expression
JVM Type (Scala or Java) | Deserialize Expressions |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
User Defined Types (UDTs) |
|
Creating Serialize Expression — ScalaReflection.serializerFor
Method
serializerFor[T: TypeTag](inputObject: Expression): CreateNamedStruct
serializerFor
creates a CreateNamedStruct expression to serialize a Scala object of type T
to internal binary row format.
import org.apache.spark.sql.catalyst.ScalaReflection.serializerFor
import org.apache.spark.sql.catalyst.expressions.BoundReference
import org.apache.spark.sql.types.TimestampType
val boundRef = BoundReference(ordinal = 0, dataType = TimestampType, nullable = true)
val timestampSerExpr = serializerFor[java.sql.Timestamp](boundRef)
scala> println(timestampSerExpr.numberedTreeString)
00 named_struct(value, input[0, timestamp, true])
01 :- value
02 +- input[0, timestamp, true]
Internally, serializerFor
calls the recursive internal variant of serializerFor with a single-element walked type path with - root class: "[clsName]"
and pattern match on the result expression.
Caution
|
FIXME the pattern match part |
Tip
|
Read up on Scala’s TypeTags in TypeTags and Manifests.
|
Note
|
serializerFor is used exclusively when ExpressionEncoder is created for a Scala type T .
|
Recursive Internal serializerFor
Method
serializerFor(
inputObject: Expression,
tpe: `Type`,
walkedTypePath: Seq[String],
seenTypeSet: Set[`Type`] = Set.empty): Expression
serializerFor
creates an expression for serializing an object of type T
to an internal row.
Caution
|
FIXME |
Encoding JVM Object to Internal Binary Row Format — toRow
Method
toRow(t: T): InternalRow
toRow
encodes (aka serializes) a JVM object t
as an internal binary row.
Internally, toRow
sets the only JVM object to be t
in inputRow and converts the inputRow
to a unsafe binary row (using extractProjection).
In case of any exception while serializing, toRow
reports a RuntimeException
:
Error while encoding: [initial exception]
[multi-line serializer]
Note
|
|
Decoding JVM Object From Internal Binary Row Format — fromRow
Method
fromRow(row: InternalRow): T
fromRow
decodes (aka deserializes) a JVM object from a row
InternalRow (with the required values only).
Internally, fromRow
uses constructProjection with row
and gets the 0th element of type ObjectType
that is then cast to the output type T
.
In case of any exception while deserializing, fromRow
reports a RuntimeException
:
Error while decoding: [initial exception]
[deserializer]
Note
|
|
Creating ExpressionEncoder For Tuple — tuple
Method
tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_]
tuple[T](e: ExpressionEncoder[T]): ExpressionEncoder[Tuple1[T]]
tuple[T1, T2](
e1: ExpressionEncoder[T1],
e2: ExpressionEncoder[T2]): ExpressionEncoder[(T1, T2)]
tuple[T1, T2, T3](
e1: ExpressionEncoder[T1],
e2: ExpressionEncoder[T2],
e3: ExpressionEncoder[T3]): ExpressionEncoder[(T1, T2, T3)]
tuple[T1, T2, T3, T4](
e1: ExpressionEncoder[T1],
e2: ExpressionEncoder[T2],
e3: ExpressionEncoder[T3],
e4: ExpressionEncoder[T4]): ExpressionEncoder[(T1, T2, T3, T4)]
tuple[T1, T2, T3, T4, T5](
e1: ExpressionEncoder[T1],
e2: ExpressionEncoder[T2],
e3: ExpressionEncoder[T3],
e4: ExpressionEncoder[T4],
e5: ExpressionEncoder[T5]): ExpressionEncoder[(T1, T2, T3, T4, T5)]
tuple
…FIXME
Note
|
tuple is used when…FIXME
|
resolveAndBind
Method
resolveAndBind(
attrs: Seq[Attribute] = schema.toAttributes,
analyzer: Analyzer = SimpleAnalyzer): ExpressionEncoder[T]
resolveAndBind
…FIXME
// A very common use case
case class Person(id: Long, name: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Person].schema
import org.apache.spark.sql.catalyst.encoders.{RowEncoder, ExpressionEncoder}
import org.apache.spark.sql.Row
val encoder: ExpressionEncoder[Row] = RowEncoder.apply(schema).resolveAndBind()
import org.apache.spark.sql.catalyst.InternalRow
val row = InternalRow(1, "Jacek")
val deserializer = encoder.deserializer
scala> deserializer.eval(row)
java.lang.UnsupportedOperationException: Only code-generated evaluation is supported
at org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow.eval(objects.scala:1105)
... 54 elided
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext
val code = deserializer.genCode(ctx).code
Note
|
|