BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Implementing Aggregation Functions in MongoDB

Implementing Aggregation Functions in MongoDB

Bookmarks

With the amount of data that organizations generate exploding from gigabytes to terabytes to petabytes, traditional databases are unable to scale up to manage such big data sets. Using these solutions, the cost of storing and processing data will significantly increase as the data grows. This is resulting in organizations looking for other economical solutions such as NoSQL databases that provide the required data storage and processing capabilities, scalability and cost effectiveness. NoSQL databases do not use SQL as the query language. There are different types of these databases such as document stores, key-value stores, graph database, object database, etc.

Typical use cases for NoSQL database includes archiving old logs, event logging, ecommerce application log, gaming data, social data, etc. due to its fast read-write capability. The stored data would then require to be processed to gain useful insights on customers and their usage of the applications.

The NoSQL database we use in this article is MongoDB which is an open source document oriented NoSQL database system written in C++. It provides a high performance document oriented storage as well as support for writing MapReduce programs to process data stored in MongoDB documents. It is easily scalable and supports auto partitioning. Map Reduce can be used for aggregation of data through batch processing. MongoDB stores data in BSON (Binary JSON) format, supports a dynamic schema and allows for dynamic queries. The Mongo Query Language is expressed as JSON and is different from the SQL queries used in an RDBMS. MongoDB provides an Aggregation Framework that includes utility functions such as count, distinct and group. However more advanced aggregation functions such as sum, average, max, min, variance and standard deviation need to be implemented using MapReduce.

This article describes the method of implementing common aggregation functions like sum, average, max, min, variance and standard deviation on a MongoDB document using its MapReduce functionality. Typical applications of aggregations include business reporting of sales data such as calculation of total sales by grouping data across geographical locations, financial reporting, etc.

Let's start with installing the required software for running the sample application discussed in this article.

Software Setup

We first install and set up the MongoDB server on a local machine.

  • Download MongoDB from the official Mongo website and unzip the files to a preferred directory on the local machine.
            For example, C:\>Mongo
  • Create a Data directory in the same folder.
            For example, C:\Mongo\Data>
    • If data files are stored elsewhere, --dbpath command line parameter needs to be specified while starting MongoDB server using the command mongod.exe.
  • Starting up the server
    • MongoDB provides a couple of executable for this purpose.
      mongod.exe is the database server daemon and mongo.exe is the administrative shell. These two executable files are located in Mongo\bin folder.
    • Change the directory to bin folder of Mongo home
      For example, C:\> cd Mongo\bin
    • There are two ways of starting the server as shown below.
      mongod.exe --dbpath C:\Mongo\data
      or
      mongod.exe --config mongodb.config
      where mongodb.config is a configuration file located in Mongo\bin folder. We specify the location of the data folder (i.e. dbpath= C:\Mongo\Data) in this configuration file.
  • Connecting to the server
              At this point, mongo server is started, and can be connected to using the URL http://localhost:27017/

Now that the MongoDB is up and running, let’s look at the aggregation functions.

Implementing Aggregation Functions

In a relational database, we can execute SQL queries with pre-defined aggregation functions such as SUM(), COUNT(), MAX() or MIN() on a numerical column. But in MongoDB, MapReduce functionality is used for aggregation and batch processing of data. It is similar to the GROUP BY clause that is used for aggregating data in SQL. The next section describes the SQL way of performing aggregations in a relational database and the corresponding implementation using the MapReduce functionality provided by MongoDB.

For this discussion, let's consider a Sales table represented as shown below, in de-normalized form in MongoDB.

Sales Table

#

Column Name

Data Type

1

OrderId

INTEGER

2

OrderDate

STRING

3

Quantity

INTEGER

4

SalesAmt

DOUBLE

5

Profit

DOUBLE

6

CustomerName

STRING

7

City

STRING

8

State

STRING

9

ZipCode

STRING

10

Region

STRING

11

ProductId

INTEGER

12

ProductCategory

STRING

13

ProductSubCategory

STRING

14

ProductName

STRING

15

ShipDate

STRING

Mapping SQL & Map Reduce based Implementations

We have provided a sample set of queries that use some aggregation functions, filter criteria and grouping clauses, and its equivalent MapReduce implementations, which is the MongoDB equivalent of performing a GROUP BY in SQL. This is very useful for running aggregation operations on a MongoDB document. A limitation of this approach is that the aggregation functions such as SUM, AVG, MIN or MAX have to be custom implemented in the mapper and reducer functions.

MongoDB does not support user defined functions (UDFs) out-of-the-box. But it allows creating and saving JavaScript functions using the db.system.js.save command. The JavaScript functions thus created can then be reused in the MapReduce functions. The table below shows the implementations of some commonly used aggregation functions. Later, we will discuss the usage of these functions in MapReduce jobs.

 

Aggregation Function

Javascript Function

SUM

db.system.js.save( { _id : "Sum" ,
value : function(key,values)
{
    var total = 0;
    for(var i = 0; i < values.length; i++)
        total += values[i];
    return total;
}});

AVERAGE

db.system.js.save( { _id : "Avg" ,
value : function(key,values)
{
    var total = Sum(key,values);
    var mean = total/values.length;
    return mean;
}});

MAX

db.system.js.save( { _id : "Max" ,
value : function(key,values)
{
    var maxValue=values[0];
    for(var i=1;i<values.length;i++)
    {
        if(values[i]>maxValue)
        {
            maxValue=values[i];
        }
    }
    returnmaxValue;
}});

MIN

db.system.js.save( { _id : "Min" ,
value : function(key,values)
{
    var minValue=values[0];
    for(var i=1;i<values.length;i++)
    {
        if(values[i]<minValue)
        {
            minValue=values[i];
        }
    }
    return minValue;
}});

VARIANCE

db.system.js.save( { _id : "Variance" ,
value : function(key,values)
{
    var squared_Diff = 0;
    var mean = Avg(key,values);
    for(var i = 0; i < values.length; i++)
    {
        var deviation = values[i] - mean;
        squared_Diff += deviation * deviation;
    }
    var variance = squared_Diff/(values.length);
    return variance;
}});

STD DEVIATION

db.system.js.save( { _id : "Standard_Deviation"
, value : function(key,values)
{
    var variance = Variance(key,values);
    return Math.sqrt(variance);
}});

The code snippets for SQL and MapReduce scripts for implementing the aggregation functions in four different use case scenarios are shown in the table below:

1. Average order quantity across geo locations

The following query is used to fetch the average order quantity across different geographic locations.

SQL Query

MapReduce Functions

SELECT

db.sales.runCommand(
{
mapreduce : "sales" ,

  

City,

State,

Region,

map:function()
{ // emit function handles the group by
        emit( {
        // Key
        city:this.City,
        state:this.State,
        region:this.Region},
        // Values
        this.Quantity);
},

  

AVG(Quantity)

reduce:function(key,values)
{
    var result = Avg(key, values);
    return result;
}

FROM sales

 

GROUP BY City, State, Region

// Group By is handled by the emit(keys, values)
 line in the map() function above
 
out : { inline : 1 } });

2. Total product sales across product categories

The following query is used to fetch the total sales amount grouped by multiple levels of product categories. The different product categories used in the following example as individual dimensions could also be defined as a complex hierarchy based dimension.

SQL Query

MapReduce Functions

SELECT

db.sales.runCommand(
{
mapreduce : "sales" ,

 

ProductCategory, ProductSubCategory, ProductName,

map:function()
{
        emit(
        // Key
        {key0:this.ProductCategory,
        key1:this.ProductSubCategory,
        key2:this.ProductName},
        // Values
        this.SalesAmt);
},

 

 SUM(SalesAmt)

reduce:function(key,values)
{
    var result = Sum(key, values);
    return result;
}

FROM sales

 

GROUP BY ProductCategory, ProductSubCategory, ProductName

// Group By is handled by the emit(keys, values) 
line in the map() function above
 
out : { inline : 1 } });

 

3. Maximum profit for a product

The following query is used to fetch the maximum profit for a given product based on the filter criteria.

SQL Query

MapReduce Functions

SELECT

db.sales.runCommand(
{
mapreduce : "sales" ,

 

 

 ProductId, ProductName,

map:function()
{
    if(this.ProductId==1)
        emit( {
            key0:this.ProductId,
            key1:this.ProductName},
            this.Profit);
},

 

MAX(SalesAmt)

reduce:function(key,values)
{
    var maxValue=Max(key,values);
    return maxValue;
}

FROM sales

 

WHERE ProductId=’1’

// WHERE condition implementation is provided in 
map() function

GROUP BY ProductId, ProductName

// Group By is handled by the emit(keys, values) 
line in the map() function above
 
out : { inline : 1 } });

4. Total quantity, total sales and average profit

The requirement for this scenario is to calculate the total quantity, total sales and average profit for orders whose ID’s are in the range 1 to 10 and ShipDate is between Jan 1 and Dec 31 of year 2011. Following query is used to perform multiple aggregations such as the total quantity, total sales and average profit in the specified year and for orders in a given range across different regions and product categories.

SQL Query

MapReduce Functions

SELECT

