Introduction
RxJava is created by Netflix in 2012. RxJava is Observable,
Observable
Observer reacts to when Observer subscribes to an Observable. This pattern facilitates concurrent operations because it does not block while waiting for Observable to emit objects. But it creates a sentry as observer. observer is sometimes called a subscriber, watcher, or reactor.
Observable equal Iterable, which pull. The pull is blocking the thread until the producer comes new values when consumer pulls values. The push is more flexible because logical and practical.
Subscribe
The Subscribe is way to connect an observer to an Observable. Observer implements onNext
,
onError
, onCompleted
method. onNext is emissions, onError
and onCompleted
is notifications.
For example of using onNext
, onCompleted
, and onError
:
//PublishSubject is later explain.
PublishSubject subject = PublishSubject.create();
subject.onNext("one"); // before subscribe.
subject.subscribe(System.out::println); //subscribe!!
subject.onNext("two"); // after subscribe.
subject.onNext("three");
subject.onComplete(); // complete.
Output
two
three
Using onComplete
before all of onNext
finish:
PublishSubject subject = PublishSubject.create();
subject.onNext("one"); // before subscribe.
subject.subscribe(System.out::println); // subscribe!!
subject.onNext("two");
subject.onNext("three");
subject.onComplete(); // complete.
subject.onNext("fore"); // after complete.
Output
two
three
Example of onError
method that need to a little note:
PublishSubject subject = PublishSubject.create();
subject.onNext("one");
subject.subscribe(System.out::println, System.out::println);
subject.onNext("two");
subject.onNext("three");
subject.onError(new Exception("Oops")); // error
subject.onNext("fore");
(appropriate) Output
two
three
java.lang.Exception: Oops
Why do onError
method need to note? It's because I encountered when wrote following code:
PublishSubject subject = PublishSubject.create();
subject.onNext("one");
subject.subscribe(System.out::println);
subject.onNext("two");
subject.onNext("three");
subject.onError(new Exception("Oops")); // error
subject.onNext("fore");
(unappropriate) Output
io.reactivex.exceptions.OnErrorNotImplementedException: Oops
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
at io.reactivex.subjects.PublishSubject$PublishDisposable.onError(PublishSubject.java:273)
at io.reactivex.subjects.PublishSubject.onError(PublishSubject.java:201)
at Example01.esObserver(Example01.java:14)
at Main.main(Main.java:4)
Caused by: java.lang.Exception: Oops
... 2 more
Exception in thread "main" io.reactivex.exceptions.OnErrorNotImplementedException: Oops
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
at io.reactivex.subjects.PublishSubject$PublishDisposable.onError(PublishSubject.java:273)
at io.reactivex.subjects.PublishSubject.onError(PublishSubject.java:201)
at Example01.esObserver(Example01.java:14)
at Main.main(Main.java:4)
Caused by: java.lang.Exception: Oops
... 2 more
This exception isn't intend, actually this exception seem to often occur. Cause of this exception is
occur for not implement onError
to your subscriber.
The following subscribe
method is:
subscribe()
subscribe(Consumer<? super T> onNext)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Comsumer<? super Disposable> onSubscribe)
subscribe(Observer<? super T> observer)
If you use onError
, then you have to implement error handling in subscribe
method.
Summary
onNext
method calls whenever the Observable emits an item. onComplete
method
notifies that the Observable properly finished. onComplete
method is named as
onCompleted
in RxJava1, but changed to removing d
in RxJava2 due to
Reactive-Streams compatibility.
Subject
A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer and an observable. observer can subscribe to one or more Observables. Observable can pass through the items it observes and also emit new items.
PublishSubject
PublishSubject emits items of subscribed Observers. This is emitted immediately after created items. So, there is risk that one or more items may be lost between the Subject is created and the observer subscribes. If you want to guarantee to delivery all items, you need to form with Create, or using ReplaySubject instead.
If when error occured, PublishSubject not emit any items to
subsequent observers, but instead emit error notification. PublishSubject
is derive from Processor in the Reactive Streams specification,
so nulls is not allowed as parameters to onNext
and onError
.
If this occured, then calls will throw NullPointerException.
Since a PublishSubject is an Observable, don't support backpressure.
data:image/s3,"s3://crabby-images/b696b/b696b617865fde6bd52af2828e6c2f5e634d1b1f" alt=""
This subject not have a public constructor. A new empty instance of
PublishSubject can create by create
method.
When the PublishSubject is terminated by onError
or onComplete
, late Observer only receive terminal event. Since a
PublishSubject don't keep/cache items, a new Observer not receive
past items. When PublishSubject implemented the Observer interface,
you are not that need to implement onSbscribe
if the subject is using as
standalone. Only when calling onSbscribe
after
PublishSubject became tarminal state, immediately
Disposable is processed. onNext
, onError
and onComplete
should implement to be serialized which call from the same thread or
not overlapping defferent threads through external means of serialization. toSerialized
into Subject is available to all Subjects.
This supports serialization and also protects reentrance. The PublishSubject
supports hasComplete
, hasThrowable
, getThrowable
and
hasObserver
which the standard state-peeking method.
Backpressure
Backpressure occurs when in an Flowable processing pipeline, an Observable is emitting items faster than an operator can ingest items.
Asynchronous operators have buffers to hold items such onNext
until can be processed.
There is problem when processing huge items. This problem lead
OutOfMemoryError. Like as error handling receive operators to deal
with by using onError
, backpressure is another feature of data flow programmer has
to think about and handle by using onBackpressure
.