BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Designing, Implementing, and Using Reactive APIs

Designing, Implementing, and Using Reactive APIs

Leia em Português

This item in japanese

Bookmarks

Key Takeaways

  • Before pursuing a reactive design, ensure that using reactive programming is right for your project
  • Reactive methods always return something because they build an execution framework, but do not start execution
  • Reactive programming allows you to declaratively state sequential and parallel relationships between operations, pushing execution optimization into the framework
  • Errors are first-class items in the flow and should be handled directly
  • Because many flows are asynchronous, special care must be taken when testing using synchronous frameworks
This article is derived from a talk given at SpringOne. You can see the video of that talk here.

 

In the last couple of years there’s been a strong push towards reactive programming in the Java world.  Whether it’s the success of NodeJS developers using non-blocking APIs, the explosion of latency-inducing microservices, or simply a desire to more efficiently utilize computing resources, many developers are starting to look at reactive programming as a viable programming model. 

Fortunately, Java developers are spoiled for choice when it comes to reactive frameworks and how to properly use them.  There aren’t very many “wrong” ways to write reactive code, but therein lies the rub; there are very few “right” ways to write reactive code.

In this article, we aim to give you some opinionated suggestions about how you should write reactive code.  These opinions come from years of experience developing a large-scale reactive API, and while they might not all be right for you, we hope they give you some direction as you get started on your reactive journey.

The examples in this article all come from the Cloud Foundry Java Client.  This project uses Project Reactor for its reactive framework.  We chose Reactor for the Java Client because of its close integration with the Spring team, but all of the concepts that we discuss apply to other reactive frameworks such as RxJava.  While it will be helpful if you have some understanding of Cloud Foundry, it is not required.  The examples have self-explanatory naming that will guide you through the reactive concept each is demonstrating.

Reactive programming is a vast subject and is well beyond the scope of this article, but for our purposes, let’s define it broadly as a way to define event driven systems in a more fluent way than we would with a traditional imperative programming style.  The goal is to move imperative logic to an asynchronous, non-blocking, functional style that is easier to understand and reason about. 

Many of the imperative APIs designed for these behaviors (threads, NIO callbacks, etc.) are not considered easy to use correctly and reliably, and in many cases using these APIs still requires a fair amount of explicit management in application code.  The promise of a reactive framework is that these concerns can be handled behind the scenes, allowing the developer to write code that focuses primarily on application functionality.

Should I Use Reactive Programming?

The very first question to ask yourself when designing a reactive API is whether you even want a reactive API!  Reactive APIs are not the correct choice for absolutely everything.  There are demonstrable downsides to reactive programming (debugging being the biggest these days, but both frameworks and IDEs are working on it).  Instead, you choose a reactive API when the value gained significantly outweighs the downsides.  When making this judgment, there are a couple of obvious patterns for which reactive programming is a good fit.

Networking

Network requests inherently involve (relatively) large latencies, and waiting on those responses to return is often the single largest waste of resources in a system.  In a non-reactive application, those waiting requests often block threads and consume stack memory, idly waiting for a response to arrive.  Remote failures and timeouts are often not handled systemically and explicitly because the provided APIs don’t make it easy to do so.  Finally, the payloads from remote calls are often of an unknown and unbounded size, leading to heap memory exhaustion.  Reactive programming, combined with non-blocking IO, addresses these kinds of issues because it gives you a clear and explicit API for each one.

Highly Concurrent Operations

Coordination of highly concurrent operations like network requests or parallelizable CPU-intensive calculations are also a good fit.  Reactive frameworks, while allowing explicit management of threading, excel when they are left to manage threading automatically.  Operators like .flatMap() transparently parallelize behaviors, maximizing the use of available resources.

Massively Scaled Applications

The servlet model of one thread per connection has served us well for years.  But with microservices we’ve started to see applications massively scaled (25, 50, even 100 instances of a single stateless application) to handle connection loads even while the CPU usage is idle.  Choosing non-blocking IO, with reactive programming to make it palatable, breaks this linkage and makes much more efficient use of available resources.  To be clear, the advantage here is often startling.  It often takes many more instances of an application built on Tomcat with hundreds or thousands of threads to handle the same load as the same application built on Netty with eight threads.

While this shouldn’t be considered an exhaustive list of places where reactive programming is useful, the key to remember here is that if your application doesn’t fit into one of these categories, you may be adding complexity without gaining any value.

What Should a Reactive API Return?

If you’ve answered the first question, and you’ve determined that your application will benefit from a reactive API, it’s time to design your API.  A good place to start is deciding what primitives your reactive API should return.

All of the reactive frameworks in the Java world (including Java 9’s Flow) are converging on the Reactive Streams Specification.  This specification defines a low-level interop API, but isn’t considered a reactive framework (i.e. it doesn’t specify the operators available to streams).

There are two main types in Project Reactor that everything builds on.  The Flux<T> type represents 0 to N values flowing through the system.  The Mono<T> type represents 0 to 1 values.  Inside the Java Client we use Mono almost exclusively since it maps clearly to a single request, single response model.

Flux<Application> listApplications() {...}

Flux<String> listApplicationNames() {
  return listApplications()
    .map(Application::getName);
}

void printApplicationName() {
  listApplicationNames()
    .subscribe(System.out::println);
}

In this example, the listApplications() method executes a network call and returns a Flux of 0 to N Application instances.  We then use the .map() operator to transform each Application into a String of its name.  That Flux of application names is then consumed and printed out to console.

Flux<Application> listApplications() {...}

Mono<List<String>> listApplicationNames() {
  return listApplications()
    .map(Application::getName)
    .collectList();
}

Mono<Boolean> doesApplicationExist(String name) {
  return listApplicationNames()
    .map(names -> names.contains(name));
}

Monos don’t have a flow in the same way that Fluxs do, but since they are conceptually a flow of one element, the operators that we use generally have the same name.  In this example, in addition to mapping to a Flux of application names, we collect those names into a single List. The Mono that contains that List can then be transformed, in this case, to a Boolean indicating whether a name is contained within it.  It might be counterintuitive, but it is common to return a Mono of collections (e.g. Mono<List<String>>) if the item you are working with is logically a collection of items rather than a stream of them.

Unlike in imperative APIs, void is not an appropriate reactive return type. Instead, every method must return either a Flux or a Mono.  This probably seems odd (there are still behaviors that have nothing to return!) but is a result of the basic operation of a reactive flow.  The execution of code that calls reactive APIs (e.g. .flatMap().map()...) is building a framework for data to flow through, but not actually transforming data.  Only at the end, when .subscribe() is called, does data begin moving through the flow, being transformed by the operations as it goes. This lazy execution is why reactive programing is built on top of lambdas, and why the types are always returned; there must always be something to .subscribe() to.

void delete(String id) {
  this.restTemplate.delete(URI, id);
}

public void cleanup(String[] args) {
  delete("test-id");
}

The imperative and blocking example above can return void as the execution of the network call begins immediately and does not return until the response is received.

Mono<Void> delete(String id) {
  return this.httpClient.delete(URI, id);
}

public void cleanup(String[] args) {
  CountDownLatch latch = new CountDownLatch(1);

  delete("test-id")
    .subscribe(n -> {}, Throwable::printStackTrace, () -> latch::countDown);

  latch.await();
}

In this reactive example the network call doesn’t begin until .subscribe() is called, after delete() returns, because it is the framework for making that call, not the result of the call itself.  In this case, we have the equivalent of a void return type by using a Mono<Void> that returns 0 items, signaling onComplete() only after the response has been received.

Scope of Methods

Once you’ve decided on what your APIs need to return, you then need to look at what each one of your methods (both API and implementation) will do.  On the Java Client, we’ve found that designing methods such that they are small and reusable pays dividends.  It enables each of these methods to be more easily composed into larger operations.  It also allows them to be more flexibly combined into parallel or sequential operations.  As a bonus, it also makes potentially complex flows much more readable.

Mono<ListApplicationsResponse> getPage(int page) {
  return this.client.applicationsV2()
    .list(ListApplicationsRequest.builder()
      .page(page)
      .build());
}

void getResources() {
  getPage(1)
    .flatMapMany(response -> Flux.range(2, response.getTotalPages() - 1)
      .flatMap(page -> getPage(page))
      .startWith(response))
    .subscribe(System.out::println);
}

This example demonstrates how we call a paginated API.  The first request to getPage() retrieves the first page of results.  Included in that first page of results is the total number of pages we need to retrieve to get the complete result.  Because the getPage() method is small, reusable, and without side effects, we are then able to reuse the method and call for page 2 through totalPages in parallel!

Sequential and Parallel Coordination

These days, nearly all significant performance improvements come from concurrency.  We know this and yet many system are either only concurrent with respect to incoming connections or are not concurrent at all.  Much of this situation can be traced back to the fact that implementing a highly concurrent system is difficult and error prone.  One of the key benefits of reactive programming is that you define sequential and parallel relationships between operations, and let the framework determine the optimal way to make use of the available resources.

Taking a look at the previous example again; the first call to getPage() is guaranteed to happen sequentially before the subsequent calls for each additional page.  In addition, since those subsequent calls to getPage() happen in a .flatMapMany(), the framework is responsible for optimally multi-threading their execution and joining the results back together, propagating any errors that might have occured.

Conditional Logic

Unlike in imperative programming, errors are considered values in reactive programming. This means that they pass through the flow’s operations.  These errors can be passed all the way to consumers, or flows can change behavior based on them.  This behavior change can manifest as the transformation of errors or generating new results based on an error.

public Mono<AppStatsResponse> getApplication(GetAppRequest request) {
  return client.applications()
    .statistics(AppStatsRequest.builder()
      .applicationId(request.id())
      .build())
    .onErrorResume(ExceptionUtils.statusCode(APP_STOPPED_ERROR),
      t -> Mono.just(AppStatsResponse.builder().build()));
}

In this example, a request is made to get the statistics for a running application.  If everything works as expected, the response is passed back to the consumer. However, if an error is received (with a specific status code), then an empty response is returned. The consumer never sees the error and execution proceeds with a default value, as if the error was never signaled.

As discussed previously, it is valid for a flow to complete without sending any items. This is often the equivalent of returning null (of which a void return type is a special case). Like the error case, this completion without any items can be passed all the way to consumers, or flows can change behavior based on them.

public Flux<GetDomainsResponse> getDomains(GetDomainsRequest request) {
  return requestPrivateDomains(request.getId())
    .switchIfEmpty(requestSharedDomains(request.getId()));
}

In this example, getDomains() returns a domain that could be in one of two different buckets.  First the private domains are searched, and if that completes successfully, albeit without any results, then the shared domains are searched.

public Mono<String> getDomainId(GetDomainIdRequest request) {
  return getPrivateDomainId(request.getName())
    .switchIfEmpty(getSharedDomainId(request.getName()))
    .switchIfEmpty(ExceptionUtils.illegalState(
      "Domain %s not found", request.getName()));
}

It can also be the case that no items indicate an error condition.  In this example, if no private or shared domain can be found, a new IllegalStateException is generated and passed through to the consumer.

Sometimes however, you want to make decisions based not on errors or emptiness, but on the values themselves.  While it’s possible to implement this logic using operators, it often proves to be more complex than is worthwhile.  In this case, you should just use imperative conditional statements.

public Mono<String> getDomainId(String domain, String organizationId) {
  return Mono.just(domain)
    .filter(d -> d == null)
    .then(getSharedDomainIds()
      .switchIfEmpty(getPrivateDomainIds(organizationId))
      .next()  // select first returned
      .switchIfEmpty(ExceptionUtils.illegalState("Domain not found")))
    .switchIfEmpty(getPrivateDomainId(domain, organizationId)
      .switchIfEmpty(getSharedDomainId(domain))
      .switchIfEmpty(
          ExceptionUtils.illegalState("Domain %s not found", domain)));
}

This example returns the id of a given domain name, within a given organization (a hierarchical container).  There’s a twist here though - if the domain is null, the id of the first of a shared domain or a private domain scoped to the organization is returned.  If the domain is not null, the explicit domain name is searched for and its id is returned.  If you find this code confusing, don’t despair, we do too!

public Mono<String> getDomainId(String domain, String organizationId) {
  if (domain == null) {
    return getSharedDomainIds()
      .switchIfEmpty(getPrivateDomainIds(organizationId))
      .next()
      .switchIfEmpty(ExceptionUtils.illegalState("Domain not found"));
  } else {
    return getPrivateDomainId(domain, organizationId)
      .switchIfEmpty(getSharedDomainId(domain))
      .switchIfEmpty(
          ExceptionUtils.illegalState("Domain %s not found", domain));
    }
}

This example is equivalent, but uses imperative conditional statements.  Much more understandable, wouldn’t you agree?

Testing

In practice, most useful flows will be asynchronous.  This is problematic when testing because testing frameworks are aggressively synchronous, registering passing or failing, long before asynchronous results would be returned.  To compensate for this, you must block the main thread until the results are returned and then move those results to the main thread for assertions.

@Test
public void noLatch() {
  Mono.just("alpha")
    .subscribeOn(Schedulers.single())
    .subscribe(s -> assertEquals("bravo", s));
}

This example, which emits a String on a non-main thread, unexpectedly passes.  The root cause of this test passing, when it clearly should not, is that the noLatch method will complete without throwing an AssertionError.

@Test
public void latch() throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);
  AtomicReference<String> actual = new AtomicReference<>();

  Mono.just("alpha")
    .subscribeOn(Schedulers.single())
    .subscribe(actual::set, t -> latch.countDown(), latch::countDown);

  latch.await();
  assertEquals("bravo", actual.get());
}

This example, while admittedly clunky, uses a CountDownLatch to ensure that the latch() method does not return until after flow has completed.  Once latch releases, an assertion is made in the main thread that will throw an AssertionError causing the test to fail.

You would be forgiven for looking at that code and refusing to implement all of your tests that way; we certainly did.  Luckily, Reactor provides a StepVerifier class to facilitate testing.

The testing of a reactive design requires more than just blocking.  You often need to assert against multiple values and expected errors, while ensuring that unexpected errors cause a test failure.  StepVerifier addresses each of these.

@Test
public void testMultipleValues() {
  Flux.just("alpha", "bravo")
    .as(StepVerifier::create)
    .expectNext("alpha")
    .expectNext("bravo")
    .expectComplete()
    .verify(Duration.ofSeconds(5));
}

In this example, the StepVerifier is used to expect that exactly alpha and bravo are emitted and then the flow complete.  If either one of them isn’t emitted, an extra element is emitted, or an error is generated, the test will fail.

@Test
public void shareFails() {
  this.domains
    .share(ShareDomainRequest.builder()
      .domain("test-domain")
      .organization("test-organization")
      .build())
    .as(StepVerifier::create)
    .consumeErrorWith(t -> assertThat(t)
      .isInstanceOf(IllegalArgumentException.class)
      .hasMessage("Private domain test-domain does not exist"))
    .verify(Duration.ofSeconds(5));
}

This example uses some of StepVerifier’s more advanced features and asserts not only that an error has been signaled, but that it is an IllegalArgumentException and that the message matches what is expected.

CountDownLatches

One of the key things to remember about reactive frameworks is that they can only coordinate their own operations and threading models.  Many of the execution environments that reactive programming will be in will outlast individual threads (e.g. Servlet containers).  In these environments, the asynchronous nature of reactive programming isn’t a problem.  However there are some environments, like the testing examples above, where processes will end before any individual thread.

public static void main(String[] args) {
  Mono.just("alpha")
    .delaySubscription(Duration.ofSeconds(1))
    .subscribeOn(Schedulers.single())
    .subscribe(System.out::println);
}

Just like the test method, this main() method will terminate before alpha is emitted.

public static void main(String[] args) throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);

  Mono.just("alpha")
    .delaySubscription(Duration.ofSeconds(1))
    .subscribeOn(Schedulers.single())
    .subscribe(System.out::println, t -> latch.countDown(),
               latch::countDown);

    latch.await();
}

And just like in the testing example, a CountDownLatch can ensure that the main thread doesn’t terminate before the flow, regardless of what thread it is executing on, terminates.

Blocking Flows

It is quite common today, and for the foreseeable future, to interact with blocking APIs in reactive programming.  In order to bridge between the two, it can be appropriate to block while waiting for a result.  However, some of the benefits of reactive programming, such as efficient resource usage, are lost when bridging to a blocking API in this manner.  Because of this you’ll want to keep your code reactive for as long as possible, only blocking at the last moment.  Also worth noting is that the logical conclusion of this idea is that a reactive API can be made blocking, but a blocking API can never be made reactive.

Mono<User> requestUser(String name) {...}

User getUser(String name) {
  return requestUser(name)
    .block();
}

In this example, .block() is used to bridge the single result from a Mono to an imperative return type.

Flux<User> requestUsers() {...}

List<User> listUsers() {
  return requestUsers()
    .collectList()
    .block();
}

Like in the previous example, .block() is used to bridge a result to an imperative return type, but before that can happen, the Flux must be collected into a single List.

Error Handling

As described earlier, errors are values that flow through the system.  This means that there is never an appropriate point to catch an exception.  You should, however, handle them either as part of the flow, or as the subscriber.  The .subscribe() method has between 0 and 3 parameters that allow you to handle each item as it arrives, handle an error if it is generated, and handle the completion of a flow.

public static void main(String[] args) throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);

  Flux.concat(Mono.just("alpha"), Mono.error(new IllegalStateException()))
    .subscribe(System.out::println, t -> {
      t.printStackTrace();
      latch.countDown();
    }, latch::countDown);

  latch.await();
}

In this example, both a value and an error are passed to the subscriber.  It is important to remember, when using a CountDownLatch, that only one of onError() or onComplete() is called.  Therefore you must release the latch in both the error and success cases.

Composable Method References

As you can imagine, any programming model that leans heavily on lambdas is susceptible to “callback hell”.  However, with a little discipline and method references, it doesn’t have be.  Something that any reasonable Ruby developer would tell you is that small private methods (even one-liners!) are really valuable when it comes to readability.  If you name your methods well and use method reference syntax, you can create very readable flows.

public Flux<ApplicationSummary> list() {
  return Mono
    .zip(this.cloudFoundryClient, this.spaceId)
    .flatMap(function(DefaultApplications::requestSpaceSummary))
    .flatMapMany(DefaultApplications::extractApplications)
    .map(DefaultApplications::toApplicationSummary);
}

In this example, the flow reads pretty well.  In order to get a Flux<ApplicationSummary> we start by passing in the cloudFoundryClient and a spaceId.  We use those to request a space summary, extract the applications from that space summary, and then map each of those applications to an application summary.  For any individual operation, we don’t know how it behaves, but at this point we don’t need to.  IDEs make it quite easy to traverse to those method references if needed, but this code doesn’t have the clutter of the implementation of each of them.

Point Free Style

Throughout this article, you may have noticed that we use a very compact style.  This is called the Pointfree style.  Its primary benefit is that it helps the developer think about composing functions (a high-level concern) rather than shuffling data (a low-level concern).  We wouldn’t say that this is a hard requirement when writing reactive programming, but we find that most people prefer it (eventually).

Mono<Void> deleteApplication(String name) {
  return PaginationUtils
    .requestClientV2Resources(page -> this.client.applicationsV2()
      .list(ListApplicationsRequest.builder()
        .name(name)
        .page(page)
        .build()))
    .single()
    .map(applicationResource -> applicationResource.getMetadata().getId())
    .flatMap(applicationId -> this.client.applicationsV2()
      .delete(DeleteApplicationRequest.builder()
        .applicationId(applicationId)
        .build()));
}

If you look at this example, you can imagine the many places where variables could be assigned, results returned and generally have it to look more like traditional imperative code.  However, this isn’t likely to increase its readability.  Instead, adding more curly braces, semicolons, equals signs, and return statements, while identifying where data is coming from and going to more explicitly, is likely to confuse the actual point of the flow itself.

Reactive programming is a vast subject, and nearly everyone is just getting started with it.  At this point there are very few “wrong” answers when writing reactive code, but at the same time the abundance of choice leaves many developers confused as to the best way to get started.   We hope that our opinions, born of experience on a large-scale project, help you on your reactive journey and we encourage you to drive the state of the art by experimenting and contributing your discoveries back to the community.

 

Ben Hale is the lead of Pivotal’s Cloud Foundry Java Experience team and is responsible for ecosystem around Java applications running on Cloud Foundry.  

 

 

Paul Harris is the Lead Developer of Pivotal’s Cloud Foundry Java Client and is responsible for enabling Java applications that orchestrate and manage Cloud Foundry.

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

  • good documentation but need some proof-reading

    by Li Yunpeng,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    the code samples may need a few proof-reading work. And samples are not quite straightforward to understand the underlying business purpose IMHO.


    public Flux<GetDomainsResponse> getDomains(GetDomainsRequest request) {
    return requestPrivateDomains(request.getId())
    .switchIfEmpty(requestSharedDomains(request.getId)); // <-- missing "()"
    }

  • Misleading example IMHO

    by Pedro Roson Fernandez,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Hello, just testing examples as I read. In "Scope of Methods":


    getPage(1)
    .map(PaginatedResponse::getTotalPages)
    .flatMapMany(totalPages -> Flux.range(2, totalPages)
    .flatMap(page -> getPage(page)))
    .subscribe(System.out::println);


    Reading the text commenting the code and the figures in the code I thought that:

    • Page 1 would get to the subscribe method (it doesn't, it's just used to take totalPages value)
    • Assuming totalPages is 4, Flux.range(2, totalPages) would get pages from 2 to 4, but it's getting til 5 (second parameter is a count not upper bound)


    Very nice reading anyway. I keep reading.

  • Re: Misleading example IMHO

    by Paul Harris,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Thanks for the comment Pedro. The examples were originally intended for use on slides, and sacrificed correctness for easy readability. Obviously that's not good enough here! There's an update incoming that should address your point, and you can always take a look at our source (github.com/cloudfoundry/cf-java-client/blob/mas...) for a more comprehensive example.

  • Will only reactive client API suffice ?

    by Dibakar Sen,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    "Finally, the payloads from remote calls are often of an unknown and unbounded size, leading to heap memory exhaustion" - Can you please elaborate on this statement ?

    My understanding is.... a remote network call, for instance a REST api call which returns a collection of items needs to be unmarshalled into the corresponding collection of domain objects. Whether the consumer of the REST api is reactive/non-reactive will not matter anyways because the remote endpoint sent all the data at once to the consumer. How will the use of a reactive client be more memory/resource efficient ?

  • Re: Will only reactive client API suffice ?

    by Paul Harris,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Thanks for the comment Dibakar. When we say unknown/unbounded we're talking about the number of objects that might be returned. For example, if we call a logging endpoint, we don't know in advance how many events will be returned, and indeed we don't know that it will ever stop returning events. But as long as the endpoint returns them as discrete objects, perhaps as a 'Flux<Event>', or as a paginated response, reactor will only request as many objects as we can handle. Reactor has been carefully tuned so that it makes good choices about how much to take, avoiding problems like heap exhaustion, but it's also possible to change its defaults to handle unusual scenarios.</event>

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT