热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ、RabbitMQ+SpringBoot笔记整理

RabbitMQ、RabbitMQ+SpringBoot笔记整理,Go语言社区,Golang程序员人脉社


参考

官网
轻松搞定RabbitMQ
RabbitMQ的应用场景以及基本原理介绍
springboot+rabbitmq整合示例程

简介

MQMessage Queue,消息队列)是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。是通过读写出入队列的消息来通信(RPC则是通过直接调用彼此来通信的)。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

AMQP协议,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
在这里插入图片描述

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息的载体,每个消息都会被投到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
Producer:消息生产者,就是投递消息的程序。
Consumer:消息消费者,就是接受消息的程序。
Channel:消息通道,在客户端的每个连接里,可建立多个channel。

在这里插入图片描述

启动服务

需要预先安装erlang
这里写图片描述

http://localhost:15672
这里写图片描述

概念记录

生产者和消费者都能可以创建队列,在关注队列的时候需要有一个明确的可监听队列。
生产者和消费者关注的都是队列,都是在队列上定义的操作。
循环分发
消息确认
消息持久化
公平分发
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue.
MQ消息模型中的核心思想是生产者从不直接发送消息到队列中。

Java 实例

源码:https://download.csdn.net/download/peng_hong_fu/10423174

1.HelloWorld

这里写图片描述

Producer sends messages to the “hello” queue. The consumer receives messages from that queue.

<dependency> <groupId>com.rabbitmqgroupId> <artifactId>amqp-clientartifactId> <version>3.6.5version> dependency>

Producer.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 生产者 * Created by PengHongfu 2018-05-14 11:24 */ public class Producer { private final static String QUEUE_NAME = "test_queue";//队列名称 public static void main(String args[]) throws IOException, TimeoutException { // 1.创建连接连接到RabbitMQ ConnectionFactory factory = new ConnectionFactory(); // 2.设置地址、端口、账号、密码 factory.setHost("localhost"); // 3.获取连接 Connection conn = factory.newConnection(); // 4.获取通道 Channel channel = conn.createChannel(); // 5.创建队列 1-队列名称 2-队列是否持久化 3-队列是否是独占 4-使用完之后是否删除此队列 5-其他属性 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 6.发送消息 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [Producer] Sent '" + message + "'"); // 7.关闭资源 channel.close(); conn.close(); } }

Consumer.java

import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 消费者 * Created by PengHongfu 2018-05-14 11:50 */ public class Consumer { private final static String QUEUE_NAME = "helloWorld_queue";//队列名称 public static void main(String args[]) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [消费者] Waiting for messages. To exit press CTRL+C"); //消费消息 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [消费者] Received '" + message + "'"); } }; /* * 监听队列 * 参数1:队列名称 * 参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。 可以通过channel.basicAck手动回复ack * 参数3:消费者 */ channel.basicConsume(QUEUE_NAME, true, consumer); } }

依次启动消费者和生产者
这里写图片描述
这里写图片描述
这里写图片描述

2.Work Queues

工作队列

这里写图片描述
Producer.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * work模式 * Created by PengHongfu 2018-05-14 14:27 */ public class Producer { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 10; i++) { // 消息内容 String message = "" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [生产者] Sent '" + message + "'"); //发送的消息间隔越来越长 Thread.sleep(i * 10); } channel.close(); connection.close(); } }

消费者处理消息时,利用休眠时间长短来模拟工作任务的轻重

Consumer1.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * 消费者1 * Created by PengHongfu 2018-05-14 14:29 */ public class Consumer1 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一时刻服务器只会发一条消息给消费者(能者多劳模式),空闲多的消费者,消费更多的消息 //channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [消费者1] Received '" + message + "'"); //休眠 Thread.sleep(10); // 手动返回ack包确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //channel.basicReject(); channel.basicNack(); //可以通过这两个函数拒绝消息,可以指定消息在服务器删除还是继续投递给其他消费者 } } }

Consumer2.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * 消费者2 * Created by PengHongfu 2018-05-14 14:29 */ public class Consumer2 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一时刻服务器只会发一条消息给消费者(能者多劳模式),空闲多的消费者,消费更多的消息 //channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [消费者2] Received '" + message + "'"); // 休眠1秒 Thread.sleep(1000); //反馈消息的消费状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }

依次启动Consumer1,Consumer2,Producer,结果如下:
这里写图片描述
这里写图片描述

平均每个消费者获得相同数量的消息。这是由于分发消息机制—Round-Robin(轮询),合理的方案是能者多劳,休眠多的消费者,工作任务重的消费者少分发消息任务。下面的代码正是处理这个问题的,使得分配公正。

channel.basicQos(1);

每个消费者在处理完一个消息之后,服务器才会发一条消息给消费者,这使得空闲的消费者,会分配到更多的消息。

这里写图片描述

这里写图片描述

3.Publish/Subscribe

发布/订阅
we'll deliver a message to multiple consumers.

这里写图片描述

There are a few exchange types available: direct, topic, headers and fanout

Direct Exchange:直接匹配,通过Exchange名称+RountingKey来发送与接收消息.
Fanout Exchange:广播订阅,向所有的消费者发布消息,但是只有消费者将队列绑定到该Exchange路由器才能收到消息,忽略Routing Key.
Topic Exchange:主题匹配订阅,这里的主题指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.来分隔多个词,只有消息这将队列绑定到该路由器且指定RoutingKey符合匹配规则时才能收到消息;
Headers Exchange:消息头订阅,消息发布前,为消息定义一个或多个键值对的消息头,然后消费者接收消息同时需要定义类似的键值对请求头:(如:x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey.
默认的exchange:如果用空字符串去声明一个exchange,那么系统就会使用”amq.direct”这个exchange,我们创建一个queue时,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去

channel.BasicPublish("", "TaskQueue", properties, bytes);

这里写图片描述

Producer.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 订阅模式 * Created by PengHongfu 2018-05-14 16:27 */ public class Producer { //交换机的名称 private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /* * 声明exchange(交换机) * 参数1:交换机名称 * 参数2:交换机类型 */ channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); for (int i = 0; i < 5; i++) { String message = "订阅消息-"+i; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [生产者] Sent '" + message + "'"); Thread.sleep(100); } channel.close(); connection.close(); } }

Consumer1.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import java.io.File; import java.io.FileOutputStream; import java.text.SimpleDateFormat; import java.util.Date; /** * Created by PengHongfu 2018-05-14 16:29 */ public class Consumer1 { private final static String QUEUE_NAME = "test_queue_exchange_1";//可以是任意的队列名 private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); /* * 绑定队列到交换机(这个交换机的名称一定要和上面的生产者交换机名称相同) * 参数1:队列的名称 * 参数2:交换机的名称 * 参数3:Routing Key * */ channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,自动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); System.out.println(" [消费者1] Waiting for messages. To exit press CTRL+C"); // 获取消息 while (true) { String dir = Consumer1.class.getClassLoader().getResource("").getPath(); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String fileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, fileName + ".log"); FileOutputStream outputStream = new FileOutputStream(file, true); outputStream.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())) + "-" + message + "rn").getBytes()); outputStream.flush(); outputStream.close(); } } }

Consumer2.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * Created by PengHongfu 2018-05-14 16:30 */ public class Consumer2 { private final static String QUEUE_NAME = "test_queue_exchange_2"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); c

推荐阅读
  • 本文详细介绍了Java代码分层的基本概念和常见分层模式,特别是MVC模式。同时探讨了不同项目需求下的分层策略,帮助读者更好地理解和应用Java分层思想。 ... [详细]
  • Java高并发与多线程(二):线程的实现方式详解
    本文将深入探讨Java中线程的三种主要实现方式,包括继承Thread类、实现Runnable接口和实现Callable接口,并分析它们之间的异同及其应用场景。 ... [详细]
  • B站服务器故障影响豆瓣评分?别担心,阿里巴巴架构师分享预防策略与技术方案
    13日晚上,在视频观看高峰时段,B站出现了服务器故障,引发网友在各大平台上的广泛吐槽。这一事件导致了连锁反应,大量用户纷纷涌入A站、豆瓣和晋江等平台,给这些网站带来了突如其来的流量压力。为了防止类似问题的发生,阿里巴巴架构师分享了一系列预防策略和技术方案,包括负载均衡、弹性伸缩和容灾备份等措施,以确保系统的稳定性和可靠性。 ... [详细]
  • Web开发框架概览:Java与JavaScript技术及框架综述
    Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
  • 如何利用Java 5 Executor框架高效构建和管理线程池
    Java 5 引入了 Executor 框架,为开发人员提供了一种高效管理和构建线程池的方法。该框架通过将任务提交与任务执行分离,简化了多线程编程的复杂性。利用 Executor 框架,开发人员可以更灵活地控制线程的创建、分配和管理,从而提高服务器端应用的性能和响应能力。此外,该框架还提供了多种线程池实现,如固定线程池、缓存线程池和单线程池,以适应不同的应用场景和需求。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 一文了解消息中间件RabbitMQ
    消息中间件---RabbitMQ1消息中间件的作用2.常用的消息中间件3消息中间件RabbitMQ3.1RabbitMQ介绍3.3RabbitMQ的队列模式3.3RabbitMQ的 ... [详细]
  • 启动activemq_「Java」SpringBoot amp; ActiveMQ
    一、消息队列消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构, ... [详细]
  • Java自学知乎!阿里高级算法专家公开10份资料,涨姿势!
    接口概述:接口是Java语言中的一种引用类型,是方法的集合,所以接口的内部主要就是定义方法,包含常量,抽象方法(JDK ... [详细]
  • C++ 异步编程中获取线程执行结果的方法与技巧及其在前端开发中的应用探讨
    本文探讨了C++异步编程中获取线程执行结果的方法与技巧,并深入分析了这些技术在前端开发中的应用。通过对比不同的异步编程模型,本文详细介绍了如何高效地处理多线程任务,确保程序的稳定性和性能。同时,文章还结合实际案例,展示了这些方法在前端异步编程中的具体实现和优化策略。 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
  • 本文详细介绍了如何安全地手动卸载Exchange Server 2003,以确保系统的稳定性和数据的完整性。根据微软官方支持文档(https://support.microsoft.com/kb833396/zh-cn),在进行卸载操作前,需要特别注意备份重要数据,并遵循一系列严格的步骤,以避免对现有网络环境造成不利影响。此外,文章还提供了详细的故障排除指南,帮助管理员在遇到问题时能够迅速解决,确保整个卸载过程顺利进行。 ... [详细]
  • 可查看rabbitmq官方集群方案架构图Thisguidecoversmirroring(queuecontentsreplication)ofclassicqueues--摘自 ... [详细]
  • RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP,SMTP,STOMP,也 ... [详细]
author-avatar
一克拉白勺的眼泪
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有