Custom Operator using Rx Java Compose and Transformers

Rx Java is a Library which helps in doing Event Based Asynchronous programming using Observables. And also helps in reducing the boiler plate code in efficient readable manner.

Here, today we are gonna talk about the compose() operator of Rx,

Pre Requisite

Key Things of Rx

Observable.from(someSource)
.map(data -> manipulate(data))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> doSomething(data));

As we see that we are forming a stream of operators and making our STUFF.

But most of the times we use some common operators on regular basis and repeating their code again and again.

Now let’s move to Old School, what we do in our projects when we are using a code repeatedly. We define it into a UTILS and then reuse it whenever needed.

Wrong Way

The following is the anti-pattern which I used for months.

First, creating a method that applies the schedulers:

<T> Observable<T> commonSchedulers(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}

Then, wrapping our Observable chain:

commonSchedulers(
Observable.from(someSource)
.map(data -> manipulate(data))
)
.subscribe(data -> doSomething(data));

The code works but it’s is not the recommended way — It's no longer a series of operators, so it's hard to follow.

Now, this anti-pattern gets worse when we use it multiple times into a single stream.🥵

Refer this below, GREAT SMALL VIDEO EXAMPLE for more info about this anti-pattern.

Main Idea of Compose

Important

I know you are using this kind of this code earlier. Now, just refactor that code use a better way instead. Need tips? (use RxJava or even Kotlin or you want to still using java, you can use Future or CompletableFuture).

So keeping in mind, the wise people behind Rx introduced this TRANSFORMERS which we used with compose() to conquer this anti-pattern and in preventing chain breaking.

INTRODUCING TRANSFORMERS

Now, Let’s create an example:

<T> Transformer<T, T> commonSchedulers() {
return new Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
or lambda<T> Transformer<T, T> commonSchedulers() {
return observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}

Now, let’s see how our code looks:

Observable.from(someSource)
.map(data -> manipulate(data))
.compose(commonSchedulers())
.subscribe(data -> doSomething(data));

Yehhhhhh, We’ve got reusable code and also the chain is preserved. Also makes our code readable.

Confuse with flatMap()?

Because, they both emit Observable<T>, which means that both can be reuse a series of operators, right?

Difference

  1. When we use compose(), we will get the original Observable<T> from the stream. Therefore, operators that affect the whole stream (like subscribeOn() and observeOn()) need to use compose().

And, if we use subscribeOn()/observeOn() inflatMap(), it will only affect the Observable inside theflatMap() but not on the whole stream.

  1. compose() get executed immediately when we create the Observable stream, as if we had written the operators inline. Where as, the flatMap() executes when its onNext() is called, each time it is called. In other words, flatMap() transforms each item, whereas compose() transforms the whole stream.
  2. compose() creates a new Observable every time onNext() is called where ascompose() operates on the stream as it is. Therefore flatMap()seems to be less efficient.

If you guys are planning to replace some operators with reusable code, use compose().

But above comparison between these two doesn’t mean that we should always go for compose(), flatMap() has its own advantages and use cases. It totally depends on our needs.

Create own Transformers by implementing Transformer interface:

public class ToLength implements Transformer<String, Integer> {
public static ToLength toLength() {
return new ToLength();
}

private ToLength() {
super();
}
@Override
public Observable<Integer> call(Observable<String> source) {
return source.map(String::length);
}
}

Note that we use the transformer toLength to transform our observable from String to its length in Integer.

observable.compose(toLength())...

Alternatives of compose()

We have some other alternatives also like Operator Interface and lift() which we can also use to create our own Custom Operator. But we will not cover them in this.

Will make a separate article for them.

HAPPY CODING….😊