BT

Distributions de calculs sur un cluster avec JPPF

| Écrit par Julien Sebrien Suivre 0 Abonnés le 23 févr. 2015. Durée de lecture estimée: 9 minutes |

Introduction

Dans cet article, nous allons voir comment il est possible d'exécuter des tâches (Runnable/Callable) de manière distribuée sur un cluster JPPF.

JPPF

JPPF est un framework facilitant la parallélisation de tâches computationnelles intensives, ainsi que leur distribution sur une grille de calcul.

Cas pratique

A titre d'exemple, nous allons soumettre un ensemble de traitements calculant la liste des nombres premiers compris entre 1 et une valeur maximale aléatoire, fixée à l'initialisation.

Les étapes permettant d'effectuer ces tâches sont :

  • Démarrage d'un "Driver" JPPF, et d'au moins 2 "Node" (nous verrons pour quelle raison dans la suite de l'article). Des informations complémentaires sur l'architecture d'une grille de calcul JPPF sont accessibles ici.
  • Instanciation d'un "client" JPPF et connexion au driver, puis soumission d'un job qui créera les traitements destinés à être parallélisés sur la grille. Ces traitements seront représentés par des instances implémentant Callable, et soumis à un JPPFExecutorService, qui exploitera le cluster existant pour exécuter les tâches soumises.
  • Attente de la terminaison du job et sortie du programme.

Tout le code présenté dans cet article sera en Java.

Prérequis

Pour exécuter le programme détaillé par la suite, vous aurez besoin de :

  • JDK 7 ou supérieur
  • Git
  • Maven
  • JPPF 4.2.4 ou supérieur

Démarrage du Driver JPPF

Télécharger et décompresser les distributions JPPF "server/driver" et "node", disponibles ici.

Puis exécuter le script :

  • Sous Windows :
startDriver.bat
  • Sous Unix/linux :
./startDriver.sh

La console affichera la log suivante :

driver process id: 9668, uuid: C8CDDAF1-C3AF-8C5D-8D55-62D9AAAC980C
management initialized and listening on port 11198
ClientClassServer initialized
NodeClassServer initialized
ClientJobServer initialized
NodeJobServer initialized
Acceptor initialized
- accepting plain connections on port 11111
- accepting secure connections on port 11443
JPPF Driver initialization complete

Démarrage du premier Node JPPF

  • Sous Windows :
startNode.bat
  • Sous Unix/linux :
./startNode.sh

La console affichera la log suivante :

node process id: 9400, uuid: 8567FE0A-E66C-DEA4-85EF-13F23AE2384D
Attempting connection to the class server at 194.119.69.152:11111
RemoteClassLoaderConnection: Reconnected to the class server
JPPF Node management initialized on port 12001
Attempting connection to the node server at 194.119.69.152:11111
Reconnected to the node server
Node successfully initialized

Démarrage du second Node JPPF

  • Sous Windows :
startNode.bat
  • Sous Unix/linux :
./startNode.sh

La console affichera la log suivante :

ode process id: 4548, uuid: 16E60458-10F9-65DD-0406-FB42A5CCD3E5
Attempting connection to the class server at 194.119.69.152:11111
RemoteClassLoaderConnection: Reconnected to the class server
JPPF Node management initialized on port 12002
Attempting connection to the node server at 194.119.69.152:11111
Reconnected to the node server
Node successfully initialized

Noter que le port de management est désormais 12002.

Création d'un client JPPF

Un Client nécessite au minimum l'adresse et le port du driver :

public final class JobUtil {
  ...
  public static JPPFClient buildClient(){
    TypedProperties props = JPPFConfiguration.getProperties();
    props.setProperty("jppf.discovery.enabled""false");
    props.setProperty("jppf.drivers""driver1");
    props.setProperty("driver1.jppf.server.host""localhost");
    props.setProperty("driver1.jppf.server.port""11111");
    return new JPPFClient();
  }
}

Soumission d'un job avec une tâche

Lorsque l'on utilise JPPF, des jobs, contenant 1 ou plusieurs tâches, sont soumis au driver à l'aide du client. Une tâche est la plus petite unité d'exécution. Chacun des noeuds actifs et disponibles sont conçus pour n'exécuter qu'une ou plusieurs tâches d'un même job à la fois. Ainsi, un noeud ne peut exécuter plusieurs tâches appartenant à des jobs distincts simultanément, à un instant donné.

Dans le cas présent, nous allons soumettre un job, contenant une unique tâche (ComputingTask). Nous décrirons l'objectif de cette tâche dans la section suivante. Toutes les tâches soumises à un cluster JPPF doivent implémenter Serializable.

Optionnellement, il est possible de paramétrer des "job listeners" afin d'être averti des changements de statut des jobs. Nous utiliserons ici un listener simple (ComputingJobListener), dont le rôle est d'afficher tous les évènements reçus et produits par le framework JPPF, durant toute l'exécution du job.

Voici le code permettant de soumettre un job avec une ComputingTask :

...
public static void main(String[] args) {
  try (JPPFClient jppfClient = JobUtil.buildClient()){
    JPPFJob job = new JPPFJob(String.valueOf(System.currentTimeMillis()));
    job.setBlocking(true);
    Task<?> task = job.add(new ComputingTask(new ComputingTaskParams(10)));
    task.setId(String.valueOf(System.currentTimeMillis()));
    job.addJobListener(new ComputingJobListener());
    List<Task<?>> results = jppfClient.submitJob(job);
    ...
  } catch (Exception e) {
    LOGGER.error(e.getMessage(), e);
  }
}
...

La tâche ComputingTask

Cette tâche va instancier un JPPFExecutorService afin d'exécuter des traitements simples, dont l'exécution sera parallélisée au sein de la grille JPPF existante (qui doit contenir au moins 2 noeuds actifs ici). Chaque traitement soumis (implémentant l'interface Callable) a pour fonction de générer la liste des nombres premiers compris entre 1 et une valeur maximale aléatoire.

Voici le code de la tâche ComputingTask qui sera exécuté sur un noeud JPPF :

public class ComputingTask extends AbstractTask<Boolean>{
  ...
  @Override
  public void run() {
    System.out.println("Starting Computing task...");
    try(JPPFClient jppfClient = JobUtil.buildClient();){
      JPPFExecutorService executorService = new JPPFExecutorService(jppfClient);
      ExecutorServiceConfiguration executorConfig = ((JPPFExecutorService)executorService).getConfiguration();
      ExecutionPolicy uuidExclusion = ExecutionPolicy.Not(new Equal("jppf.uuid"falseNodeRunner.getUuid()));
      executorConfig.getJobConfiguration().getSLA().setExecutionPolicy(uuidExclusion);
      int parallelism = params.getParallelism();
      List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(parallelism);
      for(int i=0 ; i < parallelism ; i++){
        futures.add(executorService.submit(new PrimeNumberGenerator(i, random.nextInt(100))));
      }
      for(int i=0 ; i < parallelism ; i++){
        System.out.println("PrimeNumberGenerator("+i+") terminated with: "+ futures.get(i).get());
      }
      executorService.shutdownNow();
      setResult(Boolean.TRUE);
    }catch(Exception e){
      System.err.println(e.getMessage());
      e.printStackTrace();
      setResult(Boolean.FALSE);
    }
    System.out.println("Computing task ended!");
  }
}

Concrètement, nous utilisons un objet de type JPPFClient, qui va être utilisé pour initialiser une instance de JPPFExecutorService, à qui nous soumettrons nos instances de Callable. Ensuite, nous informons le programme appelant du succès de l'exécution en appelant setResult(Boolean.TRUE).

Néanmoins, il y a un détail à prendre en compte lors de la création de notre JPPFExecutorService. Le framework JPPF va créer un job par Callable soumis. Ainsi, nous devons nous assurer que ces nouveaux jobs "fils" ne soient pas mis en attente dans la queue du noeud en cours d'utilisation (car un noeud ne peut exécuter des tâches de plusieurs jobs simultanément). Avec JPPF, il est possible de résoudre ce problème en paramétrant une politique d'exécution stipulant que chacun des jobs fils (plus spécifiquement, l'unique tâche de ces nouveaux jobs) ne puissent être exécutés par le noeud courant.

Voici le code permettant de configurer cette politique d'exécution :

JPPFExecutorService executorService = new JPPFExecutorService(jppfClient);
ExecutorServiceConfiguration executorConfig = ((JPPFExecutorService)executorService).getConfiguration();
ExecutionPolicy uuidExclusion = ExecutionPolicy.Not(new Equal("jppf.uuid"falseNodeRunner.getUuid()));
executorConfig.getJobConfiguration().getSLA().setExecutionPolicy(uuidExclusion);

Dès lors que chaque job "fils" (correspondant à chaque Callable soumis au JPPFExecutorService) sera traité par le framework JPPF, cette politique de répartition de l'exécution sera appliquée :

ExecutionPolicy uuidExclusion = ExecutionPolicy.Not(new Equal("jppf.uuid"falseNodeRunner.getUuid()));

Ici, nous spécifions que le noeud qui devra être choisi pour exécuter ces jobs "fils" devra être différent du noeud courant (dont l'id est NodeRunner.getUuid()).

Si nous n'avions pas spécifié cette politique d'exécution, les jobs "fils" pourraient être mis en attente dans la file d'attente du noeud courant. Dans ce cas précis, la tâche originale (ComputingTask) serait indéfiniment en attente de l'exécution de ces jobs "fils"(via les appels Future.get()), car un noeud ne peut exécuter plusieurs tâches (provenant de plusieurs jobs) de manière concurrente. On serait typiquement confronté à un "deadlock".

C'est la raison pour laquelle il est nécessaire de démarrer au moins 2 noeuds pour faire fonctionner cet exemple. Le driver sera ainsi en mesure de dispatcher les jobs "fils" sur les noeuds libres (n'exécutant pas la ComputingTask) de la grille de calcul.

Implémentation du PrimeNumberGenerator

La classe PrimeNumberGenerator est un Callable qui affiche simplement les nombres premiers compris entre 1 et une valeur maximale fixée à l'initialisation :

public class PrimeNumberGenerator implements Callable<Boolean>Serializable {
  ...
  @Override
  public Boolean call() throws Exception {
    StringBuilder builder = new StringBuilder();
    for (int i = 1; i < limit; i++) {
      boolean isPrime = true;
      for (int j = 2; j < i; j++) {
        if (i % j == 0) {
          isPrime = false;
          break;
        }
      }
    if (isPrime){
      builder.append(i).append(" ");
    }
  }
  System.out.println("PrimeNumberGenerator("+index+") - Prime numbers between 1 and " + limit + ":   "+builder.toString());
  return Boolean.TRUE;
  }
}

Exécution du programme

Une fois le programme lancé, nous obtenons la log suivante dans la console du noeud JPPF libre exécutant les callables :

PrimeNumberGenerator(0) - Prime numbers between 1 and 30: 1 2 3 5 7 11 13 17 19 23 29
PrimeNumberGenerator(1) - Prime numbers between 1 and 65: 1 2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61
PrimeNumberGenerator(2) - Prime numbers between 1 and 57: 1 2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53
PrimeNumberGenerator(3) - Prime numbers between 1 and 59: 1 2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53
PrimeNumberGenerator(4) - Prime numbers between 1 and 31: 1 2 3 5 7 11 13 17 19 23 29
PrimeNumberGenerator(5) - Prime numbers between 1 and 27: 1 2 3 5 7 11 13 17 19 23
PrimeNumberGenerator(6) - Prime numbers between 1 and 5: 1 2 3
PrimeNumberGenerator(7) - Prime numbers between 1 and 52: 1 2 3 5 7 11 13 17 19 23 29 31 37 41 43 47
PrimeNumberGenerator(8) - Prime numbers between 1 and 6: 1 2 3 5
PrimeNumberGenerator(9) - Prime numbers between 1 and 17: 1 2 3 5 

Si plus de 2 noeuds sont lancés, les 10 callables (de type PrimeNumberGenerator) seront exécutés de manière concurrente sur l'ensemble des noeuds libres du cluster !

Le code complet est accessible sur Github

Conclusion

Il est finalement relativement simple de distribuer du calcul sur un cluster avec JPPF. En effet, il suffit d'utiliser l'implémentation de l'interface ExecutorService proposée par le framework afin de profiter instantanément d'une scalabilité horizontale !

Au sujet de l'Auteur

Julien est un développeur, bloggeur, passionné par les nouvelles technologies, et travaille à la mise en place de solutions basées sur Cassandra, Elasticsearch, Spark, etc. pour des acteurs bancaires ou des startups. Sur son temps libre, julien s'intéresse aussi tout particulièrement aux dernières avancées concernant le machine learning et les algorithmes génétiques.

Evaluer cet article

Pertinence
Style

Bonjour étranger!

Vous devez créer un compte InfoQ ou cliquez sur pour déposer des commentaires. Mais il y a bien d'autres avantages à s'enregistrer.

Tirez le meilleur d'InfoQ

Donnez-nous votre avis

Html autorisé: a,b,br,blockquote,i,li,pre,u,ul,p

M'envoyer un email pour toute réponse à l'un de mes messages dans ce sujet
Commentaires de la Communauté

Html autorisé: a,b,br,blockquote,i,li,pre,u,ul,p

M'envoyer un email pour toute réponse à l'un de mes messages dans ce sujet

Html autorisé: a,b,br,blockquote,i,li,pre,u,ul,p

M'envoyer un email pour toute réponse à l'un de mes messages dans ce sujet

Discuter

Se connecter à InfoQ pour interagir sur ce qui vous importe le plus.


Récupérer votre mot de passe

Follow

Suivre vos sujets et éditeurs favoris

Bref aperçu des points saillants de l'industrie et sur le site.

Like

More signal, less noise

Créez votre propre flux en choisissant les sujets que vous souhaitez lire et les éditeurs dont vous désirez suivre les nouvelles.

Notifications

Restez à jour

Paramétrez vos notifications et ne ratez pas le contenu qui vous importe

BT