BT

Diffuser les Connaissances et l'Innovation dans le Développement Logiciel d'Entreprise

Contribuez

Sujets

Sélectionner votre région

Accueil InfoQ Articles Kafka Streams Et Quarkus  : Traitement En Temps Réel D'Événements

Kafka Streams Et Quarkus  : Traitement En Temps Réel D'Événements

Favoris

Points Clés

  • Kafka Streams vous permet de traiter, transformer, joindre et manipuler des événements Kafka en temps réel.
  • Quarkus s'intègre à Kafka Streams, il vous suffit donc de définir la topologie.
  • Le mode Quarkus Dev aide à tester le code Kafka Streams en fournissant un cluster Kafka.
  • Kafka Streams Interactive Queries est le moyen de consommer des données en temps réel de manière synchrone.

Dans la partie 1 de cette série, nous avons découvert l'intégration entre Apache Kafka et Quarkus où nous avons développé une application simple produisant et consommant des événements à partir de deux topics de Kafka.

Dans cet exemple, nous avons simulé une société de streaming de films. Nous avons stocké les films (Movies) dans un topic Kafka et, dans un autre topic Kafka, nous avons stocké chaque occurrence lorsqu'un utilisateur a arrêté de regarder un film et capturé la durée de lecture.

La figure suivante montre l'architecture de l'application :

Nous avons vu que la consommation de messages est simple ; vous les obtenez tant qu'ils sont produits, mais rien de plus. Mais si vous avez besoin d'un traitement en temps réel des données (par exemple, filtrer des événements ou manipuler des événements) ou si vous avez besoin d'établir des corrélations entre des événements, le simple fait d'utiliser l'API consommatrice de Kafka n'est peut-être pas la meilleure approche car le code résultant devient complexe.

Kafka Streams

Le projet Kafka Streams vous aide à consommer des flux d'événements en temps réel au fur et à mesure qu'ils sont produits, à appliquer n'importe quelle transformation, à joindre des flux, etc., et éventuellement écrire de nouvelles représentations de données dans un topic.

Kafka Streams est idéal pour les applications de streaming sans état et avec état, implémente des opérations basées sur le temps (par exemple, le regroupement d'événements autour d'une période donnée) et a à l'esprit l'évolutivité, la fiabilité et la maintenabilité toujours présentes dans l'écosystème Kafka.

Un Kafka Stream est composé de trois éléments : les entrées (source processors), les sorties (sink processors) et les processeurs (stream processors).

Source processors : un processeur source représente un topic Kafka. Un processeur source envoie les événements à un ou plusieurs processeurs de flux.

Stream processors : les processeurs de flux appliquent des transformations/logiques aux flux d'entrée comme la jointure, le regroupement, le comptage, le mappage, etc. Un processeur de flux peut être connecté à un autre processeur de flux et/ou à un processeur récepteur.

Sink processors : un processeur récepteur représente les données de sortie et est connecté à un topic Kafka.

Une topology est un graphe acrylique (graphe sans cycles) composé de sources, de processeurs et de récepteurs, puis transmis à une instance de Kafka Streams qui commencera l'exécution de la topologie.

Kafka Streams and Quarkus

Quarkus s'intègre à Kafka Streams à l'aide de l'extension Quarkus KStreams.

Premiers pas avec Quarkus

Le moyen le plus rapide d'utiliser Quarkus consiste à ajouter les dépendances requises via la page de démarrage. Chaque service peut avoir des dépendances différentes et vous pouvez choisir entre Java 11 ou Java 17. Pour l'intégration entre Quarkus et Kafka Streams, vous devez ajouter au moins l'extension Kafka Streams.

L'application en cours de développement

