作者:mobiledu2502858037 | 来源:互联网 | 2023-09-06 20:28
问题业务反馈线上队列无消费者,但是management管理端发现有Unacked量基本原理Unacked消息指的是服务端已经投递给消费者,但还没有收到消费者响应Ack这么一个中间状
问题
业务反馈线上队列无消费者,但是management管理端发现有Unacked量
基本原理
- Unacked消息指的是服务端已经投递给消费者,但还没有收到消费者响应Ack这么一个中间状态。
- 消费者假如在消费过程中(尚未给Broker响应Ack)退出,那么消费中的这部分Unacked消息会被requeue到队列,进而投递到其余消费者(假如有Consumers在线);假如没有Consumers,那这部分Unacked消息会转换为Ready状态。
- Unacked消息状态被Broker维护在内存中。
而上面截图的问题:队列有Unacked量,但是没有Consumers信息,似乎很矛盾。
定位
阅读服务端消费相关板块代码,发现 rabbit_queue_consumers 板块:
计算Unacked消息量
unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
读进程字典
all_ch_record() -> [C || {{ch, _}, C} <- get()].
写进程字典
store_ch_record(C = #cr{ch_pid = ChPid}) -> put({ch, ChPid}, C), ok.
rabbit_queue_consumers 板块实际是被 rabbit_amqqueue_process 调用。不难发现队列进程的进程字典维护消息消费过程中
的消费Channel进程等信息。
验证
- 启动Erlang shell
erl -name sj@test -setCOOKIE COOKIE名
- 获取队列amqqueue记录信息,其中包含进程pid等信息。
rpc:call(N,erlang,apply,[fun()->rabbit_amqqueue:lookup(rabbit_misc:r(<<"vhost名字">>,queue,<<"queue名字">>)) end,[]]).
- 获取队列进程的进程字典信息。Pid为步骤2获取到的进程Pid
rpc:call(node(Pid), erlang, process_info, [Pid,dictionary]).
最终发现了相似如下6条关键信息,与问题中的Unacked量对应:
{{ch,<8232.16445.2870>}, {cr,<8232.16445.2870>,#Ref<8251.0.972816396.42542>, {[{3,<<"amq.ctag-RkzjrIA60GwE9ED8opKK7w">>}],[]}, 0, {queue,[],[],0}, {qstate,<8232.16442.2870>,dormant,{0,nil}}, 1}}
处理
已经能找到上述Unacked状态的6条消息对应的Channel进程,如何让这些消息重入队(成为Ready状态)?
阅读服务端代码,发现 rabbit_amqqueue_process 板块 handle_ch_down/2 方法,该方法内部调用 rabbit_queue_consumers 板块 erase_ch/2 方法,
最终调用 erase/2 将 {ch, ChPid} 这个key从进程字典中移除。
因而,可以通过触发channel进程DOWN
exit(ChPid,normal).
最终队列进程感知Channel进程退出,进而将Unacked状态requeue,最终消息转变为Ready状态~
拓展
如何获取队列Unacked状态消息?
sys:get_state(Pid).
有兴趣可以调试下