# From Imperative Programming to Fork/Join to Parallel Streams in Java 8

Java 8 brings many features that let you write code in a more concise way. For example, instead of writing code as follows:

Collections.sort(transactions, new Comparator<Transaction>(){ public int compare(Transaction t1, Transaction t2){ return t1.getValue().compareTo(t2.getValue()); } });

you can now write the following more compact code that does the same thing but reads a lot closer to the problem statement:

transactions.sort(comparing(Transaction::getValue));

The major features introduced by Java 8 are lambda expressions, method references and the new Streams API. It is considered the largest language change since the advent of Java 20 years ago. To find detailed practical examples of how you can benefit from these features refer to the book Java 8 in Action: Lambdas, Streams and Functional-style programming written by the authors of this article and Alan Mycroft.

These features enable programmers to write more concise code, and additionally they let programmers benefit from multi-core architecture. In fact, writing programs that execute gracefully in parallel is currently the preserve of Java specialists. However, thanks to its new Streams API, Java 8 changes the game and lets everyone more easily write code that leverages multi-core architecture.

In this article we will compare different methods to compute the variance of a large data set using

- An imperative style
- The fork/join framework
- The Streams API

The variance is used in statistics to measure how far a set of numbers is spread out. It can be calculated by averaging the squared difference from the mean of the set of numbers. For example, given the numbers 40, 30, 50 and 80 representing the ages of a population, we can calculate the variance by:

- calculating the mean: (40 + 30 + 50 + 80) / 4 = 50
- taking the square difference from the mean of the set of numbers: (40-50)
^{2 }+ (30-50)^{2}+ (50-50)^{2}+ (80-50)^{2}= 1400 - finally averaging it: 1400/4 = 350

## Imperative style

A typical imperative implementation of the variance formula is as follows:

public static double varianceImperative(double[] population){ double average = 0.0; for(double p: population){ average += p; } average /= population.length; double variance = 0.0; for(double p: population){ variance += (p - average) * (p - average); } return variance/population.length; }

Why is it imperative? Our implementation describes the computation in terms of *a sequence of statements that change state*. Here, we are explicitly iterating through each element of the population array and updating two local variables (average and variance) at every iteration. This kind of code is excellent for a hardware architecture that only has one CPU. Indeed, it maps very straightforwardly to the instruction set of a CPU.

## Fork/Join framework

However, how would you write this implementation to execute on multiple-core architectures? Should you use threads? Should they synchronise at some point? The fork/join framework introduced in Java 7 alleviated some of these difficulties, so let’s try to develop a parallel version of this algorithm using it.

public class ForkJoinCalculator extends RecursiveTask<Double> { public static final long THRESHOLD = 1_000_000; private final SequentialCalculator sequentialCalculator; private final double[] numbers; private final int start; private final int end; public ForkJoinCalculator(double[] numbers, SequentialCalculator sequentialCalculator) { this(numbers, 0, numbers.length, sequentialCalculator); } private ForkJoinCalculator(double[] numbers, int start, int end, SequentialCalculator sequentialCalculator) { this.numbers = numbers; this.start = start; this.end = end; this.sequentialCalculator = sequentialCalculator; } @Override protected Double compute() { int length = end - start; if (length <= THRESHOLD) { return sequentialCalculator.computeSequentially(numbers, start, end); } ForkJoinCalculator leftTask = new ForkJoinCalculator(numbers, start, start + length/2, sequentialCalculator); leftTask.fork(); ForkJoinCalculator rightTask = new ForkJoinCalculator(numbers, start + length/2, end, sequentialCalculator); Double rightResult = rightTask.compute(); Double leftResult = leftTask.join(); return leftResult + rightResult; } }

Here we develop a *RecursiveTask* splitting an array of doubles until the length of a subarray doesn’t go below a given threshold. At this point the subarray is processed sequentially applying on it the operation defined by the following interface.

public interface SequentialCalculator { double computeSequentially(double[] numbers, int start, int end); }

With this infrastructure it is possible to recalculate the variance in parallel as follows.

public static double varianceForkJoin(double[] population){ final ForkJoinPool forkJoinPool = new ForkJoinPool(); double total = forkJoinPool.invoke(new ForkJoinCalculator(population, new SequentialCalculator() { @Override public double computeSequentially(double[] numbers, int start, int end) { double total = 0; for (int i = start; i < end; i++) { total += numbers[i]; } return total; } })); final double average = total / population.length; double variance = forkJoinPool.invoke(new ForkJoinCalculator(population, new SequentialCalculator() { @Override public double computeSequentially(double[] numbers, int start, int end) { double variance = 0; for (int i = start; i < end; i++) { variance += (numbers[i] - average) * (numbers[i] - average); } return variance; } })); return variance / population.length; }

The bottom line is that, even with the help of the fork/join framework, the parallel version is significantly harder to write, and eventually debug, than its sequential counterpart.

## Parallel Streams

Java 8 lets you achieve this in a different way. Instead of writing *how* a computation should be implemented, you describe *what* it does in broad brush strokes using the Streams API. As a result, the library can figure out how to implement the computation for you and make use of various optimisations. This style is called *declarative programming*. In Java 8 specifically, a parallel stream is designed to leverage a multi-core architecture. Let’s see how you can use them to run our first attempt of calculating the variance in a faster way.

We assume that you have some familiarity with streams in this section. However as a refresher, a Stream<T> is a sequence of elements T that support aggregate operations. You can use these operations to create a pipeline which represents a computation just like a pipeline of UNIX commands. A parallel stream is simply a stream that will execute the pipeline in parallel and can be obtained by calling the method parallel() on a normal stream. To brush up on what a stream is, refer to the Javadoc documentation.

The good news is that a few numeric operations such as max, min and average are built-in in the Java 8 API. They can be accessed through primitive specialisations of a Stream: IntStream (primitive int-valued elements), LongStream (primitive long-valued elements) and DoubleStream (primitive double-valued elements). For example, you can simply create a range of numbers with IntStream.rangeClosed(), calculate the maximum or minimum element in a stream using the method max() and min().

Coming back to our initial problem, we would like to use these operations to calculate the variance of a large population. The first step is to create a stream from the population array. We can achieve this using the Arrays.stream() static method:

DoubleStream populationStream = Arrays.stream(population).parallel();

A DoubleStream supports the method average() which we can use:

double average = populationStream.average().orElse(0.0);

The next step is to calculate the variance which makes use of the average. Each element of the population needs first to have the average subtracted from it and the result squared. This can be viewed as a map operation which transforms each element into another one using a lambda expression (double p) -> (p - average) * (p - average). Once this is done we can calculate the sum of all resulting elements by calling the method sum().

But not so fast! Streams can only be consumed once. If we re-use populationStream we will get the surprising error:

java.lang.IllegalStateException: stream has already been operated upon or closed

So we need to get a second stream to calculate the variance as show below:

public static double varianceStreams(double[] population){ double average = Arrays.stream(population).parallel().average().orElse(0.0); double variance = Arrays.stream(population).parallel() .map(p -> (p - average) * (p - average)) .sum() / population.length; return variance; }

By making use of built-in operations in the Streams API we’ve rewritten our initial imperative style code in a declarative and concise way which reads almost like the mathematical definition of the variance. Let’s now explore the performance of the three versions of our implementation.

## Benchmark

We wrote the three versions of our variance algorithm in very different styles. The streams version is the most concise and is written declaratively, which allows the library to decide on an adequate implementation and leverage the multi-core infrastructure. However, you may wonder how they perform? To find out let’s create a benchmark to see how the different versions compare. We calculate the variance of a population of 30 million random numbers between 1 and 140. We used jmh to investigate the performance of each version. Jmh is a Java harness supported by OpenJDK. You can run the benchmark yourself by cloning the project from GitHub.

The benchmark was run on a Macbook Pro 2.3 GHz quad-core Intel Core i7, with 16 GB 1600 MHz DDR3. In addition, we used the following version of JDK8:

java version "1.8.0-ea" Java(TM) SE Runtime Environment (build 1.8.0-ea-b121) Java HotSpot(TM) 64-Bit Server VM (build 25.0-b63, mixed mode)

The results are illustrated in the histogram below. The imperative version took 60ms, the fork/join version 22ms and the streams version 46ms.

These numbers should be treated with caution. It’s likely that you will get very different performance if you run the test on a 32-bit JVM for example. However, it is interesting to notice that adopting a different programming style using the Streams API in Java 8 opens the door for optimisations behind the scenes that are not possible in a strictly imperative style and in a much more straightforward way than is possible with fork/join.

## About the Authors

**Raoul-Gabriel Urma** started his PhD in Computer Science at the University of Cambridge at the age of 20. His research centers on programming languages and software engineering. He holds a MEng in Computer Science from Imperial College London and graduated with first class honors, having won several prizes for technical innovation. He has worked for large companies such as Google, eBay, Oracle, and Goldman Sachs, as well as for several startup projects. In addition, he is a frequent speaker at Java developer conferences and is also an instructor for Java courses. Twitter: @raoulUK and Website.

**Mario Fusco** is a senior software engineer at Red Hat working at the development of the core of Drools, the JBoss rules engine. He has extensive experience as a Java developer, having been involved in (and often leading) many enterprise level projects in industries ranging from media companies to the financial sector. Among his interests are functional programming and domain specific languages. By leveraging these two passions, he created the open source library lambdaj with the purposes of providing an internal Java DSL for manipulating collections and for allowing a bit of functional programming in Java. Twitter: @mariofusco.

## Hello stranger!

You need to Register an InfoQ account or Login or login to post comments. But there's so much more behind being registered.## Get the most out of the InfoQ experience.

### Tell us what you think

**Parallel Java Streams**
by
Edward Harned

Perhaps I can shine a light on why the basic F/J is so much faster than the streams version. The streams do not do parallel processing, they do paraquential,

coopsoft.com/ar/Calamity2Article.html#para

Since F/J is essentially a failure it is necessary to switch to sequential to avoid stack overflows and out of memory errors.

Yes, I wrote the article. I've been doing parallel applications for several decades and I pointed out the problems with F/J years ago to no avail.

ed

**No need to calculate average first when streaming**
by
Markus Krüger

// Helper class for intermediate fields.

private class VarianceTmp {

double mean, m2;

int count;

}

public static double varianceStreams(double[] population) {

VarianceTmp tmp = Arrays.stream(population).parallel().collect(

VarianceTmp::new,

(tmp, x) -> {

tmp.count++;

double delta = x - tmp.mean;

tmp.mean += delta / tmp.count;

tmp.m2 += delta * (x - tmp.mean);

},

(tmpA, tmpB) -> {

double delta = tmpB.mean - tmpA.mean;

int countAB = tmpA.count + tmpB.count;

tmpA.mean += delta * tmpB.mean / countAB;

tmpA.m2 += tmpB.m2

+ delta * delta * tmpA.count * tmpB.count / countAB;

tmpA.count = countAB;

});

return (tmp.count <= 1) ? 0 : tmp.m2 / (tmp.count - 1);

}

An added benefit of this approach is that the input argument doesn't need to be an array, it could be an iterator or stream; there is no need to keep all values in memory at once.

**Re: No need to calculate average first when streaming**
by
Rajesh Joshi

**Re: No need to calculate average first when streaming**
by
Markus Krüger

However, as the article currently stands, a reader might believe that the implementations shown here are suitable for real use. Which sadly is not the case, for reasons listed in the wikipedia article I link to.

Commenting on how to improve the calculation of variance also gave me a possibility to demonstrate the use of Stream.collect(), which I believe to be a useful tool.

**Other performance from my laptop**
by
benjamin fuentes

The information about OS

Name of the OS: Windows 8.1

Version of the OS: 6.3

Architecture of THe OS: amd64

Available processors (cores): 4

Free memory (bytes): 60689448

Maximum memory (bytes): 919076864

average :49.99299774874399

Sequential variance : 833.454385192641

Time : 160ms

average :49.992997748747015

Fork variance : 833.4543851928006

Time : 237ms

average :49.99299774874777

Stream variance : 833.4543851928023

Time : 273ms

weird, that average calculation differs on same array of elements ...

**Re: Other performance from my laptop**
by
benjamin fuentes

---------------

The information about OS

Name of the OS: Windows 8.1

Version of the OS: 6.3

Architecture of THe OS: amd64

Available processors (cores): 4

Free memory (bytes): 60689448

Maximum memory (bytes): 919076864

average :50.00379441649668

Sequential variance : 833.6599070601363

Time : 158ms

average :50.003794416505926

Fork variance : 833.6599070600581

Time : 236ms

average :50.00379441650572

Stream variance : 833.6599070600663

Time : 274ms

average :50.00379441650572

Stream variance2 : 833.6599070600663

Time : 68ms

---------------

// Helper class for intermediate fields.

public static class VarianceTmp {

double mean, m2;

int count;

}

public static double varianceStreams2(double[] population) {

VarianceTmp varianceTmp = Arrays.stream(population).parallel().collect(

VarianceTmp::new,

(tmp, x) -> {

tmp.count++;

double delta = x - tmp.mean;

tmp.mean += delta / tmp.count;

tmp.m2 += delta * (x - tmp.mean);

},

(tmpA, tmpB) -> {

double delta = tmpB.mean - tmpA.mean;

int countAB = tmpA.count + tmpB.count;

tmpA.mean += delta * tmpB.mean / countAB;

tmpA.m2 += tmpB.m2

+ delta * delta * tmpA.count * tmpB.count / countAB;

tmpA.count = countAB;

});

return (varianceTmp.count <= 1) ? 0 : varianceTmp.m2 / (varianceTmp.count - 1);

}

**Re: Other performance from my laptop**
by
Markus Krüger

public static double varianceImperative(double[] population){

double average = 0;

double variance = 0;

int count = 0;

for (double p: population) {

double delta = p - average;

average += delta / ++count;

variance += delta * (p - average);

}

return variance / (count - 1);

}

private static double varianceForkJoin(final double[] values) {

class VarianceTmp {

double mean, m2;

int count;

}

@SuppressWarnings("serial")

class VarianceTask extends RecursiveTask<VarianceTmp> {

private int from;

private int to;

public VarianceTask(int from, int to) {

this.from = from; this.to = to;

}

@Override

protected VarianceTmp compute() {

if (to - from <= 1_000_000) {

VarianceTmp tmp = new VarianceTmp();

double mean = 0, m2 = 0;

int count = 0;

for (int i = from; i < to; i++) {

double delta = values - mean;

mean += delta / ++count;

m2 += delta * (values - mean);

}

tmp.mean = mean; tmp.m2 = m2; tmp.count = count;

return tmp;

}

int split = from + (to - from) / 2;

VarianceTask taskA = new VarianceTask(from, split);

VarianceTask taskB = new VarianceTask(split, to);

taskB.fork();

VarianceTmp tmpA = taskA.compute(), tmpB = taskB.join();

double delta = tmpB.mean - tmpA.mean;

int countAB = tmpA.count + tmpB.count;

tmpA.mean =

(tmpA.count * tmpA.mean + tmpB.count * tmpB.mean)

/ (countAB);

tmpA.m2 += tmpB.m2

+ delta * delta * tmpA.count * tmpB.count / countAB;

tmpA.count = countAB;

return tmpA;

}

}

VarianceTmp tmp =

new ForkJoinPool().invoke(new VarianceTask(0, values.length));

return (tmp.count <= 1) ? 0 : tmp.m2 / (tmp.count - 1);

}

Also note that my previous way of combining means was imprecise. I've replaced

tmpA.mean += delta * tmpB.mean / countAB;

with

tmpA.mean = (tmpA.count * tmpA.mean + tmpB.count * tmpB.mean) / (countAB);

in the fork/join code, you should do the same for the stream code.

**Re: No need to calculate average first when streaming**
by
Markus Krüger

tmpA.mean += delta * tmpB.mean / countAB;

with

tmpA.mean = (tmpA.count * tmpA.mean + tmpB.count * tmpB.mean) / (countAB);

**Compensated sum**
by
Paul Sandoz

If you want a more apples-to-apples comparison do:

public static double varianceStreams(double[] population) {

double average = Arrays.stream(population).parallel()

.reduce(0.0, Double::sum) / population.length;

double variance = Arrays.stream(population).parallel()

.map(p -> (p - average) * (p - average))

.reduce(0.0, Double::sum) / population.length;

return variance;

}

My guess is if you do that then the results will be much closer (my results indicate a parallel uncompensated sum is about 2x faster than a compensated sum)

I thought i could optimize the average operation by removing some redundant computation, but it appears the Kahan summation dominates. See the following issue for details:

bugs.openjdk.java.net/browse/JDK-8035561

Oh, and one can safely ignore the first comment on this thread referring to paraquential, it's completely wrong.

**Re: Compensated sum**
by
Edward Harned

Here, there is one ReduceOps$ReduceOp.evaluateParallel

return new ReduceTask<>(this, helper, spliterator).invoke().get();

for the stream but since WorkThreads do not save individual ReduceTask results in a common object (the parallel technique), the ReduceTask.onComplete() uses getLocalResult() for each individual ReduceTask sequentially, hence paraquential.

<>

## Educational Content

## Weathering the Data Storm

Claudia Perlich Nov 21, 2014

## UI: The Functional Final Frontier

David Nolen Nov 21, 2014

Parallel Java Streamsby Edward Harned PostedNo need to calculate average first when streamingby Markus Krüger PostedRe: No need to calculate average first when streamingby Rajesh Joshi PostedRe: No need to calculate average first when streamingby Markus Krüger PostedRe: No need to calculate average first when streamingby Markus Krüger PostedOther performance from my laptopby benjamin fuentes PostedRe: Other performance from my laptopby benjamin fuentes PostedRe: Other performance from my laptopby Markus Krüger PostedCompensated sumby Paul Sandoz PostedRe: Compensated sumby Edward Harned Posted