protected def publish(rate: Long): Unit
Backpressure (Back Pressure)
Backpressure is to make applications robust against data surges.
With backpressure you can guarantee that your Spark Streaming application is stable, i.e. receives data only as fast as it can process it.
|Backpressure shifts the trouble of buffering input records to the sender so it keeps records until they could be processed by a streaming application. You could alternatively use dynamic allocation feature in Spark Streaming to increase the capacity of streaming infrastructure without slowing down the senders.|
Backpressure is disabled by default and can be turned on using spark.streaming.backpressure.enabled setting.
You can monitor a streaming application using web UI. It is important to ensure that the batch processing time is shorter than the batch interval. Backpressure introduces a feedback loop so the streaming system can adapt to longer processing times and avoid instability.
|Backpressure is available since Spark 1.5.|
|Read up on back pressure in Wikipedia.|
RateController is a contract for single-dstream StreamingListeners that listens to batch completed updates for a dstream and maintains a rate limit, i.e. an estimate of the speed at which this stream should ingest messages. With every batch completed update event it calculates the current processing rate and estimates the correct receving rate.
The contract says that RateControllers offer the following method:
When created, it creates a daemon single-thread executor service called stream-rate-update and initializes the internal
rateLimit counter which is the current message-ingestion speed.
When a batch completed update happens, a
numRecords processed for the batch, computes a rate limit and publishes the current value. The computed value is set as the present rate limit, and published (using the sole abstract
Computing a rate limit happens using the RateEstimator’s
|FIXME Where is this used? What are the use cases?|
RateEstimator computes the rate given the input
It is an abstract class with the following abstract method:
def compute( time: Long, elements: Long, processingDelay: Long, schedulingDelay: Long): Option[Double]
You can control what
RateEstimator to use through spark.streaming.backpressure.rateEstimator setting.
The only possible
RateEstimator to use is the pid rate estimator.
PID Rate Estimator (represented as
PIDRateEstimator) implements a proportional-integral-derivative (PID) controller which acts on the speed of ingestion of records into an input dstream.
The PID rate estimator is the only possible estimator. All other rate estimators lead to
It uses the following settings:
spark.streaming.backpressure.pid.proportional(default: 1.0) can be 0 or greater.
spark.streaming.backpressure.pid.integral(default: 0.2) can be 0 or greater.
spark.streaming.backpressure.pid.derived(default: 0.0) can be 0 or greater.
spark.streaming.backpressure.pid.minRate(default: 100) must be greater than 0.
|The PID rate estimator is used by DirectKafkaInputDStream and input dstreams with receivers (aka ReceiverInputDStreams).|
Add the following line to
Refer to Logging.
When the PID rate estimator is created you should see the following INFO message in the logs:
INFO PIDRateEstimator: Created PIDRateEstimator with proportional = [proportional], integral = [integral], derivative = [derivative], min rate = [minRate]
When the pid rate estimator computes the rate limit for the current time, you should see the following TRACE message in the logs:
TRACE PIDRateEstimator: time = [time], # records = [numElements], processing time = [processingDelay], scheduling delay = [schedulingDelay]
If the time to compute the current rate limit for is before the latest time or the number of records is 0 or less, or processing delay is 0 or less, the rate estimation is skipped. You should see the following TRACE message in the logs:
TRACE PIDRateEstimator: Rate estimation skipped
And no rate limit is returned.
Otherwise, when this is to compute the rate estimation for next time and there are records processed as well as the processing delay is positive, it computes the rate estimate.
Once the new rate has already been computed, you should see the following TRACE message in the logs:
TRACE PIDRateEstimator: latestRate = [latestRate], error = [error] latestError = [latestError], historicalError = [historicalError] delaySinceUpdate = [delaySinceUpdate], dError = [dError]
If it was the first computation of the limit rate, you should see the following TRACE message in the logs:
TRACE PIDRateEstimator: First run, rate estimation skipped
No rate limit is returned.
Otherwise, when it is another limit rate, you should see the following TRACE message in the logs:
TRACE PIDRateEstimator: New rate = [newRate]
And the current rate limit is returned.