BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Streaming Log Analytics with Kafka

Streaming Log Analytics with Kafka

Bookmarks
50:21

Summary

Kresten Thorup discusses how and why they use Kafka internally and demos how they utilize it as a straightforward event-sourcing model for distributed deployments. He presents customer cases on utilizing Kafka to manage and buffer massive volumes of data ingest.

Bio

Kresten Thorup provides technical leadership and vision at Humio. He has been a contributor to several open source projects, including GCC, GNU Objective-C, GNU Compiled Java, Emacs, and Apache Geronimo/Yoko.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

My name is Kresten, I've been coming to QCon many times, I was part of the team that started organizing it, 12-13 years ago. I've been doing a lot of different things and, three years ago, I started working on this project called Humio, which is a log aggregator. I've been in the consulting business for many years doing all kinds of different projects and, 10 years ago, we started working on a real-live big system health-care system in Denmark that really had to stay up 24/7. One of the things we realized is, to build a system that stays up 24/7, you really need to gather your logs because that's the only way to really understand a big distributed system when things go wrong.

Unlike a lot of people, we are arguing you should log everything. It's not just because we want to sell a lot of licenses of course, it's also because when you have a problem in production, it's always something you didn't think about. When you have a logging system, there are lots of people out that are arguing you should scale down on your logs. For various reasons, you're cutting down or sampling logs, so reducing them or turning them into schematized data. That's really nice once you have the data, but often, in an error situation, you want to find the things you didn't think about. You say, "I wish I indexed that. I wish I put that stuff in the database, in my logs."

Our experience is you just want to log everything, as much as possible. That's what we set out to do, build the system that would enable people to do that. We're also very much on a mission to make everybody use logging. If you're not using logging in your distributed system, centralized logging, that's really something you should consider doing because it gives you an insight you really don't have any other way of getting to.

This particular healthcare system, in several locations, it's a centralized prescription system in Denmark that integrates to 50 odd different other healthcare systems. On many occasions, we've called up those integrated systems and said, "Hey, you have a problem. You're sending wrong data," or, "your system doesn't work," and these hospitals maybe realized that, "oh, the whole system hasn't been working for three hours," but typically, these IT systems are so unstable so nobody cares anyway. Logging a lot, and if that's making that easy, that's really our mission.

In this particular talk, I'm not going to be talking so much about the product we have, I'm going to be talking about how we use Kafka to build it. Humio's, a log analytics system, it's a centralized logging facility that you can install on-prem. Nowadays, it's very much in vogue, for a startup to build SaaS services or cloud services. That's all very nice and dandy and you have a lot of tools available in that space, but very easily your system gets very messy and has lots of micro services and components that are running all over the place if you're running a SaaS service. It's very convenient to be behind an API, but when you're building an on-prem system, you really want to build a monolith, something that's super easy to install in one chunk.

We decided, as the only kind of secondary component that we're putting into our product, we're putting Kafka in, so that's a requirement when you install it, and then, we're delegating a lot of the hard parts of doing distributed systems to Kafka because it's a really nice stable system. You can use it for leader election, or for sequencing things, or for also managing volumes of data, so these are lots of things that we would like to do. I'll talk a little bit about how we do that in Humio in this talk.

Data-Driven SecOps

Just to give you an idea of the kind of customers we have, one of our customers is a healthcare organization, about 30,000 employees. This is the security operations team there; they're logging about 20 terabytes a day of logs. This is all the Windows logs from 30,000 PCs, a bunch of ADs and other various application servers, and then, BRO network data, that's high-level information about all network traffic. They're putting all this into one data stream that you can then search across and mitigate problems, mitigate viruses. For instance, they use this to service the HelpDesk, they use to service if somebody can't log in, they use this to find viruses as they happen. If CryptoLocker comes along and starts spewing out, trying to override lots of files on a network drive, the BRO network data will see, "Ooh, there's a lot of file rights that have high entropy."

That's fed in to Humio where basically it has two components, two major components of Humio. One is we say the complex event processing part, or real-time streaming-queries part, that could observe, say, "Is there a host that has a high number of writes to discs that have high entropy?" so the high entropy would mean it's encrypted. In that case, they issue a trigger that could take that machine off the network, and they can go investigate. The other part is for incident response, so we also store all these logs. Those two things come together quite nicely because we have the same query language to operate on both of them.

Humio Ingest Data Flow

Really the high-level architecture of how the system works, one side of things is how data flows in. To get the data there, you would need a whole bunch of agents that are installed on various machines are picking up the logs from various places. That could be file shippers or this could be network devices that can send us logs directly, all kinds of things. Then, in Humio, there are three kinds of nodes, and so, it's on the micro-services system, it's one binary that can run in three modes, as an ingest node, as a digest node, or as a storage node. Or it can take on several roles actually.

In this rule of a node that has various APIs, that does indication, does some field extracts and validation on the data that comes in and turns it into a more efficient format, there are the digest nodes that run these real-time queries, this complex event-processing engine, and also produces files that are then later send off to storage for a kind of backfilling or doing MapReduce jobs back in time.

If you look at one of these digest nodes as data comes in, all the events are fed through both a state machine that's a complex event-processing engine, and say there's currently a standing query sitting there that just wants to count all the errors, it would look like this in our query language, when a regex of error, and then, just count the things. That query gets compiled into being part of the state machine, it will just sit there and constantly have an internal state, which is the current count. If you have a dashboard or if you have an alert that triggers every few seconds, you can go test that value and show it on the screen. The same engine can be used to just take the same query, compiled query, and run it over a series of events in the eventstore. It's a very naive, very simple implementation of doing block processing. This could come out with a large account of things.

Humio Query Flow

We looked a bit at the ingest flow; the query flow goes a little different. There'll be a browser, I'll show you in a minute, issues a query to an API front-end node and that API node will then initiate the query, it'll schedule the query across the node. One of the things that a query says, "I run over a given period of time and I'm interested in these kinds of data," so it decides where that data is and sends off part queries to various nodes, so you can run many digest nodes and many storage nodes. Under digest nodes will be the data that's work-in-progress, it's currently building these files and the storage nodes will be the historic data.

To sum up, we do things quite differently from a lot of other logging solutions in that, instead of stashing everything in the database, and then running queries all the time, we have two separate query streams or query engines, two ways of looking at it. One is the real-time processing where we basically have these materialized views that serve as dashboards and alerts, and that's all processed in memory and we keep these intermediate results in memory. That really gives super nice fast response time for these queries that you want, that you know, that you're aware of upfront.

If you want to look at something you were not aware of upfront, then we shift to a brute-force query, so that's what we call historic query, and that's what you run under these storage nodes, it's essentially a MapReduce engine that just runs really fast. When I say MapReduce, people think that’s Hadoop, something batchy and very slow. For making that really fast, you really need to own the entire stack, you need to own the datastore and all the pieces in between, and then, optimizing all the way down in a query is what really makes this thing scream.

That was, so far, for the intro. I'd like to briefly show this, what it looks like. This is the main UI where you just go search for something. In this case, we're looking at the SSHD logs from some of our operational machines. I hope you can read it but this is basically just syslog data. When you look at a log stream, there are certain things that can be known upfront, like the timestamp on the left, and the host is the next, and then there's which application is shipping the PID of that process, but the rest, on the right-hand side here, is just free text.

If you look closely, there are actually a lot of these lines that look like this, this is somebody trying to do a dictionary attack on one of our servers. I want to search for this regex search over, in this case, the last 30 minutes, or maybe this happened over, let's say, last 30 days – on that one pretty quickly as well. This is a very simple search interface. Maybe you want to know if there's more than this guy trying to attack us, and it turns out there is. I can pull out from a stream like this and say, "I want to pick out the IP address as just a named group in this regex."

What happens here is that each of these lines then gets an IP attribute that we can find down here, at the bottom. We know the IP, it looks like that. If we do that, we can show the top IP addresses like this. It's a very simple language that feels like it's streaming through. That's how you work with the data, and then you can show it as a pie chart, or a bar chart. Let me show a pie chart. This is what a pie chart looks like. If we're really lucky, then there's actually an attack going on right now.

What we've been looking at there, were these historic MapReduce queries, but if we look at what's going on right now as we speak, as soon as I said, "I want just to run this query as live," it actually just takes the same query as I want to run on an ingest pipeline, it runs as part of the ingest pipeline and maintains the state of this overtime window of the last 15 minutes. Fortunately, there is a nice attack going on right now. I'm not always that lucky when I'm doing a presentation. You can see, there's one guy in particular that's attacking us. These visualizations, you can put them on a dashboard and you get something like this, and that's the basic functionality of Humio.

Use Kafka for the “Hard Parts”

Let's go back to the presentation, I'd like to talk about how we use Kafka for the hard parts of building a distributed reliable system. There are a couple of cases I want to get into and focus on the coordination, and how we use Kafka's [inaudible 00:15:03] the commit-log/ingest buffer for data. As some of you might realize, this is somewhat similar to what KSQL does. Since we started this project, they've added this component called KSQL that lets you programmatically do the same things. We try to do this in a more interactive and user-friendly manner, but in essence, it's very similar to the KSQL engine we've built.

For those of you who don't know Kafka, there's a two-slide Kafka 101. It's a really nice reliable distributed log/queue system. A given Kafka queue consists of a number of partitions and that's how you are able to scale out and make it run really fast. Messages within one of these partitions are sequential, but across partitions, there's no guarantee of order of delivery to replicate each of the partitions for durability, and then just add these consumers to parallelize work.

Let's look at the graphics of the same thing. As you push data into Kafka, you have a piece of software, the producer, that hashes some key pertaining to the data and that decides which partition the data goes into. These partitions can then live on different machines and themselves be replicated, as I said. Once the data is in one of these partitions, it's safe and replicated given that you've configured it that way. Then you can scale out, that's scaling volume, we can have hundreds of partitions. You can scale the processing of the data by then having as many consumers, which are then processes, machines sitting there, taking data off and doing something with it. That was Kafka 101.

Coordination “Global Data”

The first use case I want to talk a little bit about is how we do coordination. Actually we built our own Zookeeper, built it into Humio, and this is one of the things people say, "Oh, you shouldn't build your own coordination because that's really difficult." But you know what? The really difficult part about Zookeeper or Raft, or etcd, that's really doing the log, the distributed log in a consistent way. We just delegate that to Kafka. Our little Zookeeper-like system means that every process, every one of these nodes, has a full copy of the equivalent of Zookeeper. It's like a tree-structured hierarchy of similar to JSON structure and it means we can make local decisions very fast without going across the network. It means this data that's in this Zookeeper of sorts, we can run queries over it, we can iterate over all the data at very low latency.

You could say, "It's a simple thing where we just have a single partition Kafka because that serializes all messages, and that single partition of course has some replicas to make sure that it's reliable storage." Then the things we push through Kafka are operations that mutate this tree, which is essentially a big JSON object that's very simple. Because it's sequenced by Kafka, we can reliably execute this on a number of nodes and know that they all have the same state. The way to bootstrap this is if you just pick a snapshot of this tree from one of the other nodes, and it knows what's the most recent update that was done to this tree, so we can just replay from Kafka from that point on. That's a very simple way to build an event-style system for coordinating knowledge across the system.

One of the config properties of Kafka is, if you're a consumer, how long do you want to wait, because to make Kafka really scream in terms of volume, in terms of throughput, you set a high latency because you want your buffers to fill up before you do processing. For something like this, since we don't have much traffic, this only changes maybe tens of times a second that we need to process events, so we can easily have a low latency on this queue or this Kafka topic that runs this. Right now, this is baked into Humio but it's actually quite an obvious thing to pull out and provide us an open-source project one day in itself.

One of the things we use this data, this metadata that every node has a copy of, is to optimize queries, because when a query comes in, part of this, what we keep in this data set is where each of these files are located. To schedule a query, you need to run some of the files over here and some of the files over here. Since every node has this global knowledge of where all other segment files are in the system, that's a very easy problem to solve.

Log Store Design

The log store is quite different from what you see in a typical database. We have a minimal index, the index is so minimal that every node, in the cluster, can keep the index in memory. Then, that gives us a coarse-grained chunks of data that can later be processed, and then, we have a very fast “grep” implementation essentially to do this. It's very high-level, our data is pushed into these files that are an order of 10 gigabytes of data and for each file we have a little piece of index that everybody keeps in memory. That's the start time, the first log entry that goes into that file, the end time, and then a little more metadata, which are the tags, so we know what kind of information is in this segment file.

