import org.apache.spark.sql.types._
val schema = new StructType()
.add("intCol", IntegerType)
.add("doubleCol", DoubleType)
.add("intCol2", IntegerType)
.add("string", BinaryType)
val capacity = 4 * 1024 // 4k
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
val columns = schema.fields.map { field =>
new OnHeapColumnVector(capacity, field.dataType)
}
import org.apache.spark.sql.vectorized.ColumnarBatch
val batch = new ColumnarBatch(columns.toArray)
// Add a row [1, 1.1, NULL]
columns(0).putInt(0, 1)
columns(1).putDouble(0, 1.1)
columns(2).putNull(0)
columns(3).putByteArray(0, "Hello".getBytes(java.nio.charset.StandardCharsets.UTF_8))
batch.setNumRows(1)
assert(batch.getRow(0).numFields == 4)
ColumnarBatch — ColumnVectors as Row-Wise Table
ColumnarBatch
allows to work with multiple ColumnVectors as a row-wise table.
ColumnarBatch
is created when:
-
InMemoryTableScanExec
physical operator is requested to createAndDecompressColumn -
VectorizedParquetRecordReader
is requested to initBatch -
OrcColumnarBatchReader
is requested toinitBatch
-
ColumnVectorUtils
is requested totoBatch
-
ArrowPythonRunner
is requested for aIterator[ColumnarBatch]
(i.e.newReaderIterator
) -
ArrowConverters
is requested for aArrowRowIterator
(i.e.fromPayloadIterator
)
ColumnarBatch
takes an array of ColumnVectors to be created. ColumnarBatch
immediately initializes the internal MutableColumnarRow.
The number of columns in a ColumnarBatch
is the number of ColumnVectors (this batch was created with).
Note
|
In other words, using the contract is as treading on thin ice. |
Name | Description |
---|---|
|
|
|
|
Iterator Over InternalRows (in Batch) — rowIterator
Method
Iterator<InternalRow> rowIterator()
rowIterator
…FIXME
Note
|
|
Specifying Number of Rows (in Batch) — setNumRows
Method
void setNumRows(int numRows)
In essence, setNumRows
resets the batch and makes it available for reuse.
Internally, setNumRows
simply sets the numRows to the given numRows
.
Note
|
|