val rates = spark
.readStream
.format("rate") // <-- use RateStreamSource
.option("rowsPerSecond", 1)
.load
RateStreamSource
RateStreamSource
is a streaming source that generates consecutive numbers with timestamp that can be useful for testing and PoCs.
RateStreamSource
is created for rate format (that is registered by RateSourceProvider).
RateStreamSource
uses a predefined schema that cannot be changed.
val schema = rates.schema
scala> println(schema.treeString)
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)
Name | Type |
---|---|
|
|
|
|
Name | Description |
---|---|
|
|
|
|
|
|
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Getting Maximum Available Offsets — getOffset
Method
getOffset: Option[Offset]
Note
|
getOffset is a part of the Source Contract.
|
Caution
|
FIXME |
Generating DataFrame for Streaming Batch — getBatch
Method
getBatch(start: Option[Offset], end: Offset): DataFrame
Note
|
getBatch is a part of Source Contract.
|
Internally, getBatch
calculates the seconds to start from and end at (from the input start
and end
offsets) or assumes 0
.
getBatch
then calculates the values to generate for the start and end seconds.
You should see the following DEBUG message in the logs:
DEBUG RateStreamSource: startSeconds: [startSeconds], endSeconds: [endSeconds], rangeStart: [rangeStart], rangeEnd: [rangeEnd]
If the start and end ranges are equal, getBatch
creates an empty DataFrame
(with the schema) and returns.
Otherwise, when the ranges are different, getBatch
creates a DataFrame
using SparkContext.range
operator (for the start and end ranges and numPartitions partitions).
Creating RateStreamSource Instance
RateStreamSource
takes the following when created:
RateStreamSource
initializes the internal registries and counters.