声明:本文是学习Mongodb过程中的副产品,因为接触时间并不长,难免有理解上的偏差,希望借此文与感兴趣的朋友讨论切磋,呵呵。去年年底,开始接触并学习Mapreduce模型。因为工作上的关系,最近开始研究Mongodb,其中对其新特性(2010年四月)reduce模型实现产
声明:本文是学习Mongodb过程中的副产品,因为接触时间并不长,难免有理解上的偏差,希望借此文与感兴趣的朋友讨论切磋,呵呵。
去年年底,开始接触并学习Mapreduce模型。因为工作上的关系,最近开始研究Mongodb,其中对其新特性(2010年四月)reduce模型实现产生的兴趣,因为特别留意了一下。当然网上关于该方面的内容并不是很多,且多为EN文,所以我想有必要将学习使用过程中的一些问题作一下记录并加以整理,因为就有了此文。
废话不多说了,开始正文吧!
目前支持Mongodb的C#客户端应该就是Samuel
Corder 开源的这个项目了,链接:http://github.com/samus/
mongodb-csharp。
其中在它的源码包中的MongoDB.Net-Tests目录下有对TestMapReduce和TestMapReduceBuilder相应测试用例,因为我本地没安装NUnit,所以接下来的内容我是在一个新建的web项目中直接Copy其中的部分代码做的测试(注:有关Mapreduce模型的内容请查阅相关资料)。
首先我们要先加载测试数据,这里我们以DNT中的在线用户列表的(结构)作为依据,批量倒入10条记录,
Mongo db = new Mongo("Servers=10.0.4.66:27017;COnnectTimeout=300000;COnnectionLifetime=300000;MinimumPoolSize=25;MaximumPoolSize=25;Pooled=true");
db.Connect();
Database test = db.GetDatabase("test");
IMongoCollection things = test["things"];
for (int i = 1; i <= 10;i++)
{
Document record = new Document();
record["_id"] = i;
record["userid"] = i;
record["ip"] = "10.0.7." + i;
record["username"] = "用户" + i;
record["nickname"] = "用户" + i;
record["password"] = "";
record["groupid"] = i;//下面将就该字段使用MAPREDUCE方式进行分组统计
record["olimg"] = "";
record["adminid"] = 0;
record["invisible"] = 0;
record["action"] = 0;
record["lastactivity"] = 1;
record["lastposttime"] = DateTime.Now.ToString();
record["lastpostpmtime"] = DateTime.Now.ToString();
record["lastsearchtime"] = DateTime.Now.ToString();
record["lastupdatetime"] = "1212313221231231213321";
record["forumid"] = 0;
record["forumname"] = "";
record["titleid"] = 0;
record["title"] = "";
record["verifycode"] = "";
record["newpms"] = 0;
record["newnotices"] = 0;
things.Insert(record);
}
db.Disconnect();
假定目前我们有这样一个需求,就是找出该表中用户组(groupid)字段为5的用户数,当然这里我们不会使用普通的查询方法,而是使用MAPREDUCE方式,其工作过程分为两个阶段:map阶段和reduce阶段。每个阶段都有键/值对作为输入和输出,并且它们的类型可由程序员选择。下面是其实现方式:
首先是map方法:
string mapfunction = "function() { if(this.groupid==5) {emit({groupid : 5}, 1);} }";
然后是reduce方法:
string reducefunction = "function(key, current ){" +
" var count = 0;" +
" for(var i in current) {" +
" count+=current[i];" +
" }" +
" return count;" +
"};";
最后我们使用下面代码实现对上面MAP,REDUCE的相应代码绑定和MapReduce类的声明:
MapReduce mr = mrcol.MapReduce();
mr.Map = new Code(mapfunction);
mr.Reduce = new Code(reducefunction4);
mr.Execute();
foreach (Document doc in mr.Documents)
{
int groupCount = Convert.ToInt32(doc["value"]);
}
mr.Dispose();
运行上面代码,显示结果如下:
当前上面监视窗口中的
"id:"{"groupid":5},即是mapfunction中的定义,当然如果要统计所有用户组(10个用户组)中各自的用户数,只把将mapfunction改写成:
string mapfunction = "function()
{ emit(this.groupid, 1); }";
这样,它就会按当前用户所属的groupid来作为键(确保不重复),凡是同一组的用户就作为输出进行发送(emit),emit可以理解为调用reduce方法,这里参数为1[即累加1操作])。
目前我在网上打到mongodb示例基本上都是围绕分组统计功能展开的。
当然就其传参和返回值都可以使用类似元组的方式,记得上面的“emit({groupid : 5},
1)”代码吗?返回值这里也可以使用下面的方式:
string reducefunction = "function(key, current ){" +
" var count = 0;" +
" for(var i in current) {" +
" count+=current[i];" +
" }" +
" return { groupcount : count };" + //注意这里的返回方式
"};";
返回类型变了,取值的方式也要发生变成:
int groupCount = int.Parse(((Document)doc["value"])["groupcount"].ToString());
当然,上面的MapReduce
类的声明使用方式过于拘谨,下面使用链式调用的方式:
using (MapReduceBuilder mrb = mrcol.MapReduceBuilder().Map(mapfunction).Reduce(reducefunction))
{
using (MapReduce mr = mrb.Execute())
{
foreach (Document doc in mr.Documents)
{
int groupCount = int.Parse(((Document)doc["value"])["groupcount"].ToString());
}
}
}
返回的结果与之前的一样,呵呵。
另外,mongodb还支持更加复杂的数据结构,比如官司方给的下面这个数据结构示例:
mrcol.Insert(new Document().Append("_id", 1).Append("tags", new String[]{"dog", "cat"}));
mrcol.Insert(new Document().Append("_id", 2).Append("tags", new String[]{"dog"}));
mrcol.Insert(new Document().Append("_id", 3).Append("tags", new String[]{"mouse", "cat", "dog"}));
mrcol.Insert(new Document().Append("_id", 4).Append("tags", new String[]{}));
可以看出tags字段(这里暂且这么说,呵呵),就是一个字符串数组,而下面的mapreduce方法将会统计里面单词dog,cat,mouse的出现次数:
string mapfunction = "function(){\n" +
" this.tags.forEach(\n" +
" function(z){\n" +
" emit( z , { count : 1 } );\n" +
" });\n" +
"};";
string reducefunction = "function( key , values ){\n" +
" var total = 0;\n" +
" for ( var i=0; i
" total += values[i].count;\n" +
" return { count : total };\n" +
"};";
对于如何对(含)日期型数据的键进行分组统计,下面的这个链接中有详细说明(统计每天用户的访问量):
Counting Unique Items with Map-Reduce
下面这个链接就是官方给出示例的文档链接页面,其中包括更加复杂的mapreduce示例:
http://www.mongodb.org/display/DOCS/MapReduce
当然目前对于Mapreduce模式,Mongodb使用一个单独的进程来跑的,这主要是因为Javascript
引擎的限制。目前开发团队正在设计解决这一问题。原文:
As of right now, MapReduce
jobs on a single mongod process are single threaded. This is due to
a design limitation in current Javascript engines. We are looking
into alternatives to solve this issue, but for now if you want to
parallelize your MapReduce jobs, you will need to either use
sharding or do the aggregation client-side in your code.
另外就是到现在对于MONGODB那一端是如果把输入数据划分成等长的小数据发送到MapReduce(Hadoop把这一操作称为input
split,即输入切片),因为这一点对于并发运行的作业进行负载平衡很重要,而在
Hadoop中一个理想的切片大小往往是一个HDFS块的大小,默认是64 MB(Hadoop权威指南(中文版))。
除了上面所提到了,在MONGODB的mapreduce模型中,还支持map输出的临时结果集的持久化,而这一特色还在文档中专门作了如下说明:
Note on Permanent Collections Even when a permanent collection name is specified, a temporary
collection name will be used during processing. At map/reduce
completion, the temporary collection will be renamed to the
permanent name atomically. Thus, one can perform a map/reduce job
periodically with the same target collection name without worrying
about a temporary state of incomplete data. This is very useful
when generating statistical output collections on a regular
basis.
而如果想要持久化该临时集合,只要将mapreduce实例的Keptemp属性设为true,同时使用Out属性(方法)指定输出的集合名称即可。
当然就目前我测试时结果来看,在单台机器上做这种模型测试就效率上是得不尝失的(执行周期太长),特别是数据量特别大(比如3000w以上),所以应用(或运行)场景的选择很重要。
上面所说的示例比较简单,都是在单一reduce任务中的执行场景,如下图:
而实际的生产环境要比上图复杂许多,比如多reduce任务情况,在Hadoop中,如果运行多个reduce任务,map任务会对其输出进行分区,为每个reduce任务创建一个分区(partition)。每个分区包含许多键(及其关联的值),但每个键的记录都在同一个分区中。分区可以通过用户定义的partitioner来控制。如下图:
鉴于目前网上mongodb相关文档内容并不多,所以这里暂不多做讨论了。