Reactive Programming – Observables

Mastering observables

Important:A newer version of this software with updated documentation is available. Visit the Couchbase Developer Portal for more information.

The following guide helps you getting up to speed with asynchronous programming and Observables in particular. This guide is not tied to the Java SDK exclusively and aims to give you a general understanding of how to build full stack asynchronous applications.

Motivation

Asynchronous and reactive methodologies allow you to better utilize system resources. Instead of wasting a thread waiting for network (or disk) IO, it can be fully utilized to perform other work instead.

A broad range of technologies exists to facilitate this style of programming, ranging from the very limited and not really usable java.util.concurrent.Future, to full blown libraries and runtimes like Akka (and everything in between). For a database driver, the following requirements must be met:

  • Rich functionality
  • Interoperable and not opinionated
  • Performant
  • Small dependency and runtime footprint

After evaluating the requirements and solutions closely, one library stood out: RxJava. It has a very rich set to compose asynchronous workflows, has no dependencies on its own and is used at high-profile companies like Netflix. The Rx model (more on that a little later) is mature and well-thought and the community is vibrant.

We hope that once you read through the introduction and get more familiar with the concept, you never want to go back. We certainly don’t. That said, we fully support blocking operations as well, so you can still use a traditional blocking-based model if you absolutely want to.

The next section gradually introduces you into the world of Observables, the first step to reactive masterhood. If you want to learn more about the motivation, read on here.

Java 8, Lambdas and Anonymous Classes

Before jumping into the details, one thing warrants clarification: RxJava, and therefore the Java SDK fully supports Java 8. This brings some great improvements, most prominently support for lambdas and method references.

Because the Java SDK has support for Java 6 and 7, most of the examples shown in the documentation use anonymous classes instead of lambdas. You are free and even encouraged to use them if you are able to, but Java 8 on production servers is still a few months/years away at most companies.

That said, we expect the SDK to be around for a long time and want to pave the way for the future right now. To whet your appetite, compare Java 6 code to Java 8 (same code):

// Loads 3 documents in parallel
Observable
    .just("doc1", "doc2", "doc3")
    .flatMap(new Func1<String, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(String id) {
            return bucket.get(id);
        }
    }).subscribe(new Action1<JsonDocument>() {
        @Override
        public void call(JsonDocument document) {
            System.out.println("Got: " + document);
        }
    });
// Loads 3 documents in parallel
Observable
    .just("doc1", "doc2", "doc3")
    .flatMap(bucket::get)
    .subscribe(document -> System.out.println("Got: " + document));

Also, RxJava has support for other languages like Scala, Groovy or Clojure. If you are using one of those languages, please refer to the RxJava documentation on how to use the adapters.

Understanding Observables

You can think of a Observable as the push-based, asynchronous cousin (“dual”) of the pull-based, synchronous Iterable. The contract of a Observable is that zero to N data events can happen, followed by a complete event. In addition, an error event can happen at any time, also completing the Observable.

Table 1. The Duality of Iterable & Observable
Event Iterable (Pull) Observable (Push)
retrieve data T next() onNext(T)
discover error throws Exception onError(Exception)
complete returns onCompleted()

A Observable can also be converted into a BlockingObservable, which then, unsurprisingly, behaves very much like a Iterable.

The key element to take away is that a Observable<T> can emit 0 to N events, which is very different of a Future<T>, which only contains one value. Once you start to work on streams instead of single values, you will very much appreciate this fact.

Also, important to understand is that by definition, a Observable does not imply that the underlying code is executed asynchronously. As a consumer of an Observable, you leave the actual implementation to the supplier, who is able to change it later on without you having to adapt your code. Imagine, you are consuming this API:

public interface FooService {
    Observable<String> load();
}

It could be that when load() is called, the String value is fetched right out of a Map in memory (or even a hard-coded value). In this case, there is no need to move the execution away from the caller thread, because the value will be returned instantaneously. If at a later point the implementation needs to be changed so that the String is loaded through a web service (which introduces latency and other semantics), the API doesn’t need to be changed, because the underlying implementation is free to move it to a Scheduler.

Consuming Observables

The first thing you want to do when working with Observables is to consume them. Consuming a Observable means subscribing to it. Here is an example which subscribes and prints out all the items emitted:

Observable
    .just(1, 2, 3)
    .subscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("Completed Observable.");
        }

        @Override
        public void onError(Throwable throwable) {
            System.err.println("Whoops: " + throwable.getMessage());
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("Got: " + integer);
        }
    });

This prints:

Got: 1
Got: 2
Got: 3
Completed Observable.

You can see that our Observer gets notified of every event and also receives the completed event.

Note:A well-formed Observable will invoke its Subscriber’s onNext method zero or more times, and then will invoke either the onCompleted or onError method exactly once.

You can also test the error case by throwing an artificial exception when the value 2 is emitted:

Observable
    .just(1, 2, 3)
    .doOnNext(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            if (integer.equals(2)) {
                throw new RuntimeException("I don't like 2");
            }
        }
    })
    .subscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("Completed Observable.");
        }

        @Override
        public void onError(Throwable throwable) {
            System.err.println("Whoops: " + throwable.getMessage());
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("Got: " + integer);
        }
    });

This prints:

Got: 1
Whoops: I don't like 2

The first value gets through without problems, the second value throws an exception and therefore terminates the observable (and no subsequent values are allowed to be emitted after a error event).

Note:The subscribe method also returns a Subscription which you can use to unsubscribe and therefore do not receive further events.

Even if you don’t unsubscribe explicitly, operations like take do that for you implicitly. The following code only takes the first five values and then unsubscribes:

Observable
    .just("The", "Dave", "Brubeck", "Quartet", "Time", "Out")
    .take(5)
    .subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {
            System.out.println("Completed Observable.");
        }

        @Override
        public void onError(Throwable throwable) {
            System.err.println("Whoops: " + throwable.getMessage());
        }

        @Override
        public void onNext(String name) {
            System.out.println("Got: " + name);
        }
    });

This prints:

Got: The
Got: Dave
Got: Brubeck
Got: Quartet
Got: Time
Completed Observable.
Note:If you take a close look at the API, subscribe() can be fed with either an Observer or a Subscriber. Unless you are implementing a custom Observer, always use Subscriber (because otherwise it will be wrapped in one internally anyway and you are saving unnecessary object allocations).

You do not need to implement the full Subscriber every time. If you are only interested in the data events, you can subscribe like this:

Observable
    .just(1, 2, 3)
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            System.out.println("Got: " + integer);
        }
    });

Be aware though that if an error happens, the following exception will be propagated:

Exception in thread "main" rx.exceptions.OnErrorNotImplementedException
	at rx.Observable$36.onError(Observable.java:8412)
	at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:128)
	at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:97)
	at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:67)
	at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:78)
	at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:76)
	...

It is recommended to always implement an error handler right from the beginning, since things can and will go wrong at some point. It can come in handy though if you just want to try things out quickly or for illustrative purposes.

From Async to Sync

As long as your Observable works on the same thread all the time, there is no need for communication between threads since only one is executing. When your Observable flow gets executed on a different thread though, you need to take some extra care to make sure you are not missing values. This is not specific to Observables though, every time you need to deal with parallel threads you need to think about synchronization and communication.

The following code emits a increasing value every second, and this is done on a different thread:

public static void main(String... args) {
    Observable
        .interval(1, TimeUnit.SECONDS)
        .subscribe(new Action1<Long>() {
            @Override
            public void call(Long counter) {
                System.out.println("Got: " + counter);
            }
        });
}

It works perfectly fine, the only problem is though chances are you won’t see anything printed out. This is because your main thread exits before the background thread had a chance to run and emit values.

A common way to deal with such a situation is to add a CountDownLatch, which allows you to synchronize between different threads. One thread counts down the latch, the other one waits until it is counted down:

final CountDownLatch latch = new CountDownLatch(5);
Observable
    .interval(1, TimeUnit.SECONDS)
    .subscribe(new Action1<Long>() {
        @Override
        public void call(Long counter) {
            latch.countDown();
            System.out.println("Got: " + counter);
        }
    });

latch.await();

This prints and then exits:

Got: 0
Got: 1
Got: 2
Got: 3
Got: 4
Note:One common mistake is to use Thread.sleep() instead of a latch to synchronize the execution between threads. This is a bad idea because it not really synchronizes anything, but just keeps one thread alive for a specific amount of time. If the actual calls take less time you are wasting time, and if it takes longer you won’t get the desired effect. If you do this in unit tests, be prepared for a good amount of non-determinism and randomly failing tests. Always use a CountDownLatch.

A technique unique to Observables is to convert it into a BlockingObservable to achieve the same effect. In simple terms, it converts a Observable into a Iterable and making it execute on the caller thread, blocking it until one or more values arrive. This technique is used extensively in the documentation to show concepts, while not having to deal with CountDownLatches all the time. It can also be used if you for some reason are not able to use asynchronous computations.

The conversion itself doesn’t do any blocking in the first place, only subsequent calls will:

// This does not block.
BlockingObservable<Long> observable = Observable
    .interval(1, TimeUnit.SECONDS)
    .toBlocking();

// This blocks and is called for every emitted item.
observable.forEach(new Action1<Long>() {
    @Override
    public void call(Long counter) {
        System.out.println("Got: " + counter);
    }
});

Since this will run forever, you are free to chain any asynchronous computations before. So you ca build an asynchronous workflow and then block at the very end. This resembles the same code as with the CountDownLatch before:

Observable
    .interval(1, TimeUnit.SECONDS)
    .take(5)
    .toBlocking()
    .forEach(new Action1<Long>() {
        @Override
        public void call(Long counter) {
            System.out.println("Got: " + counter);
        }
    });

If you know that only a single value is every returned, you can use the single() method:

int value = Observable
    .just(1)
    .toBlocking()
    .single();

Be aware though that if more items get emitted, you get an exception:

Exception in thread "main" java.lang.IllegalArgumentException: Sequence contains too many elements
	at rx.internal.operators.OperatorSingle$1.onNext(OperatorSingle.java:58)
	at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:76)
	at rx.Subscriber.setProducer(Subscriber.java:148)
	at rx.Subscriber.setProducer(Subscriber.java:144)
	....

The same thing happens if no value gets emitted:

Exception in thread "main" java.util.NoSuchElementException: Sequence contains no elements
	at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
	at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
	at rx.Subscriber.setProducer(Subscriber.java:148)
	at rx.Subscriber.setProducer(Subscriber.java:144)
	at rx.Subscriber.setProducer(Subscriber.java:144)
	at rx.Subscriber.setProducer(Subscriber.java:144)
	....

As an alternative, you can use singleOrDefault() so that a fallback value gets returned.

You can use this technique with the Java SDK if you are loading a document and it does not exist:

JsonDocument doc = bucket.get("id").toBlocking().singleOrDefault(null);
if (doc == null) {
    System.err.println("Document not found!");
} else {
    System.out.println(doc);
}

If you check out the API documentation of the BlockingObservable, you will discover many more possibilities, including iterators or grabbing the first and/or last valuess.

One last thing that comes in handy with blocking calls: sometimes you want to collect all emitted values into a list. You can combine the blocking calls with the toList() operator to achieve something like this:

List<Integer> list = Observable
    .just(1, 2, 3)
    .toList()
    .toBlocking()
    .single();

// Prints: [1, 2, 3]
System.out.println(list);

Creating Observables

There are many ways to create Observables, and you’ve already seen just() and interval(). There are many more of those convenience methods available on the Observable class, but they all boil down to the create() method. You can simulate the example from before with this:

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        try {
            if (!subscriber.isUnsubscribed()) {
                for (int i = 0; i < 5; i++) {
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        } catch (Exception ex) {
            subscriber.onError(ex);
        }
    }
}).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        System.out.println("Got: " + integer);
    }
});

Every time a Subscriber subscribes, the call() method is executed. You can then call onNext, onComplete and onError as you wish, but keep in mind that both onComplete and onError should only be called once, and afterwards no subsequent onNext is allowed to follow so that the contract is met.

You can see that no blocking call is needed, because the Observable is completely handled on the current thread. In the section on Schedulers, you learn more about that.

Note:This example shows why it is crucial to call subscribe() on the Observable, because only such a call triggers the actual execution of the pipeline. This is a little different with Subjects, which are covered later in this guide. Nevertheless, always call subscribe() on your Observables.

Refer to the RxJava documentation on many more methods that you can use to create Observables. If you are dealing with the Java SDK, in most places this is done for you, but there are situation where it comes in handy.

