BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles From Imperative Programming to Fork/Join to Parallel Streams in Java 8

From Imperative Programming to Fork/Join to Parallel Streams in Java 8

Lire ce contenu en français

Java 8 brings many features that let you write code in a more concise way. For example, instead of writing code as follows:

Collections.sort(transactions, new Comparator<Transaction>(){
  public int compare(Transaction t1, Transaction t2){
    return t1.getValue().compareTo(t2.getValue());
  }
});

you can now write the following more compact code that does the same thing but reads a lot closer to the problem statement:

transactions.sort(comparing(Transaction::getValue));

The major features introduced by Java 8 are lambda expressions, method references and the new Streams API. It is considered the largest language change since the advent of Java 20 years ago. To find detailed practical examples of how you can benefit from these features refer to the book Java 8 in Action: Lambdas, Streams and Functional-style programming written by the authors of this article and Alan Mycroft.

These features enable programmers to write more concise code, and additionally they let programmers benefit from multi-core architecture. In fact, writing programs that execute gracefully in parallel is currently the preserve of Java specialists. However, thanks to its new Streams API, Java 8 changes the game and lets everyone more easily write code that leverages multi-core architecture.

In this article we will compare different methods to compute the variance of a large data set using

  1. An imperative style
  2. The fork/join framework
  3. The Streams API

The variance is used in statistics to measure how far a set of numbers is spread out. It can be calculated by averaging the squared difference from the mean of the set of numbers. For example, given the numbers 40, 30, 50 and 80 representing the ages of a population, we can calculate the variance by:

  1. calculating the mean: (40 + 30 + 50 + 80) / 4 = 50
  2. taking the square difference from the mean of the set of numbers: (40-50)2 + (30-50)2 + (50-50)2 + (80-50)2 = 1400
  3. finally averaging it: 1400/4 = 350

Imperative style

A typical imperative implementation of the variance formula is as follows:

 public static double varianceImperative(double[] population){
   double average = 0.0;
   for(double p: population){
      average += p;
   }
   average /= population.length;

   double variance = 0.0;
   for(double p: population){
     variance += (p - average) * (p - average);
   }
   return variance/population.length;
}

Why is it imperative? Our implementation describes the computation in terms of a sequence of statements that change state. Here, we are explicitly iterating through each element of the population array and updating two local variables (average and variance) at every iteration. This kind of code is excellent for a hardware architecture that only has one CPU. Indeed, it maps very straightforwardly to the instruction set of a CPU.

Fork/Join framework

However, how would you write this implementation to execute on multiple-core architectures? Should you use threads? Should they synchronise at some point? The fork/join framework introduced in Java 7 alleviated some of these difficulties, so let’s try to develop a parallel version of this algorithm using it.

 public class ForkJoinCalculator extends RecursiveTask<Double> {

   public static final long THRESHOLD = 1_000_000;

   private final SequentialCalculator sequentialCalculator;
   private final double[] numbers;
   private final int start;
   private final int end;

   public ForkJoinCalculator(double[] numbers, SequentialCalculator sequentialCalculator) {
     this(numbers, 0, numbers.length, sequentialCalculator);
   }

   private ForkJoinCalculator(double[] numbers, int start, int end, SequentialCalculator sequentialCalculator) {
     this.numbers = numbers;
     this.start = start;
     this.end = end;
     this.sequentialCalculator = sequentialCalculator;
   }

   @Override
   protected Double compute() {
     int length = end - start;
     if (length <= THRESHOLD) {
         return sequentialCalculator.computeSequentially(numbers, start, end);
     }
     ForkJoinCalculator leftTask = new ForkJoinCalculator(numbers, start, start + length/2, sequentialCalculator);
     leftTask.fork();
     ForkJoinCalculator rightTask = new ForkJoinCalculator(numbers, start + length/2, end, sequentialCalculator);
     Double rightResult = rightTask.compute();
     Double leftResult = leftTask.join();
     return leftResult + rightResult;
  }
}

Here we develop a RecursiveTask splitting an array of doubles until the length of a subarray doesn’t go below a given threshold. At this point the subarray is processed sequentially applying on it the operation defined by the following interface.

public interface SequentialCalculator {
  double computeSequentially(double[] numbers, int start, int end);
}

With this infrastructure it is possible to recalculate the variance in parallel as follows.

 public static double varianceForkJoin(double[] population){
   final ForkJoinPool forkJoinPool = new ForkJoinPool();
   double total = forkJoinPool.invoke(new ForkJoinCalculator(population, new SequentialCalculator() {
     @Override
     public double computeSequentially(double[] numbers, int start, int end) {
       double total = 0;
       for (int i = start; i < end; i++) {
         total += numbers[i];
       }
       return total;
     }
  }));
  final double average = total / population.length;
  double variance = forkJoinPool.invoke(new ForkJoinCalculator(population, new SequentialCalculator() {
    @Override
    public double computeSequentially(double[] numbers, int start, int end) {
      double variance = 0;
      for (int i = start; i < end; i++) {
        variance += (numbers[i] - average) * (numbers[i] - average);
      }
      return variance;
    }
 }));
 return variance / population.length;
}

The bottom line is that, even with the help of the fork/join framework, the parallel version is significantly harder to write, and eventually debug, than its sequential counterpart.

Parallel Streams

Java 8 lets you achieve this in a different way. Instead of writing how a computation should be implemented, you describe what it does in broad brush strokes using the Streams API. As a result, the library can figure out how to implement the computation for you and make use of various optimisations. This style is called declarative programming. In Java 8 specifically, a parallel stream is designed to leverage a multi-core architecture. Let’s see how you can use them to run our first attempt of calculating the variance in a faster way.

We assume that you have some familiarity with streams in this section. However as a refresher, a Stream<T> is a sequence of elements T that support aggregate operations. You can use these operations to create a pipeline which represents a computation just like a pipeline of UNIX commands. A parallel stream is simply a stream that will execute the pipeline in parallel and can be obtained by calling the method parallel() on a normal stream. To brush up on what a stream is, refer to the Javadoc documentation.

The good news is that a few numeric operations such as max, min and average are built-in in the Java 8 API. They can be accessed through primitive specialisations of a Stream: IntStream (primitive int-valued elements), LongStream (primitive long-valued elements) and DoubleStream (primitive double-valued elements). For example, you can simply create a range of numbers with IntStream.rangeClosed(), calculate the maximum or minimum element in a stream using the method max() and min().

Coming back to our initial problem, we would like to use these operations to calculate the variance of a large population. The first step is to create a stream from the population array. We can achieve this using the Arrays.stream() static method:

DoubleStream populationStream = Arrays.stream(population).parallel();

A DoubleStream supports the method average() which we can use:

double average = populationStream.average().orElse(0.0);

The next step is to calculate the variance which makes use of the average. Each element of the population needs first to have the average subtracted from it and the result squared. This can be viewed as a map operation which transforms each element into another one using a lambda expression (double p) -> (p - average) * (p - average). Once this is done we can calculate the sum of all resulting elements by calling the method sum().

But not so fast! Streams can only be consumed once. If we re-use populationStream we will get the surprising error:

java.lang.IllegalStateException: stream has already been operated upon or closed

So we need to get a second stream to calculate the variance as show below:

 public static double varianceStreams(double[] population){
   double average = Arrays.stream(population).parallel().average().orElse(0.0);
   double variance = Arrays.stream(population).parallel()
                              .map(p -> (p - average) * (p - average))
                              .sum() / population.length;
   return variance;
}

By making use of built-in operations in the Streams API we’ve rewritten our initial imperative style code in a declarative and concise way which reads almost like the mathematical definition of the variance. Let’s now explore the performance of the three versions of our implementation.

Benchmark

We wrote the three versions of our variance algorithm in very different styles. The streams version is the most concise and is written declaratively, which allows the library to decide on an adequate implementation and leverage the multi-core infrastructure. However, you may wonder how they perform? To find out let’s create a benchmark to see how the different versions compare. We calculate the variance of a population of 30 million random numbers between 1 and 140. We used jmh to investigate the performance of each version. Jmh is a Java harness supported by OpenJDK. You can run the benchmark yourself by cloning the project from GitHub.

The benchmark was run on a Macbook Pro 2.3 GHz quad-core Intel Core i7, with 16 GB 1600 MHz DDR3. In addition, we used the following version of JDK8:

java version "1.8.0-ea"
Java(TM) SE Runtime Environment (build 1.8.0-ea-b121)
Java HotSpot(TM) 64-Bit Server VM (build 25.0-b63, mixed mode)

The results are illustrated in the histogram below. The imperative version took 60ms, the fork/join version 22ms and the streams version 46ms.

These numbers should be treated with caution. It’s likely that you will get very different performance if you run the test on a 32-bit JVM for example. However, it is interesting to notice that adopting a different programming style using the Streams API in Java 8 opens the door for optimisations behind the scenes that are not possible in a strictly imperative style and in a much more straightforward way than is possible with fork/join.

About the Authors

Raoul-Gabriel Urma started his PhD in Computer Science at the University of Cambridge at the age of 20. His research centers on programming languages and software engineering. He holds a MEng in Computer Science from Imperial College London and graduated with first class honors, having won several prizes for technical innovation. He has worked for large companies such as Google, eBay, Oracle, and Goldman Sachs, as well as for several startup projects. In addition, he is a frequent speaker at Java developer conferences and is also an instructor for Java courses. Twitter: @raoulUK and Website.

Mario Fusco is a senior software engineer at Red Hat working at the development of the core of Drools, the JBoss rules engine. He has extensive experience as a Java developer, having been involved in (and often leading) many enterprise level projects in industries ranging from media companies to the financial sector. Among his interests are functional programming and domain specific languages. By leveraging these two passions, he created the open source library lambdaj with the purposes of providing an internal Java DSL for manipulating collections and for allowing a bit of functional programming in Java. Twitter: @mariofusco.

Rate this Article

Adoption
Style

BT