作者:大佬銘銘銘銘銘 | 来源:互联网 | 2023-10-12 14:11
目录
基本概念
代码与实例
基本概念
首先有个关键:此处实验接收的数据类型为Order,这里要求发送和接收要一模一样。
包括包名和类名都要一模一样:
如下,consumerDemo
下面是productorDemo
这里,包名和类都一模一样否则接收端监听会失败!
在消费(接收订阅)端要配置一些数据:
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.acknowledge-mode=manual
这里指的是目前并发为5个,最大并发数为10个,监听确认为手动,也就是接收了数据,要给RabbitMQ给一个反馈信息
如下
这里有2个注解,是简单使用RabbitMQ的关键!
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order-queue", durable = "true"),exchange = @Exchange(name = "order-exchange", type = "topic"),key = "order.#"))@RabbitHandlerpublic void onOrderMessage(@Payload Order order,@Headers Map headers,Channel channel //手工签收需要rabbitMQ的通道) throws Exception{............
}
这里如果没有对应的交换机和队列,那么此处就会自动新建
代码与实例
发送端不停的发送消息!接收端如下:
发送端关键代码:
OderSender.java
package SpringBoot.demo.produce;import SpringBoot.demo.entity.Order;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class OderSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(Order order) throws Exception{CorrelationData correlationData = new CorrelationData();correlationData.setId(order.getMessageId());rabbitTemplate.convertAndSend("order-exchange", //exchange"order.abcd", //routingKeyorder, //消息体correlationData); //correlationData消息唯一ID}
}
DemoApplicationTests.java
package SpringBoot.demo;import SpringBoot.demo.entity.Order;
import SpringBoot.demo.produce.OderSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.UUID;@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {@Autowiredprivate OderSender oderSender;@Testpublic void contextLoads() {}@Testpublic void testSend1()throws Exception{Order order = new Order();order.setId("20180618000000000003");order.setName("测试订单3");order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());oderSender.send(order);}}
application.properties
server.port=8001
spring.rabbitmq.addresses=192.168.164.141:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000server.servlet.context-path=/
spring.http.encoding.charset=UTF-8
spring.jackson.data-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL
接收端关键如下:
OrderReceiver.java
package SpringBoot.demo.consumer;import SpringBoot.demo.entity.Order;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class OrderReceiver {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order-queue", durable = "true"),exchange = @Exchange(name = "order-exchange", type = "topic"),key = "order.#"))@RabbitHandlerpublic void onOrderMessage(@Payload Order order,@Headers Map headers,Channel channel //手工签收需要rabbitMQ的通道) throws Exception{//消费者操作System.out.println("--------------收到消息,开始消费--------------");System.out.println("订单ID:" + order.getId());//告诉RabbitMQ我已经签收了long deliveryTag = (long)headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false); //false为不支持批量签收}}
application.properties
server.port=8002
spring.rabbitmq.addresses=192.168.164.141:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000server.servlet.context-path=/
spring.http.encoding.charset=UTF-8
spring.jackson.data-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL#配置关于consumer相关的
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.acknowledge-mode=manual#限流,同一时间只有一条消息消费
spring.rabbitmq.listener.simple.prefetch=1
源码下载地址:
https://github.com/fengfanchen/Java/tree/master/ProductorDemo
https://github.com/fengfanchen/Java/tree/master/consumerDemo