求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
  
 
 
     
   
分享到
在MongoDB中实现聚合函数
 

作者:Arun Viswanathan, Shruthi Kumar,发布于2012-8-22,来源:InfoQ

 

随着组织产生的数据爆炸性增长,从GB到TB,从TB到PB,传统的数据库已经无法通过垂直扩展来管理如此之大数据。传统方法存储和处理数据的成本将会随着数据量增长而显著增加。这使得很多组织都在寻找一种经济的解决方案,比如NoSQL数据库,它提供了所需的数据存储和处理能力、扩展性和成本效率。NoSQL数据库不使用SQL作为查询语言。这种数据库有多种不同的类型,比如文档结构存储、键值结构存储、图结构、对象数据库等等。

由于NoSQL数据库具有高速的读写能力,因此它的的典型应用包括归档历史日志、事件日志、电子商务日志、游戏数据、社交数据等,存储下来的数据是为了进行后续处理,以得到关于用户以及他们使用情况的有用信息。

我们在本文中使用的NoSQL是MongoDB,它是一种开源的文档数据库系统,开发语言为C++。它提供了一种高效的面向文档的存储结构,同时支持通过MapReduce程序来处理所存储的文档;它的扩展性很好,而且支持自动分区。Mapreduce可以用来实现数据聚合。它的数据以BSON(二进制JSON)格式存储,在存储结构上支持动态schema,并且允许动态查询。和RDBMS的SQL查询不同,Mongo查询语言以JSON表示。

MongoDB提供了一个聚合框架,其中包括常用功能,比如count、distinct和group。然而更多的高级聚合函数,比如sum、average、max、min、variance(方差)和standard deviation(标准差)等需要通过MapReduce来实现。

这篇文章描述了在MongoDB存储的文档上使用MapReduce来实现通用的聚合函数,如sum、average、max、min、variance和standard deviation;聚合的典型应用包括销售数据的业务报表,比如将各地区的数据分组后计算销售总和、财务报表等。

我们从本文示例应用所需软件的安装开始。

软件安装

首先在本地机器上安装并设置MongoDB服务。

  • 从Mongo网站上下载MongoDB,解压到本地目录,比如C:>Mongo
  • 在上一个文件夹内创建数据目录。比如:C:\Mongo\Data ?如果数据文件存放在其他地方,那么在用mongod.exe命令启动MongoDB时,需要在命令行加参数—-dbpath
  • 启动服务 ?MongoDB提供了两种方式:mongod.exe以后台进程启动;mongo.exe启动命令行界面,可做管理操作。这两个可执行文件都位于Mongo\bin目录下;
  • 进入Mongo安装目录的bin目录下,比如:C:> cd Mongo\bin
  • 有两种启动方式,如下:
    mongod.exe –dbpath C:\Mongo\data
    或者 mongod.exe –config mongodb.config
    mongodb.config是Mongo\bin目录下的配置文件,需要在此配置文件中指定数据目录(比如,dbpath= C:\Mongo\Data)的位置。
  • 连接到MongoDB,到这一步,mongo后台服务已经启动,可以通过http://localhost:27017查看。 MongoDB启动运行后,我们接下来看它的聚合函数。

实现聚合函数

在关系数据库中,我们可以在数值型字段上执行包含预定义聚合函数的SQL语句,比如,SUM()、COUNT()、MAX()和MIN()。但是在MongoDB中,需要通过MapReduce功能来实现聚合以及批处理,它跟SQL里用来实现聚合的GROUP BY从句比较类似。下一节将描述关系数据库中SQL方式实现的聚合和相应的通过MongoDB提供的MapReduce实现的聚合。

为了讨论这个主题,我们考虑如下所示的Sales表,它以MongoDB中的反范式形式呈现。

Sales表

基于SQL和MapReduce的实现

我们提供了一个查询的样例集,这些查询使用聚合函数、过滤条件和分组从句,及其等效的MapReduce实现,即MongoDB实现SQL中GROUP BY的等效方式。在MongoDB存储的文档上执行聚合操作非常有用,这种方式的一个限制是聚合函数(比如,SUM、AVG、MIN、MAX)需要通过mapper和reducer函数来定制化实现。

MongoDB没有原生态的用户自定义函数(UDFs)支持。但是它允许使用db.system.js.save命令来创建并保存JavaScript函数,JavaScript函数可以在MapReduce中复用。下表是一些常用的聚合函数的实现。稍后,我们会讨论这些函数在MapReduce任务中的使用。

聚合函数
Javascript 函数
SUM
db.system.js.save( { _id : "Sum" ,
value : function(key,values)
{
    var total = 0;
    for(var i = 0; i < values.length; i++)
        total += values[i];
    return total;
}});
AVERAGE
db.system.js.save( { _id : "Avg" ,
value : function(key,values)
{
    var total = Sum(key,values);
    var mean = total/values.length;
    return mean;
}});
MAX
db.system.js.save( { _id : "Max" ,
value : function(key,values)
{
    var maxValue=values[0];
    for(var i=1;i
MIN
db.system.js.save( { _id : "Min" ,
value : function(key,values)
{
    var minValue=values[0];
    for(var i=1;i
VARIANCE

db.system.js.save( { _id : "Variance" ,
value : function(key,values)
{
    var squared_Diff = 0;
    var mean = Avg(key,values);
    for(var i = 0; i < values.length; i++)
    {
        var deviation = values[i] - mean;
        squared_Diff += deviation * deviation;
    }
    var variance = squared_Diff/(values.length);
    return variance;
}}); 
STD DEVIATION
db.system.js.save( { _id : "Standard_Deviation"
, value : function(key,values)
{
    var variance = Variance(key,values);
    return Math.sqrt(variance);
}});

SQL和MapReduce脚本在四种不同的用例场景中实现聚合函数的代码片段如下表所示。

1.各地区的平均订单量

下面的查询是用来获取不同地区的平均订单量。

SQL Query MapReduce Functions
SELECT
db.sales.runCommand(
{
mapreduce : "sales" ,
City,
State,
Region,
map:function()
{ // emit function handles the group by
        emit( {
        // Key
        city:this.City,
        state:this.State,
        region:this.Region},
        // Values
        this.Quantity);
},
AVG(Quantity)
reduce:function(key,values)
{
    var result = Avg(key, values);
    return result;
}
FROM sales
trx_undo_t*    insert_undo;
GROUP BY City, State, Region
// Group By is handled by the emit(keys, values)
 line in the map() function above
out : { inline : 1 } }); 

2.产品的分类销售总额

下面的查询是用来获取产品的分类销售额,根据产品类别的层级分组。在下面例子中,不同的产品类别作为个体维度,它们也可以被称为更复杂的基于层次的维度。

SQL 查询 MapReduce 函数
SELECT
db.sales.runCommand(
{
mapreduce : "sales" ,
ProductCategory, ProductSubCategory, ProductName,
map:function()
{
        emit(
        // Key
        {key0:this.ProductCategory,
        key1:this.ProductSubCategory,
        key2:this.ProductName},
        // Values
        this.SalesAmt);
},
SUM(SalesAmt)
reduce:function(key,values)
{
    var result = Sum(key, values);
    return result;
} 
FROM sales
 
GROUP BY ProductCategory, ProductSubCategory, ProductName
// Group By is handled by the emit(keys, values) 
line in the map() function above
out : { inline : 1 } }); 

3. 一种产品的最大利润

下面的查询是用来获取一个给定产品基于过滤条件的最大利润。

SQL查询 MapReduce 函数
SELECT
db.sales.runCommand(
{
mapreduce : "sales" ,
ProductId, ProductName,
map:function()
{
    if(this.ProductId==1)
        emit( {
            key0:this.ProductId,
            key1:this.ProductName},
            this.Profit);
},
MAX(SalesAmt)
reduce:function(key,values)
{
    var maxValue=Max(key,values);
    return maxValue;
}
FROM sales
 
WHERE ProductId=’1’
// WHERE condition implementation is provided in 
map() function
GROUP BY ProductId, ProductName
// Group By is handled by the emit(keys, values) 
line in the map() function above 
out : { inline : 1 } }); 

4. 总量、总销售额、平均利润

这个场景的需求是计算订单的总数、总销售额和平均利润,订单ID在1到10之间,发货时间在2011年的1月1日到12月31日之间。下面的查询是用来执行多个聚合,比如,在指定年份以及指定的不同区域和产品类别范围里订单的总数、总销售额和平均利润。

SQL查询 MapReduce 函数
SELECT
db.sales.runCommand(
{ mapreduce : "sales" ,
Region,
ProductCategory,
ProductId,
map:function()
{
    emit( {
        // Keys
        region:this.Region,
        productCategory:this.ProductCategory,
        productid:this.ProductId},

        // Values
        {quantSum:this.Quantity,
        salesSum:this.SalesAmt,
        avgProfit:this.Profit} );
}
Sum(Quantity),
Sum(Sales),
Avg(Profit)
reduce:function(key,values)
{
    var result=
{quantSum:0,salesSum:0,avgProfit:0};
    var count = 0;
    values.forEach(function(value)
    {
        // Calculation of Sum(Quantity)
        result.quantSum += values[i].quantSum;
        // Calculation of Sum(Sales)
        result.salesSum += values[i].salesSum;
        result.avgProfit += values[i].avgProfit;
        count++;
    }
    // Calculation of Avg(Profit)
    result.avgProfit = result.avgProfit / count;
    return result;
},
FROM Sales
 
WHERE  
Orderid between 1 and 10
AND Shipdate BETWEEN
‘01/01/2011’ and
‘12/31/2011’
query : {
        "OrderId" : { "$gt" : 1 },
        "OrderId" : { "$lt" : 10 },
        "ShipDate" : { "$gt" : "01/01/2011" },
        "ShipDate" : { "$lt" : "31/12/2011" },
},
GROUP BY
Region, ProductCategory, ProductId
// Group By is handled by the emit(keys, values) 
line in the map() function above 
LIMIT 3;
limit : 3,
out : { inline : 1 } }); 

既然我们已经看了在不同业务场景下的聚合函数的代码示例,接下来我们准备来测试这些函数。

测试聚合函数

MongoDB的MapReduce功能通过数据库命令来调用。Map和Reduce函数在前面章节里已经使用JavaScript实现。下面是执行MapReduce函数的语法。

db.runCommand(

    { mapreduce : <collection>,

        map : <mapfunction>,

        reduce : <reducefunction>

        [, query : <query filter object>]

        [, sort : <sorts the input objects using this key. Useful for 
 optimization, like sorting by the emit key for fewer reduces>]

        [, limit : <number of objects to return from collection>]

        [, out : <see output options below>]

        [, keeptemp: <true|false>]

        [, finalize : <finalizefunction>]

        [, scope : <object where fields go into javascript global scope >]

        [, jsMode : true]

        [, verbose : true]

    }

)


Where the Output Options include:

{ replace : "collectionName" }

{ merge : "collectionName"

{ reduce : "collectionName" }

{ inline : 1} 

下面是用来保存聚合函数并在MapReduce中使用的命令。

启动Mongo命令行并设置表

  • 确保Mongo后台进程在运行,然后执行mongo.exe启动Mongo命令行。
  • 使用命令切换数据库:use mydb
  • 使用命令查看Sales表的内容:db.sales.find()

find命令的输出如下:

{ "_id" : ObjectId("4f7be0d3e37b457077c4b13e"), "_class" : "com.infosys.mongo.Sales", "orderId" : 1, 
"orderDate" : "26/03/2011","quantity" : 20, "salesAmt" : 200, "profit" : 150, "customerName" : "CUST1",
 "productCategory" : "IT", "productSubCategory" : "software", "productName" : "Grad", "productId" : 1 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b13f"), "_class" : "com.infosys.mongo.Sales", "orderId" : 2, 
"orderDate" : "23/05/2011", "quantity" : 30, "salesAmt" : 200, "profit" : 40, "customerName" : "CUST2",
"productCategory" : "IT", "productSubCategory" : "hardware", "productName" : "HIM", "productId" : 1 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b140"), "_class" : "com.infosys.mongo.Sales", "orderId" : 3, 
"orderDate" : "22/09/2011","quantity" : 40, "salesAmt" : 200, "profit" : 80, "customerName" : "CUST1", 
"productCategory" : "BT", "productSubCategory" : "services", "productName" : "VOCI", "productId" : 2 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b141"), "_class" : "com.infosys.mongo.Sales", "orderId" : 4, 
"orderDate" : "21/10/2011", "quantity" : 30, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", 
"productCategory" : "BT", "productSubCategory" : "hardware", "productName" : "CRUD", "productId" : 2 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b142"), "_class" : "com.infosys.mongo.Sales", "orderId" : 5, 
"orderDate" : "21/06/2011", "quantity" : 50, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", 
"productCategory" : "BT", "productSubCategory" : "hardware", "productName" : "CRUD", "productId" : 1 }

创建并保存聚合函数

通过MongoDB命令行窗口执行如下命令:

> db.system.js.save( { _id : "Sum" ,
value : function(key,values)
{
    var total = 0;
    for(var i = 0; i < values.length; i++)
        total += values[i];
    return total;
}}); 

在示例表Sales表上执行MapReduce程序

> db.sales.runCommand(
{
mapreduce : "sales" ,
map:function()
{
emit(
{key0:this.ProductCategory,
key1:this.ProductSubCategory,
key2:this.ProductName},
this.SalesAmt);
},
reduce:function(key,values)
{
    var result = Sum(key, values);
    return result;
}
out : { inline : 1 } });

输出如下:

"results" : [
        {
                "_id" : {
                        "key0" : "BT",
                        "key1" : "hardware",
                        "key2" : "CRUD"
                },
                "value" : 400
        },
        {
                "_id" : {
                        "key0" : "BT",
                        "key1" : "services",
                        "key2" : "VOCI"
                },
                "value" : 200
        },
        {
                "_id" : {
                        "key0" : "IT",
                        "key1" : "hardware",
                        "key2" : "HIM"
                },
                "value" : 200
        },

        {
                "_id" : {
                        "key0" : "IT",
                        "key1" : "software",
                        "key2" : "Grad"
                },
                "value" : 200
        }
],
"timeMillis" : 1,
"timing" : {
        "mapTime" : NumberLong(1),
        "emitLoop" : 1,
        "total" : 1
},
"counts" : {
        "input" : 5,
        "emit" : 5,
        "output" : 4
},
"ok" : 1 

总结

MongoDB提供了面向文档的存储结构,可以很容易扩展支持TB级数据。同时也提供了Map Reduce功能,可以通过批处理方式使用类SQL函数来实现数据聚合。在这篇文章中,我们描述了安装MongoDB并使用MapReduce特性执行聚合函数的过程,也提供了简单SQL聚合的MapReduce示例实现。在MongoDB中,更复杂的聚合函数也可以通过使用MapReduce功能实现。


相关文章 相关文档 相关视频



我们该如何设计数据库
数据库设计经验谈
数据库设计过程
数据库编程总结
数据库性能调优技巧
数据库性能调整
数据库性能优化讲座
数据库系统性能调优系列
高性能数据库设计与优化
高级数据库架构师
数据仓库和数据挖掘技术
Hadoop原理、部署与性能调优

 
分享到
 
 
     


MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...