Bio Greg Young is co-founder and CTO of IMIS, a stock market analytics firm in Vancouver BC. With over 10 years of varied experience in computer science from embedded operating systems to business systems and everything in between, he brings a pragmatic and often times unusual viewpoint to discussions.
1. Hi, I am Eric Evans and I am here with Greg Young who has been doing some very interesting work involving applying Domain Driven Design on an application that has very large transaction binds. So I wanted to know more about what they did and what it led to. Greg can give us a quick summary of what you did and the problems you encountered with the solutions?
Sure, I am with a company called IMIS at Vancouver BC and we work with stock market analytic data. We are connected for instance to Toronto, we are connected to New York and we are connected to NASDAQ. Each of these systems sends us thousands of messages per second, which we then take and bring into our domain. Once we have them in our domain, we model them with Domain Driven Design; the purpose of our modeling is so that we can do things like algorithmic trading, we can back-test new strategies with our data model and we can also run long term analytics on all this data. Our main problems that we run into are simply because we have so much data coming into our domain. I think the largest system I had seen done with Domain Driven Design was maybe processing a hundred transactions per second. We are more in the neighbor of tens of thousands of transactions per second, so looking at our system we decided to make a few subtle changes, within Domain Driven Design, in particular how objects interact with their repositories as compared to the way it was done previously.
2. Most people think of a repository as this set of abstraction of a collection and objects live there. We go there and ask for an object, we have object state change and the repository puts the new state in. But you found that you had to do it a different way.
Right. I still think in general we follow the repository pattern; what we've actually done is we have made objects responsible for tracking our in-state changes, so I would use an example of an order in our system. We would actually say to the order, "give me your state changes" and the order would return "OK, this is what has been changed since it has been opened". We can then take the state change from the repository and then push it through a pipeline or directly process it to a database.
Yes absolutely, and we also still go, one repository per aggregate root, so when you actually make the call for state changes to an aggregate root, it will actually crawl the children and they become a single unit of work at the repository.
Some of these state changes might be orders coming in or out of the system; we also have a lot of different pieces of data that are aggregated at our aggregate roots. For instance, if I have a symbol, I have a value on my symbol that represents my last trade for that symbol; I might also have my order books and derived from my order books I also have my best bid and my best ask which constantly move on either side of the last trade. In order to build back all the information we don't persist it down, we persist down to delta changes within the aggregate root so that we can then rebuild it.
Something similar, yes.
We will originally come through the order create message, on the order create message we will locate the symbol for the order, then we will add it to the appropriate order book for the order and the order would sit there. Eventually somebody would want to trade with that order so what we actually get across is a message that says that this trade has occurred. When we get that message that say the trade has occurred, we actually go find the order in the order book that was represented within the trade and it generates out a removal of volume from that order. That is a separate message. When we actually go to broker, to dispatch, messages to the rest of the system we actually have a separate message that represents the volume move. Part of the reason that we actually have to have a separate message is that many extensions support what is called iceberg which is a concept of "I am only showing that my order has 2000 shares in it, but hidden I am actually selling 20000, so 2000 will trade and I am told that I still have 2000 left".
Once we actually get a removed traded volume - if it comes down to the point where there is none left, we're told that this order traded and now you have zero left - that actually forces removal of the order from the order book. There are other cases where orders can be removed as well; they put up an order and then decide to cancel it. For us to actually snapshot down the entire order book every time one of these things happens, would be a vast amount of data. Big symbols, somebody like Google. They are going to have thousands of orders on either side of their order book, so you really need to move these across as delta base changes.
8. Basically your messages, coming from a bounded context, and being kind of transformed by an anticorruption layer and then picked up by some process in another context, which either subscribes to that message or perhaps is the receiving end of a pipeline.
Correct, and the main determination that we use is bounded context itself. How much data out of the core domain does it really need? Is it something that only needs to listen to very few state changes? Or is it something that really needs to listen to a majority of the state changes. A great example of something that really needs to listen to a majority of the state changes would be our analytic databases. They chase a near real-time updating a persistence model of de-normalized data which is then used for reporting purposes in other areas. We do have inconsistencies; more often they are flatly de-normalizations like writing up quotes or moving standard deviations or moving averages, the types of information that you really want to have in other areas of the system for analytic purposes.
9. In your case the requirements and the nature of the domain actually makes these delta based objects. Those are actually central concepts. It isn't just like "Ok I had an order and it was for 1000 shares and now it's just for 500 shares." The fact that I executed a trade and that caused a 500 share reduction is an important concept in its own right that you are making explicit.
Correct, in your own words, we are taking implicit items and we are making them explicit. In most domains when you actually go to save with the repository, the state changes that are associated with the object in the time that you had it, are implicitly represented down to the database when it does a comparison between what the object was and what the object is now. We have simply gone through and made those state changes explicit and part of our ubiquitous language. So when we talk about objects changing within a domain, we just don't have a change which is then persisted. We have typed changes, and changes always occur in groupings. As an example we have a very specific message for when a best bid or best ask changes. And we actually send the two down together. We also have specific messages for when trades occur and the associated behaviors that can occur with them out of the state transition.
10. So I can see how this is clarifying in the model itself and it might have potentially evolved as a domain model without the constraint of the high volume. But tell me something about the implications for this on your ability to scale.
Well first scaling is very, very, important. Needless to say that there are a very few people in the world that run tens of thousands of transactions on a single machine. We also have very high availability requirements because if we crash and we have money in the market, it's not a good position to be in. Hopefully it goes the way we want it too. So what we actually do with these state transitions that we have, we have a framework underneath them which allows us to either run pipelines or allows us to run almost peer to peer, publish-subscribe patterns. We have actually gone through our domain and broken it down at bounded contexts and we asynchronously map through the bounded contexts as opposed to our bounded contexts coming back to our root domain.
The data in the model that we stored in is drastically different, often time words have different meanings in the other context.
Actually it is a little bit backwards, the anti corruption layer is actually done in our message handlers on the other end of the pipeline. So the anti corruption layer, instead of living within the core domain, actually lives within the bounded context.
Within the receiving domain. Because our state transitions are part of the ubiquitous language and are part of our core domain, that is what the bounded context actually interacts with. It doesn't know of any of our domain objects. Those state changes have actually become part of our domain language.
A publish-subscribe would be very good for instance within a buying application. An application that lets say watches the stock prices, or watches order books, and is looking for a good situation to jump in, or a situation which says the stock ahead, you really need to get right now. The reason the publish-subscribe works so well there is because we have a distributed number of servers. When using publish-subscribe the context is actually subscribing to many different instances of the domain and each different instance is partitioned to hold different pieces of data. The context itself is oblivious to where it is getting the data from.
No, that probably wouldn't work too well. What we actually tend the partition on right; we do two things. We do things at a symbol level because order books are always in one symbol and they are very manageable partitioning plan for us. The other thing we have done in the past is to actually start portioning based on company so that we have all the data for a related company localized into a single domain because we tend to interact. Let's say Nortel has options and Nortel has the acuity.
And what we do is we can not only partition data across many servers and build a cluster of our domain, but we also mirror our clusters. When we mirror our clusters, we actually just simply partition based on other analysis. Where as Nkey and Microsoft live on server 1 in the first cluster, we guarantee they will not live together on the same server in the second cluster. So that way even if you were to lose on two servers, we will actually lose only one symbol.
17. Now, this is a pretty interesting example of where the ability to partition across different servers, the understanding of the domain is critical, and the fact that you have modeled these things in ways that allow you to take a company, to know that you can safely take everything of one company and separate from another company's is basically domain knowledge.
Absolutely, we have found in pretty much everything that we have done that your partitioning boundaries are almost always on your aggregate roots. And when you actually start coming down to your data models, you can actually start to separate your data models based upon your aggregate roots as well. So in our scaling as far as partitioning and mirroring is concerned, it has really been the aggregate roots that let us down to whatever root we took.
18. Yeah I have seen this. Aggregate boundaries seem to cover several different issues. The one that is talked about in the book most is the transaction boundaries and determining what has to be synchronously kept in sync according to the rules. They make good boundaries for partitioning and distribution.
The other thing which we have noticed with aggregate roots is whether or not something lives in an aggregate root is actually me explicitly saying that it should belong to the same data model as the aggregate root. Often times we have shared data which is shared between many aggregate roots, which we would deliberately put a service over and have the aggregate root actually talk to the service when it needs to. And the reason that we do this is so when our backend data model, we can take that separate piece of data which is being used in many aggregate roots and actually pull out its own data storage to share between them. We then use a database per aggregate root or a set of databases which are partitioned on the aggregate root.
Oh yeah actually we do. Basically what we have done is build an interface on to our domain objects, which is "give me your state changes". Now an aggregate root is responsible for not only giving its own state changes but giving the state changes of its children as well. These changes get pulled together and form a single unit of work. We are using them to define transaction boundaries within our messages.
At times you will actually have to do an entire walk through the aggregate. At other times, depending where you place behaviors, you can actually do like a subscribed method, where the aggregate listens for the changes underneath so it doesn't have to do a full walk of all its children.
21. If you were going to give people a couple of takeaways of what you have learned from this, what would the critical insights that allowed this scaling to tens of thousands of transactions per second, distribution over many servers, etc. But what are some of its takeaways?
Probably the biggest one would be that when you look at your bounded contexts, they need not be synchronous and they need not always ask the domain for its data; there is nothing wrong with me pushing data from the domain in a momentarily inconsistent way to the anticorruption layer which then saves it on the other side and the other side has now its own data model. A perfect example would be the reporting; it's perfectly ok if I recognize that I may be 3 seconds behind my production system. My domain might be a few seconds ahead of my reporting model.
Or the analytics system reporting is the prime example. To give you an idea, we often have times where we watch a single SQL server hitting 800 writes per second. Just of one connection doing de-normalizations. Now what happens when he gets 2000 writes per second? He'll get them done, but it's going to take him a few seconds. He's going momentarily to fall behind, but in general he stays within a few seconds of what our production domain is actually doing. Now, in this case, we have brought together a real-time de-normalization system. This exists in nearly every system today. Everybody wants reports and you should never report off your transactional model. Everybody has this other model that sits there and in this other model they store data in completely different ways and things have different meanings. By setting up this asynchronous pipeline they can actually bring reporting to their system in a new real-time fashion and drop all of these nasty things like batch reports or data dumps that happen overnight, which are so common to fail.
23. The things that allowed you to do this were first of all that you represented state changes explicitly and sent those messages across as opposed to reporting of the current state, you report of a stream of changes to state and that you allow these processes to run asynchronously by feeding this message queue from one side and picking it up from the other side.
The anticorruption layer picks up from the other side and then de-normalizes it into whatever form it needs to be in and it may also be transitioning it.
24. The other thing I am hearing is that you have one component there which has the most critical need to be up to date, which is the actual transactional processing. This is the part which says "These are the orders, the trades come in and now the orders are adjusted accordingly". That part has to be as close to real time as possible, you probably have some parameters for this. Can you give us a feeling as how far behind can you be on that?
When we deal with the actual market, we often come over a fiber, for instance across the country. Sometimes they have their own queues which kind of fill up as they are sending messages out so we might have a little bit of lag on their side. Generally speaking, from the time it gets to us, let's say it's a 13 milliseconds lag, 17 milliseconds lag across the entire country. Once we actually get the data, we do a little bit of processing and we write it down to our first pipeline. This pipeline is partitioned. Then the database that is listening for this section (when I say database, we have a process that is called database, but it's actually the domain), we will pick that up, apply it within the domain and dispatch out its changes. The time from ours receiving the message to dispatching out the changes is generally less than 20 milliseconds
25. Generally less than 20 milliseconds. Oh, great! That was really interesting, thanks, and I look forward to hearing more about where this leads, and I think that a lot of people can use some of these ideas. This is the first time that I have been an interviewer instead of an interviewee
Domain and Architecture
Rafael Peixoto de Azevedo
I actually deeply thank you both.
The reported experience is very rich. The design solutions to address such challenges of speed, reliability and complexity are certainly useful in other design situations.
I am looking forward to learn more about this excellently done work.