Backpressure (Back Pressure)

Backpressure is to make applications robust against data surges.

With backpressure you can guarantee that your Spark Streaming application is stable, i.e. receives data only as fast as it can process it.

Note
Backpressure shifts the trouble of buffering input records to the sender so it keeps records until they could be processed by a streaming application. You could alternatively use dynamic allocation feature in Spark Streaming to increase the capacity of streaming infrastructure without slowing down the senders.

Backpressure is disabled by default and can be turned on using spark.streaming.backpressure.enabled setting.

You can monitor a streaming application using web UI. It is important to ensure that the batch processing time is shorter than the batch interval. Backpressure introduces a feedback loop so the streaming system can adapt to longer processing times and avoid instability.

Note
Backpressure is available since Spark 1.5.

RateController

Tip
Read up on back pressure in Wikipedia.

RateController is a contract for single-dstream StreamingListeners that listens to batch completed updates for a dstream and maintains a rate limit, i.e. an estimate of the speed at which this stream should ingest messages. With every batch completed update event it calculates the current processing rate and estimates the correct receving rate.

Note
RateController works for a single dstream and requires a RateEstimator.

The contract says that RateControllers offer the following method:

protected def publish(rate: Long): Unit

When created, it creates a daemon single-thread executor service called stream-rate-update and initializes the internal rateLimit counter which is the current message-ingestion speed.

When a batch completed update happens, a RateController grabs processingEndTime, processingDelay, schedulingDelay, and numRecords processed for the batch, computes a rate limit and publishes the current value. The computed value is set as the present rate limit, and published (using the sole abstract publish method).

Computing a rate limit happens using the RateEstimator’s compute method.

Caution
FIXME Where is this used? What are the use cases?

InputDStreams can define a RateController that is registered to JobScheduler's listenerBus (using ssc.addStreamingListener) when JobScheduler starts.

RateEstimator

RateEstimator computes the rate given the input time, elements, processingDelay, and schedulingDelay.

It is an abstract class with the following abstract method:

def compute(
    time: Long,
    elements: Long,
    processingDelay: Long,
    schedulingDelay: Long): Option[Double]

You can control what RateEstimator to use through spark.streaming.backpressure.rateEstimator setting.

The only possible RateEstimator to use is the pid rate estimator.

PID Rate Estimator

PID Rate Estimator (represented as PIDRateEstimator) implements a proportional-integral-derivative (PID) controller which acts on the speed of ingestion of records into an input dstream.

Warning
The PID rate estimator is the only possible estimator. All other rate estimators lead to IllegalArgumentException being thrown.

It uses the following settings:

  • spark.streaming.backpressure.pid.proportional (default: 1.0) can be 0 or greater.

  • spark.streaming.backpressure.pid.integral (default: 0.2) can be 0 or greater.

  • spark.streaming.backpressure.pid.derived (default: 0.0) can be 0 or greater.

  • spark.streaming.backpressure.pid.minRate (default: 100) must be greater than 0.

Tip

Enable INFO or TRACE logging level for org.apache.spark.streaming.scheduler.rate.PIDRateEstimator logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.streaming.scheduler.rate.PIDRateEstimator=TRACE

Refer to Logging.

When the PID rate estimator is created you should see the following INFO message in the logs:

INFO PIDRateEstimator: Created PIDRateEstimator with proportional = [proportional], integral = [integral], derivative = [derivative], min rate = [minRate]

When the pid rate estimator computes the rate limit for the current time, you should see the following TRACE message in the logs:

TRACE PIDRateEstimator:
time = [time], # records = [numElements], processing time = [processingDelay], scheduling delay = [schedulingDelay]

If the time to compute the current rate limit for is before the latest time or the number of records is 0 or less, or processing delay is 0 or less, the rate estimation is skipped. You should see the following TRACE message in the logs:

TRACE PIDRateEstimator: Rate estimation skipped

And no rate limit is returned.

Otherwise, when this is to compute the rate estimation for next time and there are records processed as well as the processing delay is positive, it computes the rate estimate.

Once the new rate has already been computed, you should see the following TRACE message in the logs:

TRACE PIDRateEstimator:
 latestRate = [latestRate], error = [error]
 latestError = [latestError], historicalError = [historicalError]
 delaySinceUpdate = [delaySinceUpdate], dError = [dError]

If it was the first computation of the limit rate, you should see the following TRACE message in the logs:

TRACE PIDRateEstimator: First run, rate estimation skipped

No rate limit is returned.

Otherwise, when it is another limit rate, you should see the following TRACE message in the logs:

TRACE PIDRateEstimator: New rate = [newRate]

And the current rate limit is returned.

results matching ""

    No results matching ""