db.sales.runCommand(
{ mapreduce : "sales" ,

 

  

Region,

ProductCategory,

ProductId,

map:function()
{
    emit( {
        // Keys
        region:this.Region,
        productCategory:this.ProductCategory,
        productid:this.ProductId},

        // Values
        {quantSum:this.Quantity,
        salesSum:this.SalesAmt,
        avgProfit:this.Profit} );
}

 

 

 

 Sum(Quantity),

 Sum(Sales),

 Avg(Profit)

reduce:function(key,values)
{
    var result=
{quantSum:0,salesSum:0,avgProfit:0};
    var count = 0;
    values.forEach(function(value)
    {
        // Calculation of Sum(Quantity)
        result.quantSum += values[i].quantSum;
        // Calculation of Sum(Sales)
        result.salesSum += values[i].salesSum;
        result.avgProfit += values[i].avgProfit;
        count++;
    }
    // Calculation of Avg(Profit)
    result.avgProfit = result.avgProfit / count;
    return result;
},

FROM Sales

 

WHERE

 

Orderid between 1 and 10 AND

Shipdate BETWEEN ‘01/01/2011’ and

‘12/31/2011’

query : {
        "OrderId" : { "$gt" : 1 },
        "OrderId" : { "$lt" : 10 },
        "ShipDate" : { "$gt" : "01/01/2011" },
        "ShipDate" : { "$lt" : "31/12/2011" },
},

GROUP BY

Region, ProductCategory, ProductId

// Group By is handled by the emit(keys, values) 
line in the map() function above

LIMIT 3;

limit : 3,
 
out : { inline : 1 } });

Now that we have looked at the code examples of aggregation functions for different business scenarios, we are ready to test these functions.

Testing the Aggregation Functions

MapReduce functionality in MongoDB is invoked using the database command. The Map and Reduce functions are written in JavaScript syntax as described in the previous section. Following is the syntax used to execute MapReduce functions on the server.

db.runCommand(

    { mapreduce : <collection>,

        map : <mapfunction>,

        reduce : <reducefunction>

        [, query : <query filter object>]

        [, sort : <sorts the input objects using this key. Useful for 
 optimization, like sorting by the emit key for fewer reduces>]

        [, limit : <number of objects to return from collection>]

        [, out : <see output options below>]

        [, keeptemp: <true|false>]

        [, finalize : <finalizefunction>]

        [, scope : <object where fields go into javascript global scope >]

        [, jsMode : true]

        [, verbose : true]

    }

)


Where the Output Options include:

{ replace : "collectionName" }

{ merge : "collectionName"

{ reduce : "collectionName" }

{ inline : 1}

 

Shown below are the commands needed to save an aggregation function and use it in a MapReduce function.

Start Mongo Shell and setup table

  • Ensure that the Mongo Server is running. Then start the Mongo shell by running mongo.exe from Mongo home folder.
  • Switch the database by running the command:
     > use mydb
  • View the contents of the Sales table using the following command:
     > db.sales.find()

Here is the output of the find command.

{ "_id" : ObjectId("4f7be0d3e37b457077c4b13e"), "_class" : "com.infosys.mongo.Sales", "orderId" : 1, "orderDate" : "26/03/2011",
"quantity" : 20, "salesAmt" : 200, "profit" : 150, "customerName" : "CUST1", "productCategory" : "IT", "productSubCategory" : "software", 
"productName" : "Grad", "productId" : 1 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b13f"), "_class" : "com.infosys.mongo.Sales", "orderId" : 2, "orderDate" : "23/05/2011", 
"quantity" : 30, "salesAmt" : 200, "profit" : 40, "customerName" : "CUST2", "productCategory" : "IT", "productSubCategory" : "hardware",
 "productName" : "HIM", "productId" : 1 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b140"), "_class" : "com.infosys.mongo.Sales", "orderId" : 3, "orderDate" : "22/09/2011",
 "quantity" : 40, "salesAmt" : 200, "profit" : 80, "customerName" : "CUST1", "productCategory" : "BT", "productSubCategory" : "services",
 "productName" : "VOCI", "productId" : 2 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b141"), "_class" : "com.infosys.mongo.Sales", "orderId" : 4, "orderDate" : "21/10/2011", 
"quantity" : 30, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", "productCategory" : "BT", "productSubCategory" : "hardware", 
"productName" : "CRUD", "productId" : 2 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b142"), "_class" : "com.infosys.mongo.Sales", "orderId" : 5, "orderDate" : "21/06/2011", 
"quantity" : 50, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", "productCategory" : "BT", "productSubCategory" : "hardware", 
"productName" : "CRUD", "productId" : 1 }

Create and store the aggregation functions

  • From the Mongo DB command prompt, run the following command.
> db.system.js.save( { _id : "Sum" ,
value : function(key,values)
{
    var total = 0;
    for(var i = 0; i < values.length; i++)
        total += values[i];
    return total;
}});
  • Now execute the MapReduce program on the sample table "Sales"
> db.sales.runCommand(
{
mapreduce : "sales" ,
map:function()
{
emit(
{key0:this.ProductCategory,
key1:this.ProductSubCategory,
key2:this.ProductName},
this.SalesAmt);
},
reduce:function(key,values)
{
    var result = Sum(key, values);
    return result;
}
out : { inline : 1 } });

This would display the following output:

"results" : [
        {
                "_id" : {
                        "key0" : "BT",
                        "key1" : "hardware",
                        "key2" : "CRUD"
                },
                "value" : 400
        },
        {
                "_id" : {
                        "key0" : "BT",
                        "key1" : "services",
                        "key2" : "VOCI"
                },
                "value" : 200
        },
        {
                "_id" : {
                        "key0" : "IT",
                        "key1" : "hardware",
                        "key2" : "HIM"
                },
                "value" : 200
        },

        {
                "_id" : {
                        "key0" : "IT",
                        "key1" : "software",
                        "key2" : "Grad"
                },
                "value" : 200
        }
],
"timeMillis" : 1,
"timing" : {
        "mapTime" : NumberLong(1),
        "emitLoop" : 1,
        "total" : 1
},
"counts" : {
        "input" : 5,
        "emit" : 5,
        "output" : 4
},
"ok" : 1

Conclusion

MongoDB provides a document oriented storage that can easily scale to terabytes of data. It also provides Map Reduce functionality that can be used for aggregation of data using SQL-like functions through batch processing. In this article, we described the process for setting up MongoDB and performing aggregation functions using MapReduce feature. We also provided a few sample MapReduce implementations for simple SQL based aggregation queries. Using MapReduce functionality more complex aggregation functions can be implemented on the data stored in MongoDB documents.

About the Authors

Arun Viswanathan works as a Technology Architect with Cloud Center of Excellence (CoE) in Infosys Ltd, a global leader in IT & Business Consulting Services. Arun has around 9.5 years of experience in Java, Java EE, Cloud and Big Data application architecture definition and implementation. He is currently involved in design, development and consulting for Big Data solutions. He can be reached at Arun_Viswanathan01@infosys.com.

Shruthi Kumar works as a Technology Analyst with Cloud Center of Excellence (CoE) in Infosys Ltd, a global leader in IT & Business Consulting Services. Shruthi has 5 years of experience in Java, Grid Computing, Cloud and Big Data application architecture. She is currently involved in development and consulting for Big Data solutions. She can be reached at Shruthi_Kumar01@infosys.com.

 

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

  • MapReduce is inherently slow in MongoDB

    by Shailendra Pandey,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    MapReduce is slow in MongoDB . They have not switched to V8 javascript engine.

  • I think you're wrong

    by Tomasz Adamski,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    In "Average order quantity across geo locations" mapreduce example you assume that reduce function will be called only once per key. But official documentation says(www.mongodb.org/display/DOCS/MapReduce#MapReduc...) that:
    "...reduce function might be invoked more than once for the same key..."

    If all values for given key wouldn't be aggregated at once, it may result in wrong avg calulation.
    (2+3+4)/3 != (((2+3)/2)+4)/2

    In reduce function you should collect information about total sum of values and their count and pass "finalize" function to calculate avarage at the end (guaranteed to be called only once)
    www.mongodb.org/display/DOCS/MapReduce#MapReduc...

    I didn't read whole document so maybe you need to rethink other examples too.

  • Your information's not quite right.

    by Chris Westin,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    count, distinct, and group are the older aggregation functions. The new Aggregation Framework, which you link to, has sum, average, min, and max functions built-in. There are outstanding tickets to add statistical functions such as standard deviation to it. If the functions you need are available in the Aggregation Framework, it should be much faster than using Map/Reduce, because the AF is a native C++ implementation.

  • Re: Your information's not quite right.

    by Arun V,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    You are right. The article is written with reference to the older aggregation functions as shown in www.mongodb.org/display/DOCS/Aggregation. I guess the aggregation link has now been changed to show the newer aggregation framework.
    For our requirements we started defining the custom aggregation functions using MapReduce when the new aggregation framework was still not widely used...
    We Haven't done a performance benchmarking for the MR vs Aggregation framework operators. But i feel MapReduce would provide a better performance for larger data set.

  • Re: I think you're wrong

    by Arun V,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Though it is mentioned in the site, we did not face this issue during our testing. We'll try out the finalize option as well.

  • Cycle Time, Lead Time and Cumulative Flow Diagram using MongoDB

    by Balaji Muniraja,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Hi,

    I am planning to have a Offline Kanban Board application for Projects to use using MongoDB, please can you let me know if you have come across such implementations. If not, let me know by using MongoDB, can we create Cumulative Flow Diagrams.
    Thanks and Regards
    Balaji.M

  • Re: I think you're wrong

    by sreenath venkataramanappa,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Yes in Map-Reduce, during reduce state, same key can be passed multiple times depending on the amount of data.

    www.techiesinfo.com/code-snippet

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT