BT

Diffuser les Connaissances et l'Innovation dans le Développement Logiciel d'Entreprise

Contribuez

Sujets

Sélectionner votre région

Accueil InfoQ Actualités MongoDB : la magie des Reactive-Streams avec les Capped Collections

MongoDB : la magie des Reactive-Streams avec les Capped Collections

Favoris

MongoDB vient de publier son driver Java pour les traitements réactifs et non-bloquants. Bien que ce driver repose entièrement sur son homologue MongoDB Async Java Driver, il met pleinement en œuvre l'API reactive-streams pour fournir une interopérabilité transparente avec d'autres flux réactifs dans l'écosystème JVM.

En guise de rappel, MongoDB offre une fonctionnalité intéressante connexe : les capped collections sous forme d’un anneau circulaire de taille fixe. Le présent article essaie d’exploiter les capped collections d’une manière réactive.

Avant les reactive-streams

Tugdual Grall a publié un article sur l’utilisation des capped collections avec les tailables cursors. Les 'tailables cursors' représentent un moyen permettant la consommation des données générées à partir des capped collections au fil de l’eau. Tug a fait le tour des différentes implémentations en Node.js, Java et Scala.

Reprenons sa modélisation Java :

 

       MongoClient mongoClient = new MongoClient();
       DBCollection coll = mongoClient.getDB("chat").getCollection("messages");
       DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural"1).get())
               .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
       System.out.println("== open cursor ==");
       Runnable task = () -> {
           System.out.println("\tWaiting for events");
           while (cur.hasNext()) {
               DBObject obj = cur.next();
               System.out.println( obj );
           }
       };
       new Thread(task).start();

 

Bien que l’implémentation est asynchrone, il est remarquable que l'opération cur.hasNext() est malheureusement bloquante (initialisée au moment de la création du curseur).

Les reactive-streams

Avant de débarquer avec les capped collections, commençons par définir notre Subscriber.

 

    class SubscriberImpl implements Subscriber<Document> {
        @Override
        public void onSubscribe(Subscription s) {
             s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Document document) {
            System.out.println(document.toJson());
        }

        @Override
        public void onError(Throwable thrwbl) {
        }

        @Override
        public void onComplete() {
        }
    }

 

Au moment de la souscription, cette classe va prendre le relais conformément au protocole suivant :

onSubscribe onNext* (onError | onComplete)?

En résumé, cela signifie que la méthode onSubscribe est toujours invoquée, suivie d'un nombre éventuellement illimité de signaux onNext (on affiche les documents comme l’exemple précédent), suivi par un signal onError s'il y a un échec, ou un signal onComplete lorsque plus aucun élément n’est disponible.

Avec les reactive-streams, pas de majeurs changements par rapport au driver standard (sauf au niveau des imports) :

 

    import com.mongodb.reactivestreams.client.*;

        MongoClient mongoClient = MongoClients.create();
        MongoDatabase database = mongoClient.getDatabase("chat");
        MongoCollection<Document> messages = database.getCollection("messages");

 

Les reactive-streams entrent en jeu avec :

 

FindPublisher<Document> findPublisher = messages.find().cursorType(CursorType.Tailable);

 

L’invocation de MongoCollection.find retourne un Publisher au sens reactive-streams au lieu d'un curseur. findPublisher va servir à streamer les documents de notre capped collection messages pour tout subscriber ceux voulant être notifiés.

C’est le moment de brancher notre SubscriberImpl :

 

findPublisher.subscribe(new SubscriberImpl());

 

Désormais, toutes les insertions dans la collection messages seront automatiquement reflétées sur la console et c’est toute la magie des reactive-streams.

db.messages.insert({ "name" : "MongoDB", "type" : "database" })

Conclusion

Ce qui est intéressant avec le driver reactive-streams, c'est qu'il n'y a plus besoin de faire des itérations bloquantes. Le streaming des flux de données est traité d’une manière déclarative, asynchrone et non bloquante.

Références

Evaluer cet article

Pertinence
Style

Contenu Éducatif

BT