BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Building and Operating a Serverless Data Pipeline

Building and Operating a Serverless Data Pipeline

Bookmarks
49:25

Summary

Will Norman discusses the motivations of switching to a serverless infrastructure, and lessons learned while building and operating such a system at scale; focusing on operability, stability, scalability, and ease of development.

Bio

Will Norman is a Director of Engineering at Intent where he leads the Data Platform team. He has over 10 years of experience architecting, and building, high volume systems in the FinTech and AdTech industries.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

Norman: My name's Will Norman. I am here to talk about lessons that we've learned building and operating a serverless data pipeline. How many people have been rudely awakened by a page in the middle of the night telling you that a server is falling over, and you need to get up and fix it? I know I have. One of the low points of my on-call career came getting a text like this telling me that Vertica had fallen over. I was in a hotel room with my wife, so I had to get my computer out and go to the hotel bathroom, VPN into the network, and bring back up our Vertica cluster so that our jobs could run that night. This was about three years ago and around the time we started our interest in the serverless at Intent. It was definitely a shove in that direction.

A quick introduction, my name's Will Norman. I'm the director of engineering at Intent. I have about 12 years of experience in New York as a software developer with primarily FinTech and AdTech background. Intent is a data science company for commerce sites. We basically have a predictive analytics platform that allows sites to maximize the value per user that visits their site. The primary application for doing that is an ad network for travel sites. My team at Intent is the MOD squad. We have squad-based structures at Intent. MOD's a recursive acronym that stands for [inaudible 00:01:35] data, or the data platform team, made up of four engineers and one product manager. With the squad-based structure, the idea is that we have a cross-functional group of engineers who are able to own and operate our infrastructure as well as the applications that run on top of that.

Today, I'll start off talking about what is serverless, define it for the scope of this talk, talk about the data platform that we've built at Intent, and lessons learned along the way. For the scope of this talk as well, a lot of our experience is using AWS, so it's going to have some AWS-specific examples but, hopefully, a lot of these things are applicable to other platforms as well.

What is Serverless?

To start off with what's serverless, it's a bit of a buzzword and everybody has varying definitions. For us, it's more about using managed services than having a lack of servers. Of course, there are still servers in the background, but someone else is running those servers for you and providing abstractions on top of them. It's not just Functions as a Service abstraction. It's other things like Infrastructure as a Service, Database as a Service. No Kinesis streams; Snowflake's a good example of a serverless data warehouse. It allows you to scale on-demand and pay for what you use, so you don't have to worry about provision capacity as much. You're not over-provisioning, so you can meet demand three months down the road and paying for all that extra, you can scale up and scale down as you please.

One of the biggest things is that it allows you to empower your developers to own their platform. They're not in charge of doing routine tasks like configuring the network stack on a Linux kernel or coming over here running a ZooKeeper cluster or things that require domain-specific knowledge. It allows us to focus on the applications that we're building.

Intent Data Platform

Now to talk a little bit about the data platform as it existed when I joined Intent about five years ago, we were using ActiveMQ as a message broker that had application servers that were handling ad calls, clicks, conversions, processing data like that, and then shoving that data into these log processor applications, and we had embedded ActiveMQ brokers on each of those applications. The log processors ran on dedicated EC2 instances. They were Java applications. They kept the state locally, so not only do they have an embedded broker, but they are also, as they processed events, rolled those files onto the local file system. Then had a Cron job that came along every so often and rolled those files to S3. Then, of course, S3 was our final destination and place of integration.

Here's a rough overview of what that looked like. As you can see, on the positive side, it's fairly straightforward. It's a pretty traditional architecture. Coming from other places, this looked pretty familiar. I'm sure it looks familiar for most you all, but some of the issues with this architecture, one, is there's a lot of concerns being handled by these log processors. They're deserializing events, reserializing the data back out to S3. There's a lot of business logic in here. They're reading from databases, writing to databases. Then there are applications built-in, so if we have our advertisers, we manage budgets for them. Budget decrementing, processing click fraud, all kinds of stuff that's going on in these log processors. If anyone were to add new functionality, then it all had to go in here. This became the central mono repo that multiple squads were then working in the same place. This went against our squad charters.

