With the amount of data that organizations generate exploding from gigabytes to terabytes to petabytes, traditional databases are unable to scale up to manage such big data sets. Using these solutions, the cost of storing and processing data will significantly increase as the data grows. This is resulting in organizations looking for other economical solutions such as NoSQL databases that provide the required data storage and processing capabilities, scalability and cost effectiveness. NoSQL databases do not use SQL as the query language. There are different types of these databases such as document stores, key-value stores, graph database, object database, etc.
Typical use cases for NoSQL database includes archiving old logs, event logging, ecommerce application log, gaming data, social data, etc. due to its fast read-write capability. The stored data would then require to be processed to gain useful insights on customers and their usage of the applications.
The NoSQL database we use in this article is MongoDB which is an open source document oriented NoSQL database system written in C++. It provides a high performance document oriented storage as well as support for writing MapReduce programs to process data stored in MongoDB documents. It is easily scalable and supports auto partitioning. Map Reduce can be used for aggregation of data through batch processing. MongoDB stores data in BSON (Binary JSON) format, supports a dynamic schema and allows for dynamic queries. The Mongo Query Language is expressed as JSON and is different from the SQL queries used in an RDBMS. MongoDB provides an Aggregation Framework that includes utility functions such as count, distinct
and group
. However more advanced aggregation functions such as sum, average, max, min, variance
and standard deviation
need to be implemented using MapReduce.
This article describes the method of implementing common aggregation functions like sum, average, max, min, variance
and standard deviation
on a MongoDB document using its MapReduce functionality. Typical applications of aggregations include business reporting of sales data such as calculation of total sales by grouping data across geographical locations, financial reporting, etc.
Let's start with installing the required software for running the sample application discussed in this article.
Software Setup
We first install and set up the MongoDB server on a local machine.
- Download MongoDB from the official Mongo website and unzip the files to a preferred directory on the local machine.
For example,C:\>Mongo
- Create a
Data
directory in the same folder.
For example,C:\Mongo\Data>
- If data files are stored elsewhere,
--dbpath
command line parameter needs to be specified while starting MongoDB server using the commandmongod.exe
. - Starting up the server
- MongoDB provides a couple of executable for this purpose.
mongod.exe
is the database server daemon andmongo.exe
is the administrative shell. These two executable files are located inMongo\bin
folder. - Change the directory to bin folder of Mongo home
For example,C:\> cd Mongo\bin
- There are two ways of starting the server as shown below.
mongod.exe --dbpath C:\Mongo\data
or
mongod.exe --config mongodb.config
wheremongodb.config
is a configuration file located inMongo\bin
folder. We specify the location of the data folder (i.e.dbpath= C:\Mongo\Data
) in this configuration file.
- MongoDB provides a couple of executable for this purpose.
- Connecting to the server
At this point, mongo server is started, and can be connected to using the URLhttp://localhost:27017/
Now that the MongoDB is up and running, let’s look at the aggregation functions.
Implementing Aggregation Functions
In a relational database, we can execute SQL queries with pre-defined aggregation functions such as SUM(), COUNT(), MAX()
or MIN()
on a numerical column. But in MongoDB, MapReduce functionality is used for aggregation and batch processing of data. It is similar to the GROUP BY
clause that is used for aggregating data in SQL. The next section describes the SQL way of performing aggregations in a relational database and the corresponding implementation using the MapReduce functionality provided by MongoDB.
For this discussion, let's consider a Sales table represented as shown below, in de-normalized form in MongoDB.
Sales Table
# |
Column Name |
Data Type |
1 |
OrderId |
INTEGER |
2 |
OrderDate |
STRING |
3 |
Quantity |
INTEGER |
4 |
SalesAmt |
DOUBLE |
5 |
Profit |
DOUBLE |
6 |
CustomerName |
STRING |
7 |
City |
STRING |
8 |
State |
STRING |
9 |
ZipCode |
STRING |
10 |
Region |
STRING |
11 |
ProductId |
INTEGER |
12 |
ProductCategory |
STRING |
13 |
ProductSubCategory |
STRING |
14 |
ProductName |
STRING |
15 |
ShipDate |
STRING |
Mapping SQL & Map Reduce based Implementations
We have provided a sample set of queries that use some aggregation functions, filter criteria and grouping clauses, and its equivalent MapReduce implementations, which is the MongoDB equivalent of performing a GROUP BY
in SQL. This is very useful for running aggregation operations on a MongoDB document. A limitation of this approach is that the aggregation functions such as SUM, AVG, MIN
or MAX
have to be custom implemented in the mapper and reducer functions.
MongoDB does not support user defined functions (UDFs) out-of-the-box. But it allows creating and saving JavaScript functions using the db.system.js.save
command. The JavaScript functions thus created can then be reused in the MapReduce functions. The table below shows the implementations of some commonly used aggregation functions. Later, we will discuss the usage of these functions in MapReduce jobs.
Aggregation Function |
Javascript Function |
|
db.system.js.save( { _id : "Sum" , value : function(key,values) { var total = 0; for(var i = 0; i < values.length; i++) total += values[i]; return total; }}); |
|
db.system.js.save( { _id : "Avg" , value : function(key,values) { var total = Sum(key,values); var mean = total/values.length; return mean; }}); |
|
db.system.js.save( { _id : "Max" , value : function(key,values) { var maxValue=values[0]; for(var i=1;i<values.length;i++) { if(values[i]>maxValue) { maxValue=values[i]; } } returnmaxValue; }}); |
|
db.system.js.save( { _id : "Min" , value : function(key,values) { var minValue=values[0]; for(var i=1;i<values.length;i++) { if(values[i]<minValue) { minValue=values[i]; } } return minValue; }}); |
|
db.system.js.save( { _id : "Variance" , value : function(key,values) { var squared_Diff = 0; var mean = Avg(key,values); for(var i = 0; i < values.length; i++) { var deviation = values[i] - mean; squared_Diff += deviation * deviation; } var variance = squared_Diff/(values.length); return variance; }}); |
|
db.system.js.save( { _id : "Standard_Deviation" , value : function(key,values) { var variance = Variance(key,values); return Math.sqrt(variance); }}); |
The code snippets for SQL and MapReduce scripts for implementing the aggregation functions in four different use case scenarios are shown in the table below:
1. Average order quantity across geo locations
The following query is used to fetch the average order quantity across different geographic locations.
SQL Query |
MapReduce Functions |
|
db.sales.runCommand( { mapreduce : "sales" , |
|
map:function() { // emit function handles the group by emit( { // Key city:this.City, state:this.State, region:this.Region}, // Values this.Quantity); }, |
|
reduce:function(key,values) { var result = Avg(key, values); return result; } |
|
|
|
// Group By is handled by the emit(keys, values) line in the map() function above |
out : { inline : 1 } }); |
2. Total product sales across product categories
The following query is used to fetch the total sales amount grouped by multiple levels of product categories. The different product categories used in the following example as individual dimensions could also be defined as a complex hierarchy based dimension.
SQL Query |
MapReduce Functions |
|
db.sales.runCommand( { mapreduce : "sales" , |
|
map:function() { emit( // Key {key0:this.ProductCategory, key1:this.ProductSubCategory, key2:this.ProductName}, // Values this.SalesAmt); }, |
|
reduce:function(key,values) { var result = Sum(key, values); return result; } |
|
|
|
// Group By is handled by the emit(keys, values) line in the map() function above |
out : { inline : 1 } }); |
3. Maximum profit for a product
The following query is used to fetch the maximum profit for a given product based on the filter criteria.
SQL Query |
MapReduce Functions |
|
db.sales.runCommand( { mapreduce : "sales" , |
|
map:function() { if(this.ProductId==1) emit( { key0:this.ProductId, key1:this.ProductName}, this.Profit); }, |
|
reduce:function(key,values) { var maxValue=Max(key,values); return maxValue; } |
|
|
|
// WHERE condition implementation is provided in map() function |
|
// Group By is handled by the emit(keys, values) line in the map() function above |
out : { inline : 1 } }); |
4. Total quantity, total sales and average profit
The requirement for this scenario is to calculate the total quantity, total sales and average profit for orders whose ID’s are in the range 1 to 10 and ShipDate is between Jan 1 and Dec 31 of year 2011. Following query is used to perform multiple aggregations such as the total quantity, total sales and average profit in the specified year and for orders in a given range across different regions and product categories.
SQL Query |
MapReduce Functions |
|
db.sales.runCommand( { mapreduce : "sales" , |
|
map:function() { emit( { // Keys region:this.Region, productCategory:this.ProductCategory, productid:this.ProductId}, // Values {quantSum:this.Quantity, salesSum:this.SalesAmt, avgProfit:this.Profit} ); } |
|
reduce:function(key,values) { var result= {quantSum:0,salesSum:0,avgProfit:0}; var count = 0; values.forEach(function(value) { // Calculation of Sum(Quantity) result.quantSum += values[i].quantSum; // Calculation of Sum(Sales) result.salesSum += values[i].salesSum; result.avgProfit += values[i].avgProfit; count++; } // Calculation of Avg(Profit) result.avgProfit = result.avgProfit / count; return result; }, |
|
|
|
|
|
query : { "OrderId" : { "$gt" : 1 }, "OrderId" : { "$lt" : 10 }, "ShipDate" : { "$gt" : "01/01/2011" }, "ShipDate" : { "$lt" : "31/12/2011" }, }, |
|
// Group By is handled by the emit(keys, values) line in the map() function above |
|
limit : 3, |
out : { inline : 1 } }); |
Now that we have looked at the code examples of aggregation functions for different business scenarios, we are ready to test these functions.
Testing the Aggregation Functions
MapReduce functionality in MongoDB is invoked using the database command. The Map and Reduce functions are written in JavaScript syntax as described in the previous section. Following is the syntax used to execute MapReduce functions on the server.
db.runCommand( { mapreduce : <collection>, map : <mapfunction>, reduce : <reducefunction> [, query : <query filter object>] [, sort : <sorts the input objects using this key. Useful for optimization, like sorting by the emit key for fewer reduces>] [, limit : <number of objects to return from collection>] [, out : <see output options below>] [, keeptemp: <true|false>] [, finalize : <finalizefunction>] [, scope : <object where fields go into javascript global scope >] [, jsMode : true] [, verbose : true] } ) Where the Output Options include: { replace : "collectionName" } { merge : "collectionName" { reduce : "collectionName" } { inline : 1} |
Shown below are the commands needed to save an aggregation function and use it in a MapReduce function.
Start Mongo Shell and setup table
- Ensure that the Mongo Server is running. Then start the Mongo shell by running
mongo.exe
from Mongo home folder. - Switch the database by running the command:
> use mydb
- View the contents of the Sales table using the following command:
> db.sales.find()
Here is the output of the find command.
{ "_id" : ObjectId("4f7be0d3e37b457077c4b13e"), "_class" : "com.infosys.mongo.Sales", "orderId" : 1, "orderDate" : "26/03/2011", "quantity" : 20, "salesAmt" : 200, "profit" : 150, "customerName" : "CUST1", "productCategory" : "IT", "productSubCategory" : "software", "productName" : "Grad", "productId" : 1 } { "_id" : ObjectId("4f7be0d3e37b457077c4b13f"), "_class" : "com.infosys.mongo.Sales", "orderId" : 2, "orderDate" : "23/05/2011", "quantity" : 30, "salesAmt" : 200, "profit" : 40, "customerName" : "CUST2", "productCategory" : "IT", "productSubCategory" : "hardware", "productName" : "HIM", "productId" : 1 } { "_id" : ObjectId("4f7be0d3e37b457077c4b140"), "_class" : "com.infosys.mongo.Sales", "orderId" : 3, "orderDate" : "22/09/2011", "quantity" : 40, "salesAmt" : 200, "profit" : 80, "customerName" : "CUST1", "productCategory" : "BT", "productSubCategory" : "services", "productName" : "VOCI", "productId" : 2 } { "_id" : ObjectId("4f7be0d3e37b457077c4b141"), "_class" : "com.infosys.mongo.Sales", "orderId" : 4, "orderDate" : "21/10/2011", "quantity" : 30, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", "productCategory" : "BT", "productSubCategory" : "hardware", "productName" : "CRUD", "productId" : 2 } { "_id" : ObjectId("4f7be0d3e37b457077c4b142"), "_class" : "com.infosys.mongo.Sales", "orderId" : 5, "orderDate" : "21/06/2011", "quantity" : 50, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", "productCategory" : "BT", "productSubCategory" : "hardware", "productName" : "CRUD", "productId" : 1 }
Create and store the aggregation functions
- From the Mongo DB command prompt, run the following command.
> db.system.js.save( { _id : "Sum" , value : function(key,values) { var total = 0; for(var i = 0; i < values.length; i++) total += values[i]; return total; }});
- Now execute the MapReduce program on the sample table "Sales"
> db.sales.runCommand( { mapreduce : "sales" , map:function() { emit( {key0:this.ProductCategory, key1:this.ProductSubCategory, key2:this.ProductName}, this.SalesAmt); }, reduce:function(key,values) { var result = Sum(key, values); return result; } out : { inline : 1 } });
This would display the following output:
"results" : [ { "_id" : { "key0" : "BT", "key1" : "hardware", "key2" : "CRUD" }, "value" : 400 }, { "_id" : { "key0" : "BT", "key1" : "services", "key2" : "VOCI" }, "value" : 200 }, { "_id" : { "key0" : "IT", "key1" : "hardware", "key2" : "HIM" }, "value" : 200 }, { "_id" : { "key0" : "IT", "key1" : "software", "key2" : "Grad" }, "value" : 200 } ], "timeMillis" : 1, "timing" : { "mapTime" : NumberLong(1), "emitLoop" : 1, "total" : 1 }, "counts" : { "input" : 5, "emit" : 5, "output" : 4 }, "ok" : 1
Conclusion
MongoDB provides a document oriented storage that can easily scale to terabytes of data. It also provides Map Reduce functionality that can be used for aggregation of data using SQL-like functions through batch processing. In this article, we described the process for setting up MongoDB and performing aggregation functions using MapReduce feature. We also provided a few sample MapReduce implementations for simple SQL based aggregation queries. Using MapReduce functionality more complex aggregation functions can be implemented on the data stored in MongoDB documents.
About the Authors
![]() |
Arun Viswanathan works as a Technology Architect with Cloud Center of Excellence (CoE) in Infosys Ltd, a global leader in IT & Business Consulting Services. Arun has around 9.5 years of experience in Java, Java EE, Cloud and Big Data application architecture definition and implementation. He is currently involved in design, development and consulting for Big Data solutions. He can be reached at Arun_Viswanathan01@infosys.com. |
Shruthi Kumar works as a Technology Analyst with Cloud Center of Excellence (CoE) in Infosys Ltd, a global leader in IT & Business Consulting Services. Shruthi has 5 years of experience in Java, Grid Computing, Cloud and Big Data application architecture. She is currently involved in development and consulting for Big Data solutions. She can be reached at Shruthi_Kumar01@infosys.com. |
Community comments
MapReduce is inherently slow in MongoDB
by Shailendra Pandey,
I think you're wrong
by Tomasz Adamski,
Re: I think you're wrong
by Arun V,
Re: I think you're wrong
by sreenath venkataramanappa,
Your information's not quite right.
by Chris Westin,
Re: Your information's not quite right.
by Arun V,
Cycle Time, Lead Time and Cumulative Flow Diagram using MongoDB
by Balaji Muniraja,
MapReduce is inherently slow in MongoDB
by Shailendra Pandey,
Your message is awaiting moderation. Thank you for participating in the discussion.
MapReduce is slow in MongoDB . They have not switched to V8 javascript engine.
I think you're wrong
by Tomasz Adamski,
Your message is awaiting moderation. Thank you for participating in the discussion.
In "Average order quantity across geo locations" mapreduce example you assume that reduce function will be called only once per key. But official documentation says(www.mongodb.org/display/DOCS/MapReduce#MapReduc...) that:
"...reduce function might be invoked more than once for the same key..."
If all values for given key wouldn't be aggregated at once, it may result in wrong avg calulation.
(2+3+4)/3 != (((2+3)/2)+4)/2
In reduce function you should collect information about total sum of values and their count and pass "finalize" function to calculate avarage at the end (guaranteed to be called only once)
www.mongodb.org/display/DOCS/MapReduce#MapReduc...
I didn't read whole document so maybe you need to rethink other examples too.
Your information's not quite right.
by Chris Westin,
Your message is awaiting moderation. Thank you for participating in the discussion.
count, distinct, and group are the older aggregation functions. The new Aggregation Framework, which you link to, has sum, average, min, and max functions built-in. There are outstanding tickets to add statistical functions such as standard deviation to it. If the functions you need are available in the Aggregation Framework, it should be much faster than using Map/Reduce, because the AF is a native C++ implementation.
Re: Your information's not quite right.
by Arun V,
Your message is awaiting moderation. Thank you for participating in the discussion.
You are right. The article is written with reference to the older aggregation functions as shown in www.mongodb.org/display/DOCS/Aggregation. I guess the aggregation link has now been changed to show the newer aggregation framework.
For our requirements we started defining the custom aggregation functions using MapReduce when the new aggregation framework was still not widely used...
We Haven't done a performance benchmarking for the MR vs Aggregation framework operators. But i feel MapReduce would provide a better performance for larger data set.
Re: I think you're wrong
by Arun V,
Your message is awaiting moderation. Thank you for participating in the discussion.
Though it is mentioned in the site, we did not face this issue during our testing. We'll try out the finalize option as well.
Cycle Time, Lead Time and Cumulative Flow Diagram using MongoDB
by Balaji Muniraja,
Your message is awaiting moderation. Thank you for participating in the discussion.
Hi,
I am planning to have a Offline Kanban Board application for Projects to use using MongoDB, please can you let me know if you have come across such implementations. If not, let me know by using MongoDB, can we create Cumulative Flow Diagrams.
Thanks and Regards
Balaji.M
Re: I think you're wrong
by sreenath venkataramanappa,
Your message is awaiting moderation. Thank you for participating in the discussion.
Yes in Map-Reduce, during reduce state, same key can be passed multiple times depending on the amount of data.
www.techiesinfo.com/code-snippet