BT

Diffuser les Connaissances et l'Innovation dans le Développement Logiciel d'Entreprise

Contribuez

Sujets

Sélectionner votre région

Accueil InfoQ Articles Debezium Et Quarkus : Les Patterns De Capture De Modification De Données Pour Éviter Les Problèmes

Debezium Et Quarkus : Les Patterns De Capture De Modification De Données Pour Éviter Les Problèmes

Points Clés

  • Les écritures doubles sont quelque chose à éviter car vous risquez de perdre des données à cause d'écritures simultanées
  • Debezium lit les journaux de transactions de la base de données et envoie un événement pour chaque nouvel enregistrement
  • Vous pouvez soit utiliser Debezium intégré dans le code Java ou le serveur Debezium en tant que service externe
  • kcat est un outil pour inspecter les sujets Kafka

Dans la partie 2 de cette série, nous avons découvert l'intégration entre Apache Kafka Streams et Quarkus, où nous avons développé une application simple produisant des événements sur un topic Kafka et les consommer et les traiter en temps réel avec Kafka Streams.

Dans cet exemple, nous avons simulé une société de streaming de films. Nous avons stocké des films (Movies) dans un topic Kafka et, dans un autre topic Kafka, nous avons conservé chaque occurrence lorsqu'un utilisateur a arrêté de regarder un film et capturé l'heure à laquelle il avait été lu. Nous avons post-traité ces événements en temps réel pour compter le nombre de fois qu'un film est lu pendant plus de 10 minutes.

La figure suivante montre l'architecture de l'application :

Mais toutes les informations étaient stockées dans des topics Kafka. Mais il est peu probable que cela puisse se produire dans un projet du monde réel.

Les informations sur les films sont probablement stockées dans une base de données traditionnelle ; un cache distribué pour accélérer les requêtes, ou les films sont indexés dans un moteur de recherche. Mais gardons les choses simples; supposons que les informations sur les films sont stockées dans une base de données.

Cela soulève la question de savoir comment nous pouvons conserver les mêmes données dans deux systèmes différents, dans la base de données comme emplacement principal et dans le topic des films Kafka pour traiter les données avec Kafka Streams.

Cet article vous apprendra comment avoir correctement les mêmes données sous différentes formes.

La double écriture (Dual Writes)

La première chose qui peut venir à l'esprit pour résoudre ce problème est l'approche à double écriture. C'est une approche simple car il est de la responsabilité de votre code d'application de conserver les données à tous les endroits. Par exemple, une insertion d'un nouveau film doit exécuter une insertion dans la base de données et déclencher un événement dans le topic Kafka.

En termes de code, cela pourrait être quelque chose comme :

@Channel("movies")
Emitter<Record<Long, String>> movieEmitter;
 
private static ObjectMapper objectMapper = new ObjectMapper();
 
public Movie dualWriteInsert(Movie movie) throws JsonProcessingException {
    // Inserts to DB
    movie.persist();
 
    // Send an event to movies topic
    final String payloadJson = objectMapper.writeValueAsString(movie);
    long id = movie.id;
 
    movieEmitter.send(Record.of(id, payloadJson));
   

Cela semble correct, facile à mettre en œuvre et fonctionne jusqu'à ce que vous commenciez à rencontrer des problèmes étranges si vous l'essayez. Explorons-en quelques-uns :

  • Si les données sont conservées dans la base de données mais échouent lorsqu'elles sont envoyées au topic Kafka, vous pouvez encapsuler les deux opérations dans un bloc de transaction. Cela peut résoudre le problème de transaction car il y aura une annulation en cas d'erreur. Vous payez un prix élevé en termes de performances ; plus la portée de la transaction est grande, plus vous bloquez la base de données. Cela ne corrige pas la simultanéité.

  • Que se passe-t-il si deux utilisateurs simultanés souhaitent mettre à jour simultanément la même entrée de film ? Il peut arriver que l'exécution de la première requête mette à jour la base de données et envoie complètement l'événement à Kafka, puis que l'exécution de la deuxième requête mette à jour à nouveau le film. Dans ce cas, la base de données et le contenu du topic Kafka sont alignés. Mais que se passe-t-il si l'exécution de la première requête ne persiste que jusqu'à l'opération de base de données, puis la deuxième requête exécute la persitance et l'envoi à Kafka. Après cela, la première requête envoie l'événement au topic Kafka. À ce moment, les données de base de données Movie et les données du topic Kafka ont divergé pour avoir des valeurs différentes, entraînant des incohérences entre les données. Bien sûr, vous pouvez synchroniser l'ensemble de la méthode, mais cela entraînerait une énorme perte de performances.

Ce dernier problème se produit en raison de la nature du mélange de différents systèmes ; une transaction de base de données opère au sein de sa couche de persistance mais pas entre les systèmes.

2-Phase Commit

Une solution possible à ce problème est d'utiliser le protocole 2-Phase Commit. Bien que cela puisse être une bonne solution, deux problèmes sont présents :

  • Tout d'abord, tous les systèmes ne prennent pas en charge les transactions distribuées, et le 2-Phase Commit
  • Ce protocole présente des problèmes en raison de la communication entre toutes les parties requises à des fins de coordination.

C'est une solution possible, mais ce n'est pas une solution générique, et pour ce cas précis, Apache Kafka ne prend pas en charge les transactions distribuées, alors explorons une autre solution.

La capture du changement de données (Change Data Capture)

Change Data Capture (CDC) est un pattern utilisé pour suivre les données qui ont changé (c'est-à-dire les nouvelles entrées ajoutées, les registres mis à jour, etc.) et déclencher un événement, permettant à l'application de réagir au changement.

Il existe plusieurs façons d'implémenter CDC, par exemple, en utilisant des horodatages, des versions ou des indicateurs d'état au niveau de la ligne, de sorte que vous vérifiez périodiquement les éléments à partir d'un point spécifique (c'est-à-dire, SELECT all elements WHERE status=not_read). Mais cette approche présente l'inconvénient que vous accédez régulièrement à la base de données à des fins non métier ou que vous devez gérer les suppressions d'entrées.

Une autre option consiste à utiliser des triggers de base de données, c'est-à-dire que toute modification déclenche un événement et le stocke dans une table d'événements spécifique. Ça marche; vous pouvez capturer n'importe quel événement, mais vous interrogez toujours périodiquement la base de données.

La plupart des bases de données ont un journal des transactions qui enregistre toutes les modifications apportées à la base de données. Les analyseurs de journaux analysent ce journal et capturent tout changement de manière non intrusive. Les avantages de cette approche sont :

  • Impact minimal dans la base de données.
  • Les modifications sont transparentes pour l'application ; les insertions dans des colonnes spéciales sont inutiles.
  • Intégrité transactionnelle
  • Aucune modification du schéma de la base de données

Les scanners de journaux sont la meilleure approche, et l'un des projets open source les plus populaires est Debezium.

Debezium

Debezium est un projet open source pour la capture de changement de données de l'approche du scanner de journaux. Démarrez la base de données et configurez Debezium pour consommer les données du journal des transactions de cette base de données. À ce stade, pour chaque insertion, suppression ou mise à jour validée dans la base de données, Debezium déclenchera un événement afin qu'une application puisse s'y enregistrer et réagir en conséquence.

Mais pourquoi Debezium, CDC et Kafka nous aident-ils à résoudre le problème des doubles écritures ? Un topic Apache Kafka est composé d'une ou plusieurs partitions. Chaque partition ordonne les événements dans l'ordre d'arrivée (les événements sont toujours ajoutés à la fin de la partition). Donc, si nous voulons maintenir l'ordre des opérations simultanées (pour éviter d'avoir des données mal placées entre les systèmes), le topic de Kafka résout cette partie du problème.

Mais bien sûr, nous avons toujours l'autre partie, la lecture d'une base de données dans le bon ordre dans le cas d'opérations concurrentes. Les approches CDC et scanner de journaux garantissent que le contenu est dans le bon ordre après l'engagement de la transaction et qu'il est non intrusif. Debezium rend cela possible.

Vous pouvez utiliser Debezium de deux manières différentes, et les deux sont valables selon le cas d'utilisation. Ces deux méthodes sont le serveur Debezium ou le moteur Debezium (intégré).

Le serveur Debezium

Le serveur Debezium exécute Debezium en tant qu'instance Kafka Connect. Kafka Connect est un processus autonome lancé par un consommateur et/ou un producteur pour lire les données de Kafka. Kafka Connect définit des connecteurs vers différents systèmes de données, puis déplace de grands ensembles de données vers et depuis Kafka. Étant donné que les connecteurs utilisent l'API Kafka, ils sont évolutifs, tolérants aux pannes et avec une faible latence.

Supposons l'exemple suivant ; vous souhaitez exporter le contenu d'un topic Kafka vers un moteur d'index comme ElasticSearch. Vous avez deux options :

  • Créez une application (comme celle que nous avons vue dans la partie 1 de cette série) en utilisant L'API Kafka pour lire les événements d'un topic Kafka, puis utiliser le client ElasticSearch pour remplir les données de l'index.
  • Utilisez ElasticSearch Kafka Connect, qui implémente déjà toute cette logique, et vous n'avez qu'à configurer et démarrer.

Debezium fait la même chose, mais lit le journal des transactions à partir d'une base de données et envoie le contenu à un topic Kafka.

L'un des avantages de Debezium est qu'il peut se connecter à plusieurs bases de données telles que MySQL, MongoDB, PostgreSQL, Oracle DB, SQL Server, DB 2, Cassandra et Vitesse.

Le moteur Debezium

La manière habituelle d'exécuter Debezium est via un serveur Debezium, car il n'est pas intrusif pour l'application ; c'est un service qui prend les modifications de données et remplit un topic Kafka.

Mais toutes les applications ne nécessitent pas le même niveau de tolérance aux pannes ou d'évolutivité offert par Kafka Connect. De plus, l'application doit parfois capturer l'événement de modification des données, mais exécuter une logique personnalisée et ne pas envoyer la modification à un système de messagerie ou à un système de messagerie non pris en charge.

Dans ces cas, un module debezium-api définit une petite API pour intégrer le moteur Debezium dans l'application.

Jusqu'à présent, nous avons appris que les doubles écritures sont quelque chose à éviter. La solution utilise Change Data Capture pour obtenir des données directement à partir du journal des transactions et les transmettre à un topic Kafka afin que tout autre système puisse les consommer de manière et dans un ordre « transactionnel ».

Le Pattern Outbox

Si vous êtes arrivé à ce stade, vous vous demandez peut-être : "OK, bien, je peux utiliser CDC pour réagir aux modifications de données, mais j'expose l'entité interne à des systèmes externes." Bien que cela soit vrai, permettez-moi de vous présenter le pattern Outbox pour éviter ce problème.

Le pattern Outbox fournit une table outbox dans laquelle vous enregistrez toutes les opérations des entités (peut-être avec des données dénormalisées). Ensuite, le système CDC (Debezium dans notre cas) réagit aux modifications placées dans la table outbox et non dans la table des entités, ce qui isole le modèle de données des autres systèmes :

La partie importante dont vous devez être conscient est que les modifications d'entité et outbox doivent se trouver dans la même transaction.

Commençons à assembler toutes ces pièces dans un projet Quarkus et corrigeons le problème que nous avons présenté au début, comment insérer un film dans la base de données et également le remplir dans un système externe (topic Kafka).

La mise en oeuvre de Debezium

Au lieu de créer du code pour chaque cas d'utilisation, voyons comment utiliser Debezium Embedded et comment il est intégré à Quarkus pour résoudre ce problème.

La création du projet

Accédez à la start page de Quarkus et sélectionnez les extensions RestEasy Reactive et RestEasy Reactive Jackson pour le marshaling/unmarshaling d'événements depuis/vers JSON-tableau de byte Java et l'implémentation des endpoints JAX-RS, le pilote Panache et MySQL pour insérer des films dans la base de données MySQL, et la messagerie réactive SmallRye pour interagir avec Kafka. Décochez également l'option Started Code generation.

Dans la capture d'écran suivante, vous pouvez le voir :

Vous pouvez ignorer cette étape manuelle et accéder au lien Quarkus Generator, où toutes les dépendances sont sélectionnées. Appuyez ensuite sur le bouton Generate your application pour télécharger le fichier zip de l'application créée.

Décompressez le fichier et ouvrez le projet dans votre IDE préféré.

Le développement

Avant de commencer à coder, nous devons ajouter deux nouvelles dépendances : une pour utiliser le moteur Debezium et une autre pour ajouter l'extension Debezium Quarkus Outbox.

Le moteur Debezium

Ouvrez le fichier pom.xml et ajoutez les dépendances suivantes.

Dans la section dependencyManagement :

<dependency>
   <groupId>io.debezium</groupId>
   <artifactId>debezium-bom</artifactId>
   <version>1.9.4.Final</version>
   <type>pom</type>
   <scope>import</scope>
</dependency>

Dans la section dependencies :

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-ddl-parser</artifactId>
</dependency>
<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-embedded</artifactId>
</dependency>
<!-- We connect to a MySQL database, so we need debezium MySQL connector -->
<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-connector-mysql</artifactId>
</dependency>

Il s'agit d'utiliser le moteur Debezium intégré à l'application. Aucune de ces dépendances ne serait nécessaire si nous utilisions Debezium Server car il s'agit d'un service autonome.

Debezium Quarkus Outbox 

Quarkus s'intègre au modèle Outbox via l'extension Debezium Quarkus Outbox.

Ouvrez le fichier pom.xml et ajoutez les dépendances suivantes.

Dans la section dependencyManagement :

<dependency>
   <groupId>${quarkus.platform.group-id}</groupId>
   <artifactId>quarkus-debezium-bom</artifactId>
   <version>${quarkus.platform.version}</version>
   <type>pom</type>
   <scope>import</scope>
 </dependency>

Notez que la version du BOM est alignée sur la version de Quarkus, 2.10.1.Final dans ce cas.

Dans la section dependencies :

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-quarkus-outbox</artifactId>
</dependency>

L'implémentation

Vous pouvez choisir de ne pas utiliser le pattern Outbox ou de l'implémenter vous-même ; dans ce cas, aucune de ces dépendances n'est requise. Mais nous allons l'utiliser pour simplifier le développement.

Avec toutes ces dépendances en place, créez l'entité Movie annotée avec les annotations JPA et qui étend la classe PanacheEntity :

import javax.persistence.Entity;
 
import io.quarkus.hibernate.orm.panache.PanacheEntity;
 
@Entity
public class Movie extends PanacheEntity {
  
   // No worries Quarkus will change them
   // to private and auto-generate getters/setters at compilation time
   public String name;
   public String director;
   public String genre;
 
}

L'étape suivante consiste à créer un endpoint HTTP pour insérer le contenu du film dans la base de données à l'aide d'annotations JAX-RS :

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
 
import org.jboss.logging.Logger;
 
@Path("/movie")
public class MovieResource {
 
   // Service to insert the movie data into Movie and Outbox tables
   @Inject
   MovieService movieService;
 
   // Injects the logger
   @Inject
   Logger logger;
 
   // Http Post method to insert a movie
   @POST
   public Movie insert(Movie movie) {
       logger.info("New Movie inserted " + movie.name);
       System.out.println(":)");
      
       return movieService.insertMovie(movie);
   }
}

Puisque nous utilisons l'extension Debezium Quarkus Outbox, nous devons créer une entité représentant le contenu stocké dans la table outbox. L'entité doit implémenter ExportedEvent et implémentez les méthodes requises pour identifier le type d'événement mis dans la table outbox.

import java.time.Instant;
 
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
 
import io.debezium.outbox.quarkus.ExportedEvent;
 
public class MovieEvent implements ExportedEvent<String, JsonNode> {
 
   private static ObjectMapper mapper = new ObjectMapper();
 
   // Set the type enclosed inside the event
   private static final String TYPE = "Movie";
   // Set the event type
   private static final String EVENT_TYPE = "MovieCreated";
 
   private final long gameId;
   private final JsonNode jsonNode;
   private final Instant timestamp;
 
   // Saves Game info in the class
   public MovieEvent(Movie movie) {
       this.gameId = movie.id;
       this.timestamp = Instant.now();
       // Saves game content in a string column in JSON format
       this.jsonNode = convertToJson(movie);
   }
 
   @Override
   public String getAggregateId() {
       return String.valueOf(this.gameId);
   }
 
   @Override
   public String getAggregateType() {
       return TYPE;
   }
 
   @Override
   public JsonNode getPayload() {
       return jsonNode;
   }
 
   @Override
   public Instant getTimestamp() {
       return timestamp;
   }
 
   @Override
   public String getType() {
       return EVENT_TYPE;
   }
  
   private JsonNode convertToJson(Movie movie) {
       ObjectNode asJson = mapper.createObjectNode()
               .put("id", movie.id)
               .put("name", movie.name)
               .put("director", movie.director)
               .put("genre", movie.genre);
      
       return asJson;
   }
 
}

La dernière étape avant d'ajouter la logique Debezium au code est d'implémenter la classe MovieService avec la logique d'insertion. Cette logique doit conserver le film dans la table Movie et l'entité MovieEvent dans une table gérée par l'extension de table OutboxEvent.

L'extension fournit un événement CDI spécifique pour conserver un événement qui implémente l'interface ExportedEvent. La seule chose à faire est de déclencher un événement et les données sont automatiquement conservées.

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.transaction.Transactional;
 
import io.debezium.outbox.quarkus.ExportedEvent;
 
@ApplicationScoped
public class MovieService {
  
   // CDI event interface triggering Outbox entities
   @Inject
   Event<ExportedEvent<?, ?>> event;
 
   // Transaction method
   @Transactional
   public Movie insertMovie(Movie movie) {
 
       // Persists data
       movie.persist();
      
       // Persists outbox content
       event.fire(new MovieEvent(movie));
      
       return movie;
   }
}

La dernière étape consiste à configurer le moteur Debezium et à le démarrer intégré dans l'application.

Pour configurer le moteur, vous devez définir les informations de la base de données (nom d'hôte, port, informations d'identification), ainsi que la base de données et les tables que Debezium doit surveiller pour déclencher des événements.

import java.io.File;
import java.io.IOException;
 
import javax.enterprise.inject.Produces;
 
import org.eclipse.microprofile.config.inject.ConfigProperty;
 
import io.debezium.config.Configuration;
 
public class DebeziumConfiguration {
  
   // Debezium needs Database URL and credentials to login and
   // monitor transaction logs
   @ConfigProperty(name = "quarkus.datasource.jdbc.url")
   String url;
 
   @ConfigProperty(name = "quarkus.datasource.password")
   String password;
 
   @ConfigProperty(name = "quarkus.datasource.username")
   String username;
 
   @Produces
   public Configuration configureDebezium() throws IOException {
 
       // Custom class to get database name or hostname of Database server
       MySqlJdbcParser jdbcParser = MySqlJdbcParser.parse(url);
      
       File fileOffset = File.createTempFile("offset", ".dat");
       File fileDbHistory = File.createTempFile("dbhistory", ".dat");
 
       return io.debezium.config.Configuration.create()
           .with("name", "movies-mysql-connector")
           // configures MySQL connector
           .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
           .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
           .with("offset.storage.file.filename", fileOffset.getAbsolutePath())
           .with("offset.flush.interval.ms", "60000")
           // Configures database location
           .with("database.hostname", jdbcParser.getHost())
           .with("database.port", jdbcParser.getPort())
           .with("database.user", "root")
           .with("database.allowPublicKeyRetrieval", "true")
           .with("database.password", password)
           .with("database.dbname", jdbcParser.getDatabase())
           .with("database.include.list", jdbcParser.getDatabase())
           // Debezium only sends events for the modifications of OutboxEvent table and not all tables
           .with("table.include.list", jdbcParser.getDatabase() + ".OutboxEvent")
           .with("include.schema.changes", "false")
           .with("database.server.id", "10181")
           .with("database.server.name", "movies-mysql-db-server")
           .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
           .with("database.history.file.filename", fileDbHistory.getAbsolutePath())
       .build();
   }
 
}

La classe CDI DebeziumListener démarre Debezium lorsque l'application est en cours d'exécution.

Le moteur Debezium ne s'exécute pas dans un thread séparé, nous devons donc fournir un thread à exécuter en parallèle, sans bloquer le thread d'application. L'utilisation d'un ManagedExecutor est la bonne façon de fournir un thread executor dans Quarkus pour exécuter Debezium.

Ensuite, nous devons instancier le moteur Debezium en utilisant la classe DebeziumEngine, définissant les propriétés de configuration créées à l'étape précédente. L'une des étapes les plus importantes consiste à enregistrer une méthode déclenchée chaque fois que Debezium génère un événement. La méthode notifying enregistre cette méthode personnalisée, et dans notre exemple, nous l'avons nommée handleChangeEvent.

Cette méthode reçoit l'événement et nous pouvons implémenter n'importe quelle logique que nous souhaitons, de l'envoi à un topic Kafka, sa manipulation et son envoi à un autre service, tout ce que vous pouvez implémenter en Java.

import java.io.IOException;
 
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
 
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
 
import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.kafka.Record;
 
import static io.debezium.data.Envelope.FieldName.*;
import static io.debezium.data.Envelope.Operation;
 
@ApplicationScoped
public class DebeziumListener {
  
   private static ObjectMapper objectMapper = new ObjectMapper();
 
   // Start the Debezium engine in a different thread
   ManagedExecutor executor;
 
   // Debezium configuration object
   Configuration configuration;
 
   private DebeziumEngine<RecordChangeEvent<SourceRecord>> engine;
 
   public DebeziumListener(ManagedExecutor executor, Configuration configuration) {
       this.executor = executor;
       this.configuration = configuration;
   }
 
   // Interface to send events to movies Kafka topic
   @Channel("movies")
   Emitter<Record<Long, JsonNode>> movieEmitter;
 
   void onStart(@Observes StartupEvent event) {
 
       // Configures Debezium engine
       this.engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
           .using(this.configuration.asProperties())
           // For each event triggered by Debezium, the handleChangeEvnt method is called
           .notifying(this::handleChangeEvent)
           .build();
 
       // Starts Debezium in different thread
       this.executor.execute(this.engine);
   }
 
   void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
 
       // For each triggered event, we get the information
       SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
       Struct sourceRecordChangeValue= (Struct) sourceRecord.value();
 
       if (sourceRecordChangeValue != null) {
           Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
 
           // Only insert operations are processed
           if(operation == Operation.CREATE) {
 
               // Get insertation info
               Struct struct = (Struct) sourceRecordChangeValue.get(AFTER);
               String type = struct.getString("type");
               String payload = struct.getString("payload");
 
               if ("GameCreated".equals(type)) {
                   try {
                       final JsonNode payloadJson = objectMapper.readValue(payload, JsonNode.class);
                       long id = payloadJson.get("id").asLong();
 
                       // Populate content to Kafka topic
                       movieEmitter.send(Record.of(id, payloadJson));
                   } catch (JsonProcessingException e) {
                       throw new IllegalArgumentException(e);
                   }
               }
           }
       }
   }
 
   void onStop(@Observes ShutdownEvent event) throws IOException {
       if (this.engine != null) {
           this.engine.close();
       }
   }
 
}

L'exécution

Cet exemple est autonome, vous n'avez donc pas besoin de démarrer quoi que ce soit car Quarkus le fera pour vous.

Panache et Kafka Connector s'intègrent à Quarkus DevServices. Pour cette raison, nous n'avons pas besoin de démarrer un cluster Kafka ou une base de données MySQL ni de les configurer car le mode Quarkus Dev s'occupera de tout. N'oubliez pas d'avoir un environnement d'exécution de conteneur fonctionnel sur vos ordinateurs, tel que Podman ou tout autre outil compatible OCI.

Avant d'exécuter l'application, nous ajouterons deux propriétés de configuration à l'application pour rendre les choses plus traçables ; dans le fichier application.properties, ajoutez les lignes suivantes :

quarkus.hibernate-orm.log.sql=true
quarkus.debezium-outbox.remove-after-insert=false

La première ligne enregistre les instructions SQL exécutées dans la base de données. Ceci est utile pour valider les deux tables (Movies et OutboxEvent) lorsqu'il insère des données.

Le second évite que Debezium supprime les données de la table outbox après leur consommation.

Dans une fenêtre de terminal, démarrez le service :

./mvnw clean quarkus:dev

…
2022-07-07 11:36:22,942 INFO  [io.deb.con.mys.MySqlStreamingChangeEventSource] (debezium-mysqlconnector-movies-mysql-db-server-change-event-source-coordinator) Waiting for keepalive thread to start
2022-07-07 11:36:22,948 INFO  [io.deb.con.mys.MySqlStreamingChangeEventSource] (debezium-mysqlconnector-movies-mysql-db-server-change-event-source-coordinator) Keepalive thread is running
2022-07-07 11:37:43,889 INFO  [org.acm.MovieResource] (executor-thread-1) New Movie inserted string

Après quelques secondes, un cluster Kafka, une instance MySQL et l'application sont opérationnels.

Inspectez les conteneurs en cours d'exécution pour valider les instances :

docker ps

CONTAINER ID   IMAGE                          COMMAND                  CREATED          STATUS          PORTS                                    
         
fa316bfae219   vectorized/redpanda:v21.11.3   "sh -c 'while [ ! -f…"   49 seconds ago   Up 45 seconds   8081-8082/tcp, 9644/tcp, 0.0.0.0:55002->9092/tcp

4c220f7ee066   mysql:8.0                      "docker-entrypoint.s…"   50 seconds ago   Up 46 seconds   33060/tcp, 0.0.0.0:60652->3306/tcp

e41cae02ff02   testcontainers/ryuk:0.3.3      "/app"                   53 seconds ago   Up 50 seconds   0.0.0.0:60650->8080/tcp


 

Le cluster Kafka s'exécute sur le port 55002 et MySQL avec l'identifiant (4c220f7ee066) sur le port 60652.

REMARQUE : Les ports et les identifiants peuvent être différents dans chaque cas.

Dans une autre fenêtre de terminal, exécutez la commande curl pour insérer un nouveau film.

curl -X 'POST' \
  'http://localhost:8080/movie' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "Minions: The Rise of Gru",
  "director": "Kyle Balda",
  "genre": "Animation"
}'

Inspectez la fenêtre du terminal Quarkus et observez les instructions SQL exécutées sur la base de données :

:)
Hibernate:
    select
        next_val as id_val
    from
        hibernate_sequence for update


Hibernate:
    update
        hibernate_sequence
    set
        next_val= ?
    where
        next_val=?

// Insert into Movie 

Hibernate:
    insert
    into
        Movie
        (director, genre, name, id)
    values
        (?, ?, ?, ?)

// Automatically OutboxEvent table receives an insert

Hibernate:
    insert
    into
        OutboxEvent
        (aggregatetype, aggregateid, type, timestamp, payload, tracingspancontext, id)
    values
        (?, ?, ?, ?, ?, ?, ?)

Pour valider que Debezium détecte le changement et le pousse vers le topic Movies Kafka, exécutez l'outil kcat pour interrogez un topic Kafka, en définissant le port exposé du service.

kcat -b localhost:55002 -C -t movies

{"id":1,"name":"Minions: The Rise of Gru","director":"Kyle Balda","genre":"Animation"}
% Reached end of topic movies [0] at offset 1

Conclusions

Nous avons implémenté une solution qui résout le problème de double écriture entre une base de données et un système externe en utilisant Debezium pour lire les journaux de transactions et déclencher un événement pour chaque modification.

Dans cet exemple, nous avons utilisé Debezium Embedded et nous avons implémenté la logique à exécuter lorsqu'un événement a été déclenché.

L'approche intégrée peut fonctionner dans certains scénarios, mais dans d'autres (en particulier dans les projets de friche industrielle ou les projets où vous avez besoin d'une évolutivité et d'une tolérance aux pannes élevées), Debezium Server pourrait mieux convenir. Avec Debezium Server (en tant que processus Kafka Connect), aucune modification de votre code n'est requise (pas de dépendances intégrées), car Debezium est un processus autonome se connectant à un journal des transactions de base de données, détectant les modifications et les envoyant à un topic Kafka. Étant donné que les événements sont ordonnés, n'importe quel système peut consommer ces modifications à partir du topic.

Bien qu'un pattern Outbox ne soit pas obligatoire lors de l'utilisation de Debezium (à la toute fin, Debezium peut écouter les changements dans n'importe quelle table), c'est une bonne pratique pour isoler vos données, et un pattern Outbox vous aide à cela.

Intégrer des architectures de (micro)services peut sembler facile au départ, mais lorsque vous commencez à intégrer des données, les choses deviennent plus complexes, et le projet Debezium est là pour vous aider dans cette tâche.

Le code source est disponible sur GitHub.

 

Au sujet de l’Auteur

Evaluer cet article

Pertinence
Style

Contenu Éducatif

BT