热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

关于MongoDB数据库中mapreduce的研究

声明:本文是学习Mongodb过程中的副产品,因为接触时间并不长,难免有理解上的偏差,希望借此文与感兴趣的朋友讨论切磋,呵呵。去年年底,开始接触并学习Mapreduce模型。因为工作上的关系,最近开始研究Mongodb,其中对其新特性(2010年四月)reduce模型实现产
       声明:本文是学习Mongodb过程中的副产品,因为接触时间并不长,难免有理解上的偏差,希望借此文与感兴趣的朋友讨论切磋,呵呵。
       去年年底,开始接触并学习Mapreduce模型。因为工作上的关系,最近开始研究Mongodb,其中对其新特性(2010年四月)reduce模型实现产生的兴趣,因为特别留意了一下。当然网上关于该方面的内容并不是很多,且多为EN文,所以我想有必要将学习使用过程中的一些问题作一下记录并加以整理,因为就有了此文。
       废话不多说了,开始正文吧!

       目前支持Mongodb的C#客户端应该就是Samuel Corder 开源的这个项目了,链接:http://github.com/samus/mongodb-csharp。
       其中在它的源码包中的MongoDB.Net-Tests目录下有对TestMapReduce和TestMapReduceBuilder相应测试用例,因为我本地没安装NUnit,所以接下来的内容我是在一个新建的web项目中直接Copy其中的部分代码做的测试(注:有关Mapreduce模型的内容请查阅相关资料)。

      首先我们要先加载测试数据,这里我们以DNT中的在线用户列表的(结构)作为依据,批量倒入10条记录,
Mongo db = new Mongo("Servers=10.0.4.66:27017;COnnectTimeout=300000;COnnectionLifetime=300000;MinimumPoolSize=25;MaximumPoolSize=25;Pooled=true");          db.Connect(); 
Database test = db.GetDatabase("test");
IMongoCollection things = test["things"];
for (int i = 1; i <= 10;i++)
          {    
              Document record = new Document();
              record["_id"] = i;               
              record["userid"] = i;   
              record["ip"] = "10.0.7." + i;
              record["username"] = "用户" + i;
              record["nickname"] = "用户" + i;
              record["password"] = "";
              record["groupid"] = i;//下面将就该字段使用MAPREDUCE方式进行分组统计
              record["olimg"] = "";
              record["adminid"] = 0;
              record["invisible"] = 0;
              record["action"] = 0;
              record["lastactivity"] = 1;
              record["lastposttime"] = DateTime.Now.ToString();
              record["lastpostpmtime"] = DateTime.Now.ToString();
              record["lastsearchtime"] = DateTime.Now.ToString();
              record["lastupdatetime"] = "1212313221231231213321";
              record["forumid"] = 0;
              record["forumname"] = "";
              record["titleid"] = 0;
              record["title"] = "";
              record["verifycode"] = "";
              record["newpms"] = 0;
              record["newnotices"] = 0;
              things.Insert(record);             
          } 
      db.Disconnect(); 
        假定目前我们有这样一个需求,就是找出该表中用户组(groupid)字段为5的用户数,当然这里我们不会使用普通的查询方法,而是使用MAPREDUCE方式,其工作过程分为两个阶段:map阶段和reduce阶段。每个阶段都有键/值对作为输入和输出,并且它们的类型可由程序员选择。下面是其实现方式:

首先是map方法:    
 string mapfunction = "function() {  if(this.groupid==5) {emit({groupid : 5}, 1);} }";        

然后是reduce方法:    
 string reducefunction = "function(key, current ){" +
                                "   var count = 0;" +
                                "   for(var i in current) {" +
                                "       count+=current[i];" +
                                "   }" +
                                "   return count;" +
                              "};";          

最后我们使用下面代码实现对上面MAP,REDUCE的相应代码绑定和MapReduce类的声明:    
MapReduce mr = mrcol.MapReduce();
    mr.Map = new Code(mapfunction);
    mr.Reduce = new Code(reducefunction4);
    mr.Execute();
    foreach (Document doc in mr.Documents)
    {
           int groupCount = Convert.ToInt32(doc["value"]);
    }     mr.Dispose();
       运行上面代码,显示结果如下: 
    

      当前上面监视窗口中的"id:"{"groupid":5},即是mapfunction中的定义,当然如果要统计所有用户组(10个用户组)中各自的用户数,只把将mapfunction改写成:
      string mapfunction = "function() { emit(this.groupid, 1); }";
    这样,它就会按当前用户所属的groupid来作为键(确保不重复),凡是同一组的用户就作为输出进行发送(emit),emit可以理解为调用reduce方法,这里参数为1[即累加1操作])。
     目前我在网上打到mongodb示例基本上都是围绕分组统计功能展开的。
     当然就其传参和返回值都可以使用类似元组的方式,记得上面的“emit({groupid : 5}, 1)”代码吗?返回值这里也可以使用下面的方式:  
string reducefunction = "function(key, current ){" +
                                 "   var count = 0;" +
                                 "   for(var i in current) {" +
                                 "       count+=current[i];" +
                                 "   }" +
                                 "   return { groupcount : count };" +  //注意这里的返回方式
                               "};";       

返回类型变了,取值的方式也要发生变成:
int groupCount = int.Parse(((Document)doc["value"])["groupcount"].ToString());       

当然,上面的MapReduce 类的声明使用方式过于拘谨,下面使用链式调用的方式:
using (MapReduceBuilder mrb = mrcol.MapReduceBuilder().Map(mapfunction).Reduce(reducefunction)) 
{
        using (MapReduce mr = mrb.Execute())
        {
                   foreach (Document doc in mr.Documents)
                   {
                       int groupCount = int.Parse(((Document)doc["value"])["groupcount"].ToString());
                   }
        }
}        

返回的结果与之前的一样,呵呵。
       另外,mongodb还支持更加复杂的数据结构,比如官司方给的下面这个数据结构示例:
          mrcol.Insert(new Document().Append("_id", 1).Append("tags", new String[]{"dog", "cat"}));
          mrcol.Insert(new Document().Append("_id", 2).Append("tags", new String[]{"dog"}));
          mrcol.Insert(new Document().Append("_id", 3).Append("tags", new String[]{"mouse", "cat", "dog"}));
          mrcol.Insert(new Document().Append("_id", 4).Append("tags", new String[]{}));        

可以看出tags字段(这里暂且这么说,呵呵),就是一个字符串数组,而下面的mapreduce方法将会统计里面单词dog,cat,mouse的出现次数:
string mapfunction = "function(){\n" +
                           "   this.tags.forEach(\n" +
                           "       function(z){\n" +
                           "           emit( z , { count : 1 } );\n" +
                           "       });\n" +
                           "};";
string reducefunction = "function( key , values ){\n" +
                               "    var total = 0;\n" +
                               "    for ( var i=0; i
                               "        total += values[i].count;\n" +
                               "    return { count : total };\n" +
                               "};";       

对于如何对(含)日期型数据的键进行分组统计,下面的这个链接中有详细说明(统计每天用户的访问量):
       Counting Unique Items with Map-Reduce 
      下面这个链接就是官方给出示例的文档链接页面,其中包括更加复杂的mapreduce示例:
       http://www.mongodb.org/display/DOCS/MapReduce
      当然目前对于Mapreduce模式,Mongodb使用一个单独的进程来跑的,这主要是因为Javascript 引擎的限制。目前开发团队正在设计解决这一问题。原文:
       As of right now, MapReduce jobs on a single mongod process are single threaded. This is due to a design limitation in current Javascript engines. We are looking into alternatives to solve this issue, but for now if you want to parallelize your MapReduce jobs, you will need to either use sharding or do the aggregation client-side in your code.        另外就是到现在对于MONGODB那一端是如果把输入数据划分成等长的小数据发送到MapReduce(Hadoop把这一操作称为input split,即输入切片),因为这一点对于并发运行的作业进行负载平衡很重要,而在 Hadoop中一个理想的切片大小往往是一个HDFS块的大小,默认是64 MB(Hadoop权威指南(中文版))。

       除了上面所提到了,在MONGODB的mapreduce模型中,还支持map输出的临时结果集的持久化,而这一特色还在文档中专门作了如下说明:
Note on Permanent Collections Even when a permanent collection name is specified, a temporary collection name will be used during processing. At map/reduce completion, the temporary collection will be renamed to the permanent name atomically. Thus, one can perform a map/reduce job periodically with the same target collection name without worrying about a temporary state of incomplete data. This is very useful when generating statistical output collections on a regular basis.
     而如果想要持久化该临时集合,只要将mapreduce实例的Keptemp属性设为true,同时使用Out属性(方法)指定输出的集合名称即可。
     当然就目前我测试时结果来看,在单台机器上做这种模型测试就效率上是得不尝失的(执行周期太长),特别是数据量特别大(比如3000w以上),所以应用(或运行)场景的选择很重要。
     上面所说的示例比较简单,都是在单一reduce任务中的执行场景,如下图:
  

     而实际的生产环境要比上图复杂许多,比如多reduce任务情况,在Hadoop中,如果运行多个reduce任务,map任务会对其输出进行分区,为每个reduce任务创建一个分区(partition)。每个分区包含许多键(及其关联的值),但每个键的记录都在同一个分区中。分区可以通过用户定义的partitioner来控制。如下图:
     

     鉴于目前网上mongodb相关文档内容并不多,所以这里暂不多做讨论了。

推荐阅读
  • MongoDB高可用架构:深入解析Replica Set机制
    MongoDB的高可用架构主要依赖于其Replica Set机制。Replica Set通过多个mongod节点的协同工作,实现了数据的冗余存储和故障自动切换,确保了系统的高可用性和数据的一致性。本文将深入解析Replica Set的工作原理及其在实际应用中的配置和优化方法,帮助读者更好地理解和实施MongoDB的高可用架构。 ... [详细]
  • H5技术实现经典游戏《贪吃蛇》
    本文将分享一个使用HTML5技术实现的经典小游戏——《贪吃蛇》。通过H5技术,我们将探讨如何构建这款游戏的两种主要玩法:积分闯关和无尽模式。 ... [详细]
  • MongoDB Aggregates.group() 方法详解与编程实例 ... [详细]
  • Python 数据分析领域不仅拥有高质量的开发环境,还提供了众多功能强大的第三方库。本文将介绍六个关键步骤,帮助读者掌握 Python 数据分析的核心技能,并深入探讨六款虽不广为人知但却极具潜力的数据处理库,如 Pandas 的替代品和新兴的可视化工具,助力数据科学家和分析师提升工作效率。 ... [详细]
  • 本文探讨了Python类型注解使用率低下的原因,主要归结于历史背景和投资回报率(ROI)的考量。文章不仅分析了类型注解的实际效用,还回顾了Python类型注解的发展历程。 ... [详细]
  • 问题描述现在,不管开发一个多大的系统(至少我现在的部门是这样的),都会带一个日志功能;在实际开发过程中 ... [详细]
  • Python 领跑!2019年2月编程语言排名更新
    根据最新的编程语言流行指数(PYPL)排行榜,Python 在2019年2月的份额达到了26.42%,稳坐榜首位置。 ... [详细]
  • 从理想主义者的内心深处萌发的技术信仰,推动了云原生技术在全球范围内的快速发展。本文将带你深入了解阿里巴巴在开源领域的贡献与成就。 ... [详细]
  • 本文详细介绍了如何正确设置Shadowsocks公共代理,包括调整超时设置、检查系统限制、防止滥用及遵守DMCA法规等关键步骤。 ... [详细]
  • 本文详细介绍了如何搭建一个高可用的MongoDB集群,包括环境准备、用户配置、目录创建、MongoDB安装、配置文件设置、集群组件部署等步骤。特别关注分片、读写分离及负载均衡的实现。 ... [详细]
  • 本文详细介绍了如何在MySQL中创建自定义函数来计算地球表面上两点之间的距离。通过经纬度数据,利用球面三角公式,可以准确计算出两地之间的直线距离。 ... [详细]
  • 实践指南:使用Express、Create React App与MongoDB搭建React开发环境
    本文详细介绍了如何利用Express、Create React App和MongoDB构建一个高效的React应用开发环境,旨在为开发者提供一套完整的解决方案,包括环境搭建、数据模拟及前后端交互。 ... [详细]
  • 本文介绍了如何在不同操作系统上安装Git,以及一些基本和高级的Git操作,包括项目初始化、文件状态检查、版本控制、分支管理、标签处理、版本回退等,并简要提及了开源许可协议的选择。 ... [详细]
  • 如何提升Python处理约1GB数据集时的运行效率?
    如何提升Python处理约1GB数据集时的运行效率?本文探讨了在后端开发中使用Python处理大规模数据集的优化方法。通过分析常见的性能瓶颈,介绍了多种提高数据处理速度的技术,包括使用高效的数据结构、并行计算、内存管理和代码优化策略。此外,文章还提供了在Ubuntu环境下配置和测试这些优化方案的具体步骤,适用于从事推荐系统等领域的开发者。 ... [详细]
  • Hadoop——实验七:MapReduce编程实践
    文章目录一.实验目的二.实验内容三.实验步骤及结果分析 1.基于ubuntukylin14.04(7)版本,安装hadoop-eclipse-kepler-plugi ... [详细]
author-avatar
昆明DVD导航
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有