RateStreamSource

RateStreamSource is a streaming source that generates consecutive numbers with timestamp that can be useful for testing and PoCs.

RateStreamSource is created for rate format (that is registered by RateSourceProvider).

val rates = spark
  .readStream
  .format("rate") // <-- use RateStreamSource
  .option("rowsPerSecond", 1)
  .load
Table 1. RateStreamSource’s Options
Name Default Value Description

numPartitions

(default parallelism)

Number of partitions to use

rampUpTime

0 (seconds)

rowsPerSecond

1

Number of rows to generate per second (has to be greater than 0)

RateStreamSource uses a predefined schema that cannot be changed.

val schema = rates.schema
scala> println(schema.treeString)
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)
Table 2. RateStreamSource’s Dataset Schema (in the positional order)
Name Type

timestamp

TimestampType

value

LongType

Table 3. RateStreamSource’s Internal Registries and Counters
Name Description

clock

lastTimeMs

maxSeconds

startTimeMs

Tip

Enable INFO or DEBUG logging levels for org.apache.spark.sql.execution.streaming.RateStreamSource to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.RateStreamSource=DEBUG

Refer to Logging.

Getting Maximum Available Offsets — getOffset Method

getOffset: Option[Offset]
Note
getOffset is a part of the Source Contract.
Caution
FIXME

Generating DataFrame for Streaming Batch — getBatch Method

getBatch(start: Option[Offset], end: Offset): DataFrame
Note
getBatch is a part of Source Contract.

Internally, getBatch calculates the seconds to start from and end at (from the input start and end offsets) or assumes 0.

getBatch then calculates the values to generate for the start and end seconds.

You should see the following DEBUG message in the logs:

DEBUG RateStreamSource: startSeconds: [startSeconds], endSeconds: [endSeconds], rangeStart: [rangeStart], rangeEnd: [rangeEnd]

If the start and end ranges are equal, getBatch creates an empty DataFrame (with the schema) and returns.

Otherwise, when the ranges are different, getBatch creates a DataFrame using SparkContext.range operator (for the start and end ranges and numPartitions partitions).

Creating RateStreamSource Instance

RateStreamSource takes the following when created:

  • SQLContext

  • Path to the metadata

  • Rows per second

  • RampUp time in seconds

  • Number of partitions

  • Flag to whether to use ManualClock (true) or SystemClock (false)

RateStreamSource initializes the internal registries and counters.

results matching ""

    No results matching ""