import org.apache.spark.sql.types._
val schema = new StructType()
.add("intCol", IntegerType)
.add("doubleCol", DoubleType)
.add("intCol2", IntegerType)
.add("string", BinaryType)
val capacity = 4 * 1024 // 4k
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
val columns = schema.fields.map { field =>
new OnHeapColumnVector(capacity, field.dataType)
}
import org.apache.spark.sql.vectorized.ColumnarBatch
val batch = new ColumnarBatch(columns.toArray)
// Add a row [1, 1.1, NULL]
columns(0).putInt(0, 1)
columns(1).putDouble(0, 1.1)
columns(2).putNull(0)
columns(3).putByteArray(0, "Hello".getBytes(java.nio.charset.StandardCharsets.UTF_8))
batch.setNumRows(1)
assert(batch.getRow(0).numFields == 4)
ColumnarBatch — ColumnVectors as Row-Wise Table
ColumnarBatch allows to work with multiple ColumnVectors as a row-wise table.
ColumnarBatch is created when:
-
InMemoryTableScanExecphysical operator is requested to createAndDecompressColumn -
VectorizedParquetRecordReaderis requested to initBatch -
OrcColumnarBatchReaderis requested toinitBatch -
ColumnVectorUtilsis requested totoBatch -
ArrowPythonRunneris requested for aIterator[ColumnarBatch](i.e.newReaderIterator) -
ArrowConvertersis requested for aArrowRowIterator(i.e.fromPayloadIterator)
ColumnarBatch takes an array of ColumnVectors to be created. ColumnarBatch immediately initializes the internal MutableColumnarRow.
The number of columns in a ColumnarBatch is the number of ColumnVectors (this batch was created with).
|
Note
|
In other words, using the contract is as treading on thin ice. |
| Name | Description |
|---|---|
|
|
|
|
Iterator Over InternalRows (in Batch) — rowIterator Method
Iterator<InternalRow> rowIterator()
rowIterator…FIXME
|
Note
|
|
Specifying Number of Rows (in Batch) — setNumRows Method
void setNumRows(int numRows)
In essence, setNumRows resets the batch and makes it available for reuse.
Internally, setNumRows simply sets the numRows to the given numRows.
|
Note
|
|