BT

New Early adopter or innovator? InfoQ has been working on some new features for you. Learn more

Testing RxJava2

| Posted by Andres Almiray Follow 0 Followers on Feb 28, 2017. Estimated reading time: 15 minutes |

Key Takeaways

  • RxJava 2 includes built-in, test-friendly solutions.
  • Use TestSubscriber to verify Flowables, TestObserver to verify Observables, Singles, Maybes and Completables.
  • Use TestScheduler to have strict control of time.
  • The Awaitility library provides additional control of test context.

This article is a revision of “Testing RXJava”, completely updated to conform to the RxJava2 specification.

You’ve read about RxJava; you’ve played with the samples on the internet, for example in RxJava by Example, and now you have made a commitment to explore reactive opportunities in your own code. But now you are wondering how to test out the new capabilities that you might find in your codebase.

Reactive programming requires a shift in how to reason about a particular problem, because we need to focus not on individual data items but on data flowing as streams of events. These events are often produced and consumed from different threads, and so when writing tests we must keep aware of concurrency concerns. Fortunately RxJava2 provides built-in support for testing Observables and Disposables, built right into the core rxjava dependency.

First Steps

 Let’s revisit the words example from the last article and explore how we can test it out. Let’s begin by setting up the base test harness using JUnit as our testing framework:

import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import org.junit.Test;
import java.util.*;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertThat;
import static org.hamcrest.Matchers.*;


public class RxJavaTest {
    private static final List<String> WORDS = Arrays.asList(
       "the",
       "quick",
       "brown",
       "fox",
       "jumped",
       "over",
       "the",
       "lazy",
       "dog"
    );
}

Our first test will follow a naive approach, given the fact that subscriptions will, by default, run on the calling thread, if no particular Scheduler is supplied. This means we can setup a Subscription and assert its state immediately after the subscription takes place:

@Test
public void testInSameThread() {
   // given:
   List<String> results = new ArrayList<>();
   Observable<String> observable = Observable.fromIterable(WORDS)
       .zipWith(Observable.range(1, Integer.MAX_VALUE),
           (string, index) -> String.format("%2d. %s", index, string));

   // when:
   observable.subscribe(results::add);

   // then:
   assertThat(results, notNullValue());
   assertThat(results, hasSize(9));
   assertThat(results, hasItem(" 4. fox"));

Notice that we used an explicit List<String> to accumulate our results, along with a real subscriber. Given the simple nature of this test you may think that using an explicit accumulator as such is good enough, but remember that production grade observables may encapsulate errors or produce unexpected events; the simple subscriber plus accumulator combo is not sufficient to cover those cases. But don’t fret, RxJava provides a TestObserver type that can be used in such cases. Let’s refactor the previous test using this type


@Test
public void testUsingTestObserver() {
   // given:
   TestObserver<String> observer = new TestObserver<>();
   Observable<String> observable = Observable.fromIterable(WORDS)
       .zipWith(Observable.range(1, Integer.MAX_VALUE),
           (string, index) -> String.format("%2d. %s", index, string));

   // when:
   observable.subscribe(observer);

   // then:
   observer.assertComplete();
   observer.assertNoErrors();
   observer.assertValueCount(9);
   assertThat(observer.values(), hasItem(" 4. fox"));
}

TestObserver replaces the custom accumulator, but it also provides some additional behavior. For example it’s capable of telling us how many events were received and the data related to each event. It can also assert that the subscription was successfully completed and that no errors appeared during the consumption of the Observable. The current Observable under test does not produce any errors, but as we learned back in RxJava by Example Observables treat exceptions exactly the same as data events. We can simulate an error by concatenating an exception event in the following way

@Test
public void testFailure() {
   // given:
   TestObserver<String> observer = new TestObserver<>();
   Exception exception = new RuntimeException("boom!");

   Observable<String> observable = Observable.fromIterable(WORDS)
       .zipWith(Observable.range(1, Integer.MAX_VALUE),
           (string, index) -> String.format("%2d. %s", index, string))
       .concatWith(Observable.error(exception));

   // when:
   observable.subscribe(observer);

   // then:
   observer.assertError(exception);
   observer.assertNotComplete();
}

All is well in our limited use case. But actual production code can vary greatly, so let’s consider some more complex production cases.

Custom Schedulers

Quite often you’ll find cases in production code where Observables are executed on a specific thread, or “scheduler” in Rx parlance. Many Observable operations take an optional Scheduler parameter as an additional argument. RxJava defines a set of named Schedulers that can be used at any time. Some of these are io and computation, (which are shared threads) and newThread. You can also supply your own custom Scheduler implementation. Let’s change the observable code by specifying the computation Scheduler.

@Test
public void testUsingComputationScheduler() {
   // given:
   TestObserver<String> observer = new TestObserver<>();
   Observable<String> observable = Observable.fromIterable(WORDS)
       .zipWith(Observable.range(1, Integer.MAX_VALUE),
           (string, index) -> String.format("%2d. %s", index, string));

   // when:
   observable.subscribeOn(Schedulers.computation())
       .subscribe(observer);

   await().timeout(2, SECONDS)
       .until(observer::valueCount, equalTo(9));

   // then:
   observer.assertComplete();
   observer.assertNoErrors();
   assertThat(observer.values(), hasItem(" 4. fox"));
}

You’ll quickly discover there’s something wrong with this test once you run it. The subscriber performs its assertions on the test thread however the Observable produces values on a background thread (the computation thread). This means the subscriber’s assertions may be executed before the Observable has produced all relevant events, resulting in a failing test.

There are a few strategies we can choose to turn the test green:

  • Turn the Observable into a blocking .
  • Force the test to wait until a certain condition is met.
  • Switch the computation scheduler for an immediate one.

We’ll cover each strategy starting with the one that requires less effort: turning the Observable into a blocking Observable. This technique works regardless of the Scheduler in use. The assumption is that data is produced in a background thread, causing subscribers to be notified in that same background thread.

What we'd like to do is force all events to be produced and the Observable to complete before the next statement in the test is evaluated. This is done by calling blockingIterable() on the Observable itself.

@Test
public void testUsingBlockingCall() {
   // given:
   Observable<String> observable = Observable.fromIterable(WORDS)
       .zipWith(Observable.range(1, Integer.MAX_VALUE),
           (string, index) -> String.format("%2d. %s", index, string));

   // when:
   Iterable<String> results = observable
       .subscribeOn(Schedulers.computation())
       .blockingIterable();

   // then:
   assertThat(results, notNullValue());
   assertThat(results, iterableWithSize(9));
   assertThat(results, hasItem(" 4. fox"));
}

While this approach may be acceptable for the trivial code we have shown, it may not be practical for actual production code. What if the producer takes a long time to create all the data? This will make the test a slow one, increasing build times. There may be other timing issues as well. Fortunately TestObserver provides a set of methods that can force the test to wait for the termination event. Here’s how it can be done:

        @Test
	    public void testUsingComputationScheduler() {
    	// given:
    	TestObserver<String> observer = new TestObserver<>();
    	Observable<String> observable = Observable.fromIterable(WORDS)
        	.zipWith(Observable.range(1, Integer.MAX_VALUE),
            	(string, index) -> String.format("%2d. %s", index, string));

    	// when:
    	observable.subscribeOn(Schedulers.computation())
        	.subscribe(observer);

    	observer.awaitTerminalEvent(2, SECONDS);

    	// then:
    	observer.assertComplete();
    	observer.assertNoErrors();
    	assertThat(observer.values(), hasItem(" 4. fox"));
	    }

 If that were not enough I’d now like to point your attention to a handy library named Awaitility. Simply put, Awaitility is a DSL that allows you to express expectations about an asynchronous system in a concise and easy to read manner. You can include the Awaitility dependency using Maven:

<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <version>2.0.0</version>
    <scope>test</scope>
</dependency>

Or using Gradle:

testCompile 'org.awaitility:awaitility:2.0.0'

The entry point of the Awaitility DSL is the org.awaitility.Awaitility.await() method see lines 13-14 in the example below). From there you can define conditions that must be met in order to let the test continue. You may decorate conditions with timeouts and other temporal constraints, for example minimum, maximum or duration range. Revisiting the previous test with Awaitility in tow results in the following code

1 @Test
2 public void testUsingComputationScheduler_awaitility() {
3     // given:
4     TestObserver<String> observer = new TestObserver<>();
5     Observable<String> observable = Observable.fromIterable(WORDS)
6         .zipWith(Observable.range(1, Integer.MAX_VALUE),
7             (string, index) -> String.format("%2d. %s", index, string));
8
9     // when:
10     observable.subscribeOn(Schedulers.computation())
11         .subscribe(observer);
12
13     await().timeout(2, SECONDS)
14         .until(observer::valueCount, equalTo(9));
15
16     // then:
17     observer.assertComplete();
18     observer.assertNoErrors();
19     assertThat(observer.values(), hasItem(" 4. fox"));
20 }

This version does not change the nature of the Observable in any way, which allows you to test unaltered production code without any modification. This version of the test awaits at most 2 seconds for the Observable to performs its job by checking the state of the subscriber. If everything goes well, the subscriber’s state checks out to 9 events before the 2 second timeout elapses.

Awaitility plays nicely with Hamcrest matchers, Java 8 lambdas, and method references, thus resulting in concise and readable conditions. There are also ready made extensions for popular JVM languages such as Groovy and Scala.

The final strategy we’ll cover makes use of the extension mechanism that RxJava exposes as part of its API. RxJava defines a series of extension points that enable you to tweak almost every aspect of its default behavior. This extension mechanism effectively allows us to supply tailor made values for a particular RxJava feature. We’ll take advantage of this mechanism to let our test inject a specific Scheduler regardless of the one specified by the production code. The behavior we’re looking for is encapsulated in the RxJavaPlugins class. Assuming our production code relies on the computation() scheduler we’re going to override its default value, returning a Scheduler that makes event processing happen in the same thread as the caller code; this is the Schedulers.trampoline() scheduler. Here’s how the test looks now:


1 @Test
2 public void testUsingRxJavaPluginsWithImmediateScheduler() {
3     // given:
4     RxJavaPlugins.setComputationSchedulerHandler(scheduler -> 
                                                   Schedulers.trampoline());
5     TestObserver<String> observer = new TestObserver<>();
6     Observable<String> observable = Observable.fromIterable(WORDS)
7         .zipWith(Observable.range(1, Integer.MAX_VALUE),
8             (string, index) -> String.format("%2d. %s", index, string));
9
10     try {
11         // when:
12         observable.subscribeOn(Schedulers.computation())
13             .subscribe(observer);
14
15         // then:
16         observer.assertComplete();
17         observer.assertNoErrors();
18         observer.assertValueCount(9);
19         assertThat(observer.values(), hasItem(" 4. fox"));
20     } finally {
21         RxJavaPlugins.reset();
22     }
23 }

The production code is unaware that the computation() scheduler is an immediate one during testing. Please take note that you must reset the hook otherwise the immediate scheduler setting may leak, resulting in broken tests all over the place. The usage of a try/finally block obscures the intention of the test code a bit, but fortunately we can refactor out this behavior using a JUnit rule, making the test slimmer and more readable as a result. Here’s one possible implementation for such a rule

private static class ImmediateSchedulersRule implements TestRule {
   @Override
   public Statement apply(final Statement base, Description description) {
       return new Statement() {
           @Override
           public void evaluate() throws Throwable {
               RxJavaPlugins.setIoSchedulerHandler(scheduler -> 
                                                Schedulers.trampoline());
               RxJavaPlugins.setComputationSchedulerHandler(scheduler -> 
                                                Schedulers.trampoline());
               RxJavaPlugins.setNewThreadSchedulerHandler(scheduler -> 
                                                Schedulers.trampoline());

               try {
                   base.evaluate();
               } finally {
                   RxJavaPlugins.reset();
               }
           }
       };
   }
}

We override two other scheduler producing methods for good measure, making this rule more generic for other testing purposes down the road. Usage of this rule in a new testcase class is straightforward, we simply declare a field with the new type annotated with @Rule, like so

@Rule
public final ImmediateSchedulersRule schedulers = 
                                         new ImmediateSchedulersRule();

@Test
public void testUsingImmediateSchedulersRule() {
   // given:
   TestObserver<String> observer = new TestObserver<>();
   Observable<String> observable = Observable.fromIterable(WORDS)
       .zipWith(Observable.range(1, Integer.MAX_VALUE),
           (string, index) -> String.format("%2d. %s", index, string));

   // when:
   observable.subscribeOn(Schedulers.computation())
       .subscribe(observer);

   // then:
   observer.assertComplete();
   observer.assertNoErrors();
   observer.assertValueCount(9);
   assertThat(observer.values(), hasItem(" 4. fox"));
}

In the end we get the same behavior as before but with less clutter. Let’s take a moment to look back what we have accomplished so far:

  • Subscribers process data in the same thread as long as there’s no specific in use. This means we can make assertions on a subscriber right after it subscribes to an .
  • TestObserver can accumulate events and provide additional assertions on its state.
  • Any can be turned into a blocking , thus enabling us to synchronously wait for events to be produced, regardless of the used by the observable.
  • RxJava exposes an extension mechanism that enables developers to override its defaults, and inject those right into the production code.
  • Awaitility can be used to test out concurrent code using a DSL.

Each one of these techniques comes in handy in different scenarios, however all them are connected by a common thread (pun intended): the test code waits for the Observable to complete before making assertions on the subscriber’s state. What if there was a way to inspect the Observable’s behavior as it produces the data? In other words, what if it were possible to programmatically debug the Observable in place? We'll see a technique for doing that next.

Playing with Time

So far we’ve tested observables and subscriptions in a black box manner. Now we’ll have a look at another technique that allows us to manipulate time in such a way that we can pop the hood and look at a subscriber’s state while the Observable is still active, in other words, we’ll use a white box testing technique. Once again, it’s RxJava to the rescue, with its TestScheduler class. This particular Scheduler enables you to specify exactly how time passes inside of it. You can for example advance time by half a second, or make it leap 5 seconds. We’ll start by creating an instance of this new Scheduler and then pass it to the test code

1 @Test
2 public void testUsingTestScheduler() {
3     // given:
4     TestScheduler scheduler = new TestScheduler();
5     TestObserver<String> observer = new TestObserver<>();
6     Observable<Long> tick = Observable.interval(1, SECONDS, scheduler);
7
8     Observable<String> observable = Observable.fromIterable(WORDS)
9         .zipWith(tick,
10             (string, index) -> String.format("%2d. %s", index, string));
11
12     observable.subscribeOn(scheduler)
13         .subscribe(observer);
14
15     // expect:
16     observer.assertNoValues();
17     observer.assertNotComplete();
18
19     // when:
20     scheduler.advanceTimeBy(1, SECONDS);
21
22     // then:
23     observer.assertNoErrors();
24     observer.assertValueCount(1);
25     observer.assertValues(" 0. the");
26
27     // when:
28     scheduler.advanceTimeTo(9, SECONDS);
29     observer.assertComplete();
30     observer.assertNoErrors();
31     observer.assertValueCount(9);
32 }

The "production" code changed a little, as we’re now using an interval that is tied to the Scheduler to produce the numbering (line 6) instead of a range. This has the side-effect of producing numbers starting with 0 instead of the original 1. Once the Observable and the test scheduler are configured we immediately assert that the subscriber has no values (line 16), and has not completed or generated any errors (line 17). This is a sanity check as the scheduler has not moved at this point and so no values should have been produced by the observable nor received by the subscriber.

Next we move time by one whole second (line 20), this should cause the Observable to produce the first value, and that’s exactly what the next set of assertions checks (lines 23-25).

We next advance time to 9 seconds from now. Mind you that this means moving to exactly 9 seconds from the scheduler’s start, (and not advancing 9 seconds after already advancing 1 before, which would result in the scheduler looking at 10 seconds after it’s start). In other words, advanceTimeBy() moves the scheduler’s time relative to its current position, whereas advanceTimeTo()moves the scheduler’s time in an absolute fashion. We make another round of assertions (lines 29-31) to ensure the Observable has produced all data and that the subscriber has consumed it all as well. One more thing to note about the usage of TestScheduler is that real time moves immediately, which means our test does not have to wait 9 seconds to complete.

As you can see using this scheduler is quite handy but it requires you to supply the scheduler to the Observable under test. This will not play well with Observables that use a specific scheduler. But wait a second, we saw earlier how we can switch a scheduler without affecting production code using RxJavaPlugins, but this time supplying a TestScheduler instead of an immediate scheduler. We can even go so far as to apply the same technique of a custom JUnit rule, allowing the previous code to be rewritten in a more reusable form. First the new rule:

private static class TestSchedulerRule implements TestRule {
   private final TestScheduler testScheduler = new TestScheduler();

   public TestScheduler getTestScheduler() {
       return testScheduler;
   }

   @Override
   public Statement apply(final Statement base, Description description) {
       return new Statement() {
           @Override
           public void evaluate() throws Throwable {
               RxJavaPlugins.setIoSchedulerHandler(scheduler -> 
                                                   testScheduler);
               RxJavaPlugins.setComputationSchedulerHandler(scheduler -> 
                                                   testScheduler);
               RxJavaPlugins.setNewThreadSchedulerHandler(scheduler -> 
                                                   testScheduler);

               try {
                   base.evaluate();
               } finally {
                   RxJavaPlugins.reset();
               }
           }
       };
   }
}

 

Followed by the actual test code (in a new testcase class), to use our test rule:


@Rule
public final TestSchedulerRule testSchedulerRule = new TestSchedulerRule();

@Test
public void testUsingTestSchedulersRule() {
   // given:
   TestObserver<String> observer = new TestObserver<>();

   Observable<String> observable = Observable.fromIterable(WORDS)
       .zipWith(Observable.interval(1, SECONDS),
           (string, index) -> String.format("%2d. %s", index, string));

   observable.subscribeOn(Schedulers.computation())
       .subscribe(observer);

   // expect
   observer.assertNoValues();
   observer.assertNotComplete();

   // when:
   testSchedulerRule.getTestScheduler().advanceTimeBy(1, SECONDS);

   // then:
   observer.assertNoErrors();
   observer.assertValueCount(1);
   observer.assertValues(" 0. the");

   // when:
   testSchedulerRule.getTestScheduler().advanceTimeTo(9, SECONDS);
   observer.assertComplete();
   observer.assertNoErrors();
   observer.assertValueCount(9);
}

And there you have it. The usage of a TestScheduler injected via RxJavaPlugins allow you to write the test code without altering the composition of the original Observable yet it gives you the means to modify time and make assertions at specific points during the execution of the observable itself. All the techniques showed in this article should give you enough options to test out RxJava enabled code.

The Future

RxJava is one of the first libraries to provide reactive programming capabilities to Java. Version 2.0 has been redesigned to better align its API with the Reactive Streams specification, which provides a standard for asynchronous stream processing with non-blocking back pressure, targeting Java and JavaScript runtimes. I highly encourage you to review the API changes made since version 2.0; you can find a detailed description of these changes at the RxJava wiki.

In terms of testing you’ll see that the core types (Observable, Maybe, and Single) now sport a handy method named test() that creates a TestObserver instance for you on the spot. You may now chain method calls on TestObserver and there are some new assertion methods found on this type, as well.

This is an updated version of “Testing RXJava

About the Author

Andres Almiray is a Java/Groovy developer and Java Champion with more than 17 years of experience in software design and development. He has been involved in web and desktop application development since the early days of Java. He is a true believer of Open Source and has participated in popular projects like Groovy, JMatter, Asciidoctor and more. Founding member and current project lead of the Griffon framework. Spec lead for JSR 377.

Rate this Article

Adoption Stage
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Tell us what you think

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Corrections by Dávid Karnok

Hi, here are some corrections to the article:

Key takeaways:
- RxJava *2* includes built-in, test-friendly solutions.
- Use TestSubscriber to verify *Flowables*, TestObserver to verify Observables, Singles, Maybes and Completables.

Fortunately RxJava2 provides built-in support for testing Observables and *Disposables*, built right into the core rxjava dependency

> This means we can setup a Subscription and assert
With upper case, it usually means the interface, the lowercase "subscription" means that subscribe() was called on a sequence. Also Observables use Disposable as their means to represent a "connection" between the supplier and the consumer in the stages.

> observer::valueCount
Reading the value count is not thread-safe in v2 and is not recommended to wait for a particular number of items. The shortcoming will be fixed sometime in a future patch version.

Re: Corrections by Victor Grazi

I guess you would know! Most of these have been corrected. Still chewing on the last one!

Re: Corrections by Andres Almiray

I see. The code was adapted from v1 where asserting value count used to work. Is there a workaround until this issue is fixed?

Re: Corrections by Dávid Karnok

Fixed and released with 2.0.7: github.com/ReactiveX/RxJava/pull/5155

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

4 Discuss

Login to InfoQ to interact with what matters most to you.


Recover your password...

Follow

Follow your favorite topics and editors

Quick overview of most important highlights in the industry and on the site.

Like

More signal, less noise

Build your own feed by choosing topics you want to read about and editors you want to hear from.

Notifications

Stay up-to-date

Set up your notifications and don't miss out on content that matters to you

BT