BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Lambda Architecture: Design Simpler, Resilient, Maintainable and Scalable Big Data Solutions

Lambda Architecture: Design Simpler, Resilient, Maintainable and Scalable Big Data Solutions

Bookmarks

A data system is designed to store data. It is also designed to derive information from stored data. The desired information may be the data itself, or it can be computed from stored data. Data systems are very important and often outlive applications that are built around them. The same Oracle or SQL Server data store that powered rich client applications in the ’90s, stores data for today’s state-of-the-art single page applications. Data systems do not change as often as the surrounding application stack. It is therefore important to design data systems for the long term.

Recently, there has been a virtual explosion in the amount of data being produced, stored, and analyzed. As a consequence, data systems are subject to much more stress and are becoming more complex.

Lambda Architecture proposes a simpler, elegant paradigm that is designed to tame complexity while being able to store and effectively process large amounts of data. The Lambda Architecture was originally presented by Nathan Marz, who is well known in the big data community for his work on the Storm project.

In this article, we will present the motivation behind the Lambda Architecture, review its structure, and end with a working sample. For further details on the Lambda Architecture, readers are advised to refer to Nathan Marz’s upcoming book Big Data.

What ails current data systems?

For decades, data stores have been more or less synonymous with relational database systems (RDBMS). In recent years, there has been an increased adoption of NoSQL databases, primarily because of their advantage in scaling.

Relational and NoSQL databases have their advantages and indeed power most business applications today. That said, we contend that there are certain fundamental flaws with the current implementation of data systems. Some of these flaws include the following:

Current systems are not resilient: Current database systems (relational and NoSQL) are not designed to be resilient. Most current data systems support create, read, update, and delete (CRUD) operations. Of these, update and delete have immense potential to cause data corruption. With current data systems, it is too easy to delete an entire table of data when we intended to simply delete a single row. It is also easy for a software bug or hardware failure to corrupt data.

Conflation of queries and data: With current database systems, query processing is closely tied to data storage. While data storage best practices call for normalization of data, querying often benefits from de-normalization. This leads to an uneasy trade-off that must be made in every system—should one optimize for query performance or data storage?

Consequences of scaling—the CAP theorem: The CAP theorem states that it is impossible for a distributed computer system to simultaneously provide all three of the following guarantees:

  • Consistency - all nodes see the same data at the same time.
  • Availability - a guarantee that every request receives a response about whether it was successful or not.
  • Partition tolerance - the system continues to operate despite arbitrary message loss or partial system failure.

Another way of looking at the CAP theorem is to simply assume that partition tolerance is required (given the need to store large amounts of data) and non-negotiable for most modern data systems. Once we fix partition tolerance, we can have only one of the other two aspects: availability or consistency.

Relational databases favor consistency over availability, while NoSQL databases often favor availability over consistency, with some offering a tunable system that can fine-tune behavior based on application needs. This is a difficult trade-off and one that poses practical difficulties that affect the entire system. If we favor availability, we risk having different nodes with different data. If we favor consistency there could be serious performance penalties, especially in cases where nodes are connected over high-latency connections.

Lambda Architecture

Lambda Architecture is designed to perform better in all of the problem areas that we have outlined. The Lambda Architecture specifies a data store that is immutable. An immutable data store essentially eliminates the update and delete aspects of CRUD, allowing only the creation and reading of data records.

At first glance, this seems like a major hurdle. How can we have a functional data store without the ability to update and delete data? On deeper analysis it will be clear that the changes are not as problematic. An immutable model will simply track and record each fact in sequence. This allows the same information to be gathered from an immutable data store as from a mutable data store, simply by aggregating facts stored over a period of time.

Consider the following example, implemented with a mutable data store and an immutable data store. Both are shown without normalization for easier review.

Mutable data store—initial scenario

Customer ID Customer Name Preferred Shipper ID Shipper Name
1 Alfred 1001 UPS
2 Annie 1008 Federal Express

Mutable data store—after update

Customer 2 now prefers DHL.

Customer ID Customer Name Preferred Shipper ID Shipper Name
1 Alfred 1001 UPS
2 Annie 1005 DHL

Immutable data store—initial scenario

Notice that each fact is recorded with a time stamp. Each fact as recorded remains true for all time.

Customer ID Customer Name Preferred Shipper ID Shipper Name Time Stamp
1 Alfred 1001 UPS 1391123230
2 Annie 1008 Federal Express 1391423650

