作者:黄秋华1219 | 来源:互联网 | 2014-05-28 16:53
MongoDBTailableCursors是一个类似于UNIXtail-f命令的功能,它使用在MongoDB的cappedcollection上。利用这一特性可以轻松的实现消息队列系统(简介看这里)。而官网上对于TailableCursors的讲解却非常简略,下面推荐的是一篇通过多种方式的性能测试,
MongoDB Tailable Cursors 是一个类似于UNIX tail -f
命令的功能,它使用在MongoDB的capped
collection上。利用这一特性可以轻松的实现消息队列系统(简介看这里)。而官网上对于 Tailable Cursors
的讲解却非常简略,下面推荐的是一篇通过多种方式的性能测试,对 Tailable Cursors 进行深入剖析的文章。10gen
的CEO Dwight Merriman 在看完此文后也表示官方的文档需要完善了。
下面是测试的第一步,作者的步骤是先让数据接收者对测试的目标colleciton进行读取,然后再用python脚本写入10w条数据到对应的capped
collection里。写入的数据都带有写入时的时间戳信息,读取端在读到数据后对比时间戳即可计算数据延迟。
下面是写数据的python脚本:
#!/usr/bin/python
import time
from pymongo import Connection
conn = Connection()
db = conn.queues
coll = db.messages
start = time.time()
count = 100000
for i in range(0, count):
coll.insert({ "time": time.time()})
#This generates messages at a rate of about
3,900 msg/s
time.sleep(0.0001)
end = time.time()
print("total: ", count)
print("msg/s: ", count/(end - start))
下面是读数据的C++脚本,根据官方文档上的脚本改写
#include
#include
using namespace
mongo;
int main(int argc, char* argv[])
{
DBClientConnection conn;
conn.connect("localhost");
// minKey is smaller than any other possible
value
BSONElement lastId =
minKey.firstElement();
// { $natural : 1 } means in forward capped
collection insertion order
Query query =
Query().sort("$natural");
cout << "loc,val" <<
std::endl;
uint32_t i = 0;
struct timeval tv;
while( true ) {
auto_ptr c =
conn.query("queues.messages", query, 0, 0, 0,
QueryOption_CursorTailable);
while( true )
{
if( !c->more() ) {
if( c->isDead() )
break;
sleepsecs(1);
continue; // we will try more() again
}
const BSONObj& o = c->next();
lastId = o["_id"];
const double time = o["time"].Double();
gettimeofday(&tv, NULL);
const double curr = tv.tv_sec + tv.tv_usec / 1000000.0;
cout << i++ << ", " << curr - time <<
endl;
}
// prepare to requery
from where we left off
query = QUERY( "_id"
<< GT << lastId ).sort("$natural");
}
return 0;
}
测试结果如下,下面是由这10w个 document
的延迟画出的线状图,可以看到图呈锯齿形。也就是说,如果你使用官方的例子,那么其读取性能是呈锯齿形的。
然后作者猜测可能是由于程序里在每次数据读尽后进行了一秒的sleep导致的(数据读尽的原因应该是写的速度赶不上读的速度,导致每过一段时间就会出现一次数据读尽的情况)。于是作者在已经填充了10w条数据的情况下再跑了一次上面的C++程序。这次数据都已经写好了,不会出现读尽的情况,于是得到下面的图:
此图基本验证了作者的猜想,为了进一步验证其猜想,他去掉了程序里的sleep
1秒代码,再次进行了第一次测试(先开着读进程,再进行写入)。效果如下:
可以看出,效果非常好,但是很明显,如果我们不再sleep了,那么在数据读尽时实际上程序是在跑死循环,CPU负载可想而知。
其实MongoDB是提供了一个方法来解决这种问题的,那就是QueryOption_AwaitData选项,在官方的程序里是加了此选项的,此选项的解释如下:
Use with TailableCursor. If we are at the end of the data, block
for a while rather than returning no data. After a timeout period,
we do return as normal.
在使用TailableCursor时,此参数会在数据读尽时先阻塞一小段时间后再读取一次并进行返回。
于是按官方文档中例子的做法,作者加上了QueryOption_AwaitData选项,当然我们还需要去掉程序里的sleep。而同时,在写脚本还没有开始写时,我们的collection里是没有数据里,在一个capped
collection里没有数据时,实际上这个cursor是会被关闭的,这时候会再次导致死循环,所以这个sleep又需要移到isDead判断里,具体进行两处修改后的代码片断如下:
while( true ) {
if( !c->more()
) {
if( c->isDead() )
{
// 这个sleep是为了在collection里没有数据时防止死循环而写的
sleepsecs(1);
break;
}
continue;
}
执行脚本后的延迟曲线如下:
好吧,除了刚开始有一段锯齿外,基本上都很理想。那刚开始的一段锯齿是从何而来呢。其实就是因为我们之前collection里没数据导致了在第一次读取前sleep了一秒,堆积了一些数据,而后来一直是在query方法里通过QueryOption_AwaitData参数block一小段时间,所以后续的延迟时间都一直比较小。
为了解决这个没有数据时情况,作者做了一个小技巧,预先在collection里写入一条特殊数据,使这个cursor不会被服务端杀掉。代码段如下:
while( true )
{
auto_ptr c = conn.query("queues.messages",
query, 0, 0, 0,
QueryOption_CursorTailable | QueryOption_AwaitData);
while( true ) {
if( !c->more()
) {
if( c->isDead() )
{
// this sleep is important for collections that start out with no
data
sleepsecs(1);
break;
}
continue;
}
const
BSONObj& o = c->next();
lastId =
o["_id"];
const double time
= o["time"].Double();
//
处理这条特殊数据: {time: 0.0}
if (time == 0)
continue;
gettimeofday(&tv, NULL);
const double curr
= tv.tv_sec + tv.tv_usec / 1000000.0;
cout << i++
<< ", " << curr - time << endl;
}
// prepare to requery from where we
left off
query = QUERY( "_id" << GT <<
lastId ).sort("$natural");
}
如上面红色标示部分,对于这条特殊数据不进行处理。这样改动后得到了一个比较完美的最终版性能图:
好吧,解释就到这里,如果您打算使用或者正在使用Tailable
Cursors特性做消息队列,不知道是否注意到了这些细节。如果还没有,恭喜你能看到这篇文章。:)