作者:天宣建欣振萍 | 来源:互联网 | 2023-08-21 19:18
本文开始学习rabbitmq内置的集中exchanges类型;基本翻译http:www.rabbitmq.comgetstarted.html第一种,也是最简单的一种方式:只有
本文开始学习rabbitmq内置的集中exchanges类型;基本翻译http://www.rabbitmq.com/getstarted.html
第一种,也是最简单的一种方式:只有一个生产者一个消费者;
rabbitmq的队列基本上是无界的,生产者不停的生产消息放入队列,消费者阻塞式的获取消息,获得后去处理(最好是启动新的线程去处理,以免任务执行比较耗时,影响消息及时的读取)
生产者:发布消息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");//此处可以设置多个属性,包括IP地址以及认证账号等信息
Connection cOnnection= factory.newConnection(); //Connection代表了一个socket链接,具有协议协商和认证的功能;
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//声明一个队列(如果不存在则创建,如果存在且属性设置一样则不变,如果属性设置不同,则报错
//队列名称+是否需要持久化(服务重启后可恢复)+是否是排他性的(只属于本connection)+是否自动删除(不再使用时服务器自动删除)+其他属性设置
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// exchange名称+路由关键字(exchange名字为""时代表队列名称)+消息的其他设置信息+消息内容字节数组
System.out.println(" [x] Sent '" + message + "'");
//关闭链接,如果不关闭,connection会以配置的时间间隔心跳保持链接的可用性(如果空闲时间过长,有的系统会强制关闭链接)
channel.close(); connection.close();
消费者:取消息
如生产者一样创建链接和通道;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection cOnnection= factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null)QueueingConsumer cOnsumer= new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();//会阻塞,直到有消息返回
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}