Then very simple and highly efficient things will make things go fast and save resources, just compress these files. Typically, you can compress log files 10x no problem. This really means that if you're storing 1 month of data, you're ingesting 30 gigabytes a day, that compresses to 90 gigabytes of data and less than 1 megabyte in memory index. Or, if you have a system that ingests 1 terabyte a day, that becomes 4 terabytes of data approximately on disk, and still less than 1 megabyte of index of this very coarse-grained index. This is quite different from Elastic where you build these huge indexes and data actually bloats quite heavily when you put data in them. It's a very fine database but it wasn't built for high-volume ingest really.

Query

Then, to run a query. Everybody, all nodes in the system know where all these segment files are. You can look at it this way, they all have this time index and these tags, this metadata, associated with it. Maybe there's a data stream of web logs from data center one and another stream, a set of segments that all come from data center one, there are application logs. Then, from data center two, we have the web blocks, and so on and so forth. We have often, in a large-scale system, 100,000 different combinations of these things. When you run a query in Humio, you say, "I want to search from this time to that time," so you give a time interval. You also typically say, "I want to look for these tags," so that really narrows down the segments you need to search. That's how we make it run fast.

From there on, making it run fast is a matter of having a super fast “grep” thing that runs then on the decompressed data. That's a whole other talk how that works, but essentially, we don't compress it as one big chunk, we compress it in 64K chunks. That means you can move the data on to the CPU, so when it decompresses, the decompressed data fits on the level-two cache of the CPU. That means we have at least an order of magnitude or two more speed. You can do really crazy string processing at amazing speeds when the data is on the level-two cache, and then, move on to the next, so it processes these chunks in a streaming fashion. You can come and ask me about that, I've other talks about that.

I showed this slide before, when a query comes in from a browser, as we saw before, I looked for the syslog, the syslog logs for a given time interval, that being the last 30 days or the last 5 minutes, so any node in the system, because it has this Zookeeper Lite that we built ourselves on top of Kafka. It knows which nodes to go and ask to do processing for parts of it, so it does MapReduce on those select segment files around the system and gathers the results and sends them back. That's the first use case for Kafka.

Durability

The other big thing is you want to make sure you don't lose people's data, and that's always a danger when you build your own database. How do you build a system where you're certain that you're not dropping data? That's the worst thing that can be known about database vendors. For that, of course, we use Kafka again. In between these nodes, the arrow between Ingest and Digest, that's really Kafka sitting there. Ensuring durability is a thing that happened in two phases. First, in the ingest phase, you remember the API know that would validate the data and maybe reorganize it into a slightly more efficient format, and then, push it to Kafka. Once we have an ACK from Kafka, that's when we can send a 200 back to the client.

Now the client knows it's safely stored inside Humio when you get a 200 back. If you get a 500 back or something else, it can retry. That's actually typically how log shippers work. Most of our customers use Elastic's FileBeat for shipping logs, that's a very common thing. It works exactly that way, it tries to push it, and if you get a 500. There's actually auto result code that say, "Hold back a little, try a little later," a back pressure, result code. Remember, it's 300 something, 308 maybe. It'll wait a little and try it again. That concludes the first level of making sure that at least the data is persistent. Once it's in Kafka, it's in three replicas, or whatever it's configured with, and it's nice and safe.

Eventually, this Kafka is going to roll out, the data is going to roll out, and we want to make sure that the data's stored somewhere else before the queue gets too old. Inside a digest node, what will happen is, first, we run the query engine, and then, we push it into a work-in-progress buffer. It could be that this particular kind of log is fairly slow, there's not much traffic coming, so we have a work-in-progress buffer, we keep it in memory and, whenever that's full, we push it to the segment file. That's our segment file, not Kafka's segment files.

Once we push it to a file, as part of that appending of this block of data we store, what’s the offset into Kafka where we took the data from. When we start again, we can always find the block in that segment and go back, and then reapply, replay the ingest from Kafka. That's what happened and actually that's a normal mode of stopping Humio, you just throw away all this work in progress and replay data when you start up again.

A Kafka queue is configured with some retention, and the retention is some amount of time and some amount of data. When it runs full, it simply deletes the old data. You want to configure your Kafka topics that are you doing this ingest buffering so that you can deal with the downtime you might expect in your system. If you want to be able to take down some digest nodes for a day, you want to make sure you have Kafka queues that can contain the data that flows in during an entire day.

The most important performance metric in the system is this ingest latency. That's the time from an individual log line arrives here, at the API level, all the time till it's written in the work-in-progress buffer, so there it's safe because we can always replay it from there. You'll see there'll be p50, which is just putting it in the buffer, and there'll be high p90, or 95, or 99, every once in a while when the buffer gets flushed. This ingest latency, it's really what gives you this feel of the system being snappy because when it's in the ingest buffer, that's when we can query it, so this ingest buffer is also a virtual segment that can be a subject to MapReduce in the live system.

One of our proud things is that we have a really really nice low latency so that's sub-second. Typically, this is our production system where we're putting, I think, 3 or 4 terabytes in on 6 machines. Having the data available for search or for updating in a dashboard within sub-second is really nice. It means it triggers the alerts within this time interval, so you see that hacker who installed CryptoLocker, it's sub-second till we get the event that actually could cut off that machine from the network, or can generate an alert or an email or whatever.

This is one of the big issues with logging systems that are based on a database, especially in high-volume scenarios, if you get a DDoS attack or something, then all of a sudden, your logging system is 15 minutes behind and you have to spend 15 minutes before you can actually see what happened or you can see what happened in your system 15 minutes ago. This is a super critical component, also when you are in this incident-response mode.

All this data is being pushed into Kafka. One of the things we monitor is what amount of volume we're seeing for each partition on Kafka, so our default set-up is just 24 partitions. Here's just a snapshot I took yesterday of what it looked like in terms of that volume that's being pushed through the 24 positions that make up our ingest load. That's an interesting thing to measure because, if this is very uneven, maybe you have a problem, maybe you're not using resources well enough. This is the thing, you had this hash function in the producer that's pushing data into the hash function that decides which data goes into which partitions. This is completely out of whack, maybe one of your Kafka nodes is very slow because it has so much data to process.

That's one part of the issue, if this is uneven, another part of the issue is what's going on afterwards in these consumers. Maybe the processing that happens there for some of these partitions, it's very fast, and for some it's very slow. The way this could happen is, in any given Humio set up, there will typically be locked sources that are super fast, like the network traffic from a given network node. If you're monitoring network traffic, there's typically a high load of data coming from that. Or it could be other individual log sources that are very high-velocity where most appropriately low-velocity. That could produce a lot of data in a given position.

It could also be that you run a very expensive real-time query on the back end of it, there's a lot of stream processing functions. Yahoo has this library of stream-processing functions, for instance, it can count distinct with some statistics, based on bitmaps. It takes up a lot of memory. This function I showed before, the top function is quite expensive because it needs to maintain a lot of state. Some of these functions that are being pushed into on the other end are expensive and relatively slow. That could also be the reason why part of it is behind, the data-volume, processing time for ingest.

Partitions Falling behind

What we did, was we measure these ingest latencies, not just for the different partitions, but also for the different kinds of data that flow through it. Then for the slow ones, the reason could be volume, the reason could be the processing time, we just add incrementally a log scale more randomness to the key that goes into that hash function.

Here's yet another presentation of the same thing. This illustrates the high volume. We're multiplexing a much higher series of data streams where this represents individual data sources that are being ingested, maybe there's 100,000 different ones, they're being squished into maybe just 24 different ones, and then again, pulled out. Some of these processing, over here, are fairly slow. For each of these 100,000 entities, we're actually keeping track of this ingest latency, and then we use that to adjust the hash function so we have a stateful hash function instead of the default one that makes sure that the workload gets distributed upfront.

Data Model

A little more about how we do that, you would have to understand a little more about the basic data model. In Humio, we have repositories that contain data sources, and a data source is made up of a sequence of events or log lines. A data source is identified by a unique combination of tags, so this is the typical problem you see in time-series databases as you have arbitrarily many new tags being added. To deal with that, you hash the time series' name. This becomes the name of the time series and that's the hash function we use, at least initially, towards distributing the workload over the Kafka partitions.

There are some problems with this. If some of these tags are super-high variability, we have many different ones of them, if somebody decides to put user names or IP addresses into these tags, then we can arrive at millions of time-series instead. This can cause a lot of issues because then we create a lot of data sources and that actually grows the metadata and that overloads this in-memory representation of knowledge about what's in the system, our own little Zookeeper that runs locally in every node.

What we're doing at Humio is we observe these and notice the high-variability ones, and then, we hash these tag values down into a lower-value domain. For instance, if I have a tag that says "User equals Kresten" and Humio has observed that User is a high-variability tag, then it will actually hash that value, say, “User is equal to 13 now, maybe out of 16”, so we reduce the value domain of that. That reduces the amount of these data sources we have to deal with. Then, on the other side at query time, when I say, "I want to search for user equals Kresten," then in fact, that query is rewritten to, "user equals 13," and then I search through all the users that happen to hash to 13. That's another aspect of it.

Multiplexing in Kafka

As I said previously, ideally, we would really like to have like 100,000 dynamic topics in Kafka that perform and scale indefinitely. That's one of the natures of Kafka, it's really nice and stable as long as you don't create or move around these partitions. One of the things that makes a Kafka system stable and easy to work with is not creating new topics, because every time you do that, you're ending up moving data around and messing with things they tend to go wrong. That's also the reason why there's been a number of other similar systems, competitors, open-source projects that tried to build this multiplexing mechanism so you can have any amount of these data streams mapped on top of this kind of system.

The default in Kafka works for many different workloads, this illustrates also a problem you often see where there is under utilization in a Kafka set-up, in the standard Kafka set-up is because you use the standard hash function, then you'll either see some of these data sources will overload individual nodes in your system, the load in a Kafka cluster becomes very uneven, so one of the nodes is super loaded because it holds the partition that happens to be, the hash values correspond to maybe half of all the data that flows through the system, or you'll have a system in Kafka that falls behind, because some of the data-processing units or processes that are sitting behind happen to be very slow. You really have to know your data to utilize this optimally. We found that having the stateful hashing function that we managed ourselves instead of the default set-up, really lets us utilize the system much better.

Using Kafka in an On-Prem Product

I think we've discussed many times, when we started, of course ideally, on an on-prem product, everything is just in the box. We've actually been fairly happy deciding to use Kafka as a component, a required component. We can really leverage the stability and the fault tolerance, we don't have to deal with group leaders, the leadership election and things fail over of this data persistence. We get durability and we get this serialization of our global state, in a very nice package. See, typically the larger customers already have a lot of Kafka installed, they know how to run Kafka efficiently, so you just ask them, "We just need three topics: one for that ingest queue, one for this updating or coordinating the global state, and then, there's one more we use for some chatter information, information that is not so important, low-priority messaging across the cluster." That has worked really well.

The only real issues are with smaller customers that need to run the Kafka and the Zookeeper and are afraid of that, are uncomfortable with it. The only issue we've seen in that is once we get up and running, the problem is Zookeeper running underneath. There's also Zookeeper running underneath as part of the Kafka package, and that tends to run out of disk space and a little hard to maintain. There's so much work we didn't have to do because we just built on top of this. It's definitely something we can recommend other people doing a similar kind of thing. You can imagine, in the future one day, maybe we want to build our own that's nicely-integrated, but there's a lot of engineering, a lot of faults that have seen the light of day to make Kafka as stable as it is today.

There are some other smaller issues, and I can go over some of them if you're looking into using Kafka for some project. We're running under JVM and we've had a lot of troubles with long GC pauses. One of them was actually the Kafka client, the libraries that interact with Kafka are using compression of various kinds to compress the data before it's being sent to Kafka and to decompress it as it comes out of Kafka. We just replaced those with ones that ensure that you don't log. One version was Gzip, there's actually a pure Java version. Surprisingly, it might be a little slower but you get rid of these second-long pauses that somehow sometimes happen.

Also, in the LZ4 that Kafka uses was using ByteArray and the ByteArray GNA, and the JVM can sometimes cause a global lock. If you have a system where a lot of what you do is, you're sitting in Kafka decompressing data, then that happens all the time and it can really block the system quite a lot. In a high-performance system, you don't want to do that. We just disabled compression in Kafka, and then did our own compression correctly, before and after. There's a lot of little modifications and experiences that go around like that.

Another thing is, every once in a while, you need to reset your Kafka Zookeeper, particularly as you're kind of setting things up and you're doing your first dry runs. You should be aware that once you reset Kafka, then all the offsets, the positions and the queues, they reset to zero as well. After a reset like that, you will need to observe that there's this notion of a cluster ID that you can get from the Kafka API that tells you if you keep that persistent, you can check and use that as an epoch to make sure that Kafka was resetting all the offsets into Kafka or if they aren’t valid at all.

Along the way also KSQL came along, and Confluence has been demoing it here at the conference today. That's a very similar system from a technology point of view, the way that the stream-processing engine works and queries are executed. There is one slight difference though in that, at Humio, we built the system as a MapReduce-style system where we move the computation to the data, and that's actually a lot more efficient. Especially if you need this like snappy user experience, whereas KSQL, you move the data to the KSQL engine, it works by basically streaming in Kafka topics again.

We're quite complementary. We do this interactive and user-friendly experience. The KSQL, it's a library for building these kinds of things. You could say, if you started today, “We should just build an API and some KSQL,” but it still would've been a very different and not quite as snappy experience.

Final Thoughts

Some final thoughts. There's a lot of difficult problems that go away using Kafka. I can recommend that even for a product that we built and deliver to be on-prem installed, but we also operated ourselves. We also have a small SaaS offering with some customers on it, so we can feel all the pain of operating Humio before the customers do it, but we're really happy with that decision. Maybe someday we'll build our own but that'll take a lot of resources, there's many years of good knowledge in that system.

Questions and Answers

Participant 1: You had mentioned Zookeeper on top of Kafka, but you deploy Zookeeper to keep Kafka running [inaudible 00:46:40]?

Thorup: The question is we implemented Zookeeper on top of Kafka, and Kafka depends on Zookeeper to run, so we have Zookeeper running anyway. Well, maybe we could've used Zookeeper if Zookeeper had a good API for getting a replica of all the data. For this to work, we needed every node to have a kind of an insync copy of the entire set of data in Zookeeper. I'd say it's a distributed Zookeeper that actually has part of it running in every node in the cluster so it runs in memory, in all the clients. It applies for both Zookeeper and etcd, maybe they got that recently, a way to subscribe to all updates across the tree and build an in-memory model. That's just one thing we considered.

Also, it would be nice if these, Zookeeper or etcd had a query language so it take a piece of query and move it into Zookeeper or etcd and run a wider query, gather the results. Zookeeper and etcd are these very point-wise key value stores that are not very efficient if you need to - in this case, tell me all the files that are pertinent to this time interval and these tags. That's a query we can run on a local copy of that, much more efficiently than if we had to do lots of round trips to Zookeeper. That's why we pulled it in.

Participant 2: Good talk. My question revolves around the state size of log analyzers. Typically, in Splunk or Elasticsearch, you may have 10-12 nodes, depending upon the usage. In terms of Humio, how do you foresee the estate to look like?

Thorup: Are you asking about the state size of the...?

Participant 2: Of the Humio set-up.

Thorup: Of the entire Humio set-up?

Participant 2: Yes.

Thorup: A typical node, in Humio, we run that on a big memory machine, and Humio itself uses typically 16-20 gigabytes of that machine of maybe a 128 megs machine. The states inside Humio are actually very small. We can run thousands of these live queries, our widget servicing goes from even a single node. All the rest of the memory in the system is used for filesystem caches. Remember, we compress these files. You compress them and store them, and the file-system cache is full of compressed files. You can imagine, if you have 100 gigs free you can actually store a terabyte of logs on a single machine, because it’s essentially, all in memory, these files are compressed, they happen to be in memory because you recently wrote them or you recently queried them. Then it becomes very super efficient. Essentially, it's an in-memory system like that.

Moderator: Well, it's smaller than Splunk. I think, from my experience anyway.

Thorup: Oh yes.

 

See more presentations with transcripts

 

Recorded at:

Jul 24, 2019

BT