Another issue is scaling. Each log processor handled a certain subset of events. If you wanted to scale just that event type, then you had to basically figure out which log processor had extra capacity in order to handle those new events. Then additionally, if you wanted to add more capacity overall, once you added more log processors, for instance, you had to figure out how to reshard those events back across. That calculation was always a bit of a pain. If we wanted to add, let's say we had a major new partner coming on our network and we needed to increase our capacity, then that required a lot of lead time to think about, "Ok, how are we going to reshard these things? What type of hardware are we going to add?" All those calculations.

Then from an operations perspective, if one of these instances went away for whatever reason, then that was a huge pain as well. It required remounting EBS volumes, forensically going in and collecting all this data that was kept on that state and figuring out how to recover that. That was, when it happened, sometimes a several-hour operation where a lot of people had to jump on it, and it was a page and became an emergency.

Let's look at the new world that we have. It's made up of these technologies primarily. Kinesis streams are handling the events. Lambdas are processing events, putting events back on Kinesis firehose which is sterilizing stuffed S3. SNS messages triggering Lambdas. AWS batch doing some batch processing at the end of the pipe. There's a lot going on here. When you explain it to people, it can feel a little overwhelming. They get this picture in their head of a bunch of stuff going on at once and flowing all over, but once you dig into the system and learn, we tried to make things component-based and have different things in isolations or concerns. This is the flow and once you get a model of it in your head, it's fairly easy to understand how things progress through the system.

Ad servers and other systems put events onto the raw streams. Our data is an AVRO data format, and then the processor Lambda per event will do some data enrichment, and then put that into the process stream. That becomes our integration point for other people to build applications. For example, our advertiser budget decrementing is now a separate service owned by the advertiser squad and they're free to involve that as necessary. We then put data onto firehose and that's responsible for sterilizing it to S3. When that happens, since we're using AVRO, we end up with basically chunks of bytes and not actual AVRO files. Once those files get written out, we have a partitioner Lambda that takes those chunks of bytes, serializes them into events, and then writes out the AVRO files and as different partitions based off of the date and hour of the event, and also a hash of the event ID so we have even partitions.

Then at the end, we have a batch process that reads those files, so we're still dealing with small files with five-minute increments generally, and it merges those files together so we have one large file. It also dedupes the data inside that file. In Kinesis and Lambda, at least once we end up with duplicates, and so this smooths things out at our end destination so that our end-consumers, not everybody has to go through and reimplement dedupe logic.

As I mentioned, our data consumers, the one main advantage here is we're now able to have this integration point; anybody can build an application that consumes streaming data. We have our budget updating process, we have people doing real-time monitoring and looking for anomalies. We have our data scientists who are able to train models that come off of here. It really makes it easy if you have an idea of something you want to test out, you can build an application to consume from these streams, and it doesn't involve shoving everything back into this log processor application.

On our team, we have a handful of Spark Jobs that run aggregations that then get written to Redshift. We also have an ETL app that loads the event-level data into Snowflake, and then a batch process that transforms the AVRO into Parquet that makes that consumable by Athena, which is a serverless data warehouse that Amazon offers. The Parquet data is also really nice for people who want to use a Spark Notebook to do data exploration and consume data that way.

Was It Worth the Move?

The question is, was it worth the move? It's a big project and a lot of effort. I would say, absolutely. One of the biggest things is we now have fewer production issues. When we do experience them, they're less severe. First of all, other people are running the infrastructure, so a certain class of problems just goes away. You're not paged in the middle of the night because a server runs out of disc space or whatever the problem may be. Since we have a clear separation of concerns, it makes it easier when something starts to fail to triangulate where it is and fix that thing. It doesn't quite have the blast radius to go and affect other systems as much.

Then, as I mentioned, a separation of concerns for operability as well as just understanding the system. It's easier now when someone wants to make a change to a piece of functionality to point them in one specific place. They don't have to have the context of this huge application in their head. They can go in and make focused changes. The system is horizontally scalable, we no longer have to go through these exercises to figure out, "Ok, X partner's coming in. Our load is going to increase by 20%. What do we need to do to account for that?" Put them on, the system scales. If we need to shrink back down due to seasonality or whatever reason, we're able to do that as well. Things are very elastic.

We removed a bunch of undifferentiated heavy lifting. No one on my team has to be an expert in ActiveMQ or figure out how to build an AMI to deploy these servers to EC2 and configure everything for high availability and all that. Now it's just understanding the services that we're consuming, and how we compose those things and build our applications on top of it.

Total Cost of Ownership

Now, I'm going to talk about some of the lessons that we've learned along the way of building this out. First of all, with regards to the total cost of ownership is, you have different concerns in this model versus the old model of static servers. First of all, you're dealing with on-demand calls, so it becomes a little bit easier to forecast that and you also aren't paying for your excess capacity. One of the issues with serverless is since we now have a lot of components, getting a full view of what the cost is of a service becomes a little bit harder. Tagging is definitely the key to this, to bring transparency to that. Then also, when you're looking at a service initially and trying to scope out how much stuff it costs, there are hidden costs as well from things like writing the CloudWatch logs that may not seem obvious at first, but can be super expensive. An example we've had there is someone turned on debug logs for an API gateway to troubleshoot an issue, and forgot to turn them off. For that month, I think the CloudWatch log cost was something like four times the cost of API gateway. Now we have the ability to roll things up and see the true cost for our services and keep an eye on stuff.

Another thing to take into account is enterprise support. This has become key because now AWS is our partner in running our infrastructure. When there are issues, it's great for us to be able to reach out to them. Our account managers, we have a really good relationship with them. If it's an issue that just a regular support engineer can't handle, they're able to escalate things to service teams and engineering teams and get them in front of the right people and get things resolved. That's clutch. Then also, the value of being able to focus on your core problems and not do this other heavy lifting, it's hard to take into account upfront but once you have the system built out, then things become a lot more transparent and you have better velocity as a team, and you're freed up to accomplish more.

Think about Data Formats Upfront

Another issue or lesson I would say we learned is, it's super important to think about your data formats upfront. Some of the concerns to take into effect or into account is how you're going to be consuming this data and what your ecosystem looks like. If you know that you're going to have a polyglot environment, you need to make sure that you're choosing a tool that works with the different languages. Also, if you know that some of your tooling downstream expects data in a certain format, then it can be helpful to use that format or think about the complexity of transforming your data to match that.

Another decision is schema versus schemaless. It's easier to get going using schemaless data format, but as you evolve your system, it definitely becomes much harder to maintain. That was one of the reasons why we chose AVRO. By having a central data repository, we now have a clear path for data evolution. We don't run into issues if someone changes device field to device name, and then all of a sudden it breaks your downstream jobs; we can make very sane data evolution.

Then also data validation, so who's in charge of making sure you maintain your data quality, and how are you going to check for that data quality. Right now, that happens in the fronts of our system, but anytime there are changes to that it requires some communication as well.

Data at rest and data in flight - it's certainly easier to have the same data format all the way through, but there are advantages of choosing one format for at rest and then another in flight. For example, if you have Parquet as your default at rest format, then some of this tooling and consumption of the data becomes easier. Then in general, JSON, CSV, AVRO, and Parquet seem to be the default data formats for a lot of streaming platforms that we've seen.

As I mentioned, we're using AVRO and this was one of the early improvements that we made. AVRO is a self-describing data format, so when we serialize an event off of our ad server that had the chunk of bytes for the event, and then a header of JSON, so it's just plain text describing the schema of the event. What we saw is we ended up spending more of our bandwidth with passing schemas around than we actually did passing events around. We created a schema registry concept. This is super popular in the Kafka world, but there's not necessarily an analogy or a similar system for Kinesis. What we do is serialize events as just data and write the MD5 hash of the schema used to serialize the data. Then all of our schemas get published to S3. When the consumers come, they read those schemas, calculate the fingerprints, and then know how to deserialize the events as they come in. This was a huge improvement boost when we rolled this out. It makes orders of magnitude more efficient, the system flowing through.

Design for Failure

One of the key things too is to design for failure. These are distributed systems. Just because you're not operating them, you still have to be concerned about the failure modes of the system. As I mentioned earlier, Kinesis and Lambda are at least once guarantee systems. Understanding each point along the way is super critical to know what kind of guarantee do you have around ordering? What kind of guarantee do you have around message delivery, execution? One way of dealing with at least one system is making sure that your consumers are idempotent and so if you process the same message more than once you get the same result, or if you just process the same file more than once you get the same result, and you're tolerant of these multi-processes.

Then from a batch perspective, we're always over processing data. This means if you have a job that runs each hour, and mostly just processes the last hour, instead of just looking at that hour, we're looking at a range of data, so let's say 12 hours, and making sure that nothing's changed in the outside of that range. By doing that, we free ourselves that if there's an issue upstream, we're able to reprocess that data. Also having dead letter queues allows you to, if you can't process an event for some reason, shut that event off and then come back to it later once you're ready for it.

An example of an issue that we ran into and tested our system for these things is with our schema registry. We use a Maven plugin to publish the schema to a version folder, an S3. The plugin had a blip and we overwrit a previous version number and so certain events started failing as data flowed through the pipe. Since we are shoving those events over dead letter queue, we weren't stopping the flow for all the other events that were successfully being processed. That allowed us to come through, fix the Maven issue, republish our schemas, and then replay those events and flow through. Since our downstream systems were idempotent and also looking at historical data, doing all this over the last hour or whatever it took us to fix it, things just processed as it flew through. We didn't have to go and actually rerun downstream things, and everything that consumed the data down from that.

Here's an example of knowing your delivery guarantees and where that bit us. As I mentioned, when firehose writes data, that triggers a Lambda to process the data and turn it into the AVRO files. That uses S3 event notification, which is SNS under the hood. The guarantee wasn't clearly documented there. We didn't follow up, we just assumed that it was a guaranteed delivery. As we were building our system out, and we are diffing against the new system and the old system, every day or two we were running into subtle diffs. It took us a while to track it down. We finally figured out some of these files weren't actually being processed.

Here we built this replay Lambda that will basically turn this into at most once guarantee to at least once guarantee and allowed us to process that by whenever the partition or file processes, the Lambda processes, the file tags it in S3. Then, the replay Lambda comes along and looks for untagged files and basically says, "Reprocess this guy." Since this thing's idempotent, we can deal with the eventual consistency of S3 tagging and all that but if we replay a file, it's not a big deal. If we miss a file, that's where the problems occur.

Then this was the batch process that I alluded to earlier. For each event type, we have an hourly job that goes and basically compresses the files or combines the files in partition data, and then removes duplicates. When it does that, it writes the state of which files it processed in the S3. It looks at the MD5 hash of the files. Then every time it runs, it looks at the previous, I think, 12 hours worth of state. If something changes, then it's able to reprocess that partition and then update the data in the concatenated bucket. This allows us to deal with failures upstream or if we had to replay data from upstream, everything flows back through. The one caveat here is now data can change within this concatenated data bucket for up to 12 hours. That's a basic limitation that we make clear to our consumers upfront that, "Our data changes. It doesn't gel." Most of our consumers are used to dealing with that kind of thing. It hasn't been that big of an issue, but it's definitely a thing where it takes education for your end-users.

Design for Scalability

Also, designing for scalability. As I mentioned earlier, the system's horizontally scalable, but you need to do some things to make sure that's the case. It's not something you just get for free by using serverless systems. One of the key things is decoupling from non-scalable systems. If your Lambda is hitting MySQL database, for example, eventually you're going to run into the limitations of the database. Also, if you run your Lambdas in a VPC, you can exhaust resources for the subnet that it runs in. Amazon has various resources and limits there. From a decoupling perspective, if you can instead have a layer in between the database and your Lambda, or use horizontally scalable systems like DynamoDB, then that allows you to keep things horizontally scalable.

How you partition your data is important, that's something else we ran into. At first, we were using partitions that were based off of business logic. It made it easier to go through and find, let's say, things were published by a publisher name. It made it easier to go and find data for a specific publisher, but it led to super uneven partitions. When it came time to have resource allocation for the concatenated jobs, for example, for most partitions it was way over-allocated for resources just to be able to deal with the outliers over here on the tail. By moving to the hash base partitioning scheme, that allows us to keep everything even, which also makes Spark a lot happier as well.

Think about how you shard your events. Kinesis streams are made up of shards where when you've put an event onto the stream, you give it a shard ID. You tell it a key to use, and that hashes that key for you. We use event GUID, and so things get split evenly across all the shards. There are also options with some of the libraries to just have it generate a random ID if you don't have something random for it. One huge caveat here is ordering is guaranteed per shard with Kinesis. If you have message delivery guarantees, if you have message A and message B behind it, you have to process A before B. Then you have to make sure those two things are on the same shard. That requires a little bit more thinking and effort to make sure that you don't run into hot shard issues.

Then also thinking about fan-out patterns. How are you going to be consuming this data? With Kinesis especially, there's a limitation on how many consumers you can have per stream, and there's also now different ways to consume that data. There's the standard way, which is a polling mechanism under the covers, and the new way, which is lower latency but uses web sockets. You can have increased parallelism, but it also costs a lot more. If you're going to have more than, I think, five consumers, then you need to start thinking about how you're going to spread this data out. There are a couple of different patterns that have emerged, and it really depends on what your use case is, how latency-sensitive you are, how cost-sensitive you are, all of those kinds of things.

Here's an example of how we decoupled from the MySQL database. We needed our processor Lambdas. We need the consumed data from those databases, reference data. We have a scheduled Lambda that runs inside the VPC, pulls that data, dumps it out every couple minutes or whatever the freshest interval we need for the data. Then the processor Lambdas consume that data from S3. Then we also publish build artifacts like our JSON to S3, so then that becomes a scalable way to do data look-ups. That's been a pretty big win because at first we were trying to run things inside of a VPC, and then we realized pretty quickly that wasn't going to work out. We started exhausting ENIs and all kinds of stuff was coming up.

Not NoOps, Just DiffOps

Just because someone else is running your server it doesn't mean you don't have any operational concerns. You just have different operational concerns now. One of the biggest things when we do see issues is figuring out upfront if it's an application issue or a service provider issue. You need to be able to quickly figure out, "Ok, this is a no pointer. I'm not processing these messages, I need to fix this," or, "My iterator is backing up, I can't quite figure it out. I need to put in a support ticket to get some support engineers looking into this thing."

Then also being aware of platform limits. Even though your system's horizontally scalable, your service provider is going to impose a limit. You can only have a certain number of shards opening Kinesis at a time or a number of concurrent executions for Lambda. Generally, those are adjustable. There are very few hard limits in AWS, but you need to be proactive about getting those limits increased. They actually just, a couple of days ago, released a tool where you can now see all your account level limits in one pane because for the last couple of years we've been asking for that because it was a pain to go and find all the various limits and figure out which ones you're fixing to run into.

Logs - now you're not SSHing somewhere and just grabbing three logs to figure out what's going on. You need to pipe these things through. Since your applications are decomposed into several different components, it's harder to get a single view of the world. A lot of people like to use tools like Logz.io, a log aggregator, to let you go through and group things by application. CloudWatch Insights is another tool that AWS recently released that works pretty well too for doing this.

How you collect metrics - you get a lot of metrics out of the box from AWS about how these services are performing, but then application metrics you need to be able to publish yourself. This was another area where we quickly ran into API limits when trying to publish stuff to CloudWatch. There's a good pattern called metric filters where you log your metrics and then set up a filter on the logs in CloudWatch, and it ingests those metrics into the CloudWatch metrics for you. My former colleagues, who now have a consulting company called Symphonia, have a good blog post about that. Then having a dashboard so you have a view of what's going on within your system. When you do run into issues, you can see things on a component-level, and quickly pinpointing where your issues are occurring is super helpful.

Then alerting, thinking about, "Ok, what are my actionable metrics? What are things I need to keep my eyes on?" We've settled on Iterator Age for Lambdas, and then the number of execution or error rates for Lambdas. Then the thing that we page on is if all of a sudden we stopped getting data on any of these process streams, then that indicates that something's gone horribly awry upstream and we need to figure out what it is. Most things are warning emails, and then that's like, "Something's going on, there's smoke. Go see what it is." If we stopped getting data, then that's like, "Stop the world and figure out what's going on," which, again, rarely ever happens. I think we've had a couple of times with the thing with the events not being ODD serialized, and then there was a partial S3 outage a while back that had a stop the world effect.

One thing that's different is, since you aren't running your own infrastructure, sometimes it's harder to get a view into what's happening. Having that good relationship with your service provider is super key there. This feels like something's going on with the network. You can poke them to look at it, but you're not going in and looking at stuff, which is a positive on one side, but when you're used to being able to solve your own problems, then it becomes a little frustrating. Then you remind yourself that, "I'm not going in and running tcpdump to figure out what's going on. Some other engineer's doing that for me."

As much as things are different, some things still remain the same. This is the first response you get when you enter support tickets. Unfortunately, it fixes things a lot of the time. These are containers running, you're executing the Lambdas. If you do things like toggle the memory, it basically cycles the container where it's running. Since these are multi-tenant issues, there are noisy neighbor issues that you can run into that lead to things like iterator failure or iterator backup, like the one there with the partial S3 outage. There were some DNS lookups that were cached. By cycling this, you clear out that cache and start hitting the good S3 versus the bad S3. These issues don't pop up a whole lot. It's one of those once every six weeks, two months sort of thing, something's going on. One of our first things that we do before we even start investigating is just cycle the Lambda and see if that fixes it. Also, if things are systemic you can rerun our deploy, which effectively restarts everything as well.

Build Components

Having a component mindset is key here too. With Lambdas and all these serverless components, it's easy to get sprawl and run into the picture with all the balls bouncing around. By having components and encapsulation helps break things up. It helps you reason about what's going on in your system. It also makes it easier to do the right thing when you have these components set up in your infrastructure as code with all your best practices encoded there. It's also easier to extend when all you're thinking about is, "Ok, how does this component react?" versus, "I changed this Lambda and now there are 30 things that were interacting with this Lambda. What's this change going to do?"

As I mentioned, infrastructure as code is key with all these things since there are a lot of components and a lot of pieces to the infrastructure to manage; it's good to have everything in infrastructure or in code. Here's an example where we have a module in terraform. All the heavy lifting is really inside of this event processor module, which wraps over a couple other modules. By defining this block, we now have this part of our pipeline for our conversion data type where all of our event sources are wired together, all of the Lambdas are named using our naming convention. They're tagged appropriately. CloudWatch alerts are set up to send to the right group. We have our pager duty alert, and then it has sensible defaults for things like memory and execution limits. All these things we can configure to override. This is the minimal set. By having this, it makes it easier for other teams to come in and say, "I need to add this event type." It doesn't become a huge burden for us. They can submit a pull request and we can work with them to get everything set up. By hiding a lot of these nitty-gritty concerns, it allows people to come in and understand what's going on easier.

CI/CD

Another aspect that's different in the serverless world than your traditional Java server world is how you test and deploy in those cycles. It's a bit of a step backwards from being able to spin up a vagrant container or a vagrant image with your database and everything you need to develop against, and you do it all locally and iterate quickly. If you want to test out a change for a Lambda, you have to deploy that thing to Amazon and test it out. Their tooling is getting better around being able to do that and attached to a running Lambda. That's one area where Microsoft seems to be well ahead of AWS, but it's still a pain to run through that. Since it's a lot of collaboration amongst different systems, you don't really get that when you're testing something in isolation either.

Unit tests have always been important, but it's even more important now - because you want to make sure you have pretty exhaustive test coverage cases so that you don't run into a no pointer or whatever, and the Lambda doesn't have to go through your deploy cycle again - integration tests are also super important. Once you have your components built out, having good integration tests to make sure that things inside of your components are operating as you expect them to. Then end-to-end tests, so as you're gluing your components together, making sure that all of that looks good.

We use a staging account and a production account to segregate these things as part of our flow. Since a lot of these limits are account-specific, that's a good way to make sure that if someone's test goes haywire, then it's not going to take resources away from your production code. Or if you're running some terraform change and you're not quite certain about it, you don't feel as bad testing it out and staging as you would in productions like, "I just deleted this security group that everybody needs."

Since we do have a little bit more integration tests, then the end-to-end test tend to take a while because there isn’t really great tooling; it's a lot of pull to make sure this event reaches this place, this event reaches this place, and that kind of stuff, and so our deploys tend to take a while. I think that's another issue. I think we could break things up even more. Right now it's a pain point, but it's not horrible friction.

Leverage the Community

I think also leveraging the community, since this is a pretty new field, everybody is trying to build out their patterns, figure out how things work, figure out their best practices. We've found that people are super excited to talk about these things and share what they've discovered. Good places for that are on Serverless Forum Slack and OG-AWS Slack for AWS-specific stuff. Then blogs, as I mentioned earlier, Symphonia is my former colleagues’, Mike Roberts and John Chapin, who are now doing serverless consulting and have done a lot of good writing around serverless. Charity Majors who was a big influence when we were trying to figure out how to set things up from an ops perspective and do our test and what all is required there. Then Jeremy Daly is AWS Serverless Hero, I believe it's called, and he does a lot of good writing and has a newsletter that wraps up or has serverless stuff each week. People are pretty active on Twitter talking about these things, and also meetup events and conferences like this are a great place to talk to people who are doing new cool stuff.

Questions & Answers

Participant 1: You mentioned that you use two different formats, AVRO and also Parquet. It sounded like AVRO was used more in-flight than Parquet as a final at rest. Why not Parquet all the way, and why did you make this choice?

Norman: We have Parquet in addition to AVRO at rest. AVRO data is our data of record. Spark Jobs and all that were consuming AVRO already, and so it's a little bit of path dependency but, yes, I think Parquet at rest is a great option. It was more of an evolutionary thing.

Participant 1: Would you be able to use Parquet in flight also?

Norman: Not really, just because producing Parquet is more of a batch-level thing. It doesn't necessarily make sense to have as an individual event since it's a columnar data format. You lose a lot of the advantage of it.

Participant 1: You mentioned that you're using tagging of the files to figure out which files were processed or not processed. Do you ever move the files to a separate folder?

Norman: We have life cycle setup, so after a certain amount of time things just cycle out.

Participant 1: Ok, so tagging is a better option to start things off?

Norman: Yes. The tagging is pretty much free. If you need to move a file, it takes a little bit longer.

Participant 1: You also mentioned that you have some kinds of dashboards. Can you elaborate which dashboards you use to monitor things?

Norman: We use the CloudWatch metrics that we gather to build the dashboards. We basically have a view for each of the components. There's our data source view, the processing view, the partitioning, and then concatenation view. We have a dashboard for each one and it's basically event-level metrics. There are metrics for each event type and monitoring for things like total throughput, number of events, number of execution for Lambda, duration of execution for the batch jobs. It depends on the component of what we want to look at, but it's set up like that. Then we have alerts and events set up so that if something like a batch job runs for too long we get notified about it.

Participant 2: In your experience, is there any difference or is it possible at all to optimize the costs of the serverless functions by changing the payload? For example, having lots of small payloads versus having a few large payloads?

Norman: You pay for execution with Lambda, so we definitely batch messages as they flow out of our servers. We use the Kinesis Producer Library, which is something that Amazon provides, which batches messages. That's another one of those latencies versus processing concerns. Then the lever you have for Lambdas is how much memory is allocated. That also maps to how much compute you have. There’s an optimization sweet spot and people have actually published some patterns for finding what that sweet spot is. It's an effort we haven't really gone through just because the cost isn't enough for us at least to do all that optimization. We can control things like that.

Participant 3: You mentioned that you have the monitors when putting the data on the Kinesis stream when it basically stops completely. Do you have any monitors where instead of just completely stopping, the message flow decreases or increases significantly?

Norman: We don't monitor for that, but we also have business process monitoring, and so other teams can notice that. We haven't run into that. That's usually an indication that our traffic has changed dramatically and it's usually by publisher. That's where we have pretty sophisticated anomaly detection that some of our data scientists built that is looking for that, just because we have some smaller publishers, so if they lose traffic, then it's not necessarily noticeable system-wide, but it's noticeable for them.

Participant 3: You said you use schema register for the publishing new schemas and the versioning. What kind of schema validation do you do? Do you do any schema validation that is compatible?

Norman: It's more by process. We have a centralized schema registry, and so people submit pull requests. For most things, it's just adding a field, but if there's a destructive change then it triggers a conversation. One of the things we do in our AVRO schema is pretty much all the fields are optional, which allows for removing them, but that also leads to issues with making sure some things aren't optional, like event IDs. It's this trade-off of data quality versus flexibility.

Participant 4: I know that in one of your slides you mentioned data validation. What are some strategies you use to ensure the quality of your data?

Norman: Right now, that happens at the ad server, processor level. We validate input. A lot of it is controlled by us or through integration with our partners. We have a helping hand but if for some reason we get a mangled URL, then the ad servers catch that and kick that data out. There is a little bit of dance there, and so we usually discover data issues in our Spark Jobs or when we try to add serialize data into a data warehouse, and it's like, "Let's come back and revisit the validation on that field." We don't really run into those issues too frequently.

Participant 5: You mentioned the just-turn-it-off-and-turn-it-back-on model of fixing things, but what do you mean by that in terms of Lambdas?

Norman: If you change the memory configuration for a Lambda, so you go from 1,500 megs to 1,528 megs, then it basically creates a new container that the Lambda executes inside of on different infrastructure. There's a really great talk from Reinvent, I think last year, where they talked about Firecracker, which is their small VM that Lambda is run inside of, and how that works. You can get a feel for what goes on when you make these toggles. You can also change, I think, the name or description.

Participant 5: If you change anything, the structure will rebuild it. I thought maybe you had some switch that we didn't know about.

Norman: No. For the S3 thing, we've had conversations with them, "Maybe there's some heuristic way that they could just do this behind the scene." Hopefully, that's an improvement coming down the pipe. We don't really run into those issues super frequently. It's an every-couple-of-months kind of thing.

Participant 6: The schema registry and validation layer that you built around Kinesis, is that worth open-sourcing and something we can look for?

Norman: Yes, possibly.

Participant 7: The limitation you chose was 12 hours?

Norman: For the batch jobs?

Participant 7: Yes.

Norman: It was just a back of the envelope thing. Most of the time, if there are issues then we can resolve them within 12 hours. If that's not the case, then we can always change that look back. That felt like the right place, and not having to read too much data out of S3 and give you enough window. On the flip side of that, how much data you keep in your Kinesis streams also affects how, if you have some massive outage, then you can increase the retention on your streams to be up to seven days, I think. That allows you to have some time to fix things if things go horribly awry for whatever reason.

Participant 8: You mentioned three years ago that you were woken up by a server failing and then today you're presenting about the new design. Can you talk about the timeframe that you went through to migrate this? Did you get together and come up with that structure, and then did you do it in a year? Was this several iterations?

Norman: I think it's more of a strategic decision. At first, it was, "Let's push as much as we can to manage services." That's around the time that serverless came along. Different groups had this same mandate or desire to use managed services. It was really a time and fit thing. For example, at Vertica it was a two-step thing of moving our aggregation data out of Vertica into Redshift and then moving our event-level data to Snowflake, but yes, for the data platform piece, it was just a brittle system. We knew we wanted to have something a little bit more robust and scalable, and so that's where the migration came there. Each individual piece moved slightly different, but we had a strategy to move in the same direction.

Participant 9: I don't know exactly when you made these changes, but is there a reason you didn't go with AWS MSK as opposed to Kinesis? Was it that it just wasn't available at the time, because I know it was a recent event development?

Norman: Yes, it wasn't available at the time. We did do a proof of concept with Kafka, but it was one of those cases where it's, "Ok, the first step, spin up a Zookeeper cluster and then spin up your Kafka cluster." We knew that those were the types of things that we didn't want to have a whole team to manage. We're a team of four, and we run even more applications outside of this. We have a reporting application as well. We knew we wanted to be able to focus on our application problems.

 

See more presentations with transcripts

 

Recorded at:

Oct 16, 2019

BT