BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Functional-Style Callbacks Using Java 8's CompletableFuture

Functional-Style Callbacks Using Java 8's CompletableFuture

 

In preparing a talk about parallel streams in Java recently, I noticed that the classic article “The Free Lunch is Over” (TFLiO) has passed its tenth birthday. For most programmers, this article and its accompanying publicity were their first warnings that the 40-year trend of exponentially increasing processor speed was coming to an end—in fact, had already ended. Its replacement by a different trend, that of increasing processor numbers on each chip, meant that programmers had to make, in Herb Sutter’s words, “a fundamental turn towards concurrency”.

In TFLiO, Sutter observed that the vast majority of programmers “don’t grok concurrency”, but added “once grokked, lock-based programming isn’t that much harder than OO”. The first statement remains undeniably true, but ten years of further experience with lock-based concurrency has done nothing to confirm the second. Fortunately, Java programmers have mostly been able to avoid testing it out, since at the time TFLiO was published, Java 5 had just become available, and its high-level concurrency utilities were beginning to be used. These have enabled Java developers to mostly avoid the need for fine-grained reasoning about synchronisation and critical sections.

From concurrency to parallelism, and back

The Java 5 concurrency library was focused on asynchronous task handling, based on a model of producer threads creating tasks and handing them off to task-consumers via blocking queues. This model was augmented in Java 7 and 8 with support for an alternative style of task execution, involving the decomposition of a task’s data set into subsets, each of which can then be processed by independent homogenous subtasks.

The basic library for this style is the fork/join framework, which allows the programmer to prescribe how a data set should be split, and supports the submission of subtasks to a standard default thread pool; the “common” ForkJoinPool. (Throughout this article, unqualified class and interface names will refer to types in the package java.util.concurrent.) In Java 8, fork/join parallelism was made still more accessible via the mechanism of parallel streams. But not all problems are amenable to this style of parallel processing: element processing must be independent, and the data set must be sufficiently large and the per-element processing costs sufficiently high for parallel speedup to compensate for the overhead of setting up the fork/join framework.

Meanwhile, the focus on the innovation of parallel streams in Java 8 has diverted attention from a very substantial addition to the concurrency library, in the form of the class CompletableFuture<T>. This article will explore CompletableFuture, with the aim of explaining why it is so useful in programming systems that depend on the interaction of different heterogenous asynchronously-executed tasks, and how it complements fork/join-style parallelism, including parallel streams.

A page renderer

Our starting point will be an example borrowed from “Java Concurrency in Practice” (JCiP), the classic exposition of the Java 5 concurrent utilities. In JCiP §6.3, Brian Goetz explores the development of a web page renderer whose task for each page is to render its text and also to download and render its images. Image downloads take a long time, during which the CPU has nothing to do but wait. So the obvious strategy renders a page by first initiating the downloads for all its images, then using the time before they complete to render the page text, and finally rendering the downloaded images.

The first JCiP version of the page renderer uses the concept of a Future, an interface that exposes methods allowing a client to monitor the progress of a task being executed by a different thread. In Listing 1, a Callable, representing the task of downloading all the images of the page, is submitted to an Executor, which returns a Future through which the state of the download task can be interrogated. When the main thread has finished rendering the text of the page, it calls Future.get, a method that blocks until the result of all the downloads are completely available as a List<ImageData>. The obvious drawback is the coarse-grained nature of the download task; no images are available for rendering until all have been downloaded. We shall see next how to alleviate that problem.

public void renderPage(CharSequence source) {
List<ImageInfo> info = scanForImageInfo(source);
// create Callable representing download of all images
final Callable<List<ImageData>> task = () ->
  info.stream()
    	.map(ImageInfo::downloadImage)
    	.collect(Collectors.toList());
	// submit download task to the executor
	Future<List<ImageData>> images = executor.submit(task);
	// renderText(source);
try {
   // get all downloaded images (blocking until all are available)
   final List<ImageData> imageDatas = images.get();
   // render images
   imageDatas.forEach(this::renderImage);
} catch (InterruptedException e) {
   // Re-assert the thread’s interrupted status
   Thread.currentThread().interrupt();
   // We don’t need the result, so cancel the task too
   images.cancel(true);
} catch (ExecutionException e) {
  throw launderThrowable(e.getCause()); }
}

Listing 1. Using a Future to wait for all images to download

Some simplifying assumptions make this example and its subsequent variants manageable: we assume the existence of types ImageInfo (roughly, a URL) and ImageData (the binary data of an image), and of methods scanForImageInfo, downloadImage, renderText, renderImage, launderThrowable, and ImageInfo.downloadImage. The instance variable executor would have been declared with type ExecutorService and appropriately initialized. The code of the original JCiP examples has been modernized in places for this article with Java 8 lambdas and streams.

