热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ、RabbitMQ+SpringBoot笔记整理

RabbitMQ、RabbitMQ+SpringBoot笔记整理,Go语言社区,Golang程序员人脉社


参考

官网
轻松搞定RabbitMQ
RabbitMQ的应用场景以及基本原理介绍
springboot+rabbitmq整合示例程

简介

MQMessage Queue,消息队列)是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。是通过读写出入队列的消息来通信(RPC则是通过直接调用彼此来通信的)。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

AMQP协议,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
在这里插入图片描述

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息的载体,每个消息都会被投到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
Producer:消息生产者,就是投递消息的程序。
Consumer:消息消费者,就是接受消息的程序。
Channel:消息通道,在客户端的每个连接里,可建立多个channel。

在这里插入图片描述

启动服务

需要预先安装erlang
这里写图片描述

http://localhost:15672
这里写图片描述

概念记录

生产者和消费者都能可以创建队列,在关注队列的时候需要有一个明确的可监听队列。
生产者和消费者关注的都是队列,都是在队列上定义的操作。
循环分发
消息确认
消息持久化
公平分发
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue.
MQ消息模型中的核心思想是生产者从不直接发送消息到队列中。

Java 实例

源码:https://download.csdn.net/download/peng_hong_fu/10423174

1.HelloWorld

这里写图片描述

Producer sends messages to the “hello” queue. The consumer receives messages from that queue.

<dependency> <groupId>com.rabbitmqgroupId> <artifactId>amqp-clientartifactId> <version>3.6.5version> dependency>

Producer.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 生产者 * Created by PengHongfu 2018-05-14 11:24 */ public class Producer { private final static String QUEUE_NAME = "test_queue";//队列名称 public static void main(String args[]) throws IOException, TimeoutException { // 1.创建连接连接到RabbitMQ ConnectionFactory factory = new ConnectionFactory(); // 2.设置地址、端口、账号、密码 factory.setHost("localhost"); // 3.获取连接 Connection conn = factory.newConnection(); // 4.获取通道 Channel channel = conn.createChannel(); // 5.创建队列 1-队列名称 2-队列是否持久化 3-队列是否是独占 4-使用完之后是否删除此队列 5-其他属性 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 6.发送消息 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [Producer] Sent '" + message + "'"); // 7.关闭资源 channel.close(); conn.close(); } }

Consumer.java

import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 消费者 * Created by PengHongfu 2018-05-14 11:50 */ public class Consumer { private final static String QUEUE_NAME = "helloWorld_queue";//队列名称 public static void main(String args[]) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [消费者] Waiting for messages. To exit press CTRL+C"); //消费消息 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [消费者] Received '" + message + "'"); } }; /* * 监听队列 * 参数1:队列名称 * 参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。 可以通过channel.basicAck手动回复ack * 参数3:消费者 */ channel.basicConsume(QUEUE_NAME, true, consumer); } }

依次启动消费者和生产者
这里写图片描述
这里写图片描述
这里写图片描述

2.Work Queues

工作队列

这里写图片描述
Producer.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * work模式 * Created by PengHongfu 2018-05-14 14:27 */ public class Producer { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 10; i++) { // 消息内容 String message = "" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [生产者] Sent '" + message + "'"); //发送的消息间隔越来越长 Thread.sleep(i * 10); } channel.close(); connection.close(); } }

消费者处理消息时,利用休眠时间长短来模拟工作任务的轻重

Consumer1.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * 消费者1 * Created by PengHongfu 2018-05-14 14:29 */ public class Consumer1 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一时刻服务器只会发一条消息给消费者(能者多劳模式),空闲多的消费者,消费更多的消息 //channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [消费者1] Received '" + message + "'"); //休眠 Thread.sleep(10); // 手动返回ack包确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //channel.basicReject(); channel.basicNack(); //可以通过这两个函数拒绝消息,可以指定消息在服务器删除还是继续投递给其他消费者 } } }

Consumer2.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * 消费者2 * Created by PengHongfu 2018-05-14 14:29 */ public class Consumer2 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一时刻服务器只会发一条消息给消费者(能者多劳模式),空闲多的消费者,消费更多的消息 //channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [消费者2] Received '" + message + "'"); // 休眠1秒 Thread.sleep(1000); //反馈消息的消费状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }

依次启动Consumer1,Consumer2,Producer,结果如下:
这里写图片描述
这里写图片描述

平均每个消费者获得相同数量的消息。这是由于分发消息机制—Round-Robin(轮询),合理的方案是能者多劳,休眠多的消费者,工作任务重的消费者少分发消息任务。下面的代码正是处理这个问题的,使得分配公正。

channel.basicQos(1);

每个消费者在处理完一个消息之后,服务器才会发一条消息给消费者,这使得空闲的消费者,会分配到更多的消息。

这里写图片描述

这里写图片描述

3.Publish/Subscribe

发布/订阅
we'll deliver a message to multiple consumers.

这里写图片描述

There are a few exchange types available: direct, topic, headers and fanout

Direct Exchange:直接匹配,通过Exchange名称+RountingKey来发送与接收消息.
Fanout Exchange:广播订阅,向所有的消费者发布消息,但是只有消费者将队列绑定到该Exchange路由器才能收到消息,忽略Routing Key.
Topic Exchange:主题匹配订阅,这里的主题指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.来分隔多个词,只有消息这将队列绑定到该路由器且指定RoutingKey符合匹配规则时才能收到消息;
Headers Exchange:消息头订阅,消息发布前,为消息定义一个或多个键值对的消息头,然后消费者接收消息同时需要定义类似的键值对请求头:(如:x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey.
默认的exchange:如果用空字符串去声明一个exchange,那么系统就会使用”amq.direct”这个exchange,我们创建一个queue时,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去

channel.BasicPublish("", "TaskQueue", properties, bytes);

这里写图片描述

Producer.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 订阅模式 * Created by PengHongfu 2018-05-14 16:27 */ public class Producer { //交换机的名称 private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /* * 声明exchange(交换机) * 参数1:交换机名称 * 参数2:交换机类型 */ channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); for (int i = 0; i < 5; i++) { String message = "订阅消息-"+i; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [生产者] Sent '" + message + "'"); Thread.sleep(100); } channel.close(); connection.close(); } }

Consumer1.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import java.io.File; import java.io.FileOutputStream; import java.text.SimpleDateFormat; import java.util.Date; /** * Created by PengHongfu 2018-05-14 16:29 */ public class Consumer1 { private final static String QUEUE_NAME = "test_queue_exchange_1";//可以是任意的队列名 private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); /* * 绑定队列到交换机(这个交换机的名称一定要和上面的生产者交换机名称相同) * 参数1:队列的名称 * 参数2:交换机的名称 * 参数3:Routing Key * */ channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,自动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); System.out.println(" [消费者1] Waiting for messages. To exit press CTRL+C"); // 获取消息 while (true) { String dir = Consumer1.class.getClassLoader().getResource("").getPath(); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String fileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, fileName + ".log"); FileOutputStream outputStream = new FileOutputStream(file, true); outputStream.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())) + "-" + message + "rn").getBytes()); outputStream.flush(); outputStream.close(); } } }

Consumer2.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * Created by PengHongfu 2018-05-14 16:30 */ public class Consumer2 { private final static String QUEUE_NAME = "test_queue_exchange_2"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); c

推荐阅读
  • CentOs 7.3中搭建RabbitMQ 3.6单机多实例服务的步骤与使用
    CentOs7.3中搭建RabbitMQ3.6单机多实例服务的步骤与使用-RabbitMQ简介RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户 ... [详细]
  • elixirerlang通过AyandaDubeFIPS(联邦信息处理标准)[1]是由NIST(美国国家标准技术研究院)定义 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • rust编程这篇文章是关于我通过解决Twitch上尚未解决的所有CtCI问题来学习Rust的经验。英国科学博物馆集团AdaLovelace的肖像Rust徽标,由Moz ... [详细]
  • 分布式消息_58分布式消息队列WMB设计与实践
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了58分布式消息队列WMB设计与实践相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • Mono为何能跨平台
    概念JIT编译(JITcompilation),运行时需要代码时,将Microsoft中间语言(MSIL)转换为机器码的编译。CLR(CommonLa ... [详细]
  • 一面自我介绍对象相等的判断,equals方法实现。可以简单描述挫折,并说明自己如何克服,最终有哪些收获。职业规划表明自己决心,首先自己不准备继续求学了,必须招工作了。希望去哪 ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • java布尔字段用is前缀_POJO类中布尔类型的变量都不要加is前缀详解
    前言对应阿里巴巴开发手册第一章的命名风格的第八条。【强制】POJO类中布尔类型的变量都不要加is前缀,否则部分框架解析会引起序列化错误。反例:定义为基本 ... [详细]
  • Java开发面试问题,2021网易Java高级面试题及答案,实战案例
    前言大厂面试真题向来都是各大求职者的最佳练兵场,而今天小编带来的便是“HUAWEI”面经!这是一次真实的面试经历,虽然不是我自己亲身经历 ... [详细]
  • 阿里首席架构师科普RPC框架
    RPC概念及分类RPC全称为RemoteProcedureCall,翻译过来为“远程过程调用”。目前,主流的平台中都支持各种远程调用技术,以满足分布式系统架构中不同的系统之间的远程 ... [详细]
  • 招聘 | 涂鸦智能招聘IoT安全人才
    招聘 | 涂鸦智能招聘IoT安全人才 ... [详细]
  • Linux(阿里云服务器)安装RabbitMQ
    目录安装前的说明安装Socat安装Erlang安装RabbitMQ配置阿里云开放端口启动访问添加用户安装前的说明安装RabbitMQ前需要先安装Socat、Er ... [详细]
  • 随着分布式系统的规模和复杂度提高,往往会出现如下问题:(1)系统间同步通信,客户端发出调用后,必 ... [详细]
author-avatar
一克拉白勺的眼泪
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有