BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles High Load Trading Transaction Processing with Reveno CQRS/Event Sourcing Framework

High Load Trading Transaction Processing with Reveno CQRS/Event Sourcing Framework

Bookmarks

Transaction processing is everywhere nowadays, from individual retail web sites using relational databases for processing purchases, to real-time trading systems processing 100k+ orders per second.

Reveno is a new JVM based lock-free transaction processing framework based on CQRS and event-sourcing patterns. Although it’s a simple and powerful tool it does not compromise on performance. All transactions are persisted to read-only journals, and the latest state of the domain model can be restored by simply replaying these events in sequence. All runtime operations are performed in-memory so throughput can reach an order of millions of transactions per second, and mean latency in the order of microseconds. But with all of this power Reveno is still a general purpose framework, as it covers a variety of use cases with rich sets of engine configurations. For example, you can vary the durability configuration, from very relaxed (to gain additional throughput) to highly restrictive (where you have a low data loss tolerance.)

Requirements for the full range of use cases will of course vary greatly, and a general framework should take the full range of possibilities into account. In this article I’d like to provide an example of a simple trading system implementation using the Reveno framework.

At this point let's examine how Reveno deals with your domain model. As a CQRS based framework, Reveno separates your domain into transactional and query models. Neither of those models has any restrictions on declaration at all; there are no required annotations, base classes or even so much as a Serializable interface; just simple POJOs do the job.

There is no single approach that would serve every possible use case, and it is up to the application designer to determine how to handle rollbacks on the transactional model. Reveno provides two primary options for implementing objects in your domain – mutable and immutable. They are each handled differently under the hood, and each has its own pros and cons. Immutable objects are very cheap and synchronization-free in Java, and are quite in vogue of late. Reveno handles them very efficiently, but as with any immutable domain they are going to generate extra garbage. So unless some additional GC work is going to be a showstopper, it should be your default option. In contrast, the mutable model includes additional serialized snapshots of used objects during transaction execution, which can negatively affect performance. Fortunately, if you stick with the mutable model and need maximal performance and minimal GC impact, you can use an additional mutable model feature – “Compensating Actions”. Simply stated, these are manual rollback actions that you can implement along with your normal transaction handlers. For more detailed information, please refer to the official Reveno documentation page.

Now that we laid out some ground rules let’s get our hands dirty with a little code. Our artificial trading system will have a number of accounts, each of which can have zero or more orders. We must support maintenance actions such as account creation, order processing. In practice a system of this kind might be required to handle a decent workload, say from 10 to possibly 500k transactions per second or more. To complicate matters, it will be very sensitive to latency, and frequent large spikes can directly result in financial loss.

Installation

If you use any popular build tool like Maven, Gradle, Sbt, etc., you can add a Reveno dependency from Maven Central. For now, there are three available libraries:

  • reveno-core – includes all Reveno core packages, responsible for engine initialization, transaction processing, etc.
  • reveno-metrics – includes packages responsible for gathering metrics from the working engine and forwarding them to Graphite, Slf4j, etc.
  • reveno-cluster – makes it possible to run Reveno in cluster with Master-Slave architecture, thus providing failover capability.

You can refer to the full installation guide with examples on the Reveno Installation page.

Defining the transactional model

Let’s begin our development effort by defining our domain model. As we suggested earlier, we will build that up from simple POJOs. My personal preference is to opt for immutable objects, as they can greatly simplify things, they bypass any concurrency issues, and perhaps most importantly, they are highly performant in Reveno, since there is no need to preserve snapshots of accessed object. Reveno allows us to work with immutables out-of-the-box, (making it a good tutorial on how to cope with immutability generally in Java.)

Let’s first define an entity to represent a typical trading account in our system (instance variables are public here for simplicity, but in practice there is no restriction):

public class TradeAccount {
    public final long id;
    public final long balance;
    public final String currency;
    private final LongSet orders;

    public TradeAccount(long id, String currency) {
        this(id, 0, currency, new LongOpenHashSet());
    }

    private TradeAccount(long id, long balance, 
                         String currency, LongSet orders) {
        this.id = id;
        this.balance = balance;
        this.currency = currency;
        this.orders = orders;
    }

    public LongSet orders() {
        return new LongOpenHashSet(orders);
    }
}

As we can see, this class is fully immutable. But this kind of value object carries no functionality and suffers from a disease commonly known as “anemia”; it would be much better if our TradeAccount did something useful, say handling orders and monetary computations:

public class TradeAccount {
    public final long id;
    public final long balance;
    public final String currency;
    private final LongSet orders;

    public TradeAccount(long id, String currency) {
        this(id, 0, currency, new LongOpenHashSet());
    }

    private TradeAccount(long id, long balance, 
                         String currency, LongSet orders) {
        this.id = id;
        this.balance = balance;
        this.currency = currency;
        this.orders = orders;
    }

    public TradeAccount addBalance(long amount) {
        return new TradeAccount(id, balance + amount, currency, orders);
    }

    public TradeAccount addOrder(long orderId) {
        LongSet orders = new LongOpenHashSet(this.orders);
        orders.add(orderId);
        return new TradeAccount(id, balance, currency, orders);
    }

    public TradeAccount removeOrder(long orderId) {
        LongSet orders = new LongOpenHashSet(this.orders);
        orders.remove(orderId);
        return new TradeAccount(id, balance, currency, orders);
    }

    public LongCollection orders() {
        return new LongOpenHashSet(orders);
    }

}

Now it’s much more useful. Before moving to the actual transaction processing details, it’s worth mentioning how Reveno actually works with its transactional model. All entities are stored in a repository, which is accessible from all types of handlers (we will cover them in more detail shortly). Such entities refer to each other by IDs and are accessed from the repository by the ID. IDs are restricted to type long, due to internal performance optimizations.

The Order class is similar, and for brevity we will not show the source code here, but you can download the full demo source code on GitHub, and find additional links at the end of this article.

Defining the query model

So far we have explored how to build the Reveno transactional model. Logically, it is essential to define the query side as well. In Reveno, the queries are built by defining “views”, where each view represents some entity in the transaction model. Apart from designing your view classes, you should also provide mappers for each view type. We will cover them in more detail below.

When a transaction completes successfully, Reveno performs a mapping of changed entities, guaranteeing that views update in a happens-before fashion, before the command completion. By default in Reveno, the query model is in-memory. Let’s define a view for our TradingAccount class:

public class TradeAccountView {
    public final double balance;
    public final Set<OrderView> orders;

    public TradeAccountView(double balance, Set<OrderView> orders) {
        this.balance = balance;
        this.orders = orders;
    }
}

Our TradingAccountView class contains a collection of other views (in this case our OrderView), which will be very handy when it comes to supporting processes like querying, serialization, JSON conversion, etc. The Reveno mapper supports many useful methods that simplify the mapping of a collection of IDs to a collection of views, etc. We will see this in action shortly.

Defining commands and transaction actions

In order to perform any transaction in Reveno we must first execute a “command” object. A command object by itself can be a simple POJO, having a special handler registered in a system. Typically, commands are used to perform some aggregation and validation logic, using read-only access to the repository. But most importantly, commands are contractually obligated to dispatch “transaction actions” (also called “state mutators”).

Transaction actions are the component that make state changes on the domain model, and are executed using read-write access to the repository. By itself, a transaction action object can be a POJO with handlers registered in the system. All such actions are gathered together into a single atomic transaction, that is in the scope of the currently executing command. After successful execution, transaction actions are persisted to the underlying storage, and can be replayed after restart or on any kind of failure.

In our trading system we will need to create new trading accounts with some initial balance. As before, we start by defining a transaction command:

public class CreateAccount {
    public final String currency;
    public final double initialBalance;

    public CreateAccount(String currency, double initialBalance) {
        this.currency = currency;
        this.initialBalance = initialBalance;
    }

    public static class CreateAccountAction {
        public final CreateAccount info;
        public final long id;

        public CreateAccountAction(CreateAccount info, long id) {
            this.info = info;
            this.id = id;
        }
    }
}

Here we actually have two classes. CreateAccount is the command, and CreateAccountAction is the transaction action. This split is not required in general; for example if the command and transaction action data happen to match precisely, you can safely reuse the same class. But in our case, we are receiving monetary amounts as type double (say from some legacy terminal), but in our internal engine we store money values as longs, to provide perfect precision.

Now we can instantiate a Reveno engine and define command and transaction action handlers:

Reveno reveno = new Engine(pathToEngineFolder);

reveno.domain().command(CreateAccount.class, long.class, (c, ctx) -> {
    long accountId = ctx.id(TradeAccount.class);
    ctx.executeTxAction(new CreateAccount.CreateAccountAction(c, accountId));
    if (c.initialBalance > 0) {
        ctx.executeTxAction(new ChangeBalance(
                            accountId, toLong(c.initialBalance)));
    }
    return accountId;
});

reveno.domain().transactionAction(CreateAccount.CreateAccountAction.class,
                               (a, ctx) -> ctx.repo().store(a.id, 
                                new TradeAccount(a.id, a.info.currency)));

reveno.domain().transactionAction(ChangeBalance.class, 
                               (a, ctx) -> ctx.repo().
                                remap(a.accountId, TradeAccount.class, 
                               (id, e) -> e.addBalance(a.amount))
);

There’s a lot going on here, so let’s take a close look. First, we define a CreateAccount command handler, which generates a next account ID and executes a transaction action for the actual account creation. If an initial balance is defined, the ChangeBalance transaction action will also be executed. It’s worth noting that the ctx.executeTxAction call doesn’t block; all supplied transaction actions are executed in a single thread after the command handler successfully completes, so in the event a rollback is required in any TxAction handler, the changes they did are rolled back. (The actual rollback mechanism is based on the transaction model, etc.)

Mapping entities to Query model

Since our transactional and query models are segregated, we need to define mappers that transform an entity to its view side representation. There is no need to explicitly call them from our code, since Reveno automatically understands which entities in the repository are dirty and will call the appropriate ones. Let’s see how TradeAccount maps to TradeAccountView:

reveno.domain().viewMapper(TradeAccount.class, 
                           TradeAccountView.class, (id,e,r) ->
                            new TradeAccountView(fromLong(e.balance), 
                            r.linkSet(e.orders(), OrderView.class)));

where id is the identity of an entity; e is the entity; and r is a special mapping context with useful methods. The magic really happens at the r.linkSet(..) call. It lazily maps a collection of ID pointers to a collection of exact views.

We can define the Order -> OrderView mapping in the same fashion:

reveno.domain().viewMapper(Order.class, OrderView.class, (id,e,r) ->
        new OrderView(fromLong(e.price), e.size, e.symbol, 
                      r.get(TradeAccountView.class, e.accountId)));

As you may notice, our query model consists of immutable objects, like entities from the transactional model. It greatly simplifies the mapping logic. Again, this isn’t a restriction; but otherwise the entire responsibility for mapping correctness rests on you.

Executing commands

Transaction processing in Reveno is asynchronous by nature. When you execute a command on a running engine, the method call will immediately return a CompletableFuture, providing a result eventually. Internally Reveno has a “pipeline” with a number of stages, each of which handles its own thread. Passing objects one by one between them is extremely costly, and that’s where batch processing comes to play. Under high pressure Reveno handles batches on each stage, and this is what provides high throughput in first place.

After we are done with all of the declaration work and business logic implementation, we can start using the engine. First of all, we need to start it up:

reveno.startup();

After that we can create a new trading account in the system. You should note that there is also a synchronous version of the executeCommand() method, which is useful for testing or working with examples:

long accountId = reveno.executeSync(new CreateAccount("USD", 5.15));

In this case, Reveno will under the hood call the appropriate command and transaction action handlers, which in turn will create a new account with USD currency and an initial balance of $5.15. We can check for correctness as follows:

System.out.println(reveno.query().find(TradeAccountView.class, 
                                       accountId).balance);

This will print «5.15». To make this example more interesting, let’s add a new order to the account:

long orderId = reveno.executeSync(
                        new MakeOrder(accountId, "EUR/USD", 1, 1.213));

Here we create a new buy order for one EUR/USD lot with price 1.213. After that, we can again check our account for changes:

System.out.println(reveno.query().find(TradeAccountView.class, 
                                       accountId).orders.size());

This will print «1», since we now have one pending order on our account. Finally, let's close our order, which will result in opening one EUR/USD position and will decrement the balance by 1.213. The final balance should be 3.937:

reveno.executeSync(new ExecuteOrder(orderId));
// the balance is expected to be 3.937, after order successfully executed
System.out.println(reveno.query().find(TradeAccountView.class, 
                                       accountId).balance);

Persistence

As mentioned in our introduction, Reveno is first and foremost a transaction processing framework. You can safely restart your engine from the same directory and see the most recent state of your model. We had tried to make every part of it configurable, and this relates to durability as well. You can check available options by calling reveno.config().

Publishing events

Reveno has its own event processing subsystem. Your job is to define your event class (which might be a POJO) and handlers for it, and publish them from transaction actions. It is also important to note that an event will be published only after the command successfully executes, and they are completely asynchronous. All view mappings strictly happen-before event handler execution.

Event execution result is also persisted to storage, and if completed successfully, it would generally not be handled again on engine restart. But such behavior is not strictly guaranteed, so if you wish to make your handlers 100% duplication idempotent, you should check the EventMetadata.isReplay flag, which is provided in every event handler.

Let's extend our example by publishing and handling events on balance changes in a trade account. First of all, we should declare it with appropriate fields:

public class BalanceChangedEvent {
    public final long accountId;

    public BalanceChangedEvent(long accountId) {
        this.accountId = accountId;
    }
}

When the balance of any account changes, we only need to know the account ID, since in the handler we can query for the corresponding view. This is how we can declare an event handler in our case:

reveno.events().eventHandler(BalanceChangedEvent.class, (e, m) -> {
    TradeAccountView account = reveno.query().find(TradeAccountView.class, 
                                                   e.accountId);
    System.out.println(String.format(
                          "New balance of account %s from event is: %s", 
                        e.accountId, account.balance));
});

Accordingly, we should add another line to the ChangeBalance transaction action handler declaration:

reveno.domain().transactionAction(ChangeBalance.class, (a, ctx) -> {
        ctx.repo().remap(a.accountId, TradeAccount.class, 
         (id, e) -> e.addBalance(a.amount));
	// publish an event to all listeners
        ctx.eventBus().publishEvent(new BalanceChangedEvent(a.accountId));
});

Since the ChangeBalance transaction action is used in multiple commands, by adding that event publication, we will always receive events about it. You should also note that the publishEvent call returns immediately, and the event is published eventually. Ultimately, we will receive the following output:

New balance of account 1 from event is: 5.15

New balance of account 1 from event is: 3.937

Performance check

Now since we have everything up and running, it would be interesting to see what load our application can actually handle. Reveno comes with a very useful reveno-metrics library, which helps to keep track of performance metrics of a running engine. As with everything, our metrics library is optimized with off-heap memory and lock-free code, so it puts an extremely low footprint on overall performance. It also supports popular monitoring systems like Graphite.

(It should be noted that reveno-metrics is primarily a performance monitoring tool not a microbenchmarking framework. For an accurate benchmark result consider using JMH or similar tools.)

After initializing metric collection in our code to use the Reveno Slf4j sink and running the ChangeBalance command 45 million times (with warmup iterations) on a MacBook Pro 2.7 GHz i5 CPU:

  • reveno.instances.MAC-15_local.default.latency.mean: 68804
  • reveno.instances.MAC-15_local.default.latency.min: 775
  • reveno.instances.MAC-15_local.default.latency.max: 522265
  • reveno.instances.MAC-15_local.default.throughput.hits: 1183396

These numbers actually mean that we have 68 microseconds mean latency, 775 nanoseconds min and 522 microseconds max latency at overall throughput 1,183,396 transactions per second. Quite impressive results considering the amount of work done in the background and the level of durability.

Conclusion

Today Reveno is a young, but rapidly growing framework. You can visit our official website to learn more about it. We are always open to any kind of suggestion and feedback. You can also participate in our Google groups, apply bugs on the Issues page or write directly to mailto:support@reveno.org with private enquiries or anything else.

Full article demo is located on github. You can also find examples of Reveno usage (or pull request your own.)

About the Author

Artem Dmitriev is a software engineer at ad-tech startup GetIntent, delivering large-scale real-time bidding demand side platform. In recent years Artem has been working heavily on building high-load systems on the JVM. His background is the development of core engines for multi-market trading platforms, with tough latency and throughput requirements. Artem has a passion in Open-source software and its development. He welcomes feedback and can be reached at art.dm.ser@gmail.com.

Rate this Article

Adoption
Style

BT