The reason that this code must wait for all the downloads to complete is because the Future interface that it uses to represent the download tasks is quite limited as a model of asynchronously executed tasks. Future allows a client to interrogate the task for its result, blocking if necessary, or for its status—that is, whether it has been completed or been cancelled. But Future itself does not provide any way of supplying a callback method, which would allow the page rendering thread to be notified when each individual image download is complete.

Listing 2 improves on the coarse granularity of the previous example by submitting the page download tasks to a CompletionService, whose poll and take methods yield the corresponding Future instances in the order in which their tasks have completed, unlike listing 1 that processed the tasks in the order they were submitted. The platform implementation of this interface, ExecutorCompletionService, achieves this by wrapping each task in a FutureTask, a Future implementation that additionally allows a completion callback to be supplied. The callback action specified to the Futures created in ExecutorCompletionService encapsulate the addition of the completed task to a queue, ready for a client to interrogate.

 public void renderPage(CharSequence source) { 
   List<ImageInfo> info = scanForImageInfo(source); 
   CompletionService<ImageData> completionService = 
     new ExecutorCompletionService<>(executor); 

   // submit each download task to the completion service 
   info.forEach(imageInfo -> 
     completionService.submit(imageInfo::downloadImage)); 

   renderText(source); 

   // retrieve each RunnableFuture as it becomes 
   // available (and when we are ready to process it). 
   for (int t = 0; t < info.size(); t++) { 
     Future<ImageData> imageFuture = completionService.take(); 
     renderImage(imageFuture.get()); 
   } 
 }

Listing 2: using a CompletionService to render images as they become available (interrupt and error handling omitted for brevity)

Introducing CompletableFuture

Listing 2 represented the state of the art from Java 5 until last year, when the expressiveness of Java for programming asynchronous systems was greatly improved by the introduction of the class CompletableFuture (CF). This class is an implementation of Future that places callbacks, executed in a different thread from the task, on the same footing as synchronous continuation functions, executed in the same thread. It avoids the biggest problem of conventional callbacks, that of separating the flow of control into different event handlers, by allowing CF instances to be composed with callback methods to form new CFs.

An example is the method thenAccept, which accepts a Consumer (a user-supplied void-bearing function) and returns a new CF. The CF returned has the effect of applying the Consumer to the result of completing the original CF. Like many other CF methods, thenAccept has two further variants: in one, the Consumer is executed by a thread from the common fork/join pool; in the second, it is executed by a thread from an Executor that you supply with the call. This triplication of overloads—running synchronously, running asynchronously in the common ForkJoinPool, and running asynchronously in a supplied thread pool—shoulders most of the responsibility for the otherwise daunting total of nearly 60 CompletableFuture methods.

Here is an example of thenAccept, being used in the reimplementation of the page renderer:

public void renderPage(CharSequence source) { 
        List<ImageInfo> info = scanForImageInfo(source); 
        info.forEach(imageInfo -> 
               CompletableFuture 
       		.supplyAsync(imageInfo::downloadImage) 
       		.thenAccept(this::renderImage)); 
        renderText(source); 
 }

Listing 3: Using CompletableFuture to implement page rendering

Although Listing 3 is notably more concise than the previous incarnations, its style becomes easy to read with a little practice. The factory method supplyAsync returns a new CF that will be completed by running the specified Supplier in the common ForkJoinPool, with the Supplier’s result as the result of the CF. The method thenAccept returns a new CF that will be completed by executing the specified Consumer—in this case rendering the image supplied—on the result of the CF produced by supplyAsync.

To be sure, thenAccept is not the only way of composing a CF with a function. Methods that compose CFs with functions can take the following arguments:

  • a function that will be applied to the CF’s result. The methods that accept these are:
    • thenCompose, for functions that return a CompletableFuture
    • thenApply, for functions that return other types;
    • thenAccept for functions that return void;
  • a Runnable. This is accepted by the method thenRun;
  • functions that can handle both normal and exceptional termination. CF composes both respectively with the methods:
    • handle, for functions that take a value and a Throwable and return a value;
    • whenComplete, for functions that take a value and a Throwable and return void.

Extending the page renderer

Extensions of this example can illustrate other features of CompletableFuture. For example, suppose that we want to use an icon as a visual indicator where an image download has timed out or failed. CF exposes a method get(long, TimeUnit) that throws a TimeoutException after the specified period if the CF has not completed by that time. It can be used to define a function from ImageInfo to ImageData (Listing 4).

Function<ImageInfo, ImageData> infoToData = imageInfo -> { 
   CompletableFuture<ImageData> imageDataFuture = 
       CompletableFuture.supplyAsync(imageInfo::downloadImage, executor); 
   try { 
       return imageDataFuture.get(5, TimeUnit.SECONDS); 
   } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       imageDataFuture.cancel(true); 
       return ImageData.createIcon(e); 
   } catch (ExecutionException e) { 
       throw launderThrowable(e.getCause()); 
   } catch (TimeoutException e) { 
       return ImageData.createIcon(e); 
   } 
}

Listing 4: Using CompletableFuture.get to implement a timeout

Now the page can be rendered by means of successive calls to infoToData. Each call synchronously returns a downloaded image, so to download them in parallel requires a new asynchronous task to be created for each. A suitable factory method for this is CompletableFuture.runAsync(), similar to supplyAsync but taking a Runnable rather than a Supplier:

public void renderPage(CharSequence source) throws InterruptedException { 
       List<ImageInfo> info = scanForImageInfo(source); 
       info.forEach(imageInfo -> 
           CompletableFuture.runAsync(() -> 
               renderImage(infoToData.apply(imageInfo)), executor)); 
}

Now consider a further requirement, to place an indicator on the page when all requests have either completed or timed out—a situation that can be signalled by a return from the join method of all the corresponding CompletableFutures. The static method allOf is provided for this situation, to allow the creation of a void-returning CompletableFuture that is completed when all its components complete. (The join method can normally be used to return a CF result, but to see the results of CFs composed by allOf, they must be queried individually.)

public void renderPage(CharSequence source) { 
       List<ImageInfo> info = scanForImageInfo(source); 
       CompletableFuture[] cfs = info.stream() 
           .map(ii -> CompletableFuture.runAsync( 
               () -> renderImage(mapper.apply(ii)), executor)) 
           .toArray(CompletableFuture[]::new); 
       CompletableFuture.allOf(cfs).join(); 
       renderImage(ImageData.createDoneIcon()); 
  }

Combining Multiple CompletableFutures

A further group of methods allows the combination of multiple CFs. We have already seen the static method allOf, which completes when all its components have completed; its dual, anyOf—also void-bearing—completes when any of its components complete. Apart from these two methods, all others in this group are instance methods that compose the receiver in some way with another CF, then pass the result to a supplied function.

To show how these can work, let’s extend the next example in JCiP, that of a travel reservation portal, so that the progress of a connected set of bookings is recorded in a TripPlan, containing the total price together with a list of the service suppliers used:

 interface TripPlan { 
       List<ServiceSupplier> getSuppliers(); 
       int getPrice(); 
       TripPlan combine(TripPlan); 
   }

A ServiceSupplier—an airline, say, or a hotel—can create a TripPlan: (In reality, of course, ServiceSupplier.createPlan would accept arguments corresponding to destination, travel class, and so on.)

interface ServiceSupplier { 
    TripPlan createPlan(); 
    String getAlliance();       // for use later 
}

To select the best trip plan, we need to query each of the service providers for their plan for our trip, then compare the resulting plans using a Comparator that reflects our choice criteria (in this case, simply the lowest-priced):

TripPlan selectBestTripPlan(List<ServiceSupplier> serviceList) { 
   List<CompletableFuture<TripPlan>> tripPlanFutures = serviceList.stream() 
     .map(svc -> CompletableFuture.supplyAsync(svc::createPlan, executor)) 
     .collect(toList()); 
    
   return tripPlanFutures.stream() 
     .min(Comparator.comparing(cf -> cf.join().getPrice())) 
     .get().join(); 
}

Notice the intermediate collect operation, made necessary by the laziness of intermediate operations on streams. Without it, the stream terminal operation would be min, whose execution requires first the execution of join for each element of tripPlanFutures. As the code stands, by contrast, the terminal operation is collect which accumulate the CF values resulting from the map operation without blocking, allowing the underlying tasks to execute concurrently.

If the tasks of retrieving the best airline and hotel trip plans are independent, then we would wish to initiate them at the same time, like the image downloads of the previous example. To compose two CFs in this way, use CompletableFuture.thenCombine, which executes both the receiver and the supplied CF in parallel, then combines their result using the supplied function (we assume that the variables airlineshotels, and (later) cars have been declared with the type List<TravelService> and appropriately initialized:

CompletableFuture 
       .supplyAsync(() -> selectBestTripPlan(airlines)) 
       .thenCombine( 
           CompletableFuture.supplyAsync(() -> selectBestTripPlan(hotels)), 
              TripPlan::combine); 

It is instructive to extend the example. Suppose that service suppliers each belong to a travel alliance, represented by a String property alliance. Having booked airline and hotel independently, we might decide that if both belong to the same alliance, then the only car hire services worth considering are those that also belong to that alliance:

  private TripPlan addCarHire(TripPlan p) { 
       List<String> alliances = p.getSuppliers().stream() 
           .map(ServiceSupplier::getAlliance) 
           .distinct() 
           .collect(toList()); 
       if (alliances.size() == 1) { 
           return p.combine(selectBestTripPlan(cars, alliances.get(0))); 
       } else { 
           return p.combine(selectBestTripPlan(cars)); 
       } 
   }

A new overload of selectBestTripPlan will accept a String for the favored alliance and, if it is present, use it to filter the stream of services to be used:

  private TripPlan selectBestTripPlan( 
       List<ServiceSupplier> serviceList, String favoredAlliance) { 

       List<CompletableFuture<TripPlan>> tripPlanFutures = serviceList.stream() 
           .filter(ts -> 
               favoredAlliance == null || ts.getAlliance().equals(favoredAlliance)) 
           .map(svc -> CompletableFuture.supplyAsync(svc::createPlan, executor)) 
           .collect(toList()); 

       ... 
   }

In this case, the CF to select the car hire service is dependent on the CF of the combined flight and hotel reservation tasks. It can complete only when both the flight and hotel bookings have been made. The method that implements this relationship is thenCompose:

CompletableFuture.supplyAsync(() -> selectBestTripPlan(airlines)) 
       .thenCombine( 
            CompletableFuture.supplyAsync(() -> selectBestTripPlan(hotels)), 
                TripPlan::combine) 
       .thenCompose(p -> CompletableFuture.supplyAsync(() -> addCarHire(p)));

The CF resulting from the combined flight and hotel bookings is executed and its result, the combined TripPlan, becomes the input to the function argument of thenCompose. The resulting CF neatly encapsulates the dependency relationships of the different asynchronous services. Key to the conciseness of this code is that although thenCompose composes two CFs, it does not return a CompletableFuture<CompletableFuture<TripPlan>> as you might expect, but a CompletableFuture<TripPlan>. So no matter how many levels of composition are applied in creating a CF, it is not nested but flattened out, and only one operation is required to retrieve its result. This is the characteristic of the “bind” operation (the name comes from Haskell) of a monad like CF, and explains some of the enthusiasm for monads: in this case, for example, we have been able to write in functional form what would otherwise have required an awkward series of explicit task definitions in separate callbacks.

The method thenCombine is only one of a number of ways in which two CFs can be composed. Others include:

  • thenAcceptBoth: like thenCombine but taking a function that returns void;
  • runAfterBoth, which accepts a Runnable for execution after both CFs complete;
  • applyToEither, which takes a unary function, supplying it with the result of whichever CF completes first;
  • acceptEither: like applyToEither but taking a unary function with void result;
  • runAfterEither: accepts a Runnable for execution after either CF completes.

Conclusion

It would not be possible in the space of a short article to explore an API like CompletableFuture thoroughly, but I hope that the examples here have given a sense of the style of concurrent programming that it enables. Composing CompletableFutures with one another, and with other functions, allows the construction of pipeline-like recipes for multiple tasks, with control over synchronous or asynchronous execution and over the dependencies between them. Aspects that you might like to explore in more detail include exception handling, the practical aspects of choosing and configuring an executor, and the interesting challenges that arise in the design of an asynchronous API.

I hope to have clarified the relationship between the two styles of concurrent programming offered in Java 8. In the situations in which fork/join parallelism (including parallel streams) applies, it can be extremely effective in distributing work across multiple cores. But the criteria for its use are quite narrow: the data set should be large and efficiently splittable, operations on individual data elements should be (reasonably) independent of one another, these operations should be fairly expensive, and they should be CPU-intensive. If these conditions do not hold, and especially if your tasks spend much time blocked on I/O or network requests, then CompletableFuture is the better alternative. As Java programmers, we are fortunate to have a platform library that integrates these complementary approaches.

About the Author

Maurice Naftalin has worked in the software industry for four decades as developer, researcher, trainer and author. He is a Java Champion, author or co-author of books on the features introduced in Java 5 and Java 8, and a JavaOne Rockstar in 2013 and 2014. Enjoy Maurice’s treatment on Java Lambdas in his recent book Mastering Lambdas - Java Programming in a Multicore World.

 

Rate this Article

Adoption
Style

BT