BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Applied Probability - Counting Large Set of Unstructured Events with Theta Sketches

Applied Probability - Counting Large Set of Unstructured Events with Theta Sketches

Bookmarks

Key Takeaways

  • Model dynamic multi-dimensional data as Theta Sketches in such a way as to allow for millisecond-latency queries.
  • Downstream pipelines of services consume the user activity events both directly from Kafka as well as from an Amazon S3 raw data lake which stores the data in Parquet files.
  • HBase NoSQL database unique features were utilized to solve this problem.
  • Apache Spark is used to read the events from the data lake and pre-aggregate them into HBase.

AppsFlyer is a commercial SaaS attribution platform. Its clients, some of the largest mobile app companies in the world, send a large amount of events daily made up of the installs, uninstalls, sessions, in-app events, clicks and impressions performed by their user base.

In this article, I will discuss a system AppsFlyer built for the purpose of quickly and accurately finding the approximate sizes of sets of unique users (represented by a non-PII user ID), segmented by any combination of criteria over the various dimensions of these events. This system (later referred to as “Audiences”) is used by AppsFlyer’s user segmentation product for supplying interactive feedback to its users while they are defining criteria in the UI. Every action in the UI queries this system to find the approximate size of a unique set of users which meet the criteria, allowing users to fine-tune their criteria until they reach a number that they are happy with.

As a brief example, advertisers of an e-commerce application might want to know how many of their unique users installed the app in the last month, and also purchased products A and B, but DID NOT purchase product C; or how many unique users in the US added more than X products to their shopping cart in the past week but never checked out.

One of the challenges faced was that the events that reach AppsFlyer are schemaless: AppsFlyer clients are free to send any number of dimensions (i.e, “product_name” or “level_completed_num”) as part of the payload of their events. This leads to a very high number of different dimensions the multi-tenant system would need to make sense of.

This article will discuss how this system was designed and engineered to provide this approximation, with the following considerations in mind:

  • Latency: every user action in the browser should update the number in sub-second latency.
  • Accuracy: to provide a user with an estimated number that is accurate enough to confidently use.
  • Multi-tenancy: the system would need to host and serve data across all of AppsFlyer’s users, requiring it to tackle the open-ended dimensional cardinality that the data inherently contains.

The core technologies used to build this system are Theta Sketches and HBase, both of which will be discussed with an overview of how they fit into the system’s architecture, and why they fit the specific problem at hand.

High Level Overview of AppsFlyer

When engineering such a system, one of the most critical steps in the design is actually defining the data model. However, before we get into this, a brief aside about the kind of data that AppsFlyer ingests in its main pipeline.

The traffic that flows into AppsFlyer can be generally categorized as installs, uninstalls, clicks (a user clicked an ad), impressions (a user viewed an ad) and in-app events. These messages originate from clients’ mobile apps installed by their user base around the world, and reach one of AppsFlyer’s web-handlers which forward them into respective internal Kafka topics. 

Downstream, many pipelines of services consume this data for various purposes, both directly from Kafka as well as from an Amazon S3 raw data lake which stores this data in Parquet files. As will be described shortly, Spark is used by our system to read these events from the data lake and pre-aggregate them into HBase.

Figure 1: Application architecture of Theta Sketches Event Framework

The data model

We will quickly describe the general shape of events that are sent to AppsFlyer, and how our model was built to handle the complexity of their high cardinality. The data model is at the core of the design of this project, and it defined some of the major technology choices such as HBase.

Events sent to AppsFlyer carry common information such as the event’s timestamp, the internal app-id, the non-PII user ID, etc.

Of particular interest are in-app events, since they represent an interaction by the user with the app - for example, a product purchase.

To give more context concerning this interaction, in-app events may carry additional two fields that users can set:

  • event_name - to denote the type of event (“add_to_cart”, “advance_level”, etc).
  • event_attributes - a dictionary of key-value pairs that provide even greater granularity and context ("product_type" = "shirt" or "level_number" = 3, etc).

In fact, AppsFlyer is completely agnostic as to what event attributes clients choose to send. There is no predefined schema per customer and clients are free to send any event names they want along with any event attributes they wish. As will be discussed again later, this will have a major impact on the design, as building such a multi-tenant system would need to accommodate the large diversity of shapes of data that are sent to AppsFlyer.

