Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Presentations Simplifying Real-Time ML Pipelines with Quix Streams

Simplifying Real-Time ML Pipelines with Quix Streams



Tomáš Neubauer discusses Quix Streams, an open-source Python library that helps data scientists and ML engineers to build real-time ML pipelines.


Tomáš Neubauer is a co-founder and the CTO at Quix, works as a technical authority for the engineering team and is responsible for the direction of the company across the full technical stack. He was previously technical lead at McLaren, where he led architecture uplift for Formula 1 racing real-time telemetry acquisition.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.


Neubauer: I want to talk about real-time ML pipelines, and how to build projects that involve machine learning and real-time data. We're going to talk about the landscape in streaming, all different architecture decisions you can do, and advantages and disadvantages of each solution. I'm Tomas Neubauer, CTO and co-founder at Quix. Before, I worked in McLaren, where I was leading a team that was connecting F1 cars to the factory, to the cloud, so people in a factory can build real-time decision insight on top of telematics from F1 cars, without traveling every second weekend to the track around the world. This was quite a challenging use case, because the amount of data coming from sensors were enormous. There were roughly 30 million numerical values per minute per car. Some of the sensors were particularly high resolution, more than 1 kilohertz, that means more than 1000 numbers per second from only one sensor. We quickly realized that database solution and database centric architecture will not work because we couldn't simply find a database that will be able to persist and query this amount of data at this scale. We were forced to look around, and we adopted streaming. We adopted Kafka as a broker. We successfully connected sensors from a track to the cloud, and people in a factory could use it. The problem was that our team was truly cross-functional. We had chassis engineers, mechanical engineers, simulation engineers, data scientists, ML engineers, and none of them really liked how Kafka worked. They couldn't really leverage the power of this technology, because they mostly used Python, and every tool for Kafka was in Java. We were not really using the potential fully. This is where really, I first thought about, gaps in toolsets around Kafka start to grow.


I want to build with you this simple demo. We're going to have an application running on the phone here, and it's a fitness app that's sending GPS location, speed, altitude, heart rate, gForce data from all dimensions, and they detect cyclists crashing with this fitness app on a bike. We're going to display all of that in a simple frontend application.

Streaming vs. Batch

The question is, how are we going to build this? What's going to be the architecture? There's a couple of options we can do. First, we need to choose between streaming or batch. If we go for batch, it's probably going to look like this. This is an AWS example. Imagine you have multiple phones sending data. You deploy API gateway to the edge, and you send data through it into something like a Kinesis, which will funnel data to S3 buckets. That data, you can then load into your SageMaker and analyze it, and later use it to label the data and train the model. The end of that training, you will end up with a pkl file that you will then deploy to Spark into a Step Function, where you load data in batches from S3 bucket and analyze if there's any crash in it. Later, you will output the result to another S3 bucket where you use AWS Lambda to push it to frontend application, again, through an API gateway. As you can see here, it's database centric, we have S3 buckets everywhere. Another problem is that you can imagine that there will be delay between the phone and actual information displaying in the frontend, because we have pooling involved in this pipeline. We have problem of vendor specific technology. If you want to migrate to a different cloud, or possibly on-premise, we are going to have a lot of work migrating Kinesis to Azure or Google technology. Lastly, databases are not really great for scale. If you really need to get to high traffic, a database is going to be the bottleneck of your architecture.

If we go for streaming, the architecture is going to look a bit different. At the beginning, it is the same, we're going to use WebSocket gateway, because that's the right thing to do. Then we will send this data to a topic in a Kafka broker, where we are going to first sync it to database, so we can load it to Google Colab or SageMaker, and use it for analysis and model training. Which, at the end, we produce, again, a pkl file, which we can use in our service. If I go to such a Jupyter Notebook, here, you can see that my colleague loaded the data from the database, and here we have gForce data for all dimensions, and we label them if it's shaking or not shaking. Then we're plotting them to see what's going on, some analysis of old sensors there. We add some features to the data. We use TensorFlow then to train the model. Then we backtest unseen data to see a possible bias. At the end, we will package this in pkl file, and publish to model registry, in our case, a blob store. Going back, this is basically where we get the pkl file, and then we can deploy it in the service. They're going to real-time process data coming from the phone data topic, and output the results to alerts topic. This is basically where we are using the model. Then the rest is the same. We send it to WebSocket gateway, and then we send it to frontend. Now you see, the difference here is we don't need the database to actually detect crashes. The moment we train the model, we can even disable the persistence and completely forget the database. It's going to work. Now instead of database, the center of our architecture is broker, Apache Kafka, for example, which is a highly scalable and resilient technology.

