本文转载自:众成翻译
译者:文蔺
链接:http://www.zcfy.cc/article/662
原文:http://blog.yld.io/2016/05/10/introducing-queues/
这是深切探究 Node.js 中运用事情行列(work queues)治理异步事情流的系列文章的第一篇,来自the Node Patterns series。
最先享用吧!
很罕见的是,在运用程序流中,运用有着可以异步处置惩罚的事情负载。一个罕见的例子是发送邮件。比方说,新用户注册时,可以须要给 Ta 发送一封确认邮件来确认用户方才输入的 email 地点是 Ta 本身的。这包含从模板中天生音讯,向电子邮件效劳供应商发送要求,剖析结果,处置惩罚任何可以发送的终究毛病,重试,等等…… 这个流程可以比较复杂,轻易失足,或许在 HTTP 效劳器的周期中消费太长时刻。不过也有别的一个挑选,可以向耐久化存储中插进去一个文档,该文档形貌我们有一条待发送给这个用户的音讯。另一个历程可以拿到这个文档,做一些比较重的事情:从模板天生音讯,向效劳器发送要求,剖析毛病,并在必要的状况下重排这个事情。
另外,体系须要和其他体系整合的状况也很罕见。在我曾做过的一些项目中,须要差别的体系之间的举行用户配置文件的双向同步:当用户在一个体系中更新了个人材料,这些变化须要通报给其他体系,反之亦然。如果两个体系之间不须要很强的一致性,材料同步之间有一个小的耽误也许是可接收的,那这项事情就可以运用另一个历程异步处置惩罚。
更一般地说,在全部体系中有一个事情行列将事情生产者和消费者离开,这是一种罕见的形式。生产者往事情行列中插进去事情,消费者从行列中拿到事情并实行须要的使命。
运用如许的拓扑构造有很多缘由和长处,如:
解耦事情生产者和消费者
使重试逻辑更易于完成
跨时刻分配事情负载
跨空间(nodes 节点)分配事情负载
异步事情
使外部体系更轻易整合(终究的一致性)
让我们来剖析一下个中的一些问题吧。
发送邮件是很多运用须要做的事情。一个例子是,用户修正了暗码,一些运用很友爱地发送邮件关照用户有人(最好不是其他人)修正了暗码。如今发送邮件,通常是经由过程挪用第三方邮件供应商供应的 HTTP API来完成的。如果效劳迟缓或无法访问时刻会怎样?你可不想就因为一封邮件宣布出去就把暗码给回滚了。固然,你也不想就因为在处置惩罚要求失利时碰到了事情中的一个非主要的部份,使得暗码变动要求就如许崩掉了。暗码修正后愿望可以很快就发送出这封邮件,但不能有云云的价值。
另有,修正暗码意味着,你要为这个用户在两个体系中都做变动:一个中心用户数据库和一个遗留体系(legacy system)。(我晓得这很恶心啊,不过我可不止见过一次 —— 实际就这么骨感。)如果第一个胜利了、第二个失利了,咋办?
在这些情况下,你可以想一向重试直至胜利:在遗留体系中变动暗码是一个可以屡次反复的结果雷同的操纵,而邮件也可以反复发送屡次。
举例子,如果遗留体系修正暗码了但未能胜利返回关照,如果操纵是幂等的,你可以稍后重试。
以至,非幂等操纵也可以从事情行列处置惩罚中尝到甜头。比方,你可以将一次钱银生意营业插进去到事情行列中 :给每次生意营业一个通用唯一标识符(UUID, universal unique identifier),稍后吸取生意营业要求的体系可以保证不会发作反复生意营业。
在这个例子中,你基础只须要忧郁事情行列供应的必要的耐久性保证:如果体系故障,你愿望将生意营业丧失的风险降到最低。
另一个将事情生产者和消费者解耦的缘由是,你可以想将事情集群范围化:如果使命斲丧大批资本,如果使命是重 CPU 型的或许须要大批内存或操纵体系资本,你可以将其与运用其他部份分离出来,放到事情行列中。
在任何运用中,一些操纵比其他的要重。这可以会在全部节点引入有差别的事情负载:一个不幸的节点可以因处置惩罚太多的高并发营业而负担过重,而别的节点却被闲置。运用事情行列,将详细事情平均分配,可以将影响最小化。
事情行列的另一个结果是吸取事情峰(absorb work peaks):你可以为事情集群设想给定的最大容量,并确保容量永久不会凌驾。如果事情数目在短时刻内急剧上升,事情行列完全可以处理,阔别事情峰的压力。
体系监控在这里起到主要作用:你应该延续监控事情行列的长度,事情时刻(完成一项使命的时刻),事情占用,以及容量,以确定在岑岭时刻保证令人满意的操纵时刻须要的最好、最小的资本。
如果你不须要以上任何一点东西,运用耐久化事情行列的一个理由是防备崩溃。即使是同一个历程中的内存行列也能满足你的运用需求,延续的行列使你的运用在历程重启的时刻更具弹性。
好了,理论讲得差不多了 —— 我们来看详细完成。
最简朴的案例:内存事情行列(In-Memory Work Queue)可以设想出的最简朴的事情行列是一个内存行列。完成内存行列多是个学校的演习(留给读者)。这里我们运用 Async 的 queue。
假定你在做的这个演示运用和一个掌握你的屋子的硬件单位相衔接。你的 Node.js 运用和该单位经由过程一个串行端口对话,且有线协定只能同时接收一个挂起的敕令。
这个协定被包装在我们的 domotic.js
模块中,模块暴露三个函数:
.connect()
– 衔接 domotic 模块
.command()
– 发送敕令,守候相应
.disconnect()
– 割断与模块的衔接
下面的代码模仿了如许一个模块:
domotic.js:
exports.cOnnect= connect;
exports.command = command;
exports.discOnnect= disconnect;
function connect(cb) {
setTimeout(cb, 100); // simulate connection
}
function command(cmd, options, cb) {
if (succeeds()) {
setTimeout(cb, 100); // simulate command
} else {
setTimeout(function() {
var err = Error('error connecting');
err.code = 'ECONN';
cb(err);
}, 100);
}
}
function disconnect(cb) {
if (cb) setTimeout(cb, 100); // simulate disconnection
}
function succeeds() {
return Math.random() > 0.5;
}
注重我们并没有和任何 domotic 模块交互;我们只是伪装,100 毫秒后胜利挪用回调函数。
一样,
.command
函数模仿了衔接毛病: 如果succeeds()
返回false
,衔接失利,敕令失利,这有 50% 的可以性(我们的 domotic 串行衔接很轻易失足)。这使我们可以测试在发作失利以后,我们的运用是不是会胜利重连并重试敕令。
然后我们新建另一个模块,可以在行列背面发出敕令。
domotic_queue.js:
var async = require('async');
var Backoff = require('backoff');
var domotic = require('./domotic');
var cOnnected= false;
var queue = async.queue(work, 1);
function work(item, cb) {
ensureConnected(function() {
domotic.command(item.command, item.options, callback);
});
function callback(err) {
if (err && err.code == 'ECONN') {
cOnnected= false;
work(item);
} else cb(err);
}
}
/// command
exports.command = pushCommand;
function pushCommand(command, options, cb) {
var work = {
command: command,
options: options
};
console.log('pushing command', work);
queue.push(work, cb);
}
function ensureConnected(cb) {
if (connected) {
return cb();
} else {
var backoff = Backoff.fibonacci();
backoff.on('backoff', connect);
backoff.backoff();
}
function connect() {
domotic.connect(connected);
}
function connected(err) {
if (err) {
backoff.backoff();
} else {
cOnnected= true;
cb();
}
}
}
/// disconnect
exports.discOnnect= disconnect;
function disconnect() {
if (! queue.length()) {
domotic.disconnect();
} else {
console.log('waiting for the queue to drain before disonnecting');
queue.drain = function() {
console.log('disconnecting');
domotic.disconnect();
};
}
}
做了不少事情 —— 我们来一段段地剖析。
var async = require('async');
var Backoff = require('backoff');
var domotic = require('./domotic');
这里我们引入了一些包:
async
– 供应内存行列的完成
backoff
– 让我们增添每一次失利后尝试从新衔接的时刻距离
./domotic
– 模仿 domotic 的模块
我们的模块从衔接断开状况最先启动:
`var cOnnected= false;`
竖立我们的 async 行列:
`var queue = async.queue(work, 1);`
这里供应一个叫做 worker
的事情函数(在代码中进一步定义的)和一个最大并发量 1。我们在这里强迫设置,是因为我们定义了 domotic 模块协定一次只允许一个敕令。
然后定义 worker
函数,它每次处置惩罚一个行列元素:
function work(item, cb) {
ensureConnected(function() {
domotic.command(item.command, item.options, callback);
});
function callback(err) {
if (err && err.code == 'ECONN') {
cOnnected= false;
work(item);
} else cb(err);
}
}
当我们的 async
行列到场另一个事情项目,会挪用 work
函数,通报该事情项目和一个当事情完成时刻为我们所挪用的回调函数。
对每一个事情项目来讲,我们要确认已衔接了。一旦衔接上,运用事情项目中会有的 command
和 options
属性,来用 domotic 模块来实行敕令。传的末了一次参数是一个回调函数,当敕令胜利或失利以后会马上被挪用。
回调函数中,我们明确地处置惩罚衔接毛病的状况,设置 connected
状况为 false
,并再次挪用 work
重连。
如果没有发作毛病,挪用回调函数 cb
完毕当前事情项目。
function ensureConnected(cb) {
if (connected) {
return cb();
} else {
var backoff = Backoff.fibonacci();
backoff.on('backoff', connect);
backoff.backoff();
}
function connect() {
domotic.connect(connected);
}
function connected(err) {
if (err) {
backoff.backoff();
} else {
cOnnected= true;
cb();
}
}
}
ensureConnected
函数如今担任处于衔接状况时挪用回调或相反状况下尝试衔接。尝试衔接的时刻,运用 backoff
增添每次重连的时刻距离。 每次 domotic.connect
函数带着毛病被挪用,在 backoff
事宜触发之前增添距离时刻。触发 backoff
时,尝试衔接。一旦衔接胜利,挪用 cb
回调;不然坚持重试。
这个模块暴露一个 .command
函数:
/// command
exports.command = pushCommand;
function pushCommand(command, options, cb) {
var work = {
command: command,
options: options
};
console.log('pushing command', work);
queue.push(work, cb);
}
这个敕令简朴的剖析一个事情项目并将其推入行列。
末了,这个模块一样暴露出 .disconnect
函数。
/// disconnect
exports.discOnnect= disconnect;
function disconnect() {
if (! queue.length()) {
domotic.disconnect();
} else {
console.log('waiting for the queue to drain before disonnecting');
queue.drain = function() {
console.log('disconnecting');
domotic.disconnect();
};
}
}
这里我们只是确保在挪用 domotic 模块的 disconnected
要领之前行列是空的。如果行列非空,在真正断开衔接之前会守候其耗尽(drain)。
可选:在行列未被耗尽的状况下,您可以设置一个超时时刻,然后强迫断开衔接。
然后我们来新建一个 domotic 客户端:
client.js:
var domotic = require('./domotic_queue');
for(var i = 0 ; i <20; i ++) {
domotic.command('toggle light', i, function(err) {
if (err) throw err;
console.log('command finished');
});
}
domotic.disconnect();
这里我们并行得向 domotic 模块增加了 20 个 settime
敕令,同时通报了回调函数,当敕令完成时就会被挪用。如果有敕令失足,简朴地抛出毛病并中缀实行。
增加一切敕令以后我们立时断开衔接,不过模块会守候一切敕令被实行以后才会真正将其断开。
让我们在敕令行中试一下:
$ node client.js
pushing command { command: 'toggle light', options: 0 }
pushing command { command: 'toggle light', options: 1 }
pushing command { command: 'toggle light', options: 2 }
pushing command { command: 'toggle light', options: 3 }
pushing command { command: 'toggle light', options: 4 }
pushing command { command: 'toggle light', options: 5 }
pushing command { command: 'toggle light', options: 6 }
pushing command { command: 'toggle light', options: 7 }
pushing command { command: 'toggle light', options: 8 }
pushing command { command: 'toggle light', options: 9 }
pushing command { command: 'toggle light', options: 10 }
pushing command { command: 'toggle light', options: 11 }
pushing command { command: 'toggle light', options: 12 }
pushing command { command: 'toggle light', options: 13 }
pushing command { command: 'toggle light', options: 14 }
pushing command { command: 'toggle light', options: 15 }
pushing command { command: 'toggle light', options: 16 }
pushing command { command: 'toggle light', options: 17 }
pushing command { command: 'toggle light', options: 18 }
pushing command { command: 'toggle light', options: 19 }
waiting for the queue to drain before disonnecting
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
command finished
disconnecting
这里我们可以看到,一切敕令被马上放到行列中,而且敕令是被一些随机时刻距离着有序完成的。末了,一切敕令完成以后衔接割断。
本系列的下一篇文章,我们将探究怎样防止崩溃以及经由过程耐久化事情项目来限定内存影响。