Comme mentionné au début de cet article, dans la partie 1 de la série, nous avons développé une société de streaming de films avec deux topics Kafka : un pour stocker une liste de films et un deuxième topic stockant chaque fois qu'un utilisateur arrête de regarder un film, stockant la région de l'utilisateur où le film a été joué (la clé de l'événement), ainsi que l'identifiant du film et le temps qu'il a été regardé comme valeur.

Toute cette logique a été créée dans un service producteur nommé Movie Plays Producer développé avec Quarkus.

De plus, nous avons également développé un service Movie Plays Consumer développé avec Quakus, qui consommait des événements des deux rubriques et les affichait dans la console (et en tant qu'événements côté serveur HTTP).

Mais il n'y a pas eu de traitement des données ; il a juste été diffusé tel qu'il a été reçu. Que se passe-t-il si nous voulons faire une jointure entre les topics movies et playtimemovies pour obtenir la durée de chaque film non pas avec l'identifiant mais avec toutes les informations sur le film ?

La mise en œuvre de cette logique avec uniquement des messages Kafka pourrait devenir une tâche complexe car vous devez stocker les informations des film dans une Map, puis, pour chaque nouvel événement de playedmovie lu, faire la correspondance.

KStream Movie Plays

Au lieu de créer du code pour chaque cas d'utilisation, voyons comment utiliser Kafka Streams et comment il est intégré à Quarkus pour résoudre ce problème.

Création du projet

Accédez à la page de démarrage de Quarkus et sélectionnez l'extension Apache Kafka Streams pour l'intégration à Kafka Streams. Sélectionnez ensuite les extensions RestEasy et RestEasy Jackson pour le marshaling/unmarshaling des événements depuis/vers JSON-Java Object-Byte Array. Décochez également l'option Started Code generation.

Dans la capture d'écran suivante, vous pouvez le voir :

Vous avez la possibilité d'ignorer cette étape manuelle et d'accéder au lien Kafka Stream Quarkus Generator où toutes les dépendances sont sélectionnées. Appuyez ensuite sur le bouton Generate votre application pour télécharger le fichier zip de l'application échafaudée.

Décompressez le fichier et ouvrez le projet dans votre IDE préféré.

Développement

La première chose à faire lors du développement d'un Kafka Stream est de créer la Topology et définissez les sources, les processeurs et les puits.

Dans Quarkus, il vous suffit de créer une classe CDI avec une méthode renvoyant une instance de type Topology.

Créez une nouvelle classe nommée TopologyProducer qui implémentera les événements consommateurs des deux topics et les joindra. Enfin, le résultat est envoyé à un processeur récepteur qui envoie le résultat en sortie sur la console.

Un élément non encore expliqué, et vraiment utile dans ces cas d'utilisation, est les Tables de Kafka.

Un topic peut contenir plusieurs événements avec la même clé. Par exemple, vous pouvez insérer un film avec une clé, puis mettre à jour le film en utilisant la même clé en créant un nouvel événement :

Mais si vous souhaitez joindre un topic movies avec un topic playtimemovies, quel événement avec la clé 1 doit être utilisé ? Le premier ou le second ? Dans ce cas précis, ce serait le dernier car c'est celui qui contient la version la plus mise à jour du film. Pour obtenir la dernière version de chacun des événements, Kafka Streams a le concept d'une table (KTable/GlobalKTable).

Kafka Streams naviguera dans un topic donné, obtiendra la dernière version de chaque événement et le placera dans une instance de table.

L'extension KafkaStream n'enregistre pas automatiquement une classe SerDes comme cela s'est produit avec l'intégration de Kafka Messaging, nous devons donc les enregistrer manuellement dans la topologie.

package org.acme;
 
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Printed;
 
import io.quarkus.kafka.client.serialization.ObjectMapperSerde;
 
@ApplicationScoped
public class TopologyProducer {
  
   private static final String MOVIES_TOPIC = "movies";
   private static final String PLAY_MOVIES_TOPIC = "playtimemovies";
 
   @Produces
   public Topology getTopCharts() {
 
       final StreamsBuilder builder = new StreamsBuilder();
 
    // SerDes for Movie and PlayedMovie
      
       final ObjectMapperSerde<Movie> movieSerder = new ObjectMapperSerde<>(Movie.class);
       final ObjectMapperSerde<MoviePlayed> moviePlayedSerder = new ObjectMapperSerde<>(MoviePlayed.class);
 
	// Creation of a Global Kafka Table for Movies topic
 
       final GlobalKTable<Integer, Movie> moviesTable = builder.globalTable(
               MOVIES_TOPIC,
               Consumed.with(Serdes.Integer(), movieSerder));
 
	// Stream connected to playtimemovies topic, every event produced there is consumed by this stream
 
       final KStream<String, MoviePlayed> playEvents = builder.stream(
               PLAY_MOVIES_TOPIC, Consumed.with(Serdes.String(), moviePlayedSerder));
       
    //  PlayedMovies has the region as key, and the object as value. Let’s map the content so the key is the movie id (to do the join) and leave the object as value
    // Moreover, we do the join using the keys of the movies table (movieId) and the keys of the stream (we changed it to be the movieId too in the map method).
 
    // Finally, the result is streamed to console
 
       playEvents
           .map((key, value) -> KeyValue.pair(value.id, value)) // Now key is the id field
           .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)
           .print(Printed.toSysOut());
       return builder.build();
 
   }
}

Les POJO Movie et MoviePlayed contiennent les attributs requis pour la logique :

Le code de l'objet Movie est :

package org.acme;
 
public class Movie {
  
   public int id;
   public String name;
   public String director;
   public String genre;
 
   public Movie(int id, String name, String director, String genre) {
       this.id = id;
       this.name = name;
       this.director = director;
       this.genre = genre;
   }
}

Le code de l'objet MoviePlayed est :

package org.acme;
 
public class MoviePlayed {
  
   public int id;
   public long duration;
 
   public MoviePlayed(int id, long duration) {
       this.id = id;
       this.duration = duration;
   }
 
}

La dernière étape avant d'exécuter l'application Kafka Stream consiste à configurer certains paramètres, dont le plus important est quarkus.kafka-streams.topics. Il s'agit d'une liste de topics devant être présents dans le cluster Kafka avant que la topologie ne commence à traiter les données comme condition préalable.

Ouvrez le fichier src/main/resources/application.properties et ajoutez les lignes suivantes :

kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=1000
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.metrics.recording.level=DEBUG
 
quarkus.kafka-streams.topics=playtimemovies,movies

Il est maintenant temps de tester le flux. Démarrons le producteur qui a été développé dans l'article précédent. Le code source du producteur peut être trouvé ici.

Quarkus KStreams s'intègre à Quarkus DevServices. Pour cette raison, nous n'avons pas besoin de démarrer un cluster Kafka ni de configurer son emplacement car le mode Quarkus Dev s'occupera de tout. N'oubliez pas d'avoir un environnement d'exécution de conteneur fonctionnel sur votre ordinateur, tel que Podman ou tout autre outil compatible OCI.

Dans une fenêtre de terminal, démarrez le service producteur :

cd movie-plays-producer
./mvnw compile quarkus:dev

2022-04-11 07:49:31,900 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 287 minutes
2022-04-11 07:49:31,941 INFO  [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.256s.
2022-04-11 07:49:31,942 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-04-11 07:49:31,943 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
2022-04-11 07:49:32,399 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 162 minutes
2022-04-11 07:49:32,899 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 255 minutes
2022-04-11 07:49:33,404 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Sing 2 played for 264 minutes
2022-04-11 07:49:33,902 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 28 minutes
2022-04-11 07:49:34,402 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 137 minutes
2022-04-11 07:49:34,903 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 277 minutes
2022-04-11 07:49:35,402 INFO  [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 141 minutes

Dans une autre fenêtre de terminal, démarrez le code Kafka Stream que nous avons développé maintenant :

./mvnw compile quarkus:dev

2022-04-11 07:54:59,321 INFO  [org.apa.kaf.str.pro.int.StreamTask] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] task [1_0] Restored and ready to run
2022-04-11 07:54:59,322 INFO  [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] Restoration took 74 ms for all tasks [1_0]
2022-04-11 07:54:59,322 INFO  [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2022-04-11 07:54:59,324 INFO  [org.apa.kaf.str.KafkaStreams] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-client [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea] State transition from REBALANCING to RUNNING
[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
[KSTREAM-LEFTJOIN-0000000005]: 2, Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact]
[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
[KSTREAM-LEFTJOIN-0000000005]: 1, Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit]
[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
[KSTREAM-LEFTJOIN-0000000005]: 3, Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto]
[KSTREAM-LEFTJOIN-0000000005]: 5, Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]

Ce qui est affiché dans la sortie est un événement (résultant de la jointure) où la clé est le movieId et la valeur est le movie lui-même. Ce que nous avons maintenant, c'est que chaque fois qu'un film est arrêté, Kafka Stream le traite et l'affiche avec toutes les informations du film.

Jusqu'à présent, pas si compliqué, et vous pensez peut-être que cela ne vaut pas la peine d'utiliser Kafka Streams uniquement pour ce cas d'utilisation. Mais commençons à ajouter plus d'exigences, pour que vous voyiez à quel point c'est puissant.

Au lieu de générer un événement chaque fois qu'un utilisateur arrête le film, envoyons l'événement uniquement pour les films que l'utilisateur a regardés pendant plus de 10 minutes.

Utilisez la méthode filter pour filtrer les films par durée.

playEvents
       .filter((region, event) -> event.duration >= 10) // filters by duration
       .map((key, value) -> KeyValue.pair(value.id, value))
       .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)
        .print(Printed.toSysOut());

Redémarrez l'application et vous remarquerez que les films d'une durée inférieure ou égale à 10 minutes ne seront pas traités.

Nous commençons à voir que Kafka Streams aide à la propreté du code, mais ajoutons la dernière exigence. Nous ne sommes pas intéressés par la durée de lecture de chaque film, mais par le nombre de fois que chaque film est lu pendant plus de 10 minutes.

Jusqu'à présent, le traitement des événements est sans état car l'événement a été reçu, traité et envoyé à un processeur récepteur (soit vers un topic, soit comme sortie sur la console), mais pour compter le nombre de fois qu'un film a été lu, nous avons besoin d'un peu de mémoire pour se souvenir du nombre de fois qu'un film a été lu et incrémenter de un lorsqu'il est regardé à nouveau par n'importe quel utilisateur pendant plus de 10 minutes. Le traitement des événements doit être effectué de manière dynamique.

La première chose que nous devons créer est une classe Java pour stocker le nom du film et le nombre de fois qu'il est visualisé.

public class MoviePlayCount {
   public String name;
   public int count;
 
   public MoviePlayCount aggregate(String name) {
       this.name = name;
       this.count++;
 
       return this;
   }
 
   @Override
   public String toString() {
       return "MoviePlayCount [count=" + count + ", name=" + name + "]";
   }
  
}

C'est la classe qui compte, mais elle a encore besoin de deux choses :

  • Un emplacement pour stocker les instances de cette classe afin qu'elles ne soient pas réinitialisées à chaque fois qu'un événement est déclenché.
  • Une logique qui appelle la méthode aggregate chaque fois qu'un événement est déclenché dans un topic playtimemovies.

Pour le premier problème, nous devons utiliser l'interface KeyValueBytesStoreSupplier.

public static final String COUNT_MOVIE_STORE = "countMovieStore";

KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(COUNT_MOVIE_STORE);

Pour le deuxième problème, Kafka Streams utilise la méthode aggregate pour agréger les résultats.

Dans notre cas d'utilisation, le nombre de fois qu'un film est lu pendant plus de 10 minutes.

// MoviePlayCount might be serialized/deserialized too
final ObjectMapperSerde<MoviePlayCount> moviePlayCountSerder = new ObjectMapperSerde<>(MoviePlayCount.class);

// This is the join call seen before, where key is the movie id and value is the movie
.join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)
// Group events per key, in this case movie id
.groupByKey(Grouped.with(Serdes.Integer(), movieSerder))
 // Aggregate method gets the MoviePlayCount object if already created (if not it creates it) and calls its aggregate method to increment by one the viwer counter
 .aggregate(MoviePlayCount::new,
             (movieId, movie, moviePlayCounter) -> moviePlayCounter.aggregate(movie.name),
              Materialized.<Integer, MoviePlayCount> as(storeSupplier)
                  .withKeySerde(Serdes.Integer())
                  .withValueSerde(moviePlayCountSerder)
             )

Redémarrez l'application et le nombre de fois qu'un film est lu s'affiche sur la console.

CONSEIL : Pour redémarrer l'application, appuyez sur la lettre "s" dans le terminal et elle redémarre automatiquement.

Après avoir redémarré l'application, le terminal commence à afficher les statistiques de chaque film :

[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=13, name=Cruella]
[KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=11, name=Encanto]
[KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=14, name=Sing 2]
[KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=15, name=Star Trek: First Contact]
[KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=16, name=The Hobbit]
[KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=16, name=Star Trek: First Contact]
[KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=12, name=Encanto]
[KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=17, name=Star Trek: First Contact]
[KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=15, name=Sing 2]
[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=14, name=Cruella]
[KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=17, name=The Hobbit]
[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=15, name=Cruella]
[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=16, name=Cruella]

Requêtes interactives

Les résultats agrégés sont transmis à la console lorsque nous définissons le processeur sink comme flux System.out .

.toStream()
.print(Printed.toSysOut());

Mais vous pouvez également envoyer le flux résultant à un topic Kafka :

.to("counter_movies",                      Produced.with(Serdes.Integer(), moviePlayCountSerder)
);

Mais que se passe-t-il si vous n'êtes pas intéressé à réagir à chaque fois qu'un nouvel événement est envoyé, mais demandez simplement combien de fois un film spécifique a été joué à ce moment-là ?

Les requêtes interactives de Kafka Streams vous permettent d'interroger directement le store d'état sous-jacent pour la valeur associée à une clé donnée.

Tout d'abord, créons une classe avec le nom MoviePlayCountData pour stocker le résultat de la requête. Nous le faisons de cette manière pour dissocier les classes utilisées dans Kafka Streams des classes utilisées dans le reste de l'application.

public class MoviePlayCountData {
 
   private String name;
   private int count;
  
   public MoviePlayCountData(String name, int count) {
       this.name = name;
       this.count = count;
   }
 
   public int getCount() {
       return count;
   }
 
   public String getName() {
       return name;
   }
 
}

Créez maintenant une classe nommée InteractiveQueries pour implémenter l'accès au store d'état (KeyValueBytesStoreSupplier) et interrogez le nombre de fois qu'un film a été lu par son id.

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
 
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
 
import java.util.Optional;
 
@ApplicationScoped
public class InteractiveQueries {
  
   @Inject
   KafkaStreams streams;
 
  
   public Optional<MoviePlayCountData> getMoviePlayCountData(int id) {
       // gets the state store and get the movie count by movie id
       MoviePlayCount moviePlayCount = getMoviesPlayCount().get(id);
       // If there is a result
       if (moviePlayCount != null) {
           // Wrap the result into MoviePlayCountData
           return Optional.of(new MoviePlayCountData(moviePlayCount.name, moviePlayCount.count));
       } else {
           return Optional.empty();
       }
   }
 
 
   // Gets the state store
   private ReadOnlyKeyValueStore<Integer, MoviePlayCount> getMoviesPlayCount() {
       while (true) {
           try {
               return streams.store(fromNameAndType(TopologyProducer.COUNT_MOVIE_STORE, QueryableStoreTypes.keyValueStore()));
           } catch (InvalidStateStoreException e) {
               // ignore, store not ready yet
           }
       }
   }
 
}

Nous pouvons maintenant ajouter un endpoint REST simple pour exécuter cette requête.

import java.util.Optional;
 
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
 
@Path("/movie")
public class MovieCountResource {
 
   // Injects the previous class to make queries
   @Inject
   InteractiveQueries interactiveQueries;
 
   @GET
   @Path("/data/{id}")
   public Response movieCountData(@PathParam("id") int id) {
       Optional<MoviePlayCountData> moviePlayCountData = interactiveQueries.getMoviePlayCountData(id);
 
       // Depending on the result returns the value or a 404
       if (moviePlayCountData.isPresent()) {
           return Response.ok(moviePlayCountData.get()).build();
       } else {
           return Response.status(Status.NOT_FOUND.getStatusCode(),
                   "No data found for movie " + id).build();
       }
 
   }
}

Le schéma du Kafka Stream implémenté est illustré dans la figure suivante :

Mise à l'échelle

Les applications Kafka Streams peuvent être mises à l'échelle, de sorte que les flux sont distribués via plusieurs instances. Dans ce cas, chaque instance contient un sous-ensemble des résultats de l'agrégation, donc pour obtenir le résultat total de l'agrégation, vous devez extraire les données directement de l'autre instance en redirigeant la requête sur l'API REST vers cette instance.

Kafka Streams fournit une API pour savoir si les données demandées se trouvent dans le magasin Kafka Streams local ou dans un autre hébergeur.

Bien que le processus ne soit pas compliqué, il dépasse le cadre de cet article.

Conclusions

Jusqu'à présent, vous avez vu qu'il est facile de connecter une application Quarkus à Apache Kafka et de commencer à produire et à consommer des messages/événements à partir d'un topic. De plus, vous avez vu que Kafka Streams nous permet de consommer des messages Kafka mais aussi de les traiter en temps réel, en appliquant des transformations, en filtrant et, par exemple, en consommant les données résultantes de manière synchrone. Il s'agit d'une technologie puissante qui évolue facilement pour offrir une expérience en temps réel lorsque les données à traiter changent constamment.

Mais nous n'avons pas abordé le dernier problème de cette architecture. Habituellement, les données ne sont pas stockées à un seul endroit. Les informations sur le film peuvent être stockées dans une base de données relationnelle et les informations sur le film lu dans un topic Kafka. Alors, comment maintenez-vous les informations mises à jour aux deux endroits afin que Kafka Streams puisse joindre correctement les données ?

Il y a une partie manquante ici, un projet appelé Debezium pour nous aider là-dessus. Nous consacrerons un article entier à Debezium et Quarkus. Restez à l'écoute.

 

Au sujet de l’Auteur

Evaluer cet article

Pertinence
Style

Contenu Éducatif

Bonjour étranger!

Vous devez créer un compte InfoQ ou cliquez sur pour déposer des commentaires. Mais il y a bien d'autres avantages à s'enregistrer.

Tirez le meilleur d'InfoQ

Html autorisé: a,b,br,blockquote,i,li,pre,u,ul,p

Commentaires de la Communauté

Html autorisé: a,b,br,blockquote,i,li,pre,u,ul,p

Html autorisé: a,b,br,blockquote,i,li,pre,u,ul,p

BT