When you're deciding between batch and streaming, you might be thinking it's just about different technologies in your architecture. Actually, it's more than that. It's about the approach to solving common problems. It is about the mindset when you are building your pipeline. Imagine a very simple operation, where we want to calculate the delta between forces applied to the phone between all data points. As a bike is going around, it's sending data points, and they are being collected in the database, and in a batch would look like something like this, a table with rows, where each row has a timestamp and values in the columns. Now we can quickly add a new column. There would be a new feature calculating total force and applying it to the device. That's easy. Then we can do a delta on that column. Here, you see, we just get the previous row value, and subtract it. In streaming, though, we don't have a table. As data is being collected, they just go straight to the pipeline, piece by piece. That means that we can calculate the feature. That's easy. That's a stateless operation. Then for delta, we're going to need to process state, because the next time we're not going to have this row of data available. Obviously, for the first row, we can't calculate the delta. That's the same with the batch. With the second data point, we lost the previous values, but we have a state. Now we can calculate the delta with the previous value. Then we update the state. We continue in the same fashion. What is here, the complicated thing, is to make the state persistent in case of restart, and that's actually not a trivial thing to solve.

ML Deployment (REST API vs. Streaming)

This is obviously a machine learning use case. When you're building ML powered architectures, you have two major options to deploy your model: web API services and streaming. If we zoom in into the model, it looks like this. You have input with a feature, you're calling the API and the response is if it's crashed or not. What are the issues with such an architecture? First of all, CPU overhead on every call. This phone that I'm going to use for the demo is going to send 4 hertz sense data, which means that 4 times per second, there will be a message. If that's 100 phones, or 1000 phones running at the same time, you see there's lots of requests, and they will cause unnecessary CPU on orchestration of the request. It's also introducing delay, because each HTTP request will take time to be processed by the middleware. The biggest problem is, really, what's going to happen if your service can't handle the traffic? First of all, it's going to slowly get slower to respond to requests, until it's reached the point when that is longer than the timeout on the client, and you're going to start losing the request. Basically, that's a disruption in service and you're losing your data. You can obviously scale your API services, but you never can predict what traffic you will get. With streaming, this is no longer a problem, because the service is running straight on top of Kafka. Therefore, we can leverage Kafka temporary persistence and checkpointing system to make sure we're not going to lose any message. If the service will get overwhelmed with the traffic, in the worst case, we will get the answer a bit later, but we will never miss a single message. Here we're directly embedding the model into a streaming service, and that is a more resilient and more scalable solution.

If we're going to do that, we're going to build a stream processing application. That means we have to choose one of the architecture designs for such a project. The first is using Kafka client libraries, that are using Kafka producer and consumer API directly. That's a lightweight, elegant solution, but has its downsides. The second option is to add a full-fledged stream processing framework like Apache Flink. That's also not a silver bullet. With the Kafka producer and consumer API, it's worked very well with a simple processing, when you process message by message without any context. You can use it in any language, and you don't drop any dependency to your architecture. The problem is when you start using stateful processing, when you need to get a state and context and need getting the data from multiple messages. That is super complicated, and is a proper software engineering. This is the moment that you might be tempted to use something like Flink. With these full-fledged stream processing frameworks like Flink or Spark, you're basically running your processing in a separate cluster, in a server-side cluster where data and code get redistributed by the engine, and executed in a scalable and stateful and resilient manner. The problem is that you are bringing a Java dependency. The whole ecosystem is really JVM. That means it's quite difficult to use from Python. Also, debugging is quite difficult because the code is not running in your IDE, the code is running in the server. Then it gets really complicated when you try to combine the classical word, for example, Kubernetes deployment, and your Flink jobs. You just can't slap the Flink code to existing microservice, because it's not running in that Docker container together with your Node.js, Java, C# microservice, it's running in Flink.

When you use these frameworks from Python, you will get common problems, like, for example, handling JAR files. Even though you have a way of connecting, for example, Flink to Kafka, you need to use Flink connector, and therefore, if your Kafka is configured with some more advanced settings, like Kerberos, or a certificate, SSL, then it gets really difficult to use. Then, if you get over that, you might be thinking this is pretty cool. I can do SQL, and that's powerful. Until the moment you need to do something real, because then you have to build custom functions, user defined functions. If you build them in Python, the problem is that because of the JVM engine, they have to run Python environment on the side and connect it to some socket connection, as you can see here on official documentation from Flink. That's obviously not ideal. If you have any problem, trying to understand Kerberos is super difficult. What we really need at the moment, I think, is debugging, going line by line and see, what is going on? Why is my code not working? The question is, is there a third way? We believe that there is. That's why we built and created a new library called Quix Streams, which is basically something between the libraries and full-fledged stream processing frameworks. It's still light. It's still just a library. It started trying to solve the more advanced, difficult problems by leveraging Kafka and its features.

Our Approach (Kafka + Kubernetes + Python)

What is our approach then? How are we going to build this demo? I'm going to leverage Kafka, Python, and containers in Kubernetes, to build resilient, scalable solution. The most important concept to grasp to understand the demo is what sometimes is called Pub and Sub service. It's basically, when we subscribe to a topic, we load the messages, persist them, and we publish the output of that processing into another topic. Why is this a good idea? First of all, it's a very scalable solution. Those partitions are redistributed in a cluster.

Each topic consists of an [inaudible 00:18:58] partition, and data gets redistributed in those partitions. Then they are assigned to multiple instances of processing code running in a container, in a deployment in Kubernetes. Here, we get each instance getting a third of the traffic. You can add more to force them to the scale of this processing. It's fault tolerant. First of all, those partitions get replicated twice, three times based on your configuration. If one of the nodes in a Kafka broker fails, streaming is ensured, but also your compute deployments get replicated. When you have a restart, others will take the traffic and the failed container will get scheduled again. It's a resilient and scalable solution on the data delivery and compute side as well.


Let's try to build real-time applications powered by machine learning. Now we are in my PyCharm. Here we have a starting template for Quix Streams library for processing code. The Quix Streams Library is the third way. Here, I'm just going to run the code and see if we are getting data. I'm here running my companion app, which is sending telemetry data. If I just move this to the right, there we are, we can see we have a stream coming from Tomas phone. That's good. If I stop the code, and here I can start building my service. First of all, I'm going to get a data frame. It's not real pandas data frame, it's a streaming data frame, but it has the same interface, so we can use the knowledge we had from batch and streaming. Here I'm going to get gForceX, gForceY, and gForceZ. Then I will print the header, and then I'm going to iterate rows as they bring stream to the pipeline. There we are. If we go to the top, we see here four columns, and now the data being streamed. If I take the phone, you can see that immediately changing based on me moving with the phone. We have data connected to my code. First, you see that sometimes we have gaps. That's because not every row contains these three columns. I'm going to do a filtering here and I'm going to say, when it's not none.

Then we're going to need to create a new column, that column is called gForceTotal. We saw that in Jupyter Notebook. That's basically the total amount of forces applied to the phone from all dimensions. Here we're just going to do an absolute value of. There we are. Last, to get some meaningful information from this column, we're going to calculate a 10-second rolling window to see if there are substantially more force applied to someone on the period. Let's call this column, gForceTotal-10s, and that's going to be again classical pandas. Because we have already quite a lot of data, I will just adjust the print_width to 15, and let's run it. There we are. If we go to the top, we see we have now these columns. If I go to the bottom, you see the rolling average is 9.9gs. That's because we're on planet earth, and we have gravitational forces. If I start shaking my phone, you see it's going up. If I put it back to the table, it's going down again. What if I now stopped the service and restarted it. We definitely don't want to start from 0, because we would lose some data. The library would take care of the state, and use the Kafka checkpointing to make sure that we would recover from the same state.

I'm going to start shaking to get to some numbers. There we are, we are on number 16. Now I'm going to stop the processing and I'm going to start it again. You see that we were on the 10, and now it's going back to 9. If I go back to the top, we were starting on 17, 16, 14, 12, 10, and 9. The first row is recovering the state of the previous round. This is what we want. Now, it's time to use machine learning model. I'm going to just borrow the code here from my notes to get the model. Here on the top, and then I have to do it here. Now I'm just going to create another column called shaking. Obviously, we're not crashing here a bike in my office, so that's why we're detecting shaking and not crashing. There we are. The number 0 tells us that there is no shaking. If I shake now, you see there's 1s which means it's successfully detecting my shaking. That's working, so I can try it again.

The last bit we want to do here is get this information to our product, in our case, in this demo, a simple frontend application. Here, I have a frontend. If I refresh it to see just current session, there we are. You see I'm in Prague, and you also see the gForce information here coming from the phone. If I shake, there will be nothing, because we haven't written the result to the output_topic yet. To do that, that's just one line of code. Here, I will get the output_stream, write row, and because that's an asynchronous provision, I will do that. Now if I shake, there we are, we have crash detected in our frontend, so this dimension is completed.


See more presentations with transcripts


Recorded at:

Feb 02, 2024