BT

MongoDB : la magie des Reactive-Streams avec les Capped Collections

| par Slim Ouertani Suivre 7 Abonnés le 21 mai 2015. Durée de lecture estimée: 2 minutes |

MongoDB vient de publier son driver Java pour les traitements réactifs et non-bloquants. Bien que ce driver repose entièrement sur son homologue MongoDB Async Java Driver, il met pleinement en œuvre l'API reactive-streams pour fournir une interopérabilité transparente avec d'autres flux réactifs dans l'écosystème JVM.

En guise de rappel, MongoDB offre une fonctionnalité intéressante connexe : les capped collections sous forme d’un anneau circulaire de taille fixe. Le présent article essaie d’exploiter les capped collections d’une manière réactive.

Avant les reactive-streams

Tugdual Grall a publié un article sur l’utilisation des capped collections avec les tailables cursors. Les 'tailables cursors' représentent un moyen permettant la consommation des données générées à partir des capped collections au fil de l’eau. Tug a fait le tour des différentes implémentations en Node.js, Java et Scala.

Reprenons sa modélisation Java :

 

       MongoClient mongoClient = new MongoClient();
       DBCollection coll = mongoClient.getDB("chat").getCollection("messages");
       DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural"1).get())
               .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
       System.out.println("== open cursor ==");
       Runnable task = () -> {
           System.out.println("\tWaiting for events");
           while (cur.hasNext()) {
               DBObject obj = cur.next();
               System.out.println( obj );
           }
       };
       new Thread(task).start();

 

Bien que l’implémentation est asynchrone, il est remarquable que l'opération cur.hasNext() est malheureusement bloquante (initialisée au moment de la création du curseur).

Les reactive-streams

Avant de débarquer avec les capped collections, commençons par définir notre Subscriber.

 

    class SubscriberImpl implements Subscriber<Document> {
        @Override
        public void onSubscribe(Subscription s) {
             s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Document document) {
            System.out.println(document.toJson());
        }

        @Override
        public void onError(Throwable thrwbl) {
        }

        @Override
        public void onComplete() {
        }
    }

 

Au moment de la souscription, cette classe va prendre le relais conformément au protocole suivant :

onSubscribe onNext* (onError | onComplete)?

En résumé, cela signifie que la méthode onSubscribe est toujours invoquée, suivie d'un nombre éventuellement illimité de signaux onNext (on affiche les documents comme l’exemple précédent), suivi par un signal onError s'il y a un échec, ou un signal onComplete lorsque plus aucun élément n’est disponible.

Avec les reactive-streams, pas de majeurs changements par rapport au driver standard (sauf au niveau des imports) :

 

    import com.mongodb.reactivestreams.client.*;

        MongoClient mongoClient = MongoClients.create();
        MongoDatabase database = mongoClient.getDatabase("chat");
        MongoCollection<Document> messages = database.getCollection("messages");

 

Les reactive-streams entrent en jeu avec :

 

FindPublisher<Document> findPublisher = messages.find().cursorType(CursorType.Tailable);

 

L’invocation de MongoCollection.find retourne un Publisher au sens reactive-streams au lieu d'un curseur. findPublisher va servir à streamer les documents de notre capped collection messages pour tout subscriber ceux voulant être notifiés.

C’est le moment de brancher notre SubscriberImpl :

 

findPublisher.subscribe(new SubscriberImpl());

 

Désormais, toutes les insertions dans la collection messages seront automatiquement reflétées sur la console et c’est toute la magie des reactive-streams.

db.messages.insert({ "name" : "MongoDB", "type" : "database" })

Conclusion

Ce qui est intéressant avec le driver reactive-streams, c'est qu'il n'y a plus besoin de faire des itérations bloquantes. Le streaming des flux de données est traité d’une manière déclarative, asynchrone et non bloquante.

Références

Evaluer cet article

Pertinence
Style

Bonjour étranger!

Vous devez créer un compte InfoQ ou cliquez sur pour déposer des commentaires. Mais il y a bien d'autres avantages à s'enregistrer.

Tirez le meilleur d'InfoQ

Donnez-nous votre avis

Html autorisé: a,b,br,blockquote,i,li,pre,u,ul,p

M'envoyer un email pour toute réponse à l'un de mes messages dans ce sujet
Commentaires de la Communauté

Html autorisé: a,b,br,blockquote,i,li,pre,u,ul,p

M'envoyer un email pour toute réponse à l'un de mes messages dans ce sujet

Html autorisé: a,b,br,blockquote,i,li,pre,u,ul,p

M'envoyer un email pour toute réponse à l'un de mes messages dans ce sujet

Discuter

Se connecter à InfoQ pour interagir sur ce qui vous importe le plus.


Récupérer votre mot de passe

Follow

Suivre vos sujets et éditeurs favoris

Bref aperçu des points saillants de l'industrie et sur le site.

Like

More signal, less noise

Créez votre propre flux en choisissant les sujets que vous souhaitez lire et les éditeurs dont vous désirez suivre les nouvelles.

Notifications

Restez à jour

Paramétrez vos notifications et ne ratez pas le contenu qui vous importe

BT