With users able to define criteria for any type of entity, whether an install, in-app event or otherwise, it was decided to have an internal uniform abstraction for representing all of them. In the model, there is a single entity called "event", which is modeled similarly to regular in-app events: they are essentially a tagged dictionary, containing the following types of keys:

  • event_type - to denote the type of event. For in-app events, the “event_name” will be used, and for anything else (installs, click, etc), a static name will be used that corresponds to the type of message (so, “event_type” = “install”, etc).
  • event_attributes - a sub-dictionary containing the original event attributes in the message (in the case of in-app events), or just an empty dictionary for other types of events.
  • The original fields contained in the message: app_id, user_id, timestamp, etc.

Figure 2: AppsFlyer Solution Data Model

The query model

Now that there is a unified data model, let’s discuss how it’s used to represent user queries for an audience. Queries are sent from the browser via the Audiences tool, and are served by an internal service which responds with the estimated audience size.

In essence, an audience definition can be expressed as a directed tree in which internal nodes represent set operations - union, intersection or difference - and the leaves are the individual atomic criteria. In this context when the word "atomic" is used, it means the smallest possible criterion that can no longer be decomposed to sub-criteria.

Each leaf, therefore, is a dictionary comprised of the following values:

  • app ID of the application.
  • a date range for the criteria in question.
  • the event type ("install", "click", "add_to_cart", "view_product", etc).
  • the number of times the event must have happened (i.e, user added to cart more than X times).
  • an optional sub-dictionary containing further constraints over the event attributes (product_type = "bicycle", product_color = "yellow").

Figure 3: AppsFlyer Solution Query Model

A query coming in from the user to the internal API is first parsed and translated to this type of tree. Its depth and width will vary depending on the number and nesting of criteria.

To calculate the (estimated) audience size, the different sets of unique user IDs represented by each leaf need to be found, and then recursively apply the set operations to them.

Calculating the exact sets of user IDs by scanning the raw data was impractical in AppsFlyer’s case, since it could not meet the low latency requirement, or in other words - there was too much data to calculate quickly enough.

In addition, for the purpose of building this system, the desired outcome is a very close estimation and not an exact number -- making the work of calculating exact sets unnecessary.

Why Theta Sketches

After some research, Theta Sketches were selected, a probabilistic data structure used for the count-distinct problem.

Theta Sketches are similar to HyperLogLog's, and have been gaining more adoption recently, most notably in Druid. Unlike HyperLogLog, Theta Sketches offer native support for set intersections.

Theta Sketches can be thought of as a set, meaning we can add elements to them, and also ask a sketch how many distinct elements it contains, getting back an approximation within some margin of error.

Unlike “regular” sets (hash sets, for example), Theta Sketches only retain a fraction of entries added to it; all entries added to the sketch are hashed to an integer value, but only the K lowest hash values are retained by the sketch (where the ”K value” is a configurable threshold controlling the sketch’s accuracy) . These K minimal values are used to calculate a measure of the density of elements, a statistic that is used when querying for the set’s distinct size.

As a result, a single Theta Sketch can weigh in at only kilobytes while containing a large stream of data (tens of millions of messages per sketch, in this case) and still provide high accuracy. This also implies, that unlike regular sketches, Theta Sketches cannot be used to test for the existence of an element within the set; rather, they can only provide an approximation of its size.

Another useful property of Theta Sketches is that they support set operations - the same types of operations needed when traversing the tree of criteria. That is, given a number of Theta Sketches, we can ask for their intersection, union and difference. It is important to also realize on the other hand that combining sketches via “intersect” or “difference” may result in accuracy loss. This can happen for example when intersecting two sketches with different K values, or when one sketch is significantly larger than the other. The latter is possible in our case if different leaves result in sketches of different magnitudes, but it was found that even so, the accuracy loss is within the acceptable limits defined for this system.

All of this made Theta Sketches a good candidate as the data structure of choice for us.

However, to be able to use Theta Sketches in our model, given that a leaf represents an atomic criterion, it would need to be able to quickly fetch a leaf’s corresponding Theta Sketch. In other words -- a Theta Sketch for every possible atomic criterion a user could feasibly define would need to be precomputed.

The question then becomes: how to precompute so many Theta Sketches in advance, and where to persist them?

The Persistence System

With this assumption in mind, understanding that all of the possible Theta Sketches had to be precomputed, it was agreed that it’s viable to sacrifice data freshness by one day in order to achieve higher accuracy and lower latency. For this, a daily Spark job which precomputes all of yesterday's possible Theta Sketches was settled on. It scans the data-lake (stored as Parquet files), and all events belonging to the same combination of app_id, event type and event attributes are grouped together and their user IDs are aggregated into a single Theta Sketch.

Figure 4: Events are grouped by event_type, app_id, date, event_attributes and their users are aggregated together.

The next question that arose was, what do we need from a persistence layer for storing the sketches?

  • High write throughput
    Writes are done once a day, bulk loading yesterday's pre-computed Theta Sketches.
  • Low read latency
    The access pattern is read-heavy and low read latency is very important for this use case in order to be able to answer user queries in near real-time.
  • Scale to billions of rows:
    Across all of its clients, AppsFlyer ingests 100 billion events per day. Even though the data is aggregated (according to app_id, date, event name and attributes), this can still generate millions of rows per day; over time, data keeps accumulating. Since the objective was a multi-tenant solution in which a single database is maintained for data across all apps, it would have to be able to scale together with the consistently growing traffic.

High column cardinality
As discussed, the event data model is both schema-less and contains a high cardinality of dimensions, since AppsFlyer users are free to add any set of event attributes they wish when sending an in-app event message. This means a schemaless database would be needed, and since the objective was multi-tenancy, to also host millions of possible different "columns" (as AppsFlyer is used by tens of thousands of applications). 

Based on this set of criteria, the eventual decision was to use HBase, a distributed database aimed at hosting tables made up of billions of rows and millions and columns of sparse data, which fits the use case well. HBase is also part of the Hadoop project, and uses HDFS for distributed storage, which fits nicely into the existing tech stack at AppsFlyer - where Spark is used to run jobs across 25+ Hadoop clusters.

  • Rows in an HBase table always contain a unique key and any number of columns.
  • Columns are further divided into "column families", which is both a logical and physical subdivision. 
  • Column families are physically stored separately, and columns that are queried together often can be grouped together in the same column family. This achieves better locality and means a query will only read the column families it requires.

HBase clusters are made up of a master node and region servers, which store and serve data. Tables are sharded between region servers; within each shard (or "region" in HBase terminology) the rows are sorted according to the lexicographical order (byte-by-byte) of the keys.

This allows HBase to support random reads -- given a row's key, it can efficiently find the matching row. Given any two keys, it can efficiently scan the range of rows between them.

HBase is also schemaless -- new rows can be added with any number of columns, and values inside columns are just byte arrays -- which fits the schemaless nature of event attributes in our data.

Figure 5: Logical representation of an HBase table of user ratings for different films Alt: a logical representation of an HBase table of user ratings for different films. Rows are sorted by the row key, and each column represents a different movie. Data for a particular column (movie) will only exist in rows of users that actually rated that movie, and gray cells indicate missing data. This way, we can have a high number of columns at little cost due to the sparseness of the overall data.

The next question was how to model our data as HBase tables.

The HBase model

In our model, there is a single events table. Each row in this table corresponds to some combination of app_id, event date, event name and event attributes, and contains the serialized pre-aggregated Theta Sketch for that combination in a dedicated column.

In addition to the Theta Sketch column, the event attributes are stored as HBase columns where the column name is the attribute's name and the column value is the attribute's value. As discussed, the data in this use case has a very high cardinality of event attribute names since clients may use any name they would like.

This fits well with HBase's column model because the data is sparse by nature (a single event attribute name will only appear in a small subset of rows).

Next, the row key had to be designed in such a manner that fits the read query patterns. Since HBase stores data lexicographically sorted by the row keys, the key structure has direct implications on the efficiency of scan operations, since it determines how many and which rows will be included while scanning a range of rows. In this case, a key design that would be optimized for scan operations performed by these queries was decided upon (which will be discussed in detail later).

The row keys in the events table is structured as follows: app_id|event_date|event_name|hash, where hash is a 64 bit hash of the list of event attribute key-value pairs (sorted alphabetically by the key, to yield consistent and predictable hash values).

Figure 6: Event table details

Let's quickly re-examine a typical query that a leaf in the tree generates. The static parameters in the query are app_id, event_date and event_name -- these will exist in any query. Additionally, a query might also want to filter results according to event attributes (i.e, product_type = shirt and product_color = blue).

Given this query, we want to quickly find the corresponding pre-computed Theta Sketch that is stored somewhere in HBase.

To achieve this with the events table, all rows whose prefix is app_id|event_date|event_name are scanned. This type of scan will include all rows beginning with this prefix, whatever their hash component is. If event attributes are also included in the query, the same scan also checks each row for the existence of the columns defined by the query, and that the column values match.

Figure 7: Query on Events table

As seen above, this type of scan returns multiple rows. The sketches are then unioned, to yield a single Theta Sketch representing the leaf.

In pseudo-code, this would look like this:

def scan_leaf(leaf):
    rowkey_prefix = leaf.app_id + '|' + leaf.event_date + '|' + leaf.event_type
    filters = get_scan_filters(leaf.event_attributes)

    scan_request = new_hbase_scan(table = 'events')
    scan_request.set_rowkey_prefix(rowkey_prefix)
    scan_request.set_filters(filters)

    rows_result_set = scan_request.scan()

    return rows_result_set

def get_sketch_for_leaf(leaf):
    result_set = scan_leaf(leaf)
    result_sketch = new_empty_sketch()

    for row in result_set:
        result_sketch = result_sketch.union(row.users_sketch)

    return result_sketch

What about queries where no event attribute filters were defined?

It would be silly to have to scan the entire range of rows for all possible event attribute combinations of a specific event type when we could simply pre-aggregate them to a single row during the ingestion phase. Therefore, for each event name, a single additional row containing the union of all sketches over its different combinations was created, with a hash component always set to 0.

Figure 8: Queries where no event attribute filters were defined

For a query containing no event attribute filters, it is now possible to simply issue a single get request to the aggregation row, because its entire key is known in advance.

The major downside of this strategy is that queries containing event attributes filters have to scan multiple rows to generate a union, as seen above.

This can become increasingly problematic in cases where users generate a high number of combinations of event attributes. For example, what if users send us in-app events which contain the timestamp of the event as part of the event attributes?

Figure 9: In-app events which contain the timestamp of the event as part of the event attributes

As seen in the figure above, this would completely derail the solution: the number of event attribute combinations would now be multiplied by the number of unique timestamps, which would be orders of magnitude higher than without it. In the extreme, the pre-aggregated sketches would contain a single event -- greatly amplifying the number of rows written to HBase and the ranges of rows covered by scan queries.

To overcome this, it's first important to realize that these types of attributes are uninteresting for our purposes; our users care about attributes with a much lower resolution. In other words, they want to filter by attributes such as product_name or product_color -- but never by something like: purchase_timestamp_milliseconds. That’s why it made sense to trim “noisy” attributes and retain only attributes that provide high-value to our users.

While what constitutes a “high-value” or “noisy” attribute is subjective, the following simple guiding definition was agreed upon. An attribute is high-value if:

  1. The number of unique values of the event attribute must not exceed a threshold of 100.
  2.  (number of unique values of the event attribute / number of times the event attribute appeared) < 0.1 -- i.e, we observed a limited number of distinct values in the specific attribute.

Therefore, the daily Spark job first determines the cardinality of unique values held by each event attribute, in order to “trim” noisy ones.

Once noisy attributes are identified they are simply ignored, in this way, many combinations that would otherwise be distinct are now merged together, and essentially ignored.

Figure 10: Trim the purchase_timestamp event attribute to aggregate events

Even after the noisy attributes are eliminated, it is still possible to potentially generate a large number of combinations per event type (capped by a non-tight upper bound of 100^(number of event attributes) that is in average orders of magnitude lower in practice).  Therefore, it works quite well in practice, by allowing us to test for noisiness in isolation per each attribute, and is able to capture only meaningful event attributes and keep the number of combinations low and range scans fast.

Counting

Finally, enabling an additional intersection of data insights was needed, and this was incremental counts.  Essentially the type of query that needed to be supported is "at least / at most querying". For example, "users who bought product A more than X times".

A possible approach, that was considered as the initial thought, was to compute N additional sketches for each row (which will be referred to as “counting sketches”). The ith counting sketch would only contain the user IDs of users who performed this exact combination at least i times. For the sake of simplicity, let's say there are only 10 such counting sketches per row (i.e, if a user performed a particular combination 11 or more times, they would be aggregated to bucket #10). Counting sketch #1 is essentially equivalent to the regular users’ sketch.

How would we then use this data to calculate a sketch of users who bought product A more than X times?

First, the rows matching the event type and attributes exactly would be scanned, as described previously. Imagine that this yielded R rows, which will be labeled as 

For each resulting row, there are 10 counting sketches. For row i, its sketches will be labeled as 

To effectively calculate the final sketch of user IDs, we must remember that a single user ID may reside in different counting sketches of disparate rows. For example, if the query is “users who bought product A more than X times”, a single user may reside in multiple relevant rows since no event attributes such as the product’s name have been specified, meaning all rows must be combined together to find the result.

This means that given the resulting set of counting sketches, we must find all possible subsets of counting sketches such that the sum of their count indices (1..10) equals X. 

Using the notation above, this means the set of all subsets S(X), that upholds:

Given this set of subsets, to find the final result we would simply have to intersect each subset and union over them:

Even with only 10 counting sketches per row, the process of finding S(X) -- the set of subsets -- becomes complex and computationally heavy (in fact, this is a version of the NP-complete "subset sum problem").

Instead, a different approach was taken which sacrifices accuracy in favor of being simple and efficient. Rather than maintaining counting sketches per every row, counting sketches are kept only for every event type’s single aggregation row.

Now, how does this help find the sketch of users who bought product A more than X times?

* Scan the rows and filter them for product A and then union their sketches.

* Load the Xth counting sketch the aggregation row (a single Get request to HBase: the aggregation’s row key is known in advance because its hash component is set to 0).

* Intersect the sketches from steps (1) and (2) to find the result.

This changes the semantics of the query: what is actually calculated are the number of users who bought product A and /also/ purchased (any) products more than X times. However, this being an approximation problem, this relaxation was deemed acceptable.

To build these counting sketches, the Spark job maintains an additional Bloom filter per every counting sketch in the aggregation row. Bloom filters are another probabilistic data structure, used for "membership" queries, allowing the testing of whether some element has already been added to the sketch previously (with some probability that the bloom filter will give a false positive; false negatives are impossible). This way, it’s possible to know which sets a specific user has been added to already; if a user already exists in counting sketch i, this will also be added to i + 1, iteratively building up these counting sketches while progressing over the stream of data.

Results and retrospect

We started out with wanting to build a system that would provide our users with accurate and fast results. Let’s examine metrics from our production environment to see how the system holds up:

Latency

Plotting the latency percentiles (taken for an entire three day range of production traffic measured client-side), we find that latency is usually within the limit set for the system (1 second). Highlighted are p50 (50 milliseconds), p90 (323 milliseconds), p95 (500 milliseconds) and p99 (892 milliseconds). P999 is 3 seconds, indicating possible long network latencies, elevated load, GC pauses on either our service or HBase, or HBase scans that ran for a long time due to event attribute trimming not sufficiently cutting down on combinations.

Diagram description: Query latency (millisecods) by percentiles

Accuracy

For accuracy, we calculated the approximation error in percentage, given as 

 

where exact audience size was calculated offline, and measured over a sample of 7,000 real-world queries.

Diagram description: Accuracy error histogram: the x-axis shows the percent accuracy error, and the y-axis the percentage of queries with said error. 

In English, ~40% of queries have an approximation error of <1%, and the probability of an approximation error of up to 10% is 87% with a long tail afterwards. The average approximation error in this data is 6%. 

Accuracy deterioration increases with the variance between size magnitudes of the criteria making up the query, as a result of the way Theta Sketches work that was discussed previously.

Another measure for accuracy could be by how many orders of magnitude our estimations differ from the exact result, given as:

Using the same sample of 7,000 production queries, we see that in 99.7% of them we are within the same order of magnitude as the true audience size, with only 0.03% of queries which are 2 orders of magnitude over.

Introducing HBase to our Stack

HBase was introduced to AppsFlyer’s stack by this system, and therefore came with a large initial operational overhead. As with any new technology, operational overhead had to be invested into learning and maintaining it, providing metrics and visibility and integrating it with the existing Hadoop infrastructure in a robust and resilient manner.

At the time of writing this -- and after 3 years of uptime -- the HBase deployment currently spans a cluster of 50 nodes and stores 15TB of data (mostly made up of a lot of sketches). 

There is more that could be done in order to further optimize our usage of HBase. For example, currently each scan operation sends serialized sketches over the wire from HBase to one of our service processes, where they are deserialized in memory in order to union them. One way around it would be using HBase coprocessors in order to push this work to HBase’s side, reducing the network overhead. 

Summary

Probabilistic data structures are powerful tools for approaching problems that would otherwise be computationally prohibitive. Even so, it takes careful engineering to use them effectively: as scale increases, their volume alone may require distributed systems dedicated to hosting and serving them. 

Furthermore, even with these much more compact structures, reasoning about how to tradeoff size and accuracy requires careful thinking. The way in which data is modeled over these structures also has dramatic implications towards the accuracy of the model, and here too there was compromise between the complexity of the model and its accuracy. 

It was often helpful for us to step back and discuss the kind of accuracy guarantees we want to supply our users, and then simplify the design at the expense of possible higher accuracy. It is exciting to see wider industry adoption of Theta Sketches, for example by the Druid analytical database, and are happy we had the opportunity to be able to build a capable system using HBase and Theta Sketches, while maintaining the goals we set out with of accuracy and latency.

Special thanks to Sharone Revah Zitzman for the editing work on the article.

About the Author

Ronen Cohen is a software engineer who enjoys architecting systems for scale. He is passionate about everything software: from OS internals to distributed computing, weird data-structures to functional programming. Ronen is a tech lead at AppsFlyer, helping build systems to process the influx of petabytes ingested monthly - to help fuel its next phase of growth. He holds a BSc in Computer Science from the Technion.

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT