RabbitMQ3.7快速入门
按照官方教程(http://www.rabbitmq.com/getstarted.html)测试hello world:
搭建环境
1)java client
生产者和消费者都属于客户端,rabbitMQ的java客户端如下:
我们先用 rabbitMQ官方提供的java client测试,目的是对RabbitMQ的交互过程有个清晰的认识。
参考 :https://github.com/rabbitmq/rabbitmq-java-client/
2)创建maven工程
创建生产者工程和消费者工程,分别加入RabbitMQ java client的依赖。
test-rabbitmq-producer:生产者工程
test-rabbitmq-consumer:消费者工程
com.rabbitmqamqp-client4.0.3
org.springframework.bootspring-boot-starter-logging
生产者
生产者操作流程如下:
1)创建连接
2)创建通道
3)声明队列
4)发送消息
在生产者工程下的test包中创建测试类如下:
public class Producer01 {//队列名称private static final String QUEUE = "helloworld";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = null;Channel channel = null;try{ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建与RabbitMQ服务的TCP连接connection = factory.newConnection();//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务channel = connection.createChannel();/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:是否持久化* param3:队列是否独占此连接* param4:队列不再使用时是否自动删除此队列* param5:队列参数*/channel.queueDeclare(QUEUE, true, false, false, null);String message = "helloworld小明"+System.currentTimeMillis();/*** 消息发布方法* param1:Exchange的名称,如果没有指定,则使用Default Exchange* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列* param3:消息包含的属性* param4:消息体*//*** 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定* 默认的交换机,routingKey等于队列名称*/channel.basicPublish("", QUEUE, null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}catch(Exception ex){ex.printStackTrace();}finally{if(channel != null){channel.close();}if(connection != null){connection.close();}}}
}
消费者
消费者操作流程如下:
1)创建连接
2)创建通道
3)声明队列
4)监听队列
5)接收消息
6)ack回复
在消费者工程下的test包中创建测试类如下:
public class Consumer01 {private static final String QUEUE = "helloworld";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();//设置MabbitMQ所在服务器的ip和端口factory.setHost("127.0.0.1");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE, true, false, false, null);//定义消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();//消息idlong deliveryTag = envelope.getDeliveryTag();//消息内容String msg = new String(body,"utf-8");System.out.println("receive message.." + msg);}};/*** 监听队列String queue, boolean autoAck,Consumer callback* 参数明细* 1、队列名称* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复* 3、消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE, true, consumer);}
}