val rates = spark
.readStream
.format("rate") // <-- use RateStreamSource
.option("rowsPerSecond", 1)
.load
RateStreamSource
RateStreamSource is a streaming source that generates consecutive numbers with timestamp that can be useful for testing and PoCs.
RateStreamSource is created for rate format (that is registered by RateSourceProvider).
RateStreamSource uses a predefined schema that cannot be changed.
val schema = rates.schema
scala> println(schema.treeString)
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)
| Name | Type |
|---|---|
|
|
|
|
| Name | Description |
|---|---|
|
|
|
|
|
|
|
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Getting Maximum Available Offsets — getOffset Method
getOffset: Option[Offset]
|
Note
|
getOffset is a part of the Source Contract.
|
|
Caution
|
FIXME |
Generating DataFrame for Streaming Batch — getBatch Method
getBatch(start: Option[Offset], end: Offset): DataFrame
|
Note
|
getBatch is a part of Source Contract.
|
Internally, getBatch calculates the seconds to start from and end at (from the input start and end offsets) or assumes 0.
getBatch then calculates the values to generate for the start and end seconds.
You should see the following DEBUG message in the logs:
DEBUG RateStreamSource: startSeconds: [startSeconds], endSeconds: [endSeconds], rangeStart: [rangeStart], rangeEnd: [rangeEnd]
If the start and end ranges are equal, getBatch creates an empty DataFrame (with the schema) and returns.
Otherwise, when the ranges are different, getBatch creates a DataFrame using SparkContext.range operator (for the start and end ranges and numPartitions partitions).
Creating RateStreamSource Instance
RateStreamSource takes the following when created:
RateStreamSource initializes the internal registries and counters.