The Java SDK does not expose bulk methods anymore on the API, because you can do this already with the help of Observables. Compare these two examples, one only loads one document, the other loads a few (you’ll learn about flatMap() in the next section):

// Loads one document and prints it:
bucket
    .get("doc1")
    .subscribe(new Action1<JsonDocument>() {
        @Override
        public void call(JsonDocument document) {
            System.out.println("Got: " + document);
        }
    });
// Loads 3 documents in parallel
Observable
    .just("doc1", "doc2", "doc3")
    .flatMap(new Func1<String, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(String id) {
            return bucket.get(id);
        }
    }).subscribe(new Action1<JsonDocument>() {
        @Override
        public void call(JsonDocument document) {
            System.out.println("Got: " + document);
        }
    });

Transforming Observables

Observables can transform their values in various ways. One of the most basic ones is map(), which converts the incoming value into a different one. You surely like division, so here is the FizzBuzz game:

Observable
    .interval(10, TimeUnit.MILLISECONDS)
    .take(20)
    .map(new Func1<Long, String>() {
        @Override
        public String call(Long input) {
            if (input % 3 == 0) {
                return "Fizz";
            } else if (input % 5 == 0) {
                return "Buzz";
            }
            return Long.toString(input);
        }
    })
    .toBlocking()
    .forEach(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println(s);
        }
    });

The map function is used to convert the input number into a string and do some checks to satisfy the FizzBuzz game. As a more practical example, consider loading a document from the Java SDK and only extracting the firstname of a user before passing it on:

bucket
    .get("id")
    .map(new Func1<JsonDocument, String>() {
        @Override
        public String call(JsonDocument document) {
            return document.content().getString("firstname");
        }
    }).subscribe();

A variation of map() is called flatMap(), which allows you to do those transformations with asynchronous calles. Taking the example from above, we want to map from String (the document ID) to a JsonDocument (the loaded document). With a normal map(), call you would either need to block on the Observable or at some point deal with a Observable<Observable<JsonDocument>>.

Thankfully, flatMap() flattens the resulting values for us and return them into the original flow:

// Loads 3 documents in parallel
Observable
    .just("doc1", "doc2", "doc3")
    .flatMap(new Func1<String, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(String id) {
            return bucket.get(id);
        }
    }).subscribe(new Action1<JsonDocument>() {
        @Override
        public void call(JsonDocument document) {
            System.out.println("Got: " + document);
        }
    });

You can see that flatMap() returns an Observable<T> whereas the normal map just returns <T>. You will use flatMap() a lot when dealing with flows like this, so keep it in mind.

Another helpful transformation is scan(). It applies a function to each value emitted by an Observable, sequentially, and emits each successive value. We can use it to aggregate values like this:

Observable
    .just(1, 2, 3, 4, 5)
    .scan(new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer sum, Integer value) {
            return sum + value;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            System.out.println("Sum: " + integer);
        }
    });

This prints:

Sum: 1
Sum: 3
Sum: 6
Sum: 10
Sum: 15

Finally, groupBy() comes in handy, which emits one Observable by each group, defined by a function. The following example emits two Observables, one for even and one for odd values:

Observable
    .just(1, 2, 3, 4, 5)
    .groupBy(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            return integer % 2 == 0;
        }
    }).subscribe(new Action1<GroupedObservable<Boolean, Integer>>() {
        @Override
        public void call(GroupedObservable<Boolean, Integer> grouped) {
            grouped.toList().subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
                    System.out.println(integers + " (Even: " + grouped.getKey() + ")");
                }
            });
        }
    });

This prints:

[1, 3, 5] (Even: false)
[2, 4] (Even: true)

Combined with the Java SDK, this technique can be used to separate returned Documents based on their content. The following example uses a view to load all documents from the beer-sample bucket, groups them by type and counts the number of occurrences:

bucket
    .query(ViewQuery.from("my_design_doc", "my_view"))
    .flatMap(ViewResult::rows)
    .flatMap(ViewRow::document)
    .groupBy(document -> document.content().getString("type"))
    .subscribe(observable ->
        observable.count().subscribe(integer ->
            System.out.println(observable.getKey() + ": " + integer)
        )
    );

This code queries the view, extracts all rows, loads the full document for each row, groups it by the “type” property in the JSON document and then uses the count() operator to count the number of rows emitted by each observable. This prints something like:

brewery: 1412
beer: 5891

Filtering Observables

In addition to transforming Observables, you can also filter them. Filtering doesn’t change the emitted values itself, but rather how much and at which point (and if at all) they are emitted.

For example, you can filter based on some criteria:

// This will only let 3 and 4 pass.
Observable
    .just(1, 2, 3, 4)
    .filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            return integer > 2;
        }
    }).subscribe();

Or take only the first N values emitted and then unsubscribe:

// Only 1 and 2 will pass.
Observable
    .just(1, 2, 3, 4)
    .take(2)
    .subscribe();

Or use only the first or last value emitted:

// Only 1 will pass
Observable
    .just(1, 2, 3, 4)
    .first()
    .subscribe();
// Only 4 will pass
Observable
    .just(1, 2, 3, 4)
    .last()
    .subscribe();

Finally, you can use distinct() to suppress duplicate values:

// 1, 2, 3, 4 will be emitted
Observable
    .just(1, 2, 1, 3, 4, 2)
    .distinct()
    .subscribe();
Note:distinct() also allows you to pass in a function which returns the key to select by. You can use this for example to separate out duplicate JsonDocuments.

Combining Observables

Multiple Observables can also be merged to form a combined one. Depending on how you want those to be merged, there are different operators available. Two of the most used ones are merge() and zip() which are covered here.

Merge really just merges all emitted values by the source observables in the order they arrive:

Observable
    .merge(evens, odds)
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            System.out.println(integer);
        }
    });

This prints something similar to:

2
4
6
8
10
1
3
5
7
9

With the zip operator, you can combine two streams in the stricly same order, defined by a function:

Observable<Integer> evens = Observable.just(2, 4, 6, 8, 10);
Observable<Integer> odds = Observable.just(1, 3, 5, 7, 9);

Observable
    .zip(evens, odds, (v1, v2) -> v1 + " + " + v2 + " is: " + (v1 + v2))
    .subscribe(System.out::println);

This zips each pairs togehter in order and prints:

2 + 1 is: 3
4 + 3 is: 7
6 + 5 is: 11
8 + 7 is: 15
10 + 9 is: 19

Error Handling

Error handling is a vital component of every real world application and needs to be considered from the start. RxJava provides sophisticated mechanisms to deal with errors that happen inevitably in your Observable flows.

In general, you want to react in the following ways:

  • Return a default value instead
  • Flip over to a backup Observable
  • Retry the Observable (immediately or with backoff)

Returning a default value is a good idea if you cannot afford to retry or you just don’t care (maybe because the flow is not absolutely crucial to your data flow). The following code throws an exception at the first emitted item, but falls back to a default value:

Note that you can pass in a function which also takes the Exception, so you can return different values for different exception types or use it for logging purposes.

// Prints:
// Default
// Oops: I don't like: Apples
Observable
    .just("Apples", "Bananas")
    .doOnNext(s -> {
        throw new RuntimeException("I don't like: " + s);
    })
    .onErrorReturn(throwable -> {
        System.err.println("Oops: " + throwable.getMessage());
        return "Default";
    }).subscribe(System.out::println);

You can also flip to a backup observable which will be called if the first one fails. The Java SDK has a getFromReplica() command which allows you to read stale data from its replicas and treat availability for consistency on reads. You can use this approach to fall back:

bucket
    .get("id")
    .onErrorResumeNext(bucket.getFromReplica("id", ReplicaMode.ALL))
    .subscribe();

Normally you want to have more control on which observable should be run next depending on the type of error. The following example will only go to the replica if a TimeoutException happened (if not the error is passed down):

bucket
    .get("id")
    .timeout(500, TimeUnit.MILLISECONDS)
    .onErrorResumeNext(new Func1<Throwable, Observable<? extends JsonDocument>>() {
        @Override
        public Observable<? extends JsonDocument> call(Throwable throwable) {
            if (throwable instanceof TimeoutException) {
                return bucket.getFromReplica("id", ReplicaMode.ALL);
            }
            return Observable.error(throwable);
        }
    });

Finally, it is possible to retry the Observable by resubscribing. This can be done as quickly as possible, or with a backoff interval (which is preferred when external resources are involved).

The following program desperately tries to read the numbers from 1 to 10, but a (not so hidden) flaw makes it randomly throw an exception. If that happens, the code retries. Since lots of values might be already emitted, we can use distinct() to filter those out.

Observable
    .just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    .doOnNext(integer -> {
        if (new Random().nextInt(10) + 1 == 5) {
            throw new RuntimeException("Boo!");
        }
    })
    .retry()
    .distinct()
    .subscribe(System.out::println);
Note:If you only want to retry for a max amount, replace the retry() with a retry(count) call.

If you want to retry with backoff, you can use a technique like the following:

Observable
    .range(1, 10)
    .doOnNext(integer -> {
        if (new Random().nextInt(10) + 1 == 5) {
            throw new RuntimeException("Boo!");
        }
    })
    .retryWhen(attempts ->
        attempts.zipWith(Observable.range(1, 3), (n, i) -> i)
        .flatMap(i -> {
            System.out.println("delay retry by " + i + " second(s)");
            return Observable.timer(i, TimeUnit.SECONDS);
        }))
    .distinct()
    .subscribe(System.out::println);

The attempts get passed into the retryWhen() method and zipped with the number of seconds to wait. The timer method is used to complete once its timer is done. If you run this code a few times to generate an exception (or more), you will see something similar to this:

1
2
3
4
delay retry by 1 second(s)
delay retry by 2 second(s)
5
6
7
8
9
10

Schedulers & Threads

Schedulers in RxJava are used to manage and control concurrency. Some operators implicitly use one and/or allow you to pass in a custom one.

RxJava ships with a bunch of preconfigured Schedulers by default, which are all accessible through the Schedulers class:

  • Schedulers.computation(): Event-loop style scheduler for purely computational work.
  • Schedulers.immediate(): Executes the work immediately on the current thread.
  • Schedulers.io(): Executes work on a Executor-backed pool which grows as needed.
  • Schedulers.newThread(): Creates a new thread for each unit of work.
  • Schedulers.trampoline(): Queues the work on the current thread and gets executed after the current work completes.
  • Schedulers.test(): Test scheduler used for testing and debugging, which allows manual advancing of the clock.

As a rule of thumb, the computation scheduler should always be used for in-memory processing, while the IO scheduler should only be used for blocking-style IO operations (so do not use it together with the Java SDK since it is asynchronous anyway!).

You can instruct an Observable to be executed on such a scheduler in four different ways:

  • Implicitly by using an operator that makes use of one
  • Explicitly by passing the Scheduler to such an operator
  • By using subscribeOn(Scheduler)
  • By using observeOn(Scheduler)

Operators like buffer, replay, skip, delay, parallel and so forth use a Scheduler by default if not instructed otherwise. A list of default schedulers can be found here: https://github.com/ReactiveX/RxJava/wiki/Scheduler#default-schedulers-for-rxjava-observable-operators

As a rule of thumb, all of those operators allow you to pass in a custom Scheduler if needed, but most of the time sticking with the defaults is a good idea.

Note:The Java SDK uses a internal scheduler similar to the computation Scheduler to proper isolate the inner mechanisms from user-land. It is possible to change that Scheduler through the environment, but not recommended.

If you want the whole subscribe chain to be executed on a specific scheduler, you use the subscribeOn() operator. Without a Scheduler set, the following code executes on the main thread:

Observable
    .range(1, 5)
    .map(integer -> {
        System.out.println("Map: (" + Thread.currentThread().getName() + ")");
        return integer + 2;
    })
    .subscribe(integer -> 
        System.out.println("Got: " + integer + " (" + Thread.currentThread().getName() + ")")
    );

This prints:

Map: (main)
Got: 3 (main)
Map: (main)
Got: 4 (main)
Map: (main)
Got: 5 (main)
Map: (main)
Got: 6 (main)
Map: (main)
Got: 7 (main)

If you add subscribeOn() somewhere in the flow (it doesn’t matter where):

Observable
    .range(1, 5)
    .map(integer -> {
        System.out.println("Map: (" + Thread.currentThread().getName() + ")");
        return integer + 2;
    })
    .subscribeOn(Schedulers.computation())
    .subscribe(integer ->
            System.out.println("Got: " + integer + " (" + Thread.currentThread().getName() + ")")
    );

You can see it is executed on the same thread, but on the computation thread pool:

Map: (RxComputationThreadPool-6)
Got: 3 (RxComputationThreadPool-6)
Map: (RxComputationThreadPool-6)
Got: 4 (RxComputationThreadPool-6)
Map: (RxComputationThreadPool-6)
Got: 5 (RxComputationThreadPool-6)
Map: (RxComputationThreadPool-6)
Got: 6 (RxComputationThreadPool-6)
Map: (RxComputationThreadPool-6)
Got: 7 (RxComputationThreadPool-6)

If you need tighter control which parts are executed on what pool, use observeOn(). Here, the order matters:

Observable
    .range(1, 5)
    .map(integer -> {
        System.out.println("Map: (" + Thread.currentThread().getName() + ")");
        return integer + 2;
    })
    .observeOn(Schedulers.computation())
    .subscribe(integer ->
            System.out.println("Got: " + integer + " (" + Thread.currentThread().getName() + ")")
    );

Everything before the observeOn call is executed in main, everything below in the Scheduler:

Map: (main)
Map: (main)
Map: (main)
Got: 3 (RxComputationThreadPool-6)
Got: 4 (RxComputationThreadPool-6)
Got: 5 (RxComputationThreadPool-6)
Map: (main)
Map: (main)
Got: 6 (RxComputationThreadPool-6)
Got: 7 (RxComputationThreadPool-6)

There is also a way to use Schedulers directly to schedule operations, please refer to the documentation here: https://github.com/ReactiveX/RxJava/wiki/Scheduler#using-schedulers

Subjects

A Subject is a hybrid between a Observable and a Subscriber. So it can both receive and emit events. Most of the time you don’t need Subjects and can handle everything fine through Observables alone, but there are certain cases when they come in handy.

There is a distinction between different Observables that have not been covered yet:

  • A cold Observable waits for a Subscription until it emits values and does this freshly for every Subscriber.
  • A hot Observable begins emitting values upfront and presents them to every subscriber subsequently. Subjects are hot Observables.
Note:Because of the network layer in between, the Java SDK needs to use Subjects for its request/response cycles. This also makes sense because if you subscribe twice to a bucket.get() call, you actually only want one network call instead of two.

Currently, there are four Subjects supported by RxJava, slightly differing in their functionality:

  • AsyncSubject: emits the last value (and only the last value) emitted by the source Observable, and only after that source Observable completes. (If the source Observable does not emit any values, the AsyncSubject also completes without emitting any values.)
  • BehaviorSubject: When an Subscriber subscribes to a BehaviorSubject, it begins by emitting the item most recently emitted by the source Observable (or an optional seed/default value if none has yet been emitted) and then continues to emit any other items emitted later by the source Observable(s).
  • PublishSubject: PublishSubject emits to a subscriber only those items that are emitted by the source Observable(s) subsequent to the time of the subscription.
  • ReplaySubject: ReplaySubject emits to any subscriber all of the items that were emitted by the source Observable(s), regardless of when the subscriber subscribes.

As an example: if you call bucket.get(), a AsyncSubject is created under the covers and returned to you immediately. In addition, it is passed down the IO layer and stored. When a response arrives from the server, the Subject is fed with the response and you get notified appropriately.

If you need to use a Subject, choose wisely which one to use in order to keep resource usage low (some of them cache data for subscribers) especially if you push lots of data through them. You can read more about them here: https://github.com/ReactiveX/RxJava/wiki/Subject

There is one last thing you need to know when dealing with Subjects: because you are not getting new values when resubscribing (since it’s cached), the following won’t work (doing a get call every second):

bucket
    .get("id")
    .delay(1, TimeUnit.SECONDS)
    .repeat()
    .subscribe();

This will only execute one get call, because subsequent attempts only load the cached value. For this reason Observable.defer() was added, which creates a new Observable for every subscriber that comes along:

Observable.defer(new Func0<Observable<JsonDocument>>() {
    @Override
    public Observable<JsonDocument> call() {
        return bucket.get("id");
    }
})
.delay(1, TimeUnit.SECONDS)
.repeat()
.subscribe();

Reference:
http://docs.couchbase.com/developer/java-2.0/observables.html
http://reactivex.io/intro.html

Leave a Reply

Your email address will not be published. Required fields are marked *