Who says a stream with RxRelay
doesn’t crash?
Motivation
It all started with my encounter with a crashing stream …
Requirement
The application needed to support functionality, which allowed the user to load more data on scrolling – known as LoadMore
. When a user would request the application to load more data, the application could potentially encounter a network issue (such as network not available, extended latency, or timeout, etc). When this happened, the application would need to show a Retry button. If the user decided to tap the Retry button, the application would again fire a LoadMore
event. This would in turn issue the API call to load more data.
Naive Implementation
I used Subject
to receive LoadMore
events from the view. Then, I flatMap()
the LoadMore
event to trigger the API call. In case there were any Network issues, inside onError()
, I told the view to show Retry
button. When the user clicked on Retry
, it would fire another LoadMore
event to my RxJava Subject.
loadMoreSubject
.flatMap {
storyRepository.fetchNextStories()
}
.subscribe ({
//success show stories on view
}, {
//error
showRetryView()
})
However, I noticed that once I receive a No Internet
error, nothing happened if I clicked the Retry
button. This happens because the stream crashes after a terminal event like onComplete()
or onError()
. The terminal event will cause any future events we pass to this subject to be ignored as there will be no client to listen to the events.
“You shall not pass!”
Incomplete truth
After doing some research and asking some friends, I was pointed to a library called RxRelay
which seemed to be solving a similar issue :
Relay: A Subject except without the ability to call onComplete or onError.
Subjects are useful to bridge the gap between non-Rx APIs. However, they are stateful in a damaging way: when they receive an onComplete or onError they no longer become usable for moving data. This is the observable contract and sometimes it is the desired behavior. Most times it is not.
Relays are simply Subjects without the aforementioned property. They allow you to bridge non-Rx APIs into Rx easily, and without the worry of accidentally triggering a terminal state.
After reading the above description, I was happy to have found an easy way out and switched all my PublishSubject
instantly to PublishRelay
.
Surprisingly, my poor Retry
button still wouldn’t work as intended.
This made me wonder about the difference between a Subject
and a Relay
, and the actual problem Relay
solves. We will come back to this later, but let’s look at how to fix the Retry
button issue first.
Solution – Wrapper Objects
If you look at the source code of the Response<T>
object from Retrofit
or the Result<T>
object from retrofit-rxjava-adapter
or even Rxjava’s own wrapper object type called Notification<T>
. You will notice that, Response
,Result
and Notification
, they all have a similar structure: Each wrap the actual data along with error and pass the data through the stream as a single entity.
Solution using Notification :
Rxjava
provides a way to solve our problem by using the Notification
wrapper, which combines the data and error into a single notification event. Let’s use Notification
to wrap our data and internet error :
loadMoreSubject
.flatMap {
storyRepository.fetchNextStories()
//wrap success
.map { Notification.createOnNext<List<String>>(it) }
//wrap error
.onErrorReturn { Notification.createOnError(it) }
}
.subscribe ({
if (it.isOnError) {
if (it.error.isInternetError()) {
//expected and recoverable error
showRetryView()
} else {
//check for other expected errors or throw
}
} else {
//success
}
}, {
//unexpected and irrecoverable error
})
We used the flatMap()
operator on load more events, (as we did earlier) to fetch more stories and wrap the data into a Notification
by using Notification.createOnNext()
. In case of error, we use RxJava’s onErrorReturn()
operator which does not let our stream crash and instead we will wrap our error inside a notification using Notification.createOnError()
. We will then pass the Notification
forward so that the subscriber can recover from the error if possible.
Solution using our own wrapper object :
It’s not really necessary to use these existing wrappers. In fact, we can easily also create our own wrapper object. Now, let’s create a wrapper object to wrap our stories and error into a StoryResult
object :
StoryResult wrapper :
data class StoryResult(val stories: List<Story> = ArrayList(),
val throwable: Throwable?) {
val isError
get() = throwable == null
}
Now let’s use StoryResult
to fix our issue :
loadMoreSubject
.flatMap {
storyRepository.fetchNextStories()
.map { StoryResult(stories = it) }
.onErrorReturn { StoryResult(throwable = it) }
}
.subscribe ({
if (it.isOnError) {
if (it.error.isInternetError()) {
//expected and recoverable error
showRetryView()
} else {
//check for other expected errors or throw
}
} else {
//success
}
}, {
//irrecoverable error
})
Fun-fact: If you are familiar with MVI architectural pattern, you can take advantage of a similar wrapper object to deliver result notifications by wrapping data and error together within a single construct. This enables a crash-free stream in case of expected errors.
Recoverable and Non-recoverable errors :
In an app, there are two kinds of errors that can happen. Expected or recoverable errors like Internet, Server Error, etc. and Unexpected or irrecoverable errors. We usually plan for expected errors to happen and have a backup plan for fixing such errors. For example, in case of unreliable Internet, we can ask the user to try again. In these cases it doesn’t make sense to let our stream reach a terminal state. On the other hand, unexpected or irrecoverable errors are usually the ones which depict that our system is in an unstable state and recovering from this unstable stable is not possible. Only in such cases we should let our stream terminate and we will need to re-initialize our stream and start from scratch, like in the case of a NullPointerException
.
Subjects vs Relays
After we have seen how we could solve our problem by using wrapper objects. One question still remains. What is all the talk about Relays
being a better alternative? If my stream still crashed when I used a Relay
earlier, then how exactly a Relay
doesn’t let the stream terminate?
We will look into these questions now. But first, I will mention one thing:
“Relays do not help us if we get an error while consuming the events from it and mapping into something else i.e. downstream. In fact, a Relay
would also let the stream crash just like a Subject
in this case. On the other hand, Relays are beneficial because they do not accept any terminal event from the producer, i.e from the top.”
How Relays do that actually comes from the fact that Relay
is actually a Consumer
and an Observable
whereas a Subject
is an Observer
and an Observable
. In order to understand this, let’s look at the difference between a Consumer
and an Observer
first.
Observer vs Consumer
From the source code :
Observer
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable e);
void onComplete();
}
An Observer
observes all events like onSubscribe
, onNext
, onError
and onComplete
.
Consumer
/**
* A functional interface (callback) that accepts a single value.
* @param <T> the value type
*/
public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/
void accept(T t) throws Exception;
}
Consumer consumes a single type of event at once like Consumer<Data>
, Consumer<Throwable>
.
Thus a Subject
observes all the events like onSubscribe
, onNext
, onError
and onComplete
where as a Relay
handles only one type of event, i.e. onNext()
.
We have multiple overloads of the Observable.subscribe()
function. The two relevant ones here are :
Observable.subscribe(Observer<? super T> observer)
Observable.subscribe(Consumer<? super T> onNext)
In order to understand the benefit of using Relay
let us take an example.
Example
We create an Observable
which emits four integers and then completes.
val completingObservable = Observable.fromArray(1, 2, 3, 4)
We then subscribe to this completingObservable
into a Subject
because a Subject
is also an Observer
.
completingObservable.subscribe(subject)
Then, we can subscribe to the above subject
. When we do this, we will notice, once the subject
receives onComplete()
event from the completingObservable
, it will terminate. Any future events we try to put into this subject using subject.onNext(5)
will simply get ignored. This is because completingObservable.subscribe(subject)
uses the Observer
overload of the subscribe
function and thus all the events, including terminal events are forwarded to the subject.
Let’s now subscribe to our completingObservable
into a Relay
as a Relay
is also a Consumer
.
completingObservable.subscribe(relay)
If we will subscribe to the above relay
, we will notice that relay
ignores the onComplete()
event from the completingObservable
. Relay
will keep receiving any further events we send relay.accept(5)
. This is because completingObservable.subscribe(relay)
uses the Consumer
overload of subscribe
function and thus only onNext
event is forwarded to the relay
.
Thus Relay
is beneficial in this case as it doesn’t let our stream crash.
You can find the above examples of Subject vs Relays here on my rx playground repository.
Conclusion
-
“Subscribing to a Subject:” When subscribing to a
Subject
vsRelay
and consuming events,Relay
has no benefit over Subjects. We can use wrapper objects in this case to recover from expected errors. -
“Sending events to a Subject”: When sending events into a
Subject
vsRelay
, in case there’s a need of exposing aSubject
or aRelay
in the form of aRxBus
of events to communicate with other parts where-in different parts can send events, we need to take extra care and preferRxRelay
over aSubject
. This ensures terminal events are not forwarded to the bus, which can make it unusable.
References
- StackOverflow question : RxJava Relay vs Subjects
- StackOverflow question I asked : PublishSubject stops emitting after onError()
These results and observations are according to the scenarios I have faced. If you feel there can be some improvements, please let me know!
Thank you for reading!
(Photo by Samuel Zeller)