热门标签 | HotTags
当前位置:  开发笔记 > 后端 > 正文

RabbitMQHelloWorld

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:publisher:消息发布者,将消息发送到队列queuequeue:消息队列,负责接受并缓存消息co

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:



  • publisher:消息发布者,将消息发送到队列queue

  • queue:消息队列,负责接受并缓存消息

  • consumer:订阅队列,处理队列中的消息


工程搭建

创建父工程(依赖版本管理、添加子工程共用依赖)

添加依赖(spring-boot为父级)

<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.3.9.RELEASEversion>
<relativePath/>
parent>
<dependencies>

<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
dependency>
dependencies>

View Code


创建子工程(publisher、consumer)


publisher(消息发送)

public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.223.128");
factory.setPort(
5672);
factory.setVirtualHost(
"/");
factory.setUsername(
"guest");
factory.setPassword(
"guest");
// 1.2.建立连接
Connection cOnnection= factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName,
false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish(
"", queueName, null, message.getBytes());
System.out.println(
"发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}

consumer(消息获取)

public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.223.128");
factory.setPort(
5672);
factory.setVirtualHost(
"/");
factory.setUsername(
"guest");
factory.setPassword(
"guest");
// 1.2.建立连接
Connection cOnnection= factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName,
false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println(
"接收到消息:【" + message + "】");
}
});
System.out.println(
"等待接收消息。。。。");
}
}

基本消息队列的消息发送流程:



  • 建立connection

  • 创建channel

  • 利用channel声明队列

  • 利用channel向队列发送消息

基本消息队列的消息接收流程:



  • 建立connection

  • 创建channel

  • 利用channel声明队列

  • 定义consumer的消费行为handleDelivery()

  • 利用channel将消费者与队列绑定

 



推荐阅读
author-avatar
此号我已不再用
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有