protected def publish(rate: Long): Unit
Backpressure (Back Pressure)
Backpressure is to make applications robust against data surges.
With backpressure you can guarantee that your Spark Streaming application is stable, i.e. receives data only as fast as it can process it.
Note
|
Backpressure shifts the trouble of buffering input records to the sender so it keeps records until they could be processed by a streaming application. You could alternatively use dynamic allocation feature in Spark Streaming to increase the capacity of streaming infrastructure without slowing down the senders. |
Backpressure is disabled by default and can be turned on using spark.streaming.backpressure.enabled setting.
You can monitor a streaming application using web UI. It is important to ensure that the batch processing time is shorter than the batch interval. Backpressure introduces a feedback loop so the streaming system can adapt to longer processing times and avoid instability.
Note
|
Backpressure is available since Spark 1.5. |
RateController
Tip
|
Read up on back pressure in Wikipedia. |
RateController
is a contract for single-dstream StreamingListeners that listens to batch completed updates for a dstream and maintains a rate limit, i.e. an estimate of the speed at which this stream should ingest messages. With every batch completed update event it calculates the current processing rate and estimates the correct receving rate.
Note
|
RateController works for a single dstream and requires a RateEstimator.
|
The contract says that RateControllers offer the following method:
When created, it creates a daemon single-thread executor service called stream-rate-update and initializes the internal rateLimit
counter which is the current message-ingestion speed.
When a batch completed update happens, a RateController
grabs processingEndTime
, processingDelay
, schedulingDelay
, and numRecords
processed for the batch, computes a rate limit and publishes the current value. The computed value is set as the present rate limit, and published (using the sole abstract publish
method).
Computing a rate limit happens using the RateEstimator’s compute
method.
Caution
|
FIXME Where is this used? What are the use cases? |
InputDStreams can define a RateController
that is registered to JobScheduler's listenerBus
(using ssc.addStreamingListener
) when JobScheduler starts.
RateEstimator
RateEstimator
computes the rate given the input time
, elements
, processingDelay
, and schedulingDelay
.
It is an abstract class with the following abstract method:
def compute(
time: Long,
elements: Long,
processingDelay: Long,
schedulingDelay: Long): Option[Double]
You can control what RateEstimator
to use through spark.streaming.backpressure.rateEstimator setting.
The only possible RateEstimator
to use is the pid rate estimator.
PID Rate Estimator
PID Rate Estimator (represented as PIDRateEstimator
) implements a proportional-integral-derivative (PID) controller which acts on the speed of ingestion of records into an input dstream.
Warning
|
The PID rate estimator is the only possible estimator. All other rate estimators lead to IllegalArgumentException being thrown.
|
It uses the following settings:
-
spark.streaming.backpressure.pid.proportional
(default: 1.0) can be 0 or greater. -
spark.streaming.backpressure.pid.integral
(default: 0.2) can be 0 or greater. -
spark.streaming.backpressure.pid.derived
(default: 0.0) can be 0 or greater. -
spark.streaming.backpressure.pid.minRate
(default: 100) must be greater than 0.
Note
|
The PID rate estimator is used by DirectKafkaInputDStream and input dstreams with receivers (aka ReceiverInputDStreams). |
Tip
|
Enable Add the following line to
Refer to Logging. |
When the PID rate estimator is created you should see the following INFO message in the logs:
INFO PIDRateEstimator: Created PIDRateEstimator with proportional = [proportional], integral = [integral], derivative = [derivative], min rate = [minRate]
When the pid rate estimator computes the rate limit for the current time, you should see the following TRACE message in the logs:
TRACE PIDRateEstimator:
time = [time], # records = [numElements], processing time = [processingDelay], scheduling delay = [schedulingDelay]
If the time to compute the current rate limit for is before the latest time or the number of records is 0 or less, or processing delay is 0 or less, the rate estimation is skipped. You should see the following TRACE message in the logs:
TRACE PIDRateEstimator: Rate estimation skipped
And no rate limit is returned.
Otherwise, when this is to compute the rate estimation for next time and there are records processed as well as the processing delay is positive, it computes the rate estimate.
Once the new rate has already been computed, you should see the following TRACE message in the logs:
TRACE PIDRateEstimator:
latestRate = [latestRate], error = [error]
latestError = [latestError], historicalError = [historicalError]
delaySinceUpdate = [delaySinceUpdate], dError = [dError]
If it was the first computation of the limit rate, you should see the following TRACE message in the logs:
TRACE PIDRateEstimator: First run, rate estimation skipped
No rate limit is returned.
Otherwise, when it is another limit rate, you should see the following TRACE message in the logs:
TRACE PIDRateEstimator: New rate = [newRate]
And the current rate limit is returned.