热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MQ实际场景解决方案参考

一、基础概念1、MQ持久化机制如果我们生产者每发一条消息,都要MQ持久化到磁盘中,然后再发起ack或nack的回调。这样的话是不是我们MQ的吞吐量很
一、基础概念

1、MQ持久化机制

如果我们生产者每发一条消息,都要MQ持久化到磁盘中,然后再发起ack或nack的回调。这样的话是不是我们MQ的吞吐量很不高,因为每次都要把消息持久化到磁盘中。写入磁盘这个动作是很慢的,在高并发场景下是不能够接受的,吞吐量太低了。

所以MQ持久化磁盘真实的实现,是通过异步调用处理的,他是有一定的机制,如:等到有几千条消息的时候,会一次性的刷盘到磁盘上面。而不是每来一条消息,就刷盘一次。


2、confirm机制的原理

(1)消息生产者把消息发送给MQ,如果接收成功,MQ会返回一个ack消息给生产者;

(2)如果消息接收不成功,MQ会返回一个nack消息给生产者;


3、消息幂等

幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用,数据库的结果都是唯一的,不可变的。
只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样,需要业务端来实现。 


4、消息堆积

消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况: 

  1. 消息堆积在内存Buffer:一旦超过内存Buffer,可以根据一定的丢弃策略来丢弃消息,如CORBA Notification规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存Buffer大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。
  2. 消息堆积到持久化存储系统中:例如DB,KV存储,文件记录形式。 当消息不能在内存Cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力。

 

 

二、如何避免消息丢失

1、丢失原因?

第一种情况:一般MQ中间件为了提高系统的吞吐量会把消息保存在内存中,如果不作其他处理,MQ服务器一旦宕机,消息将全部丢失。这个是业务不允许的,造成很大的影响。

第二种情况: 就是消息刚刚保存到MQ内存中,但还没有来得及更新到磁盘文件中,突然宕机了。这个场景在持续的大量消息投递的过程中,会很常见。


2、避免消息丢失

持久化:将MQ消息持久化;

confirm机制:消息发送端接受MQ的confirm通知;

通过持久化confirm机制,这样是不是就可以保障100%消息不丢失了呢?

comfirm机制其实是一个异步监听的机制,是为了保证系统的高吞吐量,这样就导致了还是不能够100%保障消息不丢失,因为即使加上了confirm机制,消息在MQ内存中还没有刷盘到磁盘就宕机了,还是没法处理。



3、辅助手段

生产者在投递消息之前,可以在本地数据库建一张消息表,先把消息持久化到Redis或DB中,这样就可以利用本地数据库的事务机制。事务提交成功后,将消息表中的消息转移到消息队列中。

confirm机制监听消息是否发送成功?如ack成功消息,删除DB中此消息或者修改消息表状态

如果nack不成功的消息,这个可以根据自身的业务选择是否重发此消息。也可以删除此消息,由自己的业务决定。

 

 

三、如何避免消息重复消费

1、重复消费的问题?

导致重复消费的原因可能出现在生产者,也可能出现在 MQ 消费者

这里说的重复消费问题是指同一个数据被执行了两次,不单单指 MQ 中一条消息被消费了两次,也可能是 MQ 中存在两条一模一样的消费

  • 生产者:生产者可能会重复推送一条数据到 MQ 中,为什么会出现这种情况呢?也许是一个 Controller 接口被重复调用了 2 次,没有做接口幂等性导致的;也可能是推送消息到 MQ 时响应比较慢,生产者的重试机制导致再次推送了一次消息。
  • MQ:在消费者消费完一条数据响应 ack 信号消费成功时,MQ 突然挂了,导致 MQ 以为消费者还未消费该条数据,MQ 恢复后再次推送了该条消息,导致了重复消费。
  • 消费者:消费者已经消费完了一条消息,正准备但是还未给 MQ 发送 ack 信号时,此时消费者挂了,服务重启后 MQ 以为消费者还没有消费该消息,再次推送了该条消息。

2、如何保证幂等性?


去重原则:使用业务端逻辑保持幂等性

去重策略:保证每条消息都有唯一编号(比如唯一流水号),且保证消息处理成功与去重表的日志同时出现。
建立一个消息表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。

消费者怎么解决重复消费问题呢?这里提供两种方法:

  1. 状态判断法:消费者消费数据后把消费数据记录在 redis 中,下次消费时先到 redis 中查看是否存在该消息,存在则表示消息已经消费过,直接丢弃消息。
  2. 业务判断法:通常数据消费后都需要插入到数据库中,使用数据库的唯一性约束防止重复消费。每次消费直接尝试插入数据,如果提示唯一性字段重复,则直接丢失消息。一般都是通过这个业务判断的方法就可以简单高效地避免消息的重复处理了。

 

 

四、如何解决消息积压

1、消息积压处理办法

首要考虑临时紧急扩容,比如:利用K8s弹性扩容,增加消费者数量;

先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。

新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。

然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。

接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。

等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的consumer 机器来消费消息。


2、MQ中消息失效

假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq里,而是大量的数据会直接搞丢。

我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如:

大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。

假设 1 万个订单积压在 mq 里面,没有处理,其中 1000个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。


3、MQ消息队列快满了

如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?

没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧 。

 

 

五、如何保证消息的顺序性

1、为什么要保证顺序 

消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。举例:

Producer先后发送了2条消息,一条insert,一条update,分别分配到2台Queue中,消费者组中的两台机器分别处理这两个Queue中的消息,这时候顺序是无法保证的; 


2、出现顺序错乱的场景 

(1)RabbtiMQ

  • a、一个queue,有多个consumer去消费,这样就会造成顺序的错误。consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
  • b、一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。 

(2)RocketMQ

  • a、RocketMQ一个topic,一个queue,一个consumer,但是consumer内部进行多线程消费,这样数据也会出现顺序错乱问题。
  • b、具有顺序的数据写入到了不同的partition里面,不同的消费者去消费,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。

3、保证消息的消费顺序 

(1)RabbtiMQ

  • a、拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。
  • b、或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理。

(2)RocketMQ/Kafka

RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。 之所以出现你这个场景看起来不是顺序的,是因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不通的queue(分区)。

  • a、分区顺序:拆分多个queue,然后N个线程分别消费一个queue。
  • b、全局顺序:确保同一个消息发送到同一个partition,一个topic,一个分区(queue),一个consumer,内部单线程消费。 

 

 

六、延时消费

启动一个cron定时任务,每隔一段时间执行一次,比如30分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页update,这将是一个for循环更新操作。

cron方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:

  • 当数据量大的时候轮询效率低;
  • 时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到1小时;
  • 如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;

(1)RabbtiMQ

RabbitMQ延迟队列的核心思路是:TTL消息/队列 + DLX死信队列

TTL是Time To Live的缩写,也就是生存时间的意思;

DLX死信队列虽然叫队列,但其实指的是Exchange,或者说指的Exchange和它所属的Queue,他俩一块构成了死信队列。

(2)RocketMQ

RocketMQ延迟队列的核心思路是:所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。

注意:RocketMQ不支持任意时间的延时,只支持以下几个固定的延时等级
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

 

 

七、回溯消费 

回溯消费是指Consumer已经消费成功的消息,由于业务上的需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度。

例如:由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。

RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

感觉从生产端回溯更加合理,需要生产端提供可回溯的业务机制。


推荐阅读
  • 深入探讨CPU虚拟化与KVM内存管理
    本文详细介绍了现代服务器架构中的CPU虚拟化技术,包括SMP、NUMA和MPP三种多处理器结构,并深入探讨了KVM的内存虚拟化机制。通过对比不同架构的特点和应用场景,帮助读者理解如何选择最适合的架构以优化性能。 ... [详细]
  • 深入解析 Apache Shiro 安全框架架构
    本文详细介绍了 Apache Shiro,一个强大且灵活的开源安全框架。Shiro 专注于简化身份验证、授权、会话管理和加密等复杂的安全操作,使开发者能够更轻松地保护应用程序。其核心目标是提供易于使用和理解的API,同时确保高度的安全性和灵活性。 ... [详细]
  • 探讨如何真正掌握Java EE,包括所需技能、工具和实践经验。资深软件教学总监李刚分享了对毕业生简历中常见问题的看法,并提供了详尽的标准。 ... [详细]
  • PostgreSQL 10 离线安装指南
    本文详细介绍了如何在无法联网的服务器上进行 PostgreSQL 10 的离线安装,并涵盖了从下载安装包到配置远程访问的完整步骤。 ... [详细]
  • 优化局域网SSH连接延迟问题的解决方案
    本文介绍了解决局域网内SSH连接到服务器时出现长时间等待问题的方法。通过调整配置和优化网络设置,可以显著缩短SSH连接的时间。 ... [详细]
  • 深入探讨智能布线管理系统的电子配线架应用
    本文详细介绍了电子配线架智能布线系统的核心优势,包括实时监测网络连接、提高操作准确性、图形化显示连接架构、自动识别网络拓扑、增强安全性等功能。该系统不仅提升了网络管理的效率和准确性,还为资产管理、报告生成以及与其他智能系统的集成提供了强大的支持。 ... [详细]
  • 探讨架构师在项目中应如何平衡对产品的关注和对团队成员的关注,以实现最佳的开发成果。 ... [详细]
  • 本文深入探讨了 Redis 的两种持久化方式——RDB 快照和 AOF 日志。详细介绍了它们的工作原理、配置方法以及各自的优缺点,帮助读者根据具体需求选择合适的持久化方案。 ... [详细]
  • 通过Web界面管理Linux日志的解决方案
    本指南介绍了一种利用rsyslog、MariaDB和LogAnalyzer搭建集中式日志管理平台的方法,使用户可以通过Web界面查看和分析Linux系统的日志记录。此方案不仅适用于服务器环境,还提供了详细的步骤来确保系统的稳定性和安全性。 ... [详细]
  • HBase运维工具全解析
    本文深入探讨了HBase常用的运维工具,详细介绍了每种工具的功能、使用场景及操作示例。对于HBase的开发人员和运维工程师来说,这些工具是日常管理和故障排查的重要手段。 ... [详细]
  • FinOps 与 Serverless 的结合:破解云成本难题
    本文探讨了如何通过 FinOps 实践优化 Serverless 应用的成本管理,提出了首个 Serverless 函数总成本估计模型,并分享了多种有效的成本优化策略。 ... [详细]
  • 本文探讨了MariaDB在当前数据库市场中的地位和挑战,分析其可能面临的困境,并提出了对未来发展的几点看法。 ... [详细]
  • 本文探讨了如何在 PHP 的 Eloquent ORM 中实现数据表之间的关联查询,并通过具体示例详细解释了如何将关联数据嵌入到查询结果中。这不仅提高了数据查询的效率,还简化了代码逻辑。 ... [详细]
  • 随着网络安全威胁的不断演变,电子邮件系统成为攻击者频繁利用的目标。本文详细探讨了电子邮件系统中的常见漏洞及其潜在风险,并提供了专业的防护建议。 ... [详细]
  • 本文探讨了在Windows Server 2008环境下配置Tomcat使用80端口时遇到的问题,包括端口被占用、多项目访问失败等,并提供详细的解决方法和配置建议。 ... [详细]
author-avatar
夕阳隐日
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有