BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Debezium and Quarkus: Change Data Capture Patterns to Avoid Dual-Writes Problems

Debezium and Quarkus: Change Data Capture Patterns to Avoid Dual-Writes Problems

Bookmarks

Key Takeaways

  • Dual writes is something to avoid as you may end up losing data because of concurrent writes
  • Debezium reads database transaction logs and sends an event for each new record
  • You can either use Debezium embedded in Java code or Debezium server as an external service
  • kcat is a tool to inspect Kafka topics

In part 2 of this series, we learned about the integration between Apache Kafka Streams and Quarkus, where we developed a simple application producing events to a Kafka topic and consuming and processing them in real-time with Kafka Streams.

In that example, we simulated a Movies streaming company. We stored Movies in one Kafka topic and, in another Kafka topic, we held each occurrence when a user stopped watching a movie and captured the time it had been played. We post-processed these events in real-time to count the number of times a movie is played for more than 10 minutes.

The following figure shows the architecture of the application:

But all the information was stored in Kafka topics. But it’s unlikely that this can happen in a real-world project.

Movie information is probably stored in a traditional database; some distributed cache to speed up queries, or movies are indexed in a search engine. But let’s keep things simple; let’s assume that movie information is stored in a database.

This raises the question of how we can maintain the same data in two different systems, in the database as the primary place and in the Kafka movies topic to process data with Kafka Streams.

This article will teach you how to have the same data in different forms correctly.

Dual Writes

The first thing that may come to mind to fix this problem is the dual writes approach. It’s a straightforward approach as it’s the responsibility of your application code to maintain data in all the places. For example, an insert of a new Movie should execute an insert to the database and fire an event to the Kafka topic.

In terms of code, this could be something like:

@Channel("movies")
Emitter<Record<Long, String>> movieEmitter;
 
private static ObjectMapper objectMapper = new ObjectMapper();
 
public Movie dualWriteInsert(Movie movie) throws JsonProcessingException {
    // Inserts to DB
    movie.persist();
 
    // Send an event to movies topic
    final String payloadJson = objectMapper.writeValueAsString(movie);
    long id = movie.id;
 
    movieEmitter.send(Record.of(id, payloadJson));
   

This looks correct, is easy to implement, and works until you start getting some weird issues if you try it. Let’s explore some of them:

  • If data is persisted in the database but fails when it’s sent to the Kafka topic, you could wrap both operations in a transaction block. This can fix the transaction problem because there will be a rollback in case of an error. You are paying a big price in performance; the bigger the transaction scope, the more time you block the database. This isn’t fixing concurrency.

  • What happens if two concurrent users want to update the same Movie entry simultaneously? It could happen that the execution of the first request updates the database and sends the event to Kafka altogether, and then the execution of the second request updates the Movie again. In this case, the database and the Kafka topic content are aligned. But what happens if the execution of the first request only persists to the database operation, then the second request runs the persist and the send event to Kafka. After that, the first request sends the event to the Kafka topic. At this time, database Movie data and Kafka topic data diverged to have different values, leading to inconsistencies between data. Of course, you could synchronize the whole method, but this would mean a huge performance loss.

This last problem occurs because of the nature of mixing different systems; a database transaction ensures within its persistence layer but not between systems.

2-Phase Commit

One possible solution to this problem is to use the 2-Phase Commit protocol. Although this could be a good solution, two problems are present:

  • First of all, not all system supports distributed transactions, and the 2-Phase Commit
  • This protocol has issues because of the communication between all the parties required for coordination purposes.

It is a possible solution, but it’s not a generic solution, and for this specific case, Apache Kafka isn’t supporting distributed transactions, so let’s explore another solution.

Change Data Capture

Change Data Capture (CDC) is a pattern used to track data that has changed (i.e., new entries added, updated registries, etc.) and trigger an event, making it possible for the application to react to the change.

There are several ways for implementing CDC, for example, using timestamps, versions, or status indicators at the row level, so you periodically check the elements from one specific point (i.e., SELECT all elements WHERE status=not_read). But this approach has the drawback that you are regularly accessing the database for no business purposes or dealing with the deletions of the entries.

Another option is using database triggers, i.e., any change triggers an event and stores it in a specific event table. It works; you can capture any event, but you are still periodically polling the database.

Most databases have a transaction log that records all changes made to the database. Log scanners scan this log and capture any change in a non-intrusive way. The benefits of this approach are:

  • Minimal impact in the database.
  • Changes are transparent to the application; inserts in special columns are unnecessary.
  • Transactional integrity
  • No changes to the database schema

Log scanners are the best approach, and one of the most popular open-source projects is Debezium.

Debezium

Debezium is an open-source project for change data capture using the log scanner approach. Start the database, and configure Debezium to consume data from the transaction log of that database. At this point, for every insert, delete, or update committed to the database, Debezium will trigger an event so an application can register to it and react accordingly.

But why do Debezium, CDC, and Kafka help us fix the problem of dual-writes? An Apache Kafka topic is composed of one or more partitions. Each partition orders the events in the arriving order (events are always appended at the end of the partition). So if we want to maintain the order of concurrent operations (to avoid having misplaced data between systems), Kafka's topic resolves this part of the problem.

But of course, we still have the other part, reading from a database in the correct order in the case of concurrent operations. The CDC and log scanner approaches assure that the contents are in the correct order after transaction commitment and are non-intrusive. Debezium makes this possible.

You can operate Debezium in two different ways, and both are valid depending on the use case. These two methods are the Debezium Server or Debezium Engine (embed).

Debezium Server

Debezium Server runs Debezium as a Kafka Connect instance. Kafka Connect is a standalone process started by a consumer and/or producer to read data from Kafka. Kafka Connect defines connectors to different data systems and then moves large data sets into and out of Kafka. Since connectors use the Kafka API, they are scalable, fault-tolerant, and with low latency.

Suppose the following example; you want to export content from one Kafka topic to an index engine like ElasticSearch. You have two options:

  • Create an application (like the one we saw in part 1 of this series) using the Kafka API to read events from a Kafka topic and then use the ElasticSearch client to populate data to the index.
  • Use ElasticSearch Kafka Connect, which already implements all this logic, and you only need to configure and start.

Debezium does the same, but reads the transaction log from a database and sends the content to a Kafka topic.

One of the great things about Debezium is that it can connect to several databases such as MySQL, MongoDB, PostgreSQL, Oracle DB, SQL Server, DB 2, Cassandra and Vitesse.

Debezium Engine

The usual way to run Debezium is through Debezium Server, as it’s not intrusive to the application; it’s a service that takes data changes and populates a Kafka topic.

But not all applications require the same level of fault tolerance or scalability offered by Kafka Connect. Also, sometimes the application must capture the data change event but execute some custom logic and not send the change to a messaging system or an unsupported messaging system.

In these cases, a debezium-api module defines a small API to embed the Debezium Engine in the application.

So far, we’ve learned dual writes are something to avoid. The solution uses Change Data Capture to get data directly from the transaction log and push it to a Kafka topic so any other system can consume it in a “transactional” way and order.

Outbox Pattern 

If you arrived at this point, you might wonder: “OK nice, I can use CDC to react to data changes, but I’m exposing the internal entity to external systems.” While this  is true, allow me to introduce you to the  Outbox Pattern  to avoid this problem.

The Outbox Pattern provides an outbox table where you record all entities' operations (maybe with denormalized data). Then the CDC system (Debezium in our case) reacts to changes placed  in the outbox table and not the entity table making the data model isolated from other systems:

The important part you need to be aware of is that both entity modifications and the outbox must be within the same transaction.

Let’s start putting all these pieces together in a Quarkus project and fix the problem we introduced at the beginning, how to insert a movie in the database and also populate it into an external system (Kafka topic).

Movie Plays Debezium

Instead of handcrafting code for each use case, let’s see how to use Debezium Embedded and how it’s integrated with Quarkus to solve this problem.

Creating the Project

Navigate to the Quarkus start page and select RestEasy Reactive and RestEasy Reactive Jackson extensions for marshaling/unmarshaling events from/to JSON-Java Object-Byte Array and implement JAX-RS endpoints, Panache and MySQL driver to insert movies into the MySQL database, and the SmallRye Reactive Messaging for interacting with Kafka. Also, uncheck the Started Code generation option.

In the following screenshot, you can see it:

You can skip this manual step and navigate to the Quarkus Generator link, where all the dependencies are selected. Then push the Generate your application button to download the scaffolded application’s zip file.

Unzip the file and open the project in your favorite IDE.

Development

Before we start to code, we need to add two new dependencies: one for using the Debezium Engine and another for adding the Debezium Quarkus Outbox extension.

Debezium Engine

Open the pom.xml file and add the following dependencies.

In the dependencyManagement section:

<dependency>
   <groupId>io.debezium</groupId>
   <artifactId>debezium-bom</artifactId>
   <version>1.9.4.Final</version>
   <type>pom</type>
   <scope>import</scope>
</dependency>

In the dependencies section:

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-ddl-parser</artifactId>
</dependency>
<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-embedded</artifactId>
</dependency>
<!-- We connect to a MySQL database, so we need debezium MySQL connector -->
<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-connector-mysql</artifactId>
</dependency>

This is to use the Debezium Engine embedded in the application. None of these dependencies would be required if we used Debezium Server since it’s a standalone service.

Debezium Quarkus Outbox 

Quarkus integrates with the Outbox Pattern through the Debezium Quarkus Outbox extension. 

Open the pom.xml file and add the following dependencies.

In the dependencyManagement section:

<dependency>
   <groupId>${quarkus.platform.group-id}</groupId>
   <artifactId>quarkus-debezium-bom</artifactId>
   <version>${quarkus.platform.version}</version>
   <type>pom</type>
   <scope>import</scope>
 </dependency>

Note that the version of the BOM is aligned with the Quarkus version, 2.10.1.Final in this case.

In the dependencies section:

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-quarkus-outbox</artifactId>
</dependency>

Implementation

You can choose not to use the Outbox Pattern or implement it yourself; in this case, none of these dependencies are required. But we’ll use it to simplify the development.

With all these dependencies in place, create the Movie entity annotated with JPA annotations and extend the PanacheEntity class:

import javax.persistence.Entity;
 
import io.quarkus.hibernate.orm.panache.PanacheEntity;
 
@Entity
public class Movie extends PanacheEntity {
  
   // No worries Quarkus will change them
   // to private and auto-generate getters/setters at compilation time
   public String name;
   public String director;
   public String genre;
 
}

The next step is to create an HTTP endpoint to insert the movie content into the database using JAX-RS annotations:

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
 
import org.jboss.logging.Logger;
 
@Path("/movie")
public class MovieResource {
 
   // Service to insert the movie data into Movie and Outbox tables
   @Inject
   MovieService movieService;
 
   // Injects the logger
   @Inject
   Logger logger;
 
   // Http Post method to insert a movie
   @POST
   public Movie insert(Movie movie) {
       logger.info("New Movie inserted " + movie.name);
       System.out.println(":)");
      
       return movieService.insertMovie(movie);
   }
}

Since we are using the Debezium Quarkus Outbox extension, we need to create an entity representing the content stored in the outbox table. The entity must implement the ExportedEvent interface and implement the required methods to identify the kind of event put in the outbox table.

import java.time.Instant;
 
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
 
import io.debezium.outbox.quarkus.ExportedEvent;
 
public class MovieEvent implements ExportedEvent<String, JsonNode> {
 
   private static ObjectMapper mapper = new ObjectMapper();
 
   // Set the type enclosed inside the event
   private static final String TYPE = "Movie";
   // Set the event type
   private static final String EVENT_TYPE = "MovieCreated";
 
   private final long gameId;
   private final JsonNode jsonNode;
   private final Instant timestamp;
 
   // Saves Game info in the class
   public MovieEvent(Movie movie) {
       this.gameId = movie.id;
       this.timestamp = Instant.now();
       // Saves game content in a string column in JSON format
       this.jsonNode = convertToJson(movie);
   }
 
   @Override
   public String getAggregateId() {
       return String.valueOf(this.gameId);
   }
 
   @Override
   public String getAggregateType() {
       return TYPE;
   }
 
   @Override
   public JsonNode getPayload() {
       return jsonNode;
   }
 
   @Override
   public Instant getTimestamp() {
       return timestamp;
   }
 
   @Override
   public String getType() {
       return EVENT_TYPE;
   }
  
   private JsonNode convertToJson(Movie movie) {
       ObjectNode asJson = mapper.createObjectNode()
               .put("id", movie.id)
               .put("name", movie.name)
               .put("director", movie.director)
               .put("genre", movie.genre);
      
       return asJson;
   }
 
}

The last step before adding Debezium logic to the code is to implement the MovieService class with insert logic. This logic should persist the movie into the Movie table and the MovieEvent entity into a table managed by the OutboxEvent table extension.

The extension provides a specific CDI event to persist an event that implements the ExportedEvent interface. The only thing to do is fire an event, and the data is automatically persisted.

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.transaction.Transactional;
 
import io.debezium.outbox.quarkus.ExportedEvent;
 
@ApplicationScoped
public class MovieService {
  
   // CDI event interface triggering Outbox entities
   @Inject
   Event<ExportedEvent<?, ?>> event;
 
   // Transaction method
   @Transactional
   public Movie insertMovie(Movie movie) {
 
       // Persists data
       movie.persist();
      
       // Persists outbox content
       event.fire(new MovieEvent(movie));
      
       return movie;
   }
}

The last step is configuring Debezium Engine and to start it embedded within the application. 

To configure the engine, you need to set the database information (hostname, port, credentials), and the database and tables Debezium should monitor to trigger events.

import java.io.File;
import java.io.IOException;
 
import javax.enterprise.inject.Produces;
 
import org.eclipse.microprofile.config.inject.ConfigProperty;
 
import io.debezium.config.Configuration;
 
public class DebeziumConfiguration {
  
   // Debezium needs Database URL and credentials to login and
   // monitor transaction logs
   @ConfigProperty(name = "quarkus.datasource.jdbc.url")
   String url;
 
   @ConfigProperty(name = "quarkus.datasource.password")
   String password;
 
   @ConfigProperty(name = "quarkus.datasource.username")
   String username;
 
   @Produces
   public Configuration configureDebezium() throws IOException {
 
       // Custom class to get database name or hostname of Database server
       MySqlJdbcParser jdbcParser = MySqlJdbcParser.parse(url);
      
       File fileOffset = File.createTempFile("offset", ".dat");
       File fileDbHistory = File.createTempFile("dbhistory", ".dat");
 
       return io.debezium.config.Configuration.create()
           .with("name", "movies-mysql-connector")
           // configures MySQL connector
           .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
           .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
           .with("offset.storage.file.filename", fileOffset.getAbsolutePath())
           .with("offset.flush.interval.ms", "60000")
           // Configures database location
           .with("database.hostname", jdbcParser.getHost())
           .with("database.port", jdbcParser.getPort())
           .with("database.user", "root")
           .with("database.allowPublicKeyRetrieval", "true")
           .with("database.password", password)
           .with("database.dbname", jdbcParser.getDatabase())
           .with("database.include.list", jdbcParser.getDatabase())
           // Debezium only sends events for the modifications of OutboxEvent table and not all tables
           .with("table.include.list", jdbcParser.getDatabase() + ".OutboxEvent")
           .with("include.schema.changes", "false")
           .with("database.server.id", "10181")
           .with("database.server.name", "movies-mysql-db-server")
           .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
           .with("database.history.file.filename", fileDbHistory.getAbsolutePath())
       .build();
   }
 
}

The DebeziumListener CDI class starts Debezium when the application is up and running.

Debezium Engine doesn’t run in a separate thread, so we need to provide a thread to run in parallel, not blocking the application thread. Using the ManagedExecutor is the correct way to provide an executor thread within Quarkus to run Debezium.

Then we need to instantiate the Debezium Engine using the DebeziumEngine class, setting the configuration properties created in the previous step.  One of the most important steps is registering a method triggered every time Debezium generates an event. The notifying method registers this custom method, and in our example, we named it handleChangeEvent.

This method receives the event and we can implement any logic we wish, from sending to a Kafka topic, manipulating and sending it to another service, anything you can implement in Java.

import java.io.IOException;
 
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
 
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
 
import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.kafka.Record;
 
import static io.debezium.data.Envelope.FieldName.*;
import static io.debezium.data.Envelope.Operation;
 
@ApplicationScoped
public class DebeziumListener {
  
   private static ObjectMapper objectMapper = new ObjectMapper();
 
   // Start the Debezium engine in a different thread
   ManagedExecutor executor;
 
   // Debezium configuration object
   Configuration configuration;
 
   private DebeziumEngine<RecordChangeEvent<SourceRecord>> engine;
 
   public DebeziumListener(ManagedExecutor executor, Configuration configuration) {
       this.executor = executor;
       this.configuration = configuration;
   }
 
   // Interface to send events to movies Kafka topic
   @Channel("movies")
   Emitter<Record<Long, JsonNode>> movieEmitter;
 
   void onStart(@Observes StartupEvent event) {
 
       // Configures Debezium engine
       this.engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
           .using(this.configuration.asProperties())
           // For each event triggered by Debezium, the handleChangeEvnt method is called
           .notifying(this::handleChangeEvent)
           .build();
 
       // Starts Debezium in different thread
       this.executor.execute(this.engine);
   }
 
   void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
 
       // For each triggered event, we get the information
       SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
       Struct sourceRecordChangeValue= (Struct) sourceRecord.value();
 
       if (sourceRecordChangeValue != null) {
           Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
 
           // Only insert operations are processed
           if(operation == Operation.CREATE) {
 
               // Get insertation info
               Struct struct = (Struct) sourceRecordChangeValue.get(AFTER);
               String type = struct.getString("type");
               String payload = struct.getString("payload");
 
               if ("GameCreated".equals(type)) {
                   try {
                       final JsonNode payloadJson = objectMapper.readValue(payload, JsonNode.class);
                       long id = payloadJson.get("id").asLong();
 
                       // Populate content to Kafka topic
                       movieEmitter.send(Record.of(id, payloadJson));
                   } catch (JsonProcessingException e) {
                       throw new IllegalArgumentException(e);
                   }
               }
           }
       }
   }
 
   void onStop(@Observes ShutdownEvent event) throws IOException {
       if (this.engine != null) {
           this.engine.close();
       }
   }
 
}

Running

This example is self-contained, so you don’t need to start anything as Quarkus will do it for you.

Panache and Kafka Connector integrate with Quarkus DevServices. For this reason, we don’t need to start a Kafka cluster or a MySQL database nor configure them as Quarkus Dev mode will take care of everything. Remember to have a working container runtime on your computers, such as Podman or any other OCI-compliant tool. 

Before running the application, we’ll add two configuration properties to the application to make things more traceable; in the application.properties file, add the following lines:

quarkus.hibernate-orm.log.sql=true
quarkus.debezium-outbox.remove-after-insert=false

The first line logs SQL statements executed to the database. This is useful to validate both tables (Movies and OutboxEvent) when it inserts data.

The second one avoids Debezium deleting data from the outbox table after it’s consumed.

In one terminal window, start the service:

./mvnw clean quarkus:dev

…
2022-07-07 11:36:22,942 INFO  [io.deb.con.mys.MySqlStreamingChangeEventSource] (debezium-mysqlconnector-movies-mysql-db-server-change-event-source-coordinator) Waiting for keepalive thread to start
2022-07-07 11:36:22,948 INFO  [io.deb.con.mys.MySqlStreamingChangeEventSource] (debezium-mysqlconnector-movies-mysql-db-server-change-event-source-coordinator) Keepalive thread is running
2022-07-07 11:37:43,889 INFO  [org.acm.MovieResource] (executor-thread-1) New Movie inserted string

After a few seconds, a Kafka cluster, MySQL instance, and the application are up and running.

Inspect the running containers to validate instances:

docker ps

CONTAINER ID   IMAGE                          COMMAND                  CREATED          STATUS          PORTS                                    
         
fa316bfae219   vectorized/redpanda:v21.11.3   "sh -c 'while [ ! -f…"   49 seconds ago   Up 45 seconds   8081-8082/tcp, 9644/tcp, 0.0.0.0:55002->9092/tcp

4c220f7ee066   mysql:8.0                      "docker-entrypoint.s…"   50 seconds ago   Up 46 seconds   33060/tcp, 0.0.0.0:60652->3306/tcp

e41cae02ff02   testcontainers/ryuk:0.3.3      "/app"                   53 seconds ago   Up 50 seconds   0.0.0.0:60650->8080/tcp


 

Kafka cluster runs at port 55002 and MySQL with id (4c220f7ee066) at port 60652.

NOTE: Ports and IDs might be different in each case.

In another terminal window, run the curl command to insert a new Movie.

curl -X 'POST' \
  'http://localhost:8080/movie' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "Minions: The Rise of Gru",
  "director": "Kyle Balda",
  "genre": "Animation"
}'

Inspect the Quarkus terminal window and see the SQL statements run against the database:

:)
Hibernate:
    select
        next_val as id_val
    from
        hibernate_sequence for update


Hibernate:
    update
        hibernate_sequence
    set
        next_val= ?
    where
        next_val=?

// Insert into Movie 

Hibernate:
    insert
    into
        Movie
        (director, genre, name, id)
    values
        (?, ?, ?, ?)

// Automatically OutboxEvent table receives an insert

Hibernate:
    insert
    into
        OutboxEvent
        (aggregatetype, aggregateid, type, timestamp, payload, tracingspancontext, id)
    values
        (?, ?, ?, ?, ?, ?, ?)

To validate that Debezium detects the change and pushes it to the Movies Kafka topic, run the kcat tool to query a Kafka topic, setting the exposed port of the service.

kcat -b localhost:55002 -C -t movies

{"id":1,"name":"Minions: The Rise of Gru","director":"Kyle Balda","genre":"Animation"}
% Reached end of topic movies [0] at offset 1

Conclusions

We’ve implemented a solution that fixes the dual writes problem between a database and an external system by using Debezium to read transaction logs and trigger an event for every change.

In this example, we used Debezium Embedded, and we implemented the logic to execute when an event was fired. 

The embedded approach might work in some scenarios, but in others (especially in brownfield projects or projects where you require high scalability and fault-tolerance), Debezium Server might suit better. With Debezium Server (as a Kafka Connect process), no change in your code is required (no embed dependencies), as Debezium is a standalone process connecting to a database transaction log, detecting the changes, and sending them to a Kafka topic. Since events are ordered, any system can consume these changes from the topic.

Although an Outbox Pattern is not mandatory when using Debezium (at the very end, Debezium can listen for changes in any table), it’s a good practice to isolate your data, and an Outbox Pattern helps you with this.

Integrating (micro)service architectures might seem easy initially, but when you start integrating data, things become more complex, and the Debezium project is here to help you with this task.

Source code is available on GitHub.

About the Author

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

  • Interesting concept but is it really necessary?

    by Michael Segel,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    CDC is both a pattern and an implementation.
    CDC exists today in most databases. (Greenplum and HBase* do not support it, but my search was not extensive.)

    CDC is essentially a post transaction trigger that calls a stored procedure to update a backing table / log table to capture the changes made by the transaction.

    In your example,
    You could persist your information first to the database which has CDC enabled for that table. This just writes the data into an internal table/log. Then you would have an external listener capture the event, and then write it to your stream. You will always have two writes. The issue is if they are concurrent or cascading.

    Conversely you could have first written to the stream, and then made the database a consumer of the data on the stream. Again cascading writes.

    I'm not sure of the need for Debezium.
    So what am I missing?

    Thx

    *Hbase writes to the WAL, but this is bypassed in some operations. Its possible to write a Co-procesor to implement CDC, but why? HBase should just die a swift death... ;-)

  • What's the message of the blogentry?

    by Sebastian Morkisch,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    I read this entry with great interest. I liked your style of concise description. Also a plus are the admittedly few, but existing sources you rely on.
    Unfortunately, you left the topic on CDC with Kafka too soon.

    Could you clarify what log(s) the debezium software is trying to read? Is it the transaction/query/binary log?

    This debezium software seems to introduce a huge security risk and possibly violates privacy policies as it duplicates sensitive data on multiple systems that would only have lived on Kafka, one backend, processing business logic and a DB. This approach implies further audits, hardening and a lot of time to write a protection requirement statement.

    I see a possible need for CDC but don't see why debezium should simplify everything. The key message wasn't transported, or not synchronized with my mind. ;-)

  • Re: What's the message of the blogentry?

    by Alex Soto,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Debezium uses transaction logs to read changes on DB and translate them to Kafka. Of course, you are duplicating information, but sometimes is required; depending on your use case, you might be able to do it or not, in the end, it depends on the requirements

  • Re: Interesting concept but is it really necessary?

    by Alex Soto,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Basically you've got two big advantages; the first one, Debezium reads from the transaction log, which means the change is propagated when no error in the DB. The second advantage is that Debezium uses Kafka Connect helping on making this part redundant, scalable, and resilient, ... So can you implement what you mention here? Yes, but you'll end up implementing Debezium

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT