BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles From Imperative Programming to Fork/Join to Parallel Streams in Java 8

From Imperative Programming to Fork/Join to Parallel Streams in Java 8

Lire ce contenu en français

Bookmarks

Java 8 brings many features that let you write code in a more concise way. For example, instead of writing code as follows:

Collections.sort(transactions, new Comparator<Transaction>(){
  public int compare(Transaction t1, Transaction t2){
    return t1.getValue().compareTo(t2.getValue());
  }
});

you can now write the following more compact code that does the same thing but reads a lot closer to the problem statement:

transactions.sort(comparing(Transaction::getValue));

The major features introduced by Java 8 are lambda expressions, method references and the new Streams API. It is considered the largest language change since the advent of Java 20 years ago. To find detailed practical examples of how you can benefit from these features refer to the book Java 8 in Action: Lambdas, Streams and Functional-style programming written by the authors of this article and Alan Mycroft.

These features enable programmers to write more concise code, and additionally they let programmers benefit from multi-core architecture. In fact, writing programs that execute gracefully in parallel is currently the preserve of Java specialists. However, thanks to its new Streams API, Java 8 changes the game and lets everyone more easily write code that leverages multi-core architecture.

In this article we will compare different methods to compute the variance of a large data set using

  1. An imperative style
  2. The fork/join framework
  3. The Streams API

The variance is used in statistics to measure how far a set of numbers is spread out. It can be calculated by averaging the squared difference from the mean of the set of numbers. For example, given the numbers 40, 30, 50 and 80 representing the ages of a population, we can calculate the variance by:

  1. calculating the mean: (40 + 30 + 50 + 80) / 4 = 50
  2. taking the square difference from the mean of the set of numbers: (40-50)2 + (30-50)2 + (50-50)2 + (80-50)2 = 1400
  3. finally averaging it: 1400/4 = 350

Imperative style

A typical imperative implementation of the variance formula is as follows:

 public static double varianceImperative(double[] population){
   double average = 0.0;
   for(double p: population){
      average += p;
   }
   average /= population.length;

   double variance = 0.0;
   for(double p: population){
     variance += (p - average) * (p - average);
   }
   return variance/population.length;
}

Why is it imperative? Our implementation describes the computation in terms of a sequence of statements that change state. Here, we are explicitly iterating through each element of the population array and updating two local variables (average and variance) at every iteration. This kind of code is excellent for a hardware architecture that only has one CPU. Indeed, it maps very straightforwardly to the instruction set of a CPU.

Fork/Join framework

However, how would you write this implementation to execute on multiple-core architectures? Should you use threads? Should they synchronise at some point? The fork/join framework introduced in Java 7 alleviated some of these difficulties, so let’s try to develop a parallel version of this algorithm using it.

 public class ForkJoinCalculator extends RecursiveTask<Double> {

   public static final long THRESHOLD = 1_000_000;

   private final SequentialCalculator sequentialCalculator;
   private final double[] numbers;
   private final int start;
   private final int end;

   public ForkJoinCalculator(double[] numbers, SequentialCalculator sequentialCalculator) {
     this(numbers, 0, numbers.length, sequentialCalculator);
   }

   private ForkJoinCalculator(double[] numbers, int start, int end, SequentialCalculator sequentialCalculator) {
     this.numbers = numbers;
     this.start = start;
     this.end = end;
     this.sequentialCalculator = sequentialCalculator;
   }

   @Override
   protected Double compute() {
     int length = end - start;
     if (length <= THRESHOLD) {
         return sequentialCalculator.computeSequentially(numbers, start, end);
     }
     ForkJoinCalculator leftTask = new ForkJoinCalculator(numbers, start, start + length/2, sequentialCalculator);
     leftTask.fork();
     ForkJoinCalculator rightTask = new ForkJoinCalculator(numbers, start + length/2, end, sequentialCalculator);
     Double rightResult = rightTask.compute();
     Double leftResult = leftTask.join();
     return leftResult + rightResult;
  }
}

Here we develop a RecursiveTask splitting an array of doubles until the length of a subarray doesn’t go below a given threshold. At this point the subarray is processed sequentially applying on it the operation defined by the following interface.

public interface SequentialCalculator {
  double computeSequentially(double[] numbers, int start, int end);
}

With this infrastructure it is possible to recalculate the variance in parallel as follows.

 public static double varianceForkJoin(double[] population){
   final ForkJoinPool forkJoinPool = new ForkJoinPool();
   double total = forkJoinPool.invoke(new ForkJoinCalculator(population, new SequentialCalculator() {
     @Override
     public double computeSequentially(double[] numbers, int start, int end) {
       double total = 0;
       for (int i = start; i < end; i++) {
         total += numbers[i];
       }
       return total;
     }
  }));
  final double average = total / population.length;
  double variance = forkJoinPool.invoke(new ForkJoinCalculator(population, new SequentialCalculator() {
    @Override
    public double computeSequentially(double[] numbers, int start, int end) {
      double variance = 0;
      for (int i = start; i < end; i++) {
        variance += (numbers[i] - average) * (numbers[i] - average);
      }
      return variance;
    }
 }));
 return variance / population.length;
}

The bottom line is that, even with the help of the fork/join framework, the parallel version is significantly harder to write, and eventually debug, than its sequential counterpart.

Parallel Streams

Java 8 lets you achieve this in a different way. Instead of writing how a computation should be implemented, you describe what it does in broad brush strokes using the Streams API. As a result, the library can figure out how to implement the computation for you and make use of various optimisations. This style is called declarative programming. In Java 8 specifically, a parallel stream is designed to leverage a multi-core architecture. Let’s see how you can use them to run our first attempt of calculating the variance in a faster way.

We assume that you have some familiarity with streams in this section. However as a refresher, a Stream<T> is a sequence of elements T that support aggregate operations. You can use these operations to create a pipeline which represents a computation just like a pipeline of UNIX commands. A parallel stream is simply a stream that will execute the pipeline in parallel and can be obtained by calling the method parallel() on a normal stream. To brush up on what a stream is, refer to the Javadoc documentation.

The good news is that a few numeric operations such as max, min and average are built-in in the Java 8 API. They can be accessed through primitive specialisations of a Stream: IntStream (primitive int-valued elements), LongStream (primitive long-valued elements) and DoubleStream (primitive double-valued elements). For example, you can simply create a range of numbers with IntStream.rangeClosed(), calculate the maximum or minimum element in a stream using the method max() and min().

Coming back to our initial problem, we would like to use these operations to calculate the variance of a large population. The first step is to create a stream from the population array. We can achieve this using the Arrays.stream() static method:

DoubleStream populationStream = Arrays.stream(population).parallel();

A DoubleStream supports the method average() which we can use:

double average = populationStream.average().orElse(0.0);

The next step is to calculate the variance which makes use of the average. Each element of the population needs first to have the average subtracted from it and the result squared. This can be viewed as a map operation which transforms each element into another one using a lambda expression (double p) -> (p - average) * (p - average). Once this is done we can calculate the sum of all resulting elements by calling the method sum().

But not so fast! Streams can only be consumed once. If we re-use populationStream we will get the surprising error:

java.lang.IllegalStateException: stream has already been operated upon or closed

So we need to get a second stream to calculate the variance as show below:

 public static double varianceStreams(double[] population){
   double average = Arrays.stream(population).parallel().average().orElse(0.0);
   double variance = Arrays.stream(population).parallel()
                              .map(p -> (p - average) * (p - average))
                              .sum() / population.length;
   return variance;
}

By making use of built-in operations in the Streams API we’ve rewritten our initial imperative style code in a declarative and concise way which reads almost like the mathematical definition of the variance. Let’s now explore the performance of the three versions of our implementation.

Benchmark

We wrote the three versions of our variance algorithm in very different styles. The streams version is the most concise and is written declaratively, which allows the library to decide on an adequate implementation and leverage the multi-core infrastructure. However, you may wonder how they perform? To find out let’s create a benchmark to see how the different versions compare. We calculate the variance of a population of 30 million random numbers between 1 and 140. We used jmh to investigate the performance of each version. Jmh is a Java harness supported by OpenJDK. You can run the benchmark yourself by cloning the project from GitHub.

The benchmark was run on a Macbook Pro 2.3 GHz quad-core Intel Core i7, with 16 GB 1600 MHz DDR3. In addition, we used the following version of JDK8:

java version "1.8.0-ea"
Java(TM) SE Runtime Environment (build 1.8.0-ea-b121)
Java HotSpot(TM) 64-Bit Server VM (build 25.0-b63, mixed mode)

The results are illustrated in the histogram below. The imperative version took 60ms, the fork/join version 22ms and the streams version 46ms.

These numbers should be treated with caution. It’s likely that you will get very different performance if you run the test on a 32-bit JVM for example. However, it is interesting to notice that adopting a different programming style using the Streams API in Java 8 opens the door for optimisations behind the scenes that are not possible in a strictly imperative style and in a much more straightforward way than is possible with fork/join.

About the Authors

Raoul-Gabriel Urma started his PhD in Computer Science at the University of Cambridge at the age of 20. His research centers on programming languages and software engineering. He holds a MEng in Computer Science from Imperial College London and graduated with first class honors, having won several prizes for technical innovation. He has worked for large companies such as Google, eBay, Oracle, and Goldman Sachs, as well as for several startup projects. In addition, he is a frequent speaker at Java developer conferences and is also an instructor for Java courses. Twitter: @raoulUK and Website.

Mario Fusco is a senior software engineer at Red Hat working at the development of the core of Drools, the JBoss rules engine. He has extensive experience as a Java developer, having been involved in (and often leading) many enterprise level projects in industries ranging from media companies to the financial sector. Among his interests are functional programming and domain specific languages. By leveraging these two passions, he created the open source library lambdaj with the purposes of providing an internal Java DSL for manipulating collections and for allowing a bit of functional programming in Java. Twitter: @mariofusco.

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

  • Parallel Java Streams

    by Edward Harned,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Excellent article. Seldom do I see someone comparing sequential/parallel together. The Oracle developers seem focused only on getting something to work, not how well it works.

    Perhaps I can shine a light on why the basic F/J is so much faster than the streams version. The streams do not do parallel processing, they do paraquential,
    coopsoft.com/ar/Calamity2Article.html#para
    Since F/J is essentially a failure it is necessary to switch to sequential to avoid stack overflows and out of memory errors.

    Yes, I wrote the article. I've been doing parallel applications for several decades and I pointed out the problems with F/J years ago to no avail.

    ed

  • No need to calculate average first when streaming

    by Markus Krüger,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    It turns out you don't actually need to know the average before streaming in order to calculate the variance. As explained in the wikipedia article Algorithms for calculating variance, there are mathematical approaches that let you calculate the variance in a single pass, and they can also be modified to run in parallel. Here is a small snippet that makes use of collect() to do so:


    // Helper class for intermediate fields.
    private class VarianceTmp {
    double mean, m2;
    int count;
    }

    public static double varianceStreams(double[] population) {
    VarianceTmp tmp = Arrays.stream(population).parallel().collect(
    VarianceTmp::new,
    (tmp, x) -> {
    tmp.count++;
    double delta = x - tmp.mean;
    tmp.mean += delta / tmp.count;
    tmp.m2 += delta * (x - tmp.mean);
    },
    (tmpA, tmpB) -> {
    double delta = tmpB.mean - tmpA.mean;
    int countAB = tmpA.count + tmpB.count;
    tmpA.mean += delta * tmpB.mean / countAB;
    tmpA.m2 += tmpB.m2
    + delta * delta * tmpA.count * tmpB.count / countAB;
    tmpA.count = countAB;
    });
    return (tmp.count <= 1) ? 0 : tmp.m2 / (tmp.count - 1);
    }


    An added benefit of this approach is that the input argument doesn't need to be an array, it could be an iterator or stream; there is no need to keep all values in memory at once.

  • Re: No need to calculate average first when streaming

    by Rajesh Joshi,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    The emphasis in the article as I see it is more on parallel computing and readability. I agree there are better algorithms, however I think you are missing the point.

  • Re: No need to calculate average first when streaming

    by Markus Krüger,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    I did understand that the focus of the article wasn't on how to calculate variance, that was kind of obvious from the title. :-) And if the article had at least mentioned that the implementation presented was very naive and pointed out that much better approaches exist, that would have been fine. I agree that a proper implementation of calculating variance would have made the article harder to to read.

    However, as the article currently stands, a reader might believe that the implementations shown here are suitable for real use. Which sadly is not the case, for reasons listed in the wikipedia article I link to.

    Commenting on how to improve the calculation of variance also gave me a possibility to demonstrate the use of Stream.collect(), which I believe to be a useful tool.

  • Other performance from my laptop

    by benjamin fuentes,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    i got different results from my corei3 laptop, here below on 30M records :


    The information about OS

    Name of the OS: Windows 8.1
    Version of the OS: 6.3
    Architecture of THe OS: amd64
    Available processors (cores): 4
    Free memory (bytes): 60689448
    Maximum memory (bytes): 919076864

    average :49.99299774874399
    Sequential variance : 833.454385192641
    Time : 160ms

    average :49.992997748747015
    Fork variance : 833.4543851928006
    Time : 237ms

    average :49.99299774874777
    Stream variance : 833.4543851928023
    Time : 273ms



    weird, that average calculation differs on same array of elements ...

  • Re: Other performance from my laptop

    by benjamin fuentes,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    small correction to stream variance calculation above using collect and results : (variance2 from Markus Krüger really ROCKS!!!)

    ---------------


    The information about OS

    Name of the OS: Windows 8.1
    Version of the OS: 6.3
    Architecture of THe OS: amd64
    Available processors (cores): 4
    Free memory (bytes): 60689448
    Maximum memory (bytes): 919076864

    average :50.00379441649668
    Sequential variance : 833.6599070601363
    Time : 158ms

    average :50.003794416505926
    Fork variance : 833.6599070600581
    Time : 236ms

    average :50.00379441650572
    Stream variance : 833.6599070600663
    Time : 274ms

    average :50.00379441650572
    Stream variance2 : 833.6599070600663
    Time : 68ms



    ---------------

    // Helper class for intermediate fields.
    public static class VarianceTmp {
    double mean, m2;
    int count;
    }

    public static double varianceStreams2(double[] population) {
    VarianceTmp varianceTmp = Arrays.stream(population).parallel().collect(
    VarianceTmp::new,
    (tmp, x) -> {
    tmp.count++;
    double delta = x - tmp.mean;
    tmp.mean += delta / tmp.count;
    tmp.m2 += delta * (x - tmp.mean);
    },
    (tmpA, tmpB) -> {
    double delta = tmpB.mean - tmpA.mean;
    int countAB = tmpA.count + tmpB.count;
    tmpA.mean += delta * tmpB.mean / countAB;
    tmpA.m2 += tmpB.m2
    + delta * delta * tmpA.count * tmpB.count / countAB;
    tmpA.count = countAB;
    });
    return (varianceTmp.count <= 1) ? 0 : varianceTmp.m2 / (varianceTmp.count - 1);
    }

  • Re: Other performance from my laptop

    by Markus Krüger,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Interesting results, thanks for sharing. However, for a fair comparison of runtimes, you should modify sequential variance and fork variance as well to use a one-pass algorithm. Here are a couple of implementations you could use:


    public static double varianceImperative(double[] population){
    double average = 0;
    double variance = 0;
    int count = 0;
    for (double p: population) {
    double delta = p - average;
    average += delta / ++count;
    variance += delta * (p - average);
    }
    return variance / (count - 1);
    }


    private static double varianceForkJoin(final double[] values) {
    class VarianceTmp {
    double mean, m2;
    int count;
    }
    @SuppressWarnings("serial")
    class VarianceTask extends RecursiveTask<VarianceTmp> {
    private int from;
    private int to;
    public VarianceTask(int from, int to) {
    this.from = from; this.to = to;
    }
    @Override
    protected VarianceTmp compute() {
    if (to - from <= 1_000_000) {
    VarianceTmp tmp = new VarianceTmp();
    double mean = 0, m2 = 0;
    int count = 0;
    for (int i = from; i < to; i++) {
    double delta = values - mean;
    mean += delta / ++count;
    m2 += delta * (values - mean);
    }
    tmp.mean = mean; tmp.m2 = m2; tmp.count = count;
    return tmp;
    }

    int split = from + (to - from) / 2;
    VarianceTask taskA = new VarianceTask(from, split);
    VarianceTask taskB = new VarianceTask(split, to);
    taskB.fork();
    VarianceTmp tmpA = taskA.compute(), tmpB = taskB.join();
    double delta = tmpB.mean - tmpA.mean;
    int countAB = tmpA.count + tmpB.count;
    tmpA.mean =
    (tmpA.count * tmpA.mean + tmpB.count * tmpB.mean)
    / (countAB);
    tmpA.m2 += tmpB.m2
    + delta * delta * tmpA.count * tmpB.count / countAB;
    tmpA.count = countAB;
    return tmpA;
    }

    }
    VarianceTmp tmp =
    new ForkJoinPool().invoke(new VarianceTask(0, values.length));
    return (tmp.count <= 1) ? 0 : tmp.m2 / (tmp.count - 1);
    }

    Also note that my previous way of combining means was imprecise. I've replaced


    tmpA.mean += delta * tmpB.mean / countAB;


    with


    tmpA.mean = (tmpA.count * tmpA.mean + tmpB.count * tmpB.mean) / (countAB);


    in the fork/join code, you should do the same for the stream code.

  • Re: No need to calculate average first when streaming

    by Markus Krüger,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    The preceding code contains a precision bug. Please replace


    tmpA.mean += delta * tmpB.mean / countAB;

    with

    tmpA.mean = (tmpA.count * tmpA.mean + tmpB.count * tmpB.mean) / (countAB);

  • Compensated sum

    by Paul Sandoz,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    The reason for some of the differences between the F/J version and the stream version is that the average operation performs a compensated sum (Kahan summation).

    If you want a more apples-to-apples comparison do:

    public static double varianceStreams(double[] population) {
    double average = Arrays.stream(population).parallel()
    .reduce(0.0, Double::sum) / population.length;
    double variance = Arrays.stream(population).parallel()
    .map(p -> (p - average) * (p - average))
    .reduce(0.0, Double::sum) / population.length;
    return variance;
    }

    My guess is if you do that then the results will be much closer (my results indicate a parallel uncompensated sum is about 2x faster than a compensated sum)

    I thought i could optimize the average operation by removing some redundant computation, but it appears the Kahan summation dominates. See the following issue for details:

    bugs.openjdk.java.net/browse/JDK-8035561

    Oh, and one can safely ignore the first comment on this thread referring to paraquential, it's completely wrong.

  • Re: Compensated sum

    by Edward Harned,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Not so fast. Paraquential doesn’t just mean a single submission of each Task with invoke(), it also pertains to sequential processing anywhere parallel processing should be done.

    Here, there is one ReduceOps$ReduceOp.evaluateParallel
    return new ReduceTask<>(this, helper, spliterator).invoke().get();
    for the stream but since WorkThreads do not save individual ReduceTask results in a common object (the parallel technique), the ReduceTask.onComplete() uses getLocalResult() for each individual ReduceTask sequentially, hence paraquential.
    <>

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT