JP

Learn RxJava2

update: 21 Mar 2018

Introduction

RxJava is created by Netflix in 2012. RxJava is Observable,

Observable

Observer reacts to when Observer subscribes to an Observable. This pattern facilitates concurrent operations because it does not block while waiting for Observable to emit objects. But it creates a sentry as observer. observer is sometimes called a subscriber, watcher, or reactor.

Observable equal Iterable, which pull. The pull is blocking the thread until the producer comes new values when consumer pulls values. The push is more flexible because logical and practical.

Subscribe

The Subscribe is way to connect an observer to an Observable. Observer implements onNext, onError, onCompleted method. onNext is emissions, onError and onCompleted is notifications.

For example of using onNext, onCompleted, and onError:


        //PublishSubject is later explain.
        PublishSubject subject = PublishSubject.create();

        subject.onNext("one");  // before subscribe.
        subject.subscribe(System.out::println); //subscribe!!
        subject.onNext("two");  // after subscribe.
        subject.onNext("three");
        subject.onComplete();  // complete.
      

Output


        two
        three
      

Using onComplete before all of onNext finish:


        PublishSubject subject = PublishSubject.create();

        subject.onNext("one");  // before subscribe.
        subject.subscribe(System.out::println); // subscribe!!
        subject.onNext("two");
        subject.onNext("three");
        subject.onComplete();  // complete.
        subject.onNext("fore");  // after complete.
      

Output


        two
        three
      

Example of onError method that need to a little note:


        PublishSubject subject = PublishSubject.create();
        subject.onNext("one");
        subject.subscribe(System.out::println, System.out::println);
        subject.onNext("two");
        subject.onNext("three");
        subject.onError(new Exception("Oops"));  // error
        subject.onNext("fore");
      

(appropriate) Output


        two
        three
        java.lang.Exception: Oops
      

Why do onError method need to note? It's because I encountered when wrote following code:


        PublishSubject subject = PublishSubject.create();
        subject.onNext("one");
        subject.subscribe(System.out::println);
        subject.onNext("two");
        subject.onNext("three");
        subject.onError(new Exception("Oops"));  // error
        subject.onNext("fore");
      

(unappropriate) Output


        io.reactivex.exceptions.OnErrorNotImplementedException: Oops
                at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
                at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
                at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
                at io.reactivex.subjects.PublishSubject$PublishDisposable.onError(PublishSubject.java:273)
                at io.reactivex.subjects.PublishSubject.onError(PublishSubject.java:201)
                at Example01.esObserver(Example01.java:14)
                at Main.main(Main.java:4)
        Caused by: java.lang.Exception: Oops
                ... 2 more
        Exception in thread "main" io.reactivex.exceptions.OnErrorNotImplementedException: Oops
                at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
                at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
                at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
                at io.reactivex.subjects.PublishSubject$PublishDisposable.onError(PublishSubject.java:273)
                at io.reactivex.subjects.PublishSubject.onError(PublishSubject.java:201)
                at Example01.esObserver(Example01.java:14)
                at Main.main(Main.java:4)
        Caused by: java.lang.Exception: Oops
                ... 2 more
      

This exception isn't intend, actually this exception seem to often occur. Cause of this exception is occur for not implement onError to your subscriber.
The following subscribe method is:


        subscribe()
        subscribe(Consumer<? super T> onNext)
        subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
        subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
        subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Comsumer<? super Disposable> onSubscribe)
        subscribe(Observer<? super T> observer)
      

If you use onError, then you have to implement error handling in subscribe method.

Summary

onNext method calls whenever the Observable emits an item. onComplete method notifies that the Observable properly finished. onComplete method is named as onCompleted in RxJava1, but changed to removing d in RxJava2 due to Reactive-Streams compatibility.

Subject

A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer and an observable. observer can subscribe to one or more Observables. Observable can pass through the items it observes and also emit new items.

PublishSubject

PublishSubject emits items of subscribed Observers. This is emitted immediately after created items. So, there is risk that one or more items may be lost between the Subject is created and the observer subscribes. If you want to guarantee to delivery all items, you need to form with Create, or using ReplaySubject instead.

If when error occured, PublishSubject not emit any items to subsequent observers, but instead emit error notification. PublishSubject is derive from Processor in the Reactive Streams specification, so nulls is not allowed as parameters to onNext and onError. If this occured, then calls will throw NullPointerException.

Since a PublishSubject is an Observable, don't support backpressure.

This subject not have a public constructor. A new empty instance of PublishSubject can create by create method. When the PublishSubject is terminated by onError or onComplete, late Observer only receive terminal event. Since a PublishSubject don't keep/cache items, a new Observer not receive past items. When PublishSubject implemented the Observer interface, you are not that need to implement onSbscribe if the subject is using as standalone. Only when calling onSbscribe after PublishSubject became tarminal state, immediately Disposable is processed. onNext, onError and onComplete should implement to be serialized which call from the same thread or not overlapping defferent threads through external means of serialization. toSerialized into Subject is available to all Subjects. This supports serialization and also protects reentrance. The PublishSubject supports hasComplete, hasThrowable, getThrowable and hasObserver which the standard state-peeking method.

Backpressure

Backpressure occurs when in an Flowable processing pipeline, an Observable is emitting items faster than an operator can ingest items.

Asynchronous operators have buffers to hold items such onNext until can be processed. There is problem when processing huge items. This problem lead OutOfMemoryError. Like as error handling receive operators to deal with by using onError, backpressure is another feature of data flow programmer has to think about and handle by using onBackpressure.

Home