BT

Testing RxJava

| Posted by Andres Almiray on Oct 18, 2016. Estimated reading time: 15 minutes |

Key takeaways

  • RxJava includes built-in, test-friendly solutions.
  • Use TestSubscriber to verify Observables.
  • Use TestScheduler to have strict control of time.
  • The Awaitility library provides additional control of test context.

This article has been updated to accommodate the new changes and challenges that have appeared. Visit “Testing RXJava2” for the newer version.

You’ve read about RxJava; you’ve played with the samples on the internet, for example in RxJava by Example, and now you have made a commitment to explore reactive opportunities in your own code. But now you are wondering how to test out the new capabilities that you might find in your codebase.

Reactive programming requires a shift in how to reason about a particular problem, because we need to focus not on individual data items but on data flowing as streams of events. These events are often produced and consumed from different threads, and so when writing tests we must keep aware of concurrency concerns. Fortunately RxJava provides built-in support for testing Observables and Subscriptions, built right into the core rxjava dependency.

First Steps

Let’s revisit the words example from the last article and explore how we can test it out. Let’s begin by setting up the base test harness using JUnit as our testing framework:

import rx.Observable;
import rx.observers.TestSubscriber;
import rx.plugins.RxJavaHooks;
import rx.schedulers.Schedulers;
import java.util.*;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.*;
import org.junit.Test;
import static org.junit.Assert.assertThat;

public class RxJavaTest {
    private static final List<String> WORDS = Arrays.asList(
       "the",
       "quick",
       "brown",
       "fox",
       "jumped",
       "over",
       "the",
       "lazy",
       "dog"
    );
}

Our first test will follow a naive approach, given the fact that subscriptions will, by default, run on the calling thread, if no particular Scheduler is supplied. This means we can setup a Subscription and assert its state immediately after the subscription takes place:

@Test
public void testInSameThread() {
    // given:
    List<String> results = new ArrayList<>();
    Observable<String> observable = Observable
       .from(WORDS)
       .zipWith(Observable.range(1, Integer.MAX_VALUE),
           (string, index) -> String.format("%2d. %s", index, string));

       // when:
    observable.subscribe(results::add);

    // then:
    assertThat(results, notNullValue());
    assertThat(results, hasSize(9));
    assertThat(results, hasItem(" 4. fox"));
}

Notice that we used an explicit List<String> to accumulate our results, along with a real subscriber. Given the simple nature of this test you may think that using an explicit accumulator as such is good enough, but remember that production grade observables may encapsulate errors or produce unexpected events; the simple subscriber plus accumulator combo is not sufficient to cover those cases. But don’t fret, RxJava provides a TestSubscriber type that can be used in such cases. Let’s refactor the previous test using this type

@Test
public void testUsingTestSubscriber() {
    // given:
    TestSubscriber<String> subscriber = new TestSubscriber<>();

    Observable<String> observable = Observable
        .from(WORDS)
        .zipWith(Observable.range(1, Integer.MAX_VALUE),
           (string, index) -> String.format("%2d. %s", index, string));

    // when:
    observable.subscribe(subscriber);

    // then:
    subscriber.assertCompleted();
    subscriber.assertNoErrors();
    subscriber.assertValueCount(9);
    assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
}

TestSubscriber replaces the custom accumulator, but it also provides some additional behavior. For example it’s capable of telling us how many events were received and the data related to each event. It can also assert that the subscription was successfully completed and that no errors appeared during the consumption of the Observable. The current Observable under test does not produce any errors, but as we learned back in RxJava by Example Observables treat exceptions exactly the same as data events. We can simulate an error by concatenating an exception event in the following way

@Test
public void testFailure() {
    // given:
    TestSubscriber<String> subscriber = new TestSubscriber<>();
    Exception exception = new RuntimeException("boom!");

    Observable<String> observable = Observable
        .from(WORDS)
        .zipWith(Observable.range(1, Integer.MAX_VALUE),
           (string, index) -> String.format("%2d. %s", index, string))
        .concatWith(Observable.error(exception));

    // when:
    observable.subscribe(subscriber);

    // then:
    subscriber.assertError(exception);
    subscriber.assertNotCompleted();
}

 

All is well in our limited use case. But actual production code can vary greatly, so let’s consider some more complex production cases.

Custom Schedulers

Quite often you’ll find cases in production code where Observables are executed on a specific thread, or “scheduler” in Rx parlance. Many Observable operations take an optional Scheduler parameter as an additional argument. RxJava defines a set of named Schedulers that can be used at any time. Some of these are io and computation, (which are shared threads) and newThread. You can also supply your own custom Scheduler implementation. Let’s change the observable code by specifying the computation Scheduler.

@Test
public void testUsingComputationScheduler() {
    // given:
    TestSubscriber<String> subscriber = new TestSubscriber<>();
    Observable<String> observable = Observable
        .from(WORDS)
        .zipWith(Observable.range(1, Integer.MAX_VALUE),
            (string, index) -> String.format("%2d. %s", index, string));

    // when:
    observable
        .subscribeOn(Schedulers.computation())
        .subscribe(subscriber);

    // then:
    subscriber.assertCompleted();
    subscriber.assertNoErrors();
    assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
}

You’ll quickly discover there’s something wrong with this test once you run it. The subscriber performs its assertions on the test thread however the Observable produces values on a background thread (the computation thread). This means the subscriber’s assertions may be executed before the Observable has produced all relevant events, resulting in a failing test.

There are a few strategies we can choose to turn the test green:

  • Turn the Observable into a blocking Observable.
  • Force the test to wait until a certain condition is met.
  • Switch the computation scheduler for an immediate one.

We’ll cover each strategy starting with the one that requires less effort: turning the Observable into a blocking Observable. This technique works regardless of the Scheduler in use. The assumption is that data is produced in a background thread, causing subscribers to be notified in that same background thread.

What we'd like to do is force all events to be produced and the Observable to complete before the next statement in the test is evaluated. This is done by calling toBlocking() on the Observable itself.

@Test
public void testUsingBlockingCall() {
    // given:
    Observable<String> observable = Observable.from(WORDS)
    .zipWith(Observable.range(1, Integer.MAX_VALUE),
          (string, index) -> String.format("%2d. %s", index, string));

    // when:
    Iterable<String> results = observable
    	.subscribeOn(Schedulers.computation())
    	.toBlocking()
    	.toIterable();

    // then:
    assertThat(results, notNullValue());
    assertThat(results, iterableWithSize(9));
    assertThat(results, hasItem(" 4. fox"));
}

While this approach may be acceptable for the trivial code we have shown, it may not be practical for actual production code. What if the producer takes a long time to create all the data? This will make the test a slow one, increasing build times. There may be other timing issues as well. I’d now like to point your attention to a handy library named Awaitility. Simply put, Awaitility is a DSL that allows you to express expectations about an asynchronous system in a concise and easy to read manner. You can include the Awaitility dependency using Maven:

<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <version>2.0.0</version>
    <scope>test</scope>
</dependency>

Or using Gradle:

testCompile 'org.awaitility:awaitility:2.0.0'

The entry point of the Awaitility DSL is the org.awaitility.Awaitility.await() method see lines 13-14 in the example below). From there you can define conditions that must be met in order to let the test continue. You may decorate conditions with timeouts and other temporal constraints, for example minimum, maximum or duration range. Revisiting the previous test with Awaitility in tow results in the following code

1    @Test 
2    public void testUsingComputationScheduler() { 
3      // given: 
4      TestSubscriber<String> subscriber = new TestSubscriber<>(); 
5      Observable<String> observable = Observable.from(WORDS) 
6          .zipWith(Observable.range(1, Integer.MAX_VALUE), 
7              (string, index) -> String.format("%2d. %s", index, string)); 
8    
9      // when: 
10     observable.subscribeOn(Schedulers.computation()) 
11               .subscribe(subscriber); 
12   
13     await().timeout(2, SECONDS) 
14            .until(subscriber::getValueCount, equalTo(9)); 
15   
16     // then: 
17     subscriber.assertCompleted(); 
18     subscriber.assertNoErrors(); 
19     assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox")); 
20   } 

This version does not change the nature of the Observable in any way, which allows you to test unaltered production code without any modification. This version of the test awaits at most 2 seconds for the Observable to performs its job by checking the state of the subscriber. If everything goes well, the subscriber’s state checks out to 9 events before the 2 second timeout elapses.

Awaitility plays nicely with Hamcrest matchers, Java 8 lambdas, and method references, thus resulting in concise and readable conditions. There are also ready made extensions for popular JVM languages such as Groovy and Scala.

The final strategy we’ll cover makes use of the extension mechanism that RxJava exposes as part of its API. RxJava defines a series of extension points that enable you to tweak almost every aspect of its default behavior. This extension mechanism effectively allows us to supply tailor made values for a particular RxJava feature. We’ll take advantage of this mechanism to let our test inject a specific Scheduler regardless of the one specified by the production code. The behavior we’re looking for is encapsulated in the RxJavaHooks class. Assuming our production code relies on the computation() scheduler we’re going to override its default value, returning a Scheduler that makes event processing happen in the same thread as the caller code; this is the Schedulers.immediate() scheduler. Here’s how the test looks now:

1    @Test 
2    public void testUsingRxJavaHooksWithImmediateScheduler() { 
3      // given: 
4      RxJavaHooks.setOnComputationScheduler(scheduler -> Schedulers.immediate()); 
5      TestSubscriber<String> subscriber = new TestSubscriber<>(); 
6      Observable<String> observable = Observable.from(WORDS) 
7          .zipWith(Observable.range(1, Integer.MAX_VALUE), 
8              (string, index) -> String.format("%2d. %s", index, string)); 
9    
10     try { 
11        // when: 
12        observable.subscribeOn(Schedulers.computation()) 
13        .subscribe(subscriber); 
14   
15        // then: 
16        subscriber.assertCompleted(); 
17        subscriber.assertNoErrors(); 
18        subscriber.assertValueCount(9); 
19        assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox")); 
20    } finally { 
21        RxJavaHooks.reset(); 
22    } 
23   } 

The production code is unaware that the computation() scheduler is an immediate one during testing. Please take note that you must reset the hook otherwise the immediate scheduler setting may leak, resulting in broken tests all over the place. The usage of a try/finally block obscures the intention of the test code a bit, but fortunately we can refactor out this behavior using a JUnit rule, making the test slimmer and more readable as a result. Here’s one possible implementation for such a rule

public class ImmediateSchedulersRule implements TestRule {
    @Override
    public Statement apply(final Statement base, Description description) {
        return new Statement() {
            @Override
            public void evaluate() throws Throwable {
                RxJavaHooks
                    .setOnIOScheduler(scheduler -> Schedulers.immediate());
                RxJavaHooks
                    .setOnComputationScheduler(scheduler -> Schedulers.immediate());
                RxJavaHooks
                    .setOnNewThreadScheduler(scheduler -> Schedulers.immediate());
                try {
                     base.evaluate();
                } finally {
                     RxJavaHooks.reset(); }
                }
        };
    }
}

We override two other scheduler producing methods for good measure, making this rule more generic for other testing purposes down the road. Usage of this rule in a new testcase class is straightforward, we simply declare a field with the new type annotated with @Rule, like so

@Rule
public final ImmediateSchedulersRule schedulers = new ImmediateSchedulersRule();

@Test
public void testUsingImmediateSchedulersRule() {
    // given:
    TestSubscriber<String> subscriber = new TestSubscriber<>();
    Observable<String> observable = Observable.from(WORDS)
        .zipWith(Observable.range(1, Integer.MAX_VALUE),
            (string, index) -> String.format("%2d. %s", index, string));

    // when:
    observable.subscribeOn(Schedulers.computation())
        .subscribe(subscriber);

    // then:
    subscriber.assertNoErrors();
    subscriber.assertCompleted();
    subscriber.assertValueCount(9);
    assertThat(subscriber.getOnNextEvents(), hasItem(" 4. fox"));
}

In the end we get the same behavior as before but with less clutter. Let’s take a moment to look back what we have accomplished so far:

  • Subscribers process data in the same thread as long as there’s no specific Scheduler in use. This means we can make assertions on a subscriber right after it subscribes to an Observable.
  • TestSubscriber can accumulate events and provide additional assertions on its state.
  • Any Observable can be turned into a blocking Observable, thus enabling us to synchronously wait for events to be produced, regardless of the Scheduler used by the observable.
  • RxJava exposes an extension mechanism that enables developers to override its defaults, and inject those right into the production code.
  • Awaitility can be used to test out concurrent code using a DSL.

Each one of these techniques comes in handy in different scenarios, however all them are connected by a common thread (pun intended): the test code waits for the Observable to complete before making assertions on the subscriber’s state. What if there was a way to inspect the Observable’s behavior as it produces the data? In other words, what if it were possible to programmatically debug the Observable in place? We'll see a technique for doing that next.

Playing with Time

So far we’ve tested observables and subscriptions in a black box manner. Now we’ll have a look at another technique that allows us to manipulate time in such a way that we can pop the hood and look at a subscriber’s state while the Observable is still active, in other words, we’ll use a white box testing technique. Once again, it’s RxJava to the rescue, with its TestScheduler class. This particular Scheduler enables you to specify exactly how time passes inside of it. You can for example advance time by half a second, or make it leap 5 seconds. We’ll start by creating an instance of this new Scheduler and then pass it to the test code

1    @Test 
2    public void testUsingTestScheduler() { 
3     // given: 
4     TestScheduler scheduler = new TestScheduler(); 
5     TestSubscriber<String> subscriber = new TestSubscriber<>(); 
6     Observable<Long> tick = Observable.interval(1, SECONDS, scheduler); 
7    
8     Observable<String> observable = Observable.from(WORDS) 
9     .zipWith(tick, (string, index) -> String.format("%2d. %s", index, string)); 
10   
11    observable.subscribeOn(scheduler) 
12    .subscribe(subscriber); 
13   
14    // expect: 
15    subscriber.assertNoValues(); 
16    subscriber.assertNotCompleted(); 
17   
18    // when: 
19    scheduler.advanceTimeBy(1, SECONDS); 
20   
21    // then: 
22    subscriber.assertNoErrors(); 
23    subscriber.assertValueCount(1); 
24    subscriber.assertValues(" 0. the"); 
25   
26    // when: 
27    scheduler.advanceTimeTo(9, SECONDS); 
28    subscriber.assertCompleted(); 
29    subscriber.assertNoErrors(); 
30    subscriber.assertValueCount(9);
31   }

The "production" code changed a little, as we’re now using an interval that is tied to the Scheduler to produce the numbering (line 6) instead of a range. This has the side-effect of producing numbers starting with 0 instead of the original 1. Once the Observable and the test scheduler are configured we immediately assert that the subscriber has no values (line 15), and has not completed or generated any errors (line 16). This is a sanity check as the scheduler has not moved at this point and so no values should have been produced by the observable nor received by the subscriber.

Next we move time by one whole second (line 19), this should cause the Observable to produce the first value, and that’s exactly what the next set of assertions checks (lines 22-24).

We next advance time to 9 seconds from now. Mind you that this means moving to exactly 9 seconds from the scheduler’s start, (and not advancing 9 seconds after already advancing 1 before, which would result in the scheduler looking at 10 seconds after it’s start). In other words, advanceTimeBy() moves the scheduler’s time relative to its current position, whereas advanceTimeTo()moves the scheduler’s time in an absolute fashion. We make another round of assertions (lines 28-30) to ensure the Observable has produced all data and that the subscriber has consumed it all as well. One more thing to note about the usage of TestScheduler is that real time moves immediately, which means our test does not have to wait 9 seconds to complete.

As you can see using this scheduler is quite handy but it requires you to supply the scheduler to the Observable under test. This will not play well with Observables that use a specific scheduler. But wait a second, we saw earlier how we can switch a scheduler without affecting production code using RxJavaHooks, but this time supplying a TestScheduler (lines 13-15) instead of an immediate scheduler. We can even go so far as to apply the same technique of a custom JUnit rule, allowing the previous code to be rewritten in a more reusable form. First the new rule:

1    public class TestSchedulerRule implements TestRule { 
2     private final TestScheduler testScheduler = new TestScheduler(); 
3    
4     public TestScheduler getTestScheduler() { 
5       return testScheduler; 
6     } 
7    
8     @Override 
9     public Statement apply(final Statement base, Description description) { 
10      return new Statement() { 
11        @Override 
12        public void evaluate() throws Throwable { 
13          RxJavaHooks.setOnIOScheduler(scheduler -> testScheduler); 
14          RxJavaHooks.setOnComputationScheduler(scheduler -> testScheduler); 
15          RxJavaHooks.setOnNewThreadScheduler(scheduler -> testScheduler); 
16   
17          try { base.evaluate(); } 
18          finally { RxJavaHooks.reset(); } 
19        } 
20      }; 
21    } 
22   } 

Followed by the actual test code (in a new testcase class), to use our test rule:

23    @Rule 
24    public final TestSchedulerRule testSchedulerRule = new TestSchedulerRule(); 
25   
26    @Test 
27    public void testUsingTestSchedulersRule() { 
28      // given: 
29      TestSubscriber<String> subscriber = new TestSubscriber<>(); 
30   
31      Observable<String> observable = Observable.from(WORDS) 
32        .zipWith(Observable.interval(1, SECONDS), 
33          (string, index) -> String.format("%2d. %s", index, string)); 
34   
35      observable.subscribeOn(Schedulers.computation()) 
36        .subscribe(subscriber); 
37   
38      // expect 
39      subscriber.assertNoValues(); 
40      subscriber.assertNotCompleted(); 
41   
42      // when: 
43      testSchedulerRule.getTestScheduler().advanceTimeBy(1, SECONDS); 
44   
45      // then: 
46      subscriber.assertNoErrors(); 
47      subscriber.assertValueCount(1); 
48      subscriber.assertValues(" 0. the"); 
49   
50      // when: 
51      testSchedulerRule.getTestScheduler().advanceTimeTo(9, SECONDS); 
52      subscriber.assertCompleted(); 
53      subscriber.assertNoErrors(); 
54      subscriber.assertValueCount(9); 
55    }

And there you have it. The usage of a TestScheduler injected via RxJavaHooks allow you to write the test code without altering the composition of the original Observable yet it gives you the means to modify time and make assertions at specific points during the execution of the observable itself. All the techniques showed in this article should give you enough options to test out RxJava enabled code.

The Future

RxJava is one of the first libraries to provide reactive programming capabilities to Java. The upcoming version 2.0 has been redesigned to better align its API with the Reactive Streams specification, which provides a standard for asynchronous stream processing with non-blocking back pressure, targeting Java and JavaScript runtimes. This means there will be some API changes coming up in the next version; you can find a detailed description of these changes at the RxJava wiki.

In terms of testing you’ll see that the core types (Observable, Maybe, and Single) now sport a handy method named test() that creates a TestSubscriber instance for you on the spot. You may now chain method calls on TestSubscriber and there are some new assertion methods found on this type, as well.

This article has been updated to accommodate the new changes and challenges that have appeared. Visit “Testing RXJava2” for the newer version.

About the Author

Andres Almiray is a Java/Groovy developer and Java Champion with more than 17 years of experience in software design and development. He has been involved in web and desktop application development since the early days of Java. He is a true believer of Open Source and has participated in popular projects like Groovy, JMatter, Asciidoctor and more. Founding member and current project lead of the Griffon framework. Spec lead for JSR 377.

Rate this Article

Relevance
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Tell us what you think

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

await() ... getValueCount() ... assertCompleted by Dávid Karnok

Note that having the 9 element doesn't mean the onCompleted was fired and registered by the TestSubscriber in the right time thus assertCompleted() may still fail.

Also, you can wait for a terminal event with TestSubscriber.awaitTerminalEvent(time, unit)

Tests with marble diagrams in ASCII form by Alexandre Victoor

Thanks for this article!
Another testing API has been developed within RxJS, using marble diagrams in ASCII form. I felt in love of this API, it makes tests so concise and readable... and so I wrote a Java port of it: MarbleTest4J
I look forward to get your feedback

Re: await() ... getValueCount() ... assertCompleted by Andres Almiray

All examples that make use of TestSubscriber call `assertCompleted()` before checking the final values. I understand by your comment that the inverse may result in failures.

And yes, I totally forgot about `TestSubscriber.awaitTerminalEvent(time, unit) ` even thought it was staring back right at me. Thank you for the reminder :-)

LICENSE of the code available above. by Android Developer

Hi Andres,

I wanted to know what the license of the code mentioned in this wonderful article is. I scanned through the website and couldn't find something suggesting the License of the code in the article. I did not find a License for the code in the InfoQ newsletter numbered 48 as well.

I wanted to put up a GitHub repo and wanted to put a License there. It would be great if you could mention the license of the code in the article.

Regards,
An Android Developer.

Re: LICENSE of the code available above. by Andres Almiray

Hi, I'm glad you found the article to be useful. The code is licensed under GPL3.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

5 Discuss
BT