Transcript
GV: My name is Felix. I am a Committer on the Venice Database Project, which was open sourced last year. I've been doing data infrastructure since 2011, the majority of which I spent at LinkedIn. You can find some of my thoughts on my blog at philosopherping.com. I'd like to cite one of the nice quotes from Winston Churchill. The nice thing about having the modern data stack, data mesh, real-time analytics, and feature stores is you always know exactly which database to choose for the job. Of course, Winston didn't say that. As far as I know, no one ever said that. In fact, I hear people saying the opposite. Year after year, they say, the database landscape is getting more confusing. There are just too many options to pick from. Here's a bunch of data systems, most of which you're probably familiar with, or at least heard about. What I want to do in this talk is give you a new lens to look through this landscape, and categorize these systems, because within this bunch of systems hiding in plain sight, are several derived data systems. I think it's useful if we understand the differences between primary data and derived data, and then which systems are optimized for each. Then we can make better choices about our architectures.
Outline
First, I want to lay the groundwork for this discussion by talking about, what is primary data. Then I want to use that to contrast it with derived data, and what is that? Finally, the third section is about four derived data use cases that we have at LinkedIn, including real-time analytics, graph, ML feature storage, and search. For each of those, I'm going to cover the specialized system that we use to serve these use cases at scale at LinkedIn.
What Is Primary Data?
What is primary data? Some related terms, you could say primary data is source of truth data. You could say a primary data system is a system of records. Here's an attempt at defining primary data. Primary data is that which is created or mutated as a direct result of a user action. Here, I'm putting the emphasis on direct. Hang on to that, it will make sense when we contrast with derived data. What are we looking for in a primary data system? Typically, we'll want that system to provide durability. That means when you write the data into the primary data system, you expect the data to be persistent. You don't want it to be only in memory, and then you have a power outage and you lose that, or something like that. You would want it to be on durable medium, like spindles or flash. You might also want the system to guard against machine failures, in which case, you would benefit from the system having replication. Besides machine failures, you might also care about human failures. I deployed a version of my application that corrupts the data, let's roll back to yesterday's backup. Backups may be useful to guard against that, or other scenarios, of course.
Finally, perhaps the most important is that, usually, primary data systems benefit from strong consistency. I say usually, and I'm putting some caveats on these things, because none of these are actually hard requirements. You could choose to put your primary data anywhere you'd like. It turns out that in many cases, these are useful characteristics. For strong consistency in particular, the thing is, for user experience reasons, we want to have that. Imagine that the user is editing their account details in your system. If the user submits their edit to the account detail information, and then they reload the page, or maybe they even load it on a different device that they have, they would expect to see the edit that was just acknowledged to be there. You achieve that by having strong consistency.
Some examples of primary data systems include the following. We've got the traditional relational databases of the transactional flavor, like OLTP databases, like MySQL, Postgres, and others. You could also store primary data in a NoSQL offering, like Cassandra or Mongo. Of course, nowadays, the cloud is all the rage, so Cosmos DB, DynamoDB, and other similar offerings could certainly host primary data as well. One last thing about primary data is, how does the data escape that system that it's in? Because sometimes we want to get access to that data elsewhere. That's critical to our talk here. One way is to ETL a whole snapshot of the database to another location, like your Hadoop grid, or maybe some cloud storage like S3 or Azure Blob Store. Another way is to listen in on the changes as they happen. We call that Change Data Capture. At LinkedIn, we're pretty big on Change Data Capture, and we built Databus more than a decade ago, then replaced it with our next-gen Change Data Capture system called Brooklin. This one we open sourced a few years ago. It supports data capture out of MySQL, Oracle, and others. There are other options as well, like Debezium is a pretty popular Change Data Capture system as well nowadays.
What Is Derived Data?
Once the primary data escapes the system it's in, somehow, then it might become derived data. What does it mean? Here are some related terms. Derived data may be processed, or computed, reformatted, massaged, inferred from other data. You can pick and choose any of these terms. Sometimes, many of these things are going to happen at the same time. Then the output of this transformation basically is the derived data, and it is derived from some other data. Therefore, because this data is processed, it is indirect. Earlier, I told you to hang on to the notion that primary data is a result of a direct action of the user, derived data may also be indirectly tied to a user action. That's an important distinction. It's not direct. There's been some processing hub in between. Maybe it's been loaded into a Hadoop grid or some object storage in order to get processed in batch over there, or maybe we've been doing some stream processing on the Change Data Capture. Sometimes we're doing both of these at the same time, like a Lambda architecture.
There are many different types of derivation of the data. I'm going to go over many examples of how data can be derived for specialized use cases throughout the rest of the talk. I think it's useful to subcategorize derivation into two flavors, physical and logical. What I mean by that is physical derivation doesn't change the content of the data, but it changes how it's organized. For example, you may have the table in your database, which is organized in row-oriented format, which makes it ideal for looking up a single row or a small amount of rows, and retrieving all of that quickly. Then, if you want to do some analytics on this data, that will still work, the row-oriented table will still work. It could be more optimal to reformat the same data into column-oriented format. That would be an example of massaging the data into a different structure. The data is the same, but it's optimized for another use case.
A logical derivation is where the data itself changes. The input is different from the output. It could be a little different or very different. An example of a logical derivation, where the data is just a little different is maybe you're putting the data in lowercase. Or maybe you're stripping out the plural suffix to the word when you find it. These are like tokenization techniques that you may find in a search use case, for example. The data looks pretty similar, but it's a little bit transformed. That makes it more useful for this type of use case. It could also have a logical derivation that makes it completely unrecognizable. For example, in machine learning, if you have a training job that takes as input some text that's been generated by users, and then maybe also some user actions about whether they liked a certain thing, or whether they shared it, and so on, all of these input signals. Then, the output of the ML training job is just a bunch of floating-point values. Those may be the weights of a neural net, or maybe some scores about a prediction of the likelihood that the user will click on a given article, if it was presented to them. It had some relation to the input, but it's completely unrecognizable from our perspective. That would be an example of a logical derivation that changes the data a lot.
We covered in the last section, what are the qualities that a primary data system may benefit from. Let's do the same for derived data. Given that derived data is processed, it's great if the derived data system has first class support for ingesting these processed outputs, so ingesting batch data, ingesting stream data. There wouldn't be much of a point in having the derived data system if it didn't have some specialized capability. If the derived data system does exactly the same stuff that your primary data system does, then you might as well just have your primary data system. Either these are capabilities that are not found at all in the primary data system, or maybe they're there but not as efficiently implemented in the primary data system. That brings me to the last point that oftentimes a derived data system is going to offer some superior performance in some niche application.
The thing which we typically do not find in a derived data system is strong consistency. That's in opposition to the primary data systems. If you think about it, again, from the user standpoint, the user experience, typically, it does not matter for a derived data system to have strong consistency. Because the data in it is not appearing there as a direct result of a user action. It's happening after some background process or asynchronous process has done something with the data. By the time the data comes out of the stream processor that generated it, it doesn't matter too much at that point if the last mile, like the interaction between the stream processor and the derived data system, whether that is strongly consistent or not. It doesn't really matter. Because the whole thing that happened before that was already asynchronous anyway. Therefore, from the user's perspective, the whole thing is asynchronous, regardless of whether the last mile is strongly consistent or not. Therefore, if you don't have strong consistency, then you don't have read after write semantics either. It's important when using those systems to be cognizant of that and to make sure we don't misuse the systems by assuming they provide something that they do not.
Here are a few examples of open source derived data systems. There are search indices like Solr and Elasticsearch. Real-time analytics systems like Pinot, Druid, ClickHouse, and some ML feature storage systems like Venice. I want to linger for a little moment on these two search indices, Solr and Elasticsearch. I want to linger here to highlight some of the subtlety about consistency, because these systems, Solr and Elasticsearch, both offer a REST endpoint for writing documents into the index. You have a REST endpoint, you post to it, and then it gets acknowledged as a 200 OK, something like that. You could easily assume that because the REST endpoint acknowledged your request, that's it, your data is there. It's persistent and readable. If you do a query against the search index, you will find the document that you just put in there. That would be only halfway true. When the POST comes back successfully, your data is durable, but it is not yet readable. That's an important distinction. Why is it not durable? Because these search indices have a periodic process that will go in and do what they call a refresh or a commit. Depending on the system, the terminology is different, but it's the same idea. Then, after this refresh of the index happens, then the data that was persisted after the last refresh, then that data becomes readable also.
You could force the system to give you strong consistency. You could say, I'll post my document, and then I'll force a refresh. That would make it look like the search index has strong consistency. It would truly have strong consistency at this point. What I want to get at is that this is not really a production grade way to use this system. If you were doing this trick, where you post your document and then force a refresh, you could theoretically use Solr or Elasticsearch as your system of record, this could be your source of truth. When you look at the best practices for these systems, you see that this is not quite desirable. When you look at the best practices, they tell you, actually, don't even bother sending one document at a time, because that's awfully inefficient for this system. Instead, accumulate some batch of documents. Then once you reach some sweet spot, like maybe 5000 documents, or it depends on the size of each document, of course, then at that point only, send me the batch. That is the more efficient way of using this system. If you send me too much documents to index, the system may kick it back. It may say, I'm overloaded, you got to back off, and try again later.
You see that these best practices don't align very well with this being a system of record. In fact, the production grade way to run this system is to put a buffer in front. Imagine that you have Kafka as your buffer, you write documents, one at a time into Kafka. Then you have some connecting application like Kafka Connect, that will accumulate those documents into batches of the right size, and then post those batches to the search index at the rate the search index prefers. If the search index kicks it back, then the connecting application will take care of the backoff, retries, and all of the logic to make this reliable and scalable and efficient. The interesting thing here is, although Solr and Elasticsearch, which are quite old, maybe more than a decade old, although those systems were built with a REST endpoint that makes it look like you're going to get strong consistency out of this thing, in practice, it's not really standalone. You ought to use something else as part of the mix. If you look at more recent derived data systems, like Venice and Pinot, you see that those systems were built from the ground up to use a buffer. Essentially, their write API is a buffer. You give a buffer to those systems, or they have a buffer built in even. You see the evolution of these derived data systems, like we cut to the chase. In the newer systems, instead of dumping the complexity on the user to stand up all this infrastructure around it, the system is just built in to work in the production grade way.
Derived Data Use Cases at LinkedIn - Real-Time Analytics
Speaking of production grade, I want to now make this even more concrete by talking about four categories of derived data use cases that we have at LinkedIn, because each of those has between hundreds and thousands of unique use cases. Let's get started with real-time analytics. For that we use Pinot, which ingests a bunch of data from Hadoop and Kafka. At LinkedIn, we're old school, so we still use Hadoop. We probably have one of the biggest Hadoop deployments on the planet at this point. In your mind's eye, whenever you see this slide, you can substitute Hadoop for S3 or Azure Blob Store, whatever favorite thing you have for storing batch data. Then, similarly, instead of Kafka, you could substitute like Kinesis, or Pulsar, or whatever, as the streaming input. An example of use case served by Pinot is, who viewed my profile. Here you can see the count of unique viewers per week in the past 90 days, and you have various toggles at the top to filter the data. Pinot is great for that. You can ingest events data, like profile view events, for example, and index it inside Pinot. That works great because Pinot is perfect for immutable data, meaning that not that the whole dataset is immutable, but rather, each record in the dataset is immutable. You can append a new record, but you cannot change old records. It's also great for time-series data. If you have a metric that comes out once per minute, and then maybe there's a bunch of different metrics that have different dimensions attached to them, and each of them comes out once a minute, this would be also a great fit. Once you have that data inside Pinot, you can do expressive SQL queries on top of that, like filtering, aggregation, GROUP BY, ORDER BY. This is like what I just showed in the example use case.
In terms of types of derivation that are supported by Pinot, one of them is to transcode the data from a bunch of formats. The data coming in from the stream into Pinot could be in Avro format. That's what we use at LinkedIn, or it could be in protobuf, or Thrift format, or others. The data that comes from the batch source could be in ORC, Parquet. Pinot will transcode all of that into its native representation of each of the data types. Then, it's going to take those bits of data and encode them using column-oriented techniques. For example, dictionary encoding is, if you have a column of your table that has a somewhat low cardinality, then instead of encoding each of the values itself, you can instead assign an ID for each of the values and encode the IDs, which if the IDs are smaller than the value, then you have a space saving. Then, you can also combine that with run-length encoding, which means that instead of recording every single value of the column, you encode, essentially, how many times each of the values have it. For example, I had a value 10, 100 times, and then I had value 12, 150 times, and so on. This makes the data even smaller, especially if the column is sorted, but even if it's not sorted, you can still leverage run-length encoding. Depending on the shape of the data, it could yield some pretty big space savings.
Why does that matter? In the case of Pinot, for many of the use cases, we want to have the whole dataset in RAM in order to support very fast queries. How big the data is, directly affects cost, because RAM is costly. Furthermore, the less data there is to scan the faster our scans of the data can go. Minimizing space improves both storage cost and query performance. Besides these physical derivation techniques, Pinot also supports a variety of logical derivations. You can, for example, project some of the columns of the data and throw away the rest. You can also filter out some of the rows of the data. If you have some complex types in the data, as you could find in Avro or protobuf, like some struct or nested record type of situation in the data, then you can flatten that out, which makes it easier to query in SQL. Pinot is open source. If you want to find information about it, there's a bunch of great information on the web. The official documentation is really good, you can find it at docs.pinot.apache.org. There's also a bunch of videos, interviews, podcasts, and so on about Pinot. I'm sure you'll find information if you're looking for it.
Use Case - Graph
Our next system is our graph database, which is called LIquid. It also ingests data from Hadoop and Kafka, in our case. I want to quickly clarify, though, graph databases are not inherently derived data systems. There are graph databases, which are intended to be primary data systems. For example, Neo4j from my understanding is designed to be a primary data system, it provides strong consistency and so on. In our case, what we wanted to do with this graph data is to join data coming out of a bunch of other databases. We have a bunch of source of truth databases for each of our datasets, and we want to join all of those into the economic graph, and then run queries on that economic graph. For us, it made sense to build LIquid as a derived data system. One of the usages is finding out the degrees of separation between the logged in member and any other member of the site. Next to each name, you see, first, second, third. This is the degree of separation between you and another member. You see that sprinkled across the whole site, like next to each name, but it also affects things you don't see. There are visibility rules, for example, that are based on the degree of separation. This affects a bunch of things throughout the whole site.
Another use case served by LIquid is mutual connections. If you look at someone's profile, you can see the mutual connections, which are essentially my first-degree connections who also happen to be first-degree connections of the other person. That is achieved by a graph traversal query in LIquid. Another query you can do with LIquid is your connections that work at a company. This means, essentially, give me all of my first-degree connections who work at a given company. There's a bunch more. These graph traversal queries can be used to express even more complex things. For example, I could say, give me all of my second-degree connections that have gone to one of the same schools that I went to, and where the time that we went there overlaps. Essentially, we have a chance of having been classmates. You could do these things with joins in SQL. In some cases, that works great. If the query is a bit complex, if it has self joins, or recursive joins, you can do these things in SQL, but sometimes it starts getting pretty hairy. A graph-oriented language is another tool in the toolkit, and sometimes it makes it much easier to express some of these queries. In some cases, also makes it much more performant as well.
What kinds of derivation are supported in LIquid? We transcode the data from Avro and other formats, as is the case with Pinot, because LIquid has its own representation of the data. We convert the data to triples. That means subject, predicate, object. An example of a triple is member ID connected to member ID. Another example might be member ID works at company ID, or member ID studied at school ID. There can be a variety of things that are represented. Essentially, this is the whole data model. You can represent everything via triples. It's a very simple, but also very powerful abstraction for the data. This allows us to join together a bunch of disparate datasets that exist throughout our ecosystem. In terms of logical derivation, as data gets ingested into LIquid, you can filter out if there are some records you don't want in there. You can also do other kinds of massaging to the data to make it more amenable to being queried in the graph.
Use Case - ML Feature Storage
For ML features, we store them primarily in Venice. Again, Venice ingests data from Hadoop and Kafka. Venice was open sourced last year. It is a project I work on. One of the use cases that we serve with Venice is people you may know. Actually, people you may know is served by both LIquid and Venice. The flow of data in this use case goes roughly like this. First of all, we query LIquid to find out what are all the second-degree connections that the logged in user might know. That's a huge set, way too many to present. There is some early filtering happening right there inside LIquid based on some heuristics that are part of the economic graph. What comes out of that is a set of 5000 candidates. These are people you potentially might know. Five thousand is still a lot, so we want to dwindle that down further. At that point, we turn around and query Venice. For each of the 5000 members, we're going to query a bunch of ML features that relate to them, and we're also going to push down some ML feature computation inside Venice. There are embeddings, which are arrays of floats representing the member. We do things like dot product or cosine similarity, these types of vector math inside Venice, and return just a result of that computation. These features that are either fetched directly or computed all come out of Venice, and then are fed into a machine learning model that will do some inference. For each of the members that are part of the candidate set, we will infer a score, which is how likely is it that you might know this person. Then we'll rank that and return the top K. That's why you see very few recommendations but hopefully highly relevant ones.
The feed is another example where we're going to take the posts that have been created by your network. That's going to be a huge set, again, so we cannot present all of that. We want to do some relevance scoring, for which we retrieve a bunch of ML features out of Venice, and then pass that into an ML inference model, and score each of the posts, rank that, and return the top K most relevant ones. Both of these, people you may know and the feed, are what we call recommender use cases. Venice is great for loading lots of data quickly and frequently. The output of ML training jobs is very high volume. The ML engineers want to refresh that data frequently, maybe once a day or several times a day, the whole dataset. Or, alternatively, they may want to stream in the changes as soon as they happen. Once the data is in Venice, the queries that are supported on the data are fairly simple comparatively to the other systems we looked at. There is NoSQL, or graph traversal, or anything like that, it's essentially primary key lookups. There is a little more than that, like you can project some fields out of the document, or you can push down some vector math, like I said earlier, to be executed inside the servers, colocated with the data for higher performance. Essentially, these are the types of capabilities that are useful for ML inference workloads.
In terms of the types of derivation supported in Venice, it's basically all physical. We reorganize the data in certain ways, but we don't really change the logical content of the data. I mentioned earlier that ML training is a type of logical derivation, but it's not happening directly inside Venice. It's happening in some processing framework upstream of Venice, before it lands in Venice. In terms of what happens inside Venice itself, we transcode data from other formats. If the data is already in Avro, then we keep it in Avro, inside Venice. Then we sometimes compress the data if it's amenable to that, like if there's a lot of repetition in the data, we have found that using Zstandard compression is extremely effective. What we do there is, because a lot of our data comes from batch pushes, we have all the data there in a Hadoop grid, which is a great place to inspect the data and train a dictionary on it. Then we train a dictionary that is optimized for that specific dataset and push that alongside the data. That allows us to leverage the repetitiveness that exists across records in the dataset. In the most extreme cases, we've seen datasets get compressed 40x, a huge compression ratio. In most cases, it's not that good. Still, it gives us a lot of savings on this system, which is primarily storage bound, lots of data stored in it, and lots of data getting written to it.
Then we put the data in our storage engine, which is RocksDB. We use both of RocksDB's formats, block based, which is optimized for when the dataset is larger than RAM and you have an SSD for the rest, for the cold data. Then we have the plain table format, which is optimized for the datasets that are all in RAM. Depending on the use case, we choose one or the other. Then, besides that, we join datasets from various sources, like similar idea as in LIquid. We also have a type of derivation that we call collection merging. What that means is, in your Venice table schema, you can define that some column is actually a collection type, like a map or a set. Then, you can add elements into that, one at a time, and Venice will do the merging for you. You can also remove elements in the same way without needing to know what the rest of the collection is. If you want to find out more about Venice, we've got our documentation up at venicedb.org. We've got all our community resources there, like our Slack instance, our blog posts, and stuff like that, if you want to learn more.
Use Case - Search
The last system I want to cover is for our search use cases. That system is called Galene. Again, it ingests data from batch and streams. An example is the search box you see at the top. If you type Ashish, in there, you'll see a bunch of people named Ashish, and a few companies also named Ashish, in there. If you typed the right keyword, you could also find schools or pages and so on. This is what we call a federated search. It queries many search verticals, and then coalesces all of that into a single view. Galene is great for retrieving documents that match some predicate that you're interested in. Then scoring the relevance of each of those documents, which is achieved via arbitrary code and ML models that run directly inside the searchers. Then, ranking those documents by their relevance score, and returning the top K most relevant ones. This is essentially what Galene does. The way that it achieves that is it has two kinds of data structures, which are essentially the way that it physically derives the data coming into it. Actually, there are two flavors of forward index, one for the data that is present in the documents, like maybe the documents are tagged with the author of the document, the date it was modified. These are characteristics that may be useful as part of the search or as part of the ranking. These are stored in a forward index, which looks like the left-hand table here. For doc ID, here is some associated value, like the content or maybe some metadata about the document, like the author, or timestamp, and other things. There's another flavor of forward index, which is called Da Vinci, which is used for ML features. Da Vinci is actually a subcomponent of Venice, the previous system I talked about. Venice is actually a component inside Galene, and the Da Vinci part is open source. You can reuse that in your own applications if you'd like.
The inverted index is implemented as Lucene, which is the same technology underpinning Solr and Elasticsearch, at least from my understanding. In addition to Lucene, we've implemented our own proprietary special sauce on top of that to make it more performant. The inverted index looks roughly like the table on the right-hand side. For each of the tokens that are present in the corpus, for each of these tokens, we store a list of the document IDs that are known to contain these tokens. What do I mean by token? Tokenization is a type of logical derivation that helps make the data easier to search. The simplest form of tokenization could be viewed as a physical derivation. It's just like split the text by spaces, and each of the words are going to be a token. That can work, but it's a little simplistic and it doesn't work super great, because that those tokens are not clean enough. In order to make the tokens more useful for searching, we're going to do additional things, like we're going to strip out punctuation, we'll put them all in lowercase. We might substitute words for some root word, like let's say, if the language has a notion of gender, of course, all the languages have plural and singular, these are all variants of the same root word. We may index on the root word instead. Then, also, similarly, transform at query time, whatever the user provides into the root word, and then it matches inside the inverted index. You could also do synonyms. All of these allow the tokens to make it more likely that you get a hit when you search.
Here's a high-level architecture of what the Galene system looks like. This is a simplified architecture. Essentially, you got a request coming from the application hitting a broker node, that will then essentially split the request across a bunch of shards, which are searchers. Each of these searchers has the three data structures I talked about, the inverted index for doing the document retrieval, then the forward index that contains other metadata about the documents that we may want to use for scoring, and ML features that we also use for scoring that are stored in Da Vinci. The reason we have Da Vinci in there is it supports a higher rate of refresh of the data, which our ML engineers prefer. We've got all of this inside the searcher, and then the searcher returns the top K most relevant documents that it found, to the broker. The broker coalesces all of those results, does another round of top K, and then returns that to the application.
Conclusion - To Derive or Not to Derive?
First of all, small scale is easier. If your data is of a small scale, maybe you can do everything I just talked about in a single system. You could do everything in Postgres, for example. Why bother having more moving parts, if one will do the job? Even if you're at a large scale, you could still take advantage of this principle, by trying out an experiment at smaller scale. Let's say you don't yet have a dedicated solution for search, and you want to add a search functionality to your business, but you feel like you don't have the business justification for it. Maybe you can build a very quick and dirty prototype of search on top of your relational database, expose that to just 1% or a 10th of a percent of your users, gather some usage data on that. Maybe that solution is sufficiently scalable for that fraction of your users, and it gives you some useful input about whether that's a good product market fit. It can help you justify, now that this is demonstrably useful, let's build a large-scale version of this with a specialized system.
Another way to say what I just said is, use what works until it doesn't. Make your stack as simple as possible, but no simpler. That's important, because sometimes having fewer moving parts is what makes your stack simpler, but sometimes fewer moving parts is actually what makes it more complicated. At some point, you reach a level of workload where doing everything all in one system is a liability. If you have a transactional workload that gets negatively impacted by some analytical workload that happens in the same box, that may not be a good thing. In that case, you may think about splitting it into separate systems. Maybe you still choose the same tech on both sides here, maybe Postgres is your transactional, and Postgres is your analytical, and they're just connected by some replication in between. Maybe that works great, at least for a while. Then, since you have that separation anyway, you could also start feeding that replication stream into a more specialized solution as well. Keep your options open. Most importantly, avoid sunk cost fallacy. Just because your architecture worked five years ago, doesn't mean it's still the best today. Your workload constantly grows and changes, so you have to adapt to that and not be beholden to the past design choice. That's difficult, but try to stay agile and adapt as much as you can.
See more presentations with transcripts