BT

Diffuser les Connaissances et l'Innovation dans le Développement Logiciel d'Entreprise

Contribuez

Sujets

Sélectionner votre région

Accueil InfoQ Articles De la Programmation Impérative - aux forks/joins - aux Streams Parallèles en Java8

De la Programmation Impérative - aux forks/joins - aux Streams Parallèles en Java8

Favoris

Java 8 apporte de nombreuses nouveautés qui permettent d'écrire du code de manière plus concise. Par exemple, à la place de ce code :

Collections.sort(transactions, new Comparator<Transaction>(){
  public int compare(Transaction t1, Transaction t2){
    return t1.getValue().compareTo(t2.getValue());
  }
});

vous pouvez maintenant écrire le code suivant, plus compacte, qui remplie la même fonctionnalité, mais qui, à la lecture, s'avère beaucoup plus proche du problème posé :

transactions.sort(comparing(Transaction::getValue));

Les principales fonctionnalités de Java 8 sont les expressions lambda, les références de méthodes et la nouvelle API Streams. Ces changements sont considérés comme étant les plus importants depuis l'apparition de Java, il y a 20 ans. Pour trouver des exemples pratiques détaillés, de la manière dont vous pouvez tirer partie au mieux de ces fonctionnalités, vous pouvez vous référer au livre Java 8 in Action: Lambdas, Streams and Functional-style programming écrit par les auteurs de cet article et Alan Mycroft.

Ces fonctionnalités permettent aux développeurs d'écrire du code plus concis, et, en plus, de tirer partie des architectures multi-coeurs. En fait, écrire des programmes qui tirent correctement partie de la parallélisation est, dans l'état actuel des choses, réservé aux spécialistes de Java. Maintenant, grâce à sa nouvelle API Streams, Java 8 change la donne et permet à n'importe qui d'écrire plus facilement du code qui exploite les architectures multi-coeurs.

Dans cet article, nous allons comparer différentes méthodes pour calculer la variance d'un grand ensemble de données, en utilisant :

  1. le style impératif
  2. le framework fork/join
  3. l'API Streams

La variance est utilisée en statistiques pour mesurer la dispersion d'un ensemble de données. Elle peut être calculée par moyenne des carrés des écarts à la moyenne. Par exemple, si nous prenons les nombres 40, 30, 50 et 80 qui représentent les âges d'une population, nous pouvons calculer la variance de la manière suivante :

  1. calculer la moyenne : (40 + 30 + 50 + 80) / 4 = 50
  2. additionner les écarts élevés au carré : (40-50)2 + (30-50)2 + (50-50)2 + (80-50)2 = 1400
  3. et finalement, calculer la moyenne de cette somme : 1400/4 = 350

Style Impératif

Une implémentation typique de la formule de la variance serait la suivante :

public static double varianceImperative(double[] population){
   double average = 0.0;
   for(double p: population){
      average += p;
   }
   average /= population.length;

   double variance = 0.0;
   for(double p: population){
     variance += (p - average) * (p - average);
   }
   return variance/population.length;
}

Pourquoi est-elle impérative ? Notre implémentation décrit le calcul comme un ensemble d'instructions qui modifient l'état. Ici, nous faisons une itération explicite sur chacun des éléments du tableau de la population et nous mettons à jour deux variables locales (average et variance) a chaque itération. Ce type de code est excellent pour les architectures physiques qui ne disposent que d'un seul CPU. En effet, ces instructions correspondent pratiquement directement aux instructions qui sont soumises au CPU.

Fork/Join framework

Comment écririez-vous cette implémentation pour une exécution dans une architecture multi-coeurs ? Devrions-nous utiliser des threads? Devrions-nous les synchroniser par endroits ? Le framework fork/join qui est apparu dans Java 7 a permis d'atténuer ce type de problèmes, essayons maintenant de développer une version parallélisée de l'algorithme en se basant sur fork/join.

public class ForkJoinCalculator extends RecursiveTask<Double> {

    public static final long THRESHOLD = 1_000_000;

    private final SequentialCalculator sequentialCalculator;
    private final double[] numbers;
    private final int start;
    private final int end;

    public ForkJoinCalculator(double[] numbers, SequentialCalculator sequentialCalculator) {
        this(numbers, 0, numbers.length, sequentialCalculator);
    }

    private ForkJoinCalculator(double[] numbers, int start, int end, SequentialCalculator sequentialCalculator) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
        this.sequentialCalculator = sequentialCalculator;
    }

    @Override
    protected Double compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return sequentialCalculator.computeSequentially(numbers, start, end);
        }
        ForkJoinCalculator leftTask = new ForkJoinCalculator(numbers, start, start + length/2, sequentialCalculator);
        leftTask.fork();
        ForkJoinCalculator rightTask = new ForkJoinCalculator(numbers, start + length/2, end, sequentialCalculator);
        Double rightResult = rightTask.compute();
        Double leftResult = leftTask.join();
        return leftResult + rightResult;
    }
}

Nous avons développé un RecursiveTask qui divise le tableau de doubles jusqu'à ce que la taille des sous-tableaux descende en dessous d'une limite fixée. Ensuite, chaque sous-tableau est traité séquentiellement en appliquant l'opération définie par l'interface suivante :

public interface SequentialCalculator {
  double computeSequentially(double[] numbers, int start, int end);
}

Avec cette infrastructure, il est possible de recalculer la variance parallèlement, de la manière suivante :

public static double varianceForkJoin(double[] population){
   final ForkJoinPool forkJoinPool = new ForkJoinPool();
   double total = forkJoinPool.invoke(new ForkJoinCalculator(population, new SequentialCalculator() {
     @Override
     public double computeSequentially(double[] numbers, int start, int end) {
       double total = 0;
       for (int i = start; i < end; i++) {
         total += numbers[i];
       }
       return total;
     }
  }));
  final double average = total / population.length;
  double variance = forkJoinPool.invoke(new ForkJoinCalculator(population, new SequentialCalculator() {
    @Override
    public double computeSequentially(double[] numbers, int start, int end) {
      double variance = 0;
      for (int i = start; i < end; i++) {
        variance += (numbers[i] - average) * (numbers[i] - average);
      }
      return variance;
    }
 }));
 return variance / population.length;
}

La dernière ligne renvoie le résultat. Même avec l'aide du framework fork/join, on constate que la version parallélisée est significativement plus complexe à écrire, et éventuellement à débugger, que la version séquentielle.

Streams parallélisés

Java 8 permet de faire ce traitement d'une manière différente. Au lieu d'écrire comment (comment faire le calcul), vous décrivez grossièrement quoi (ce qu'il doit faire) en utilisant l'API Streams. Ensuite, la bibliothèque peut décider la meilleurs méthode pour effectuer le calcul à votre place et utiliser un ensemble d'optimisations. On appelle ce style de programmation la programmation déclarative. En Java 8 spécifiquement, un stream parallélisé est conçu pour s'appuyer sur les architectures multi-coeurs. Voyons comment vous pouvez les utiliser pour tenter de calculer la variance de manière plus rapide.

Nous partons du principe que vous avez quelques connaissances sur les streams dans cette section. Afin de vous rafraîchir la mémoire, un Stream est une séquence d'éléments T qui supporte les opérations d'agrégation. Vous pouvez utiliser ces opérations pour créer un pipeline de calcul exactement comme un pipeline de commandes UNIX. Un stream parallélisé est simplement un stream qui exécutera le pipeline de manière parallélisée et qui peut être obtenu simplement en appelant la méthode parallel() sur un stream normal. Pour rafraîchir vos connaissances sur ce qu'est un stream, vous pouvez vous référer à la javadoc.

La bonne nouvelle est que quelques opérations numériques telles que max, min et average sont nativement intégrées dans l'API de Java 8. Elles sont accessibles par des versions spécialisées de Stream pour les primitives : IntStream (éléments primitifs de type int), LongStream (éléments primitifs de type long) et DoubleStream (éléments primitifs de type double). Par exemple, vous pouvez simplement créer une série de nombres à l'aide de IntStream.rangeClosed() et calculer le minimum ou le maximum du stream à l'aide des méthodes max() et min().

Revenons à notre problème initial, nous aimerions utiliser ces opérations pour calculer la variance d'une large population. La première étape est de créer un stream à partir du tableau de population. Nous pouvons effectuer cette opération à l'aide de la méthode statique Arrays.stream() :

DoubleStream populationStream = Arrays.stream(population).parallel();

Un DoubleStream supporte la méthode average() que nous pouvons utiliser :

double average = populationStream.average().orElse(0.0);

L'étape suivante est de calculer la variance qui utilise cette moyenne. Chaque élément de la population a d'abord besoin de la moyenne soustraite à lui-même, le résultat est ensuite porté au carré. Ceci peut être vu comme une opération de type map qui transforme chaque élément en un autre en utlisant une expression lambda (double p) -> (p - average) * (p - average). Une fois que c'est fait, nous pouvons calculer la somme des éléments résultants à l'aide de la méthode sum().

Mais pas si vite ! Les streams peuvent être consommés uniquement une fois. Si nous réutilisons populationStream, nous aurons la surprenante erreur :

 

java.lang.IllegalStateException: stream has already been operated upon or closed

 

Nous avons donc besoin d'un second stream pour calculer la viariance comme nous l'avons vu plus tôt :

public static double varianceStreams(double[] population){
   double average = Arrays.stream(population).parallel().average().orElse(0.0);
   double variance = Arrays.stream(population).parallel()
                              .map(p -> (p - average) * (p - average))
                              .sum() / population.length;
   return variance;
}

En utilisant des opérations natives de l'API Streams, nous avons réécrit notre version initiale du code impératif de manière déclarative et beaucoup plus concise, qui se lit pratiquement comme une définition mathématique de la variance. Intéressons-nous à présent aux performances des trois versions implémentées.

Benchmark

Nous avons écrit les trois versions de notre algorithme de variance dans trois styles de code très différents. La version à base de streams est la plus concise et est écrite de manière déclarative, ce qui permet à la librairie de décider de l'implémentation la plus adéquate pour tirer profit au mieux de l'infrastructure multi-coeurs. Cependant, vous vous posez peut être des questions sur les performances ? Pour connaître la réponse, créons un benchmark pour comparer les différentes versions. Nous calculons la variance d'une population de 30 millions de nombres compris entre 1 et 140. Nous avons utilisé jmh pour relever les performances de chacune des versions. Jmh est un outil de benchmark supporté par OpenJDK. Vous pouvez lancer ce benchmark vous-mêmes en clonant le projet sur GitHub.

Le benchmark a été lancé sur un Macbook Pro 2.3Ghz quad core Intel Core i7, avec 16Go de DDR3 à 1600MHz. De plus, nous avons utilisé la version suivante du JDK 8 :

java version "1.8.0-ea"
Java(TM) SE Runtime Environment (build 1.8.0-ea-b121)
Java HotSpot(TM) 64-Bit Server VM (build 25.0-b63, mixed mode)

Les résultats sont illustrés dans l'histogramme ci-dessous. La version impérative a pris 60ms, la version à base de fork/join a pris 22ms et la version avec les streams a pris 46ms.

Ces chiffres doivent être considérés avec précaution. Il est probable que vous obteniez des performances très différentes si vous lancez ce test sur une JVM 32-bit par exemple. Il est intéressant de noter que le fait d'adopter un style de programmation différent basé sur l'API Strams de Java 8 nous ouvre les portes d'une optimisation "en coulisse" qui n'est pas possible avec un code purement impératif, et d'une manière beaucoup plus directe que ce que nous offre la version fork/join.

A propos des auteurs

Raoul-Gabriel Urma a commencé son doctorat d'informatique à l'Université de Cambridge à l'age de 20 ans. Ses recherches se concentrent sur les langages de programmation et l'ingénierie logicielle. Il est titulaire d'un Master of Engineering de l'Imperial College of London dont il a été major, en ayant remporté un ensemble de prix pour l'innovation technique. Il a travaillé pour de grosses sociétés telles que Google, eBay, Oracle et Goldman Sachs, aussi bien que pour des startups. De plus, il intervient fréquemment dans les conférences autour de Java et donne aussi des cours de Java. Twitter : @raoulUK et Website.

Mario Fusco est un Ingénieur logiciel senior chez Red Hat qui travaille sur les développements du noyau de Drools, le moteur de règles de JBoss. Il a beaucoup d'expérience en tant que Développeur Java, ayant été impliqué (et souvent à la tête) de beaucoup de projets d'entreprises allant des compagnies de médias au secteur financier. Parmi ses centres d'intérêts, on trouve la programmation fonctionnelle et les langages dédiés (DSL). En partant de ses deux passions, il a créé la librairie open source lanbdaj dont l'objet est de proposer un langage DSL Java qui permet de manipuler les collections et qui permet de faire un peu de programmation fonctionnelle en Java. Twitter : @mariofusco.

Evaluer cet article

Pertinence
Style

Contenu Éducatif

BT