Immutable data store—after update

Customer 2 now prefers DHL. An additional record is appended to reflect this new fact. The new record contains an updated time stamp.

Customer ID Customer Name Preferred Shipper ID Shipper Name Time Stamp
1 Alfred 1001 UPS 1391123230
2 Annie 1008 Federal Express 1391423650
2 Annie 1005 DHL 1391423769

Notice that existing data has not changed in any manner. Now consider the following query:

Which shipper should we use for an order placed by Customer ID 2?

With a mutable data store, we can directly look up this information and provide a response.

The immutable data store cannot directly look up this information since there are multiple facts related to this request. The system does have all the facts required to provide this information though. All it needs to do at a very basic level is examine records stored for this Customer ID and obtain the shipper stored in the record with the most recent time stamp.

Queries as a function of all data

An immutable data store that records facts as shown in the example can provide the same query responses as a mutable data store. It is just the processing details that differ. In general, a query result can be seen as some function of all data stored in the system.

Query results => function (all data stored)

Immutable data store—implementation

In practice, such a storage and processing system maps quite well to the Hadoop Distributed File System (HDFS) for storage and to MapReduce for processing. With HDFS, it is possible to store an arbitrary amount of data in a scalable fashion. With MapReduce, we can process this data. MapReduce can implement any arbitrary function that takes the stored data as input and operate on the data in a scalable manner.

Need for speed—pre-computed views

The downside of an immutable data store is that batch processing is not real-time. While we expect that there will be improvements in the speed of batch processing systems, it is also true that data will continue to skyrocket in volume. It is safe to assume that batch processes are not real-time, at least for now. Applications need to be able to access data quickly. They cannot wait for a batch process to complete. Consequently, we need a layer that contains pre-computed values produced by the batch process.

Architecture Model

Pre-computed batch layer views—implementation

The data store serving pre-computed views needs to be easily writable from a batch process. It does not, however, need to support random writes. It just needs to support random reads. This makes such a data store dramatically easier to implement than a full-fledged relational data store. ElephantDB is an example.

What about the time between batches?

With pre-computed batch views, we should be able to service most application needs. However, the batch process producing these views does take some time to run. During this period of time, additional data may be coming in. This data is not included in the process we have described so far. To account for this data, we need a parallel layer that can process additional data as it comes in. The Lambda Architecture provides for this aspect, and terms it the “real-time” layer.

Real-time layer

The real-time layer is designed to calculate query results on top of an incoming stream of data. Results, once computed, should be stored in such a manner that they can be queried by applications. Just as with the batch layer, the real-time layer also stores results as they are computed into a view.

The real-time layer itself can certainly be implemented to the specific needs of a project. The Storm project, originally created by Nathan Marz, is also an excellent solution for this need.

Pre-computed real-time views

The data store that implements the real-time view needs to support random writes. Consequently, such data stores need to be significantly more complicated (since they need to support random writes) than the view layer for the batch process. NoSQL data stores such as Apache Cassandra and Redis are well suited for serving in this capacity.

Final results

The application will query both the real-time and batch views, and aggregate results.

Figure 1 below shows the Lambda architecture model.

The overall structure of the Lambda Architecture

Evaluation of the Lambda Architecture

Let us now evaluate how the Lambda Architecture performs in the three main problem areas we discussed earlier.

Resilience

The batch layer fares well in this aspect. If implemented correctly, it is difficult for human errors or hardware faults to corrupt data stored in the system since the system does not allow update or delete operations in existing data.

The real-time layer, though, is susceptible to errors. There is potential for data corruption and loss in this area since the data stores being used are mutable.

The benefit is that even if the real-time layer fails, no data will be lost. As long as incoming writes are being propagated to the batch storage layer, the results will eventually catch up when the next batch job runs. So, while results may be out of date if the real-time layer fails, the data of records in the batch layer will not be corrupted. Results will eventually get back in sync when the real-time layer is back online.

Conflation of queries and data

Under the Lambda Architecture, we can store data in a completely normalized manner in the batch data store. We can also de-normalize as needed for the real-time and batch views. Data that services application queries is not related to the long-term storage structure in the batch layer. This allows us to fine-tune each layer as needed.

Consequences of scaling—the CAP theorem

Under the Lambda Architecture, results from the batch layer are always eventually consistent. As soon as a fresh batch update is completed, results from the batch layer are consistent. We must choose whether the real-time layer will be consistent or available. As stated previously, this is where most of the system complexity is isolated, and it is best to choose a system that is well suited for your specific needs.

Other aspects to consider

These topics are not related to the core architecture, but are important to consider.

Batch layer—well suited for analytics

It is worth noting that the immutable data store is very valuable for analytics. Such a data store can be seen as a complete record of everything that happens in a system. When data is stored in a mutable data store, potentially valuable information that can be used for analytics is lost each time a record is updated or deleted.

Complexity of algorithms

We should note that with some use cases, the complexity of algorithms that deal with the real-time layer is likely to be significantly higher than those in the batch layer. Readers can review web material on incremental and approximate algorithms for a better understanding of the issues involved.

Schemas

Schemas are valuable. Much like compile-time checking in statically typed languages, they are used to ensure that data being stored is valid for the context.

The implementation of schemas in real-world systems has left much to be desired. Schemas are often hard to change—think of the work involved in making schema changes to a relational model that has already been deployed. Schemas are often quite restrictive. For instance, there is no easy way to specify nested objects, nested collections, and such, though doing so will make our data model much simpler to understand and use. Schema systems also often involve substantial configuration.

These and other issues have caused many to start moving away from schemas to a schema-less core model. The Lambda Architecture strongly advocates against this. It recommends storing data in the batch layer using a schema that can adapt to changes over the life of the data system.

Schema libraries such as Apache Avro, Apache Thrift, or Google’s Protocol Buffers can be used for this purpose. These libraries offer a schema system that is simple to use and maintain. These can be used from most commonly used programming languages such as Java, C# and Python.

Sample application

I have prepared a sample application that illustrates key aspects of the Lambda Architecture. The following sections explain the structure of the sample application:

Use case

  • The application receives streaming data from a single pressure sensor named “SensorZ”.
  • A set of values is already available in the batch layer (stored in a single file).
  • The client application has to display the top ten pressure values recorded across the dataset (the batch layer and the real-time layer) at any time.

The sample is implemented in Java with the exception of a Pig Latin script used for running MapReduce on the batch layer.

System requirements

The sample application has the following dependencies:

  • Oracle JDK 1.7 (64-bit version)
  • NetBeans 7.4
  • Apache Maven—the version included with NetBeans 7.4
  • Storm 0.9
  • Apache Zookeeper (required by Storm)
  • Apache Pig 0.12
  • Redis 2.8.4
  • Apache Avro

Figure 2 below illustrates the sample application data flow diagram.

Sample application data flow

Sample source code

The complete sample code is available here.

Application description and walk-through

There are four applications in the code sample:

  • Data generator— located in generatedata folder.
  • MapReduce script and batch layer file—the batch folder.
  • Real-time processing system implemented using Storm—the realtime folder.
  • Client application—the query folder.

Batch layer data generator

The application named generatedata generates random pressure readings and stores them in a regular file. This data forms the batch layer in the application architecture. Data is serialized using the Apache Avro schema system. A pre-generated file named pressure-data.avro is stored under the batch folder.

Calculation of top ten values from batch data

We take the Avro-formatted data that is stored in the file created by the data generator and compute the top ten pressure values using a Pig Latin script (calculate-max.pig). This script can be run in local mode or on a Hadoop cluster. The script will write the top ten values to a plain CSV file. In our sample, this CSV file acts as the precomputed batch view.

Process batch layer using MapReduce

          values = load 'pressure-data.avro' using AvroStorage('{
          "namespace":"com.syncfusion.stats",
          "type": "record",
          "name":"PressureReading",
          "fields":[
          {"name": "name", "type":"string"},
          {"name": "value", "type":"double"},
          {"name": "date", "type":"long"}
          ]
          }');
          sortedValues = ORDER values by value DESC;
          top10Values = LIMIT sortedValues 10; 
          STORE top10Values INTO 'output' USING PigStorage(','); 

Real-time layer

The sample includes code for deploying a Storm cluster that generates and processes pressure readings in real-time. The Storm cluster will monitor readings in real-time, and maintain the top ten values for the real-time data. These values are periodically stored in Redis, which acts as the storage system for the real-time view.

The Storm code has three essential components:

The spout component is responsible for servicing a stream of data and providing a stream of tuples for further processing. In real-world scenarios, a spout may read data from a queue or similar system. In our application, we simply simulate random data as shown in the following sample code:

Spout producing random pressure readings

public void nextTuple() {
          Utils.sleep(100);
          _collector.emit(new Values(Configuration.SENSOR_NAME, Common.getrandomDouble(), new Date().getTime()));
}

In a Storm system, tuples produced by spouts can be processed downstream by bolts. Bolts can then choose to emit further streams of tuples which can then be processed, and so on.

In our case, we have a single bolt downstream (MaxTrackerBolt). For each incoming tuple, the execute method is called on the bolt that is configured to receive the tuple. This bolt stores incoming data into a sorted set. The sorted set is implemented using a queue that stores ten values at a time and removes the lowest value with each add operation.

Process data and maintain top ten values

public void execute(Tuple tuple) {
…
	  String sensor = tuple.getString(0);
          double value = tuple.getDouble(1);
          long time = tuple.getLong(2);
          PressureReading pressureReading = new PressureReading(sensor, value, time);
          this.addValue(pressureReading);

Please note that we use a Storm grouping named “Fields Grouping” that ensures all values with the same sensor name are routed to the same bolt. In our sample data, we produce only one sensor name. This system should be quite simple to extend to multiple named sensors. Also, note that in order to keep things simple, we do not use multiple layers of bolts. To scale well, it is important to consider using an intermediate layer that can aggregate tuples in parallel with a final consolidation layer aggregating all data from a sensor.

Storm supports the notion of a “tick” tuple. The system can be configured to send a tick tuple every x seconds. When a tick tuple is received, we react to it in the bolt’s execute code by storing, in Redis, sorted data values that have been gathered so far.

When we move data to Redis, we use the lpush method and store values under a predefined key. We then remove values that are left over from the last push using ltrim method, which retains data from the specified index range and removes everything else. At the end of this operation, only the top ten values last updated by the real-time layer are stored in Redis.

Update real-time view

public void execute(Tuple tuple) {
        if (isTickTuple(tuple)) {
            // write to redis when we receive a tick
            PressureReading[] pressureReadingMaxValues = new PressureReading[queue.size()];
            queue.toArray(pressureReadingMaxValues);
  
            Jedis jedis = new Jedis(Configuration.REDIS_SERVER);
            if (pressureReadingMaxValues != null && pressureReadingMaxValues.length > 0) {
                Arrays.sort(pressureReadingMaxValues);
               for (PressureReading v : pressureReadingMaxValues) {
                   try {
 
                       jedis.lpush(Configuration.REDIS_CACHE_KEY, PressureReadingHelpers.serializeToString(v));
                   } catch (IOException ex) {
                       Logger.getLogger(MaxTrackerBolt.class.getName()).log(Level.SEVERE, null, ex);
                   }
               }
        

               jedis.ltrim(Configuration.REDIS_CACHE_KEY, 0, pressureReadingMaxValues.length - 1);
          }
      }

Query

The query application reads data from the text file where the batch layer stored its results, and also reads data from Redis. It combines and then sorts the data. From the sorted data obtained by combining the real-time and batch views, it outputs the top ten values from the combined array.

Gather results from the real-time view stored in Redis

          // get near real-time data from redis
          // the running storm topology updates these results in redis
          Jedis jedis = new Jedis( Configuration.REDIS_SERVER);
  

          List<String> stored = jedis.lrange(Configuration.REDIS_CACHE_KEY, 0, Configuration.MAXVALUES - 1);
          for (String s : stored) {
              PressureReading pressureReading = PressureReadingHelpers.deserializeFromString(s);
              values.add(pressureReading);
          }

Gather results from the batch layer view stored in a text file

          // get serialized data output from the batch process
          List<String> lines = Files.readAllLines(Paths.get(BATCH_RESULTS_FILENAME),
                          Charset.defaultCharset());
          for (String line : lines) {
              PressureReading pressureReading = PressureReadingHelpers.fromCSV(line);
              values.add(pressureReading);
          }
      }

Sort combined results

private static void sortResults(ArrayList<PressureReading> values) {
        Collections.sort(values, new Comparator<PressureReading>() {
            @Override
            public int compare(PressureReading reading1, PressureReading reading2) {
                return reading2.getValue().compareTo(reading1.getValue());
            }
       });
   }
        

Display top ten results

List<PressureReading> finalValues = values.subList(0, Configuration.MAXVALUES - 1);
          SimpleDateFormat ft = new SimpleDateFormat("E yyyy.MM.dd 'at' hh:mm:ss a zzz");

          for (PressureReading reading : finalValues) {
               String text = String.format("Sensor - %s, Pressure - %f%n, Date - %s", 
                       reading.getName(), 
            
                       reading.getValue(), 
                       ft.format(reading.getDate()));

              System.out.println(text);
          }

Summary

We hope that you now have a good understanding of the Lambda Architecture. As with any system, it is important to understand the model and then tweak it for your specific needs. We strongly believe that data systems structured in accordance with the Lambda Architecture will better stand the test of time. We hope that your data systems benefit from the consideration of such an architecture.

About the author

Daniel Jebaraj is the vice president of development at Syncfusion, Inc. Syncfusion is a leading provider of enterprise software frameworks and solutions. Syncfusion’s big data solutions team helps customers deliver end-to-end, cost effective big data solutions that are designed and built for the long term. For more details, visit this site.

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

  • Datomic?

    by Nicola Rizhikov,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Why you did not mention datomic, which follows same architecture?

  • Hype?

    by Richard Henderson,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    This is an old set of concepts. If Nathan has a better implementation, then fantastic. Bring it on. We need better implementations. What we don't need is more marketing of old dressed up as new. Please tell us how this is different to what has gone before. Architecturally this is old news.

  • My implementation 'lambda-architecture' in practice

    by Trieu Nguyen tan,

    Your message is awaiting moderation. Thank you for participating in the discussion.

  • Unexplained need for pre-computed real-time view random write support claim

    by Robert Mars,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Your article emphatically states pre-computed real-time views need to support random writes without explanation. Earlier it states the reason pre-computed batch layer views do not need to support random writes is because only queries to large immutable processed data need to be supported. This applies equally to the real-time layer, which processes in real-time a considerably smaller immutable data store, also only needing to satisfy queries. "The application will query both the real-time and batch views, and aggregate results." Only queries are of concern.

  • Time for reruns already?

    by Angie Kong,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Nothing new presented in the article. All of the issues, design principles, design patterns, benefits/tradeoffs, etc... are well known and have been around for many years (decades really). Putting a fancy new wrapper on it does not make it new. Your sections on batch process (not real time? really? wow!), real time layer and their corresponding pre-compute views are just rehashing what any respectable DBMS already supports natively.

    Nice marketing pitch, nothing more. Now if you've built a DBMS that has natively implemented these patterns, that would be very interesting.

  • Nice article

    by Harihara Natarajan,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Good to see an implementation and an explanation of the concepts . Even though they are well known and around for decades

  • Implementing Lambda Architecture using other Tools

    by Srinath Perera,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Thanks for the detailed walk though.

    Most Lambda architecture discussions uses Storm as the realtime layer and Hadoop as the batch layer, but other options are possible.

    One obvious choice is using Hive instead of Hadoop, and another is using CEP (Complex Event Processing) that also is SQL like. I personally think using SQL like languages makes things much easier to understand and deal with.

  • can I really use storm as my real time system if i have a hard limit of 20ms from request to user response?

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

  • can I really use storm as my real time system if i have a hard limit of 20ms from request to user response?

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

  • can I really use storm as my real time system if i have a hard limit of 20ms from request to user response?

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

  • can I really use storm as my real time system if i have a hard limit of 20ms from request to user response?

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

  • can I really use storm as my real time system if i have a hard limit of 20ms from request to user response?

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

  • can I really use storm as my real time system if i have a hard limit of 20ms from request to user response?

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

  • can I really use storm as my real time system if i have a hard limit of 20ms from request to user response?

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

  • can I really use storm as my real time system if i have a hard limit of 20ms from request to user response?

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

  • can I really use storm as my real time system if i have a hard limit of 20ms from request to user response?

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

  • can I really use storm as my real time system if i have a hard limit of 20ms from request to user response?

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

  • can I really use storm as my real time system if i have a hard limit of 20ms from request to user response?

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

  • can I really use storm as my real time system if i have a hard limit of 20ms from request to user response?

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

  • can I really use storm as my real time system

    by Tomer Ben David,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    if i have a hard limit of 20ms from request to user response? if it passes information between nodes for calculation i don't see how it can manage to complete the calculations in 20ms. does that means I need to build my own realtime system from scratch?! thanks

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT