作者:sdfdsafgafsdf | 来源:互联网 | 2014-05-28 16:53
最近在做搜索的查询日志的统计分析,对每一条查询统计日志,我将其解析出来后以特定字段格式存在mongodb中,定时调度做些统计分析。其中有个需求是,统计某个时间段(每天、每周、每月)各个query的查询次数,展示上就是热门查询query了。考虑到处理的数据量
最近在做搜索的查询日志的统计分析,对每一条查询统计日志,我将其解析出来后以特定字段格式存在mongodb中,定时调度做些统计分析。其中有个需求是,统计某个时间段(每天、每周、每月)各个query的查询次数,展示上就是热门查询query了。考虑到处理的数据量不会很大,解决方法也可以简单来之。我现在使用的方法就是mongodb的MapReduce功能,其实这个需求也可以认为是个group操作,而mongodb的group功能就是基于MapReduce的,但group对结果集的大小是有限制的。本文就针对一个示例介绍一下mongodb
MapReduce功能。
语法介绍
MapReduce是mongodb中的一个Command,它的语法格式如下:
db.runCommand(
false>]
[, finalize :
]
[, scope : ]
[, verbose : true]
);
对于该Command,必有的3个参数我就不解释了。对于可选参数,这里简要说明如下:
(1) query是很常用的,它用来在map阶段过滤查询条件的以限定MapReduce操作的记录范围。
(2)
和query相关的还有sort和limit,我起初以为它俩是用在reduce阶段,实际上和query一起用在map阶段。
(3)
mongodb默认是创建一个临时的collection存储MapReduce结果,当客户端连接关闭或者显示使用collection.drop(),这个临时的collection会被删除掉。这也就说,默认的keeptemp是false,如果keeptemp为true,那么结果collection就是永久的。当然,生成的collection名称并不友好,所以可以指定out表明永久存储的collection的名称(这时不需要再指定keeptemp)。当指定out时,并不是将执行结果直接存储到out,而是同样到临时collection,之后如果out存在则drop掉,最后rename临时collection为out。
(4) finalize:当MapReduce完成时应用到所有结果上,通常不怎么使用。
(5) verbose:提供执行时间的统计信息。
执行结果的格式如下:
{ result : ,
counts : input : ,
emit : ,
output : ,
timeMillis : ,
ok : <1_if_ok>,
[, err : ]
}
更常用的MapReduce命令的helper是:
db.collection.mapReduce(mapfunction,reducefunction[,options]);
map函数的定义如下,map函数内使用this来操作当前行表示的对象,并且需要使用emit(key,value)方法来向reduce提供参数:
function map(void) -> void
reduce函数定义如下,reduce的key就是emit(key,value)的key,value_array是同个key对应的多个value数组:
function reduce(key, value_array) -> value
MapReduce得到的collection的格式是“_id”:key,”value”:。
应用示例
这里给出一个假想的无意义的示例,主要是为了说明mongodb
MapReduce的使用。每条记录的schema是“query”:,”cnt”:,”year”:,”month”=>。这个schema的cnt是不需要的,因为每条查询query的cnt都是1,但这里想要稍微复杂一些条件。下面是可在mongodb
shell中执行的MapReduce脚本。
map = function() emit(this.query, this.cnt);;
reduce = function(key , vals) {
var sum = 0;
for(var i in vals) sum += vals[i];
return sum;
};
res = db.log_info.mapReduce(map,reduce,{"query":"year":2010});
执行结果如下:
{
"result" : "tmp.mr.mapreduce_1284794393_2",
"timeMillis" : 72,
"counts" : "input" : 1000,
"emit" : 1000,
"output" : 113,
"ok" : 1,
}
对于”result”,它是生成的临时collection名称,这个名称的命名规则是:”tmp.mr.mapreduce_”+time(0)+”_”+(jobNumber++)
执行db[res.result].find()得到:
"_id" : "a", "value" : 521
"_id" : "aa", "value" : 128
"_id" : "aaa", "value" : 40
"_id" : "aaaa", "value" : 4
"_id" : "aaab", "value" : 9
"_id" : "aaac", "value" : 13
"_id" : "aab", "value" : 45
"_id" : "aaba", "value" : 5
"_id" : "aabb", "value" : 14
"_id" : "aabc", "value" : 20
"_id" : "aac", "value" : 39
"_id" : "aaca", "value" : 6
"_id" : "aacb", "value" : 2
"_id" : "aacc", "value" : 5
"_id" : "ab", "value" : 65
"_id" : "aba", "value" : 37
"_id" : "abaa", "value" : 12
"_id" : "abab", "value" : 13
"_id" : "abac", "value" : 10
"_id" : "abb", "value" : 42
Java客户端API使用
和JS脚本一样,mongodb Java客户端提供了两个MapReduce接口,分别是:
public MapReduceOutput mapReduce( String map , String reduce , String outputCollection , DBObject query );
public MapReduceOutput mapReduce( DBObject command );
MapReduceOutput实现如下:
public class MapReduceOutput {
?
MapReduceOutput( DBCollection from , BasicDBObject raw )_collname = raw.getString( "result" );
_coll = from._db.getCollection( _collname );
_counts = (BasicDBObject)raw.get( "counts" );
?
public DBCursor results()return _coll.find();
?
public void drop()_coll.drop();
?
public DBCollection getOutputCollection()return _coll;
?
final String _collname;
final DBCollection _coll;
final BasicDBObject _counts;
}
所以,可以调用MapReduceOutput.results()得到DBCursor做后续处理,比如在我的应用场景里,根据value值做降序排序并取limit
1000以得到最热门的一些query。
由于Javascript引擎设计上的限制,当前的mongodb
MapReduce还只是单线程执行,mongodb也在计划解决这个问题。如果需要多线程处理,可以考虑shard或者在客户端代码控制处理。