A look at Back pressure and its handling in RxJS

muhammad abdulmoiz
codeburst
Published in
3 min readOct 29, 2017

--

Back Pressure

In data stream processing we run into situations where consumer can not cope up with producer due to high speed data transmission, this situation is known as Back Pressure.

Back pressure can be handled via following approaches

1- Lossy operations.

2- Lossless operations.

Operations in which consumer isn’t interested in all transmitted data and data loss doesn’t effect overall process are referred as lossy operations, for example input value change from DOM element (as complete value change can be tracked in later streams)

Operations where data loss can cause error conditions in a system i.e each chunk of data is very important and system can’t afford to loss data at all are referred as lossless operations, for example bank transactions information.

RXJS and BackPressure

Cold and hot Observables

Before digging into the backPressure handling details we must understand behaviors of observables.

Cold observable emits sequence of items when observer demands with out disrupting the integrity of sequence.

A hot observable starts emitting data Immediately when its created. Subscribers normally starts getting middle chunks(depends on the time of subscription)
Such an Observable emits items at its own pace, and it is up to its observers to keep up.

Cold Observables are ideal subjects for the reactive pull model of backPressure whereas hot Observables are better candidates of approaches like pausable buffered.

Handling via Debounce approach

Its a candidate for lossy operational approach, only emit item if no event is triggered with in the provided time period(i.e continuous event triggering doesn’t run subscription handler with in particular time stamp)

var source = Rx.Observable
.interval(500 /* ms */)
.take(12);
var subscription = source
.debounceTime(1000)
.subscribe(
{
next: (x) => console.log(x)
})
Output11

From the above output we can observe that all initial emits were omitted and only that one is considered which has debounce time 1000ms

Handling via throttle approach

With in a particular throttle time period it doesn’t emit triggered events, In this case data is loss for that throttle period.

var source = Rx.Observable
.interval(500 /* ms */)
.take(12);
var subscription = source
.throttle(()=>Observable.interval(1000))
.subscribe(
{
next: (x) => console.log(x)
})
Output0
2
4
6
8
10

All the data emits with in that 1000ms period are omitted which results in a series of even number outputs.

Pausable Approaches

The ability to pause observals is the powerful concept given by RxJS both is lossy and lossless versions.
An example of pausable approach

//Creating an observable which emits numbers after each 500ms
const source = Rx.Observable.interval(500);
// Creating a pauser subject to subscribe to
const pauser = new Rx.Subject();
const pausable = pauser.switchMap(paused => {
// Switching based on state
return paused ? Observable.never() : source;
});
pausable.subscribe(x => console.log(x));

pauser.next(false); // starting source streaming
// Pausing the stream after three seconds
setTimeout(function(){
pauser.next(true); // pausing source streaming
}, 3000);

To understand pausable Observals try running and observing the outputs. comments in code do explain it at some extent.

Buffer count Approach

Data chunks are buffered to a particular count and buffer is then transmitted to subscribers , following is an example

var source = Rx.Observable
.interval(500 /* ms */)
.bufferCount(2)
.take(12);
var subscription = source
.subscribe(
{
next: (x) => console.log(x)
})
OUTPUT0,1
2,3 and so on…

Its a lossless approach

Buffer time Approach

To a particular time data is buffered and after that time interval buffer is transmitted to all subscribers, following is an example

var source = Rx.Observable
.interval(500 /* ms */)
.bufferTime(2000)
.take(12);
var subscription = source
.subscribe(
{
next: (x) => console.log(x)
})
OUTPUT0,1
2,3
4,5,6
7,8,9,10
11,12,13,14

It assures no data is loss and all is buffered with in that buffered time.

RxJS Library update
RxJS 4.x previously supports pausable and controlled observal apis which aren’t implemented in rewrite of the library in 5.x version as the approach can be achieved by composing other observables apis those approaches are very essential for handling. complete migration list from 4.x to 5.x can be viewed by following link https://github.com/ReactiveX/rxjs/blob/master/MIGRATION.md

Conclusion

Back pressure is a very common scenario when using data streams , using above approaches a data structure can be develop to handle back pressure scenario according to the situation/problem developer is facing.

--

--