这里官方使用的Pom是4.0.2版本
12 3 7com.rabbitmq 4amqp-client 54.0.2 68 12org.slf4j 9slf4j-api 101.7.10 1113 17org.slf4j 14slf4j-log4j12 151.7.5 1618 22log4j 19log4j 201.2.17 2123 27junit 24junit 254.11 26
简单队列 hello word
P:消息的生产者
C:消息的消费者
红色:队列
生产者将消息发送到队列,消费者从队列中获取消息。
那么我们根据以上的模型,咱们抽取出 3 个对象 生产者(用户发送消息) 队列(中间件):类似于容器(存储消息) 消费者(获取队列中的消息)
JAVA 操作 获取 MQ 连接
类似于我们在操作数据库的时候的要获取到连接然后才对数据进行
1 package cn.wh.util; 2 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 import java.io.IOException; 7 import java.util.concurrent.TimeoutException; 8 9 public class RabbitMqConnectionUtil { 10 /** 11 * 获取mq的连接 12 * @return 13 */ 14 public static Connection getConnection() throws IOException, TimeoutException { 15 //定义一个连接工厂 16 ConnectionFactory factory=new ConnectionFactory(); 17 //设置服务器的地址 18 factory.setHost("192.168.152.5"); 19 //AMQP 5672 20 factory.setPort(5672); 21 //设置哪一个数据库 vhost 22 factory.setVirtualHost("/vhost_wh"); 23 //设置用户名 24 factory.setUsername("wh"); 25 factory.setPassword("123"); 26 27 return factory.newConnection(); 28 } 29 }
生产者发送数据到消息队列
1 package cn.wh.simple; 2 3 import cn.wh.util.RabbitMqConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class Send { 11 private static final String QUEVE_NAME = "test_simple_queue"; 12 13 public static void main(String[] args) throws IOException { 14 //获取一个连接 15 Connection cOnnection= null; 16 try { 17 18 cOnnection= RabbitMqConnectionUtil.getConnection(); 19 } catch (IOException e) { 20 e.printStackTrace(); 21 } catch (TimeoutException e) { 22 e.printStackTrace(); 23 } 24 25 //创建一个通道 26 Channel channel = connection.createChannel(); 27 // 创建队列声明 28 channel.queueDeclare(QUEVE_NAME, false, false, false, null); 29 30 //发送的消息 31 String msg="hello simple"; 32 channel.basicPublish("",QUEVE_NAME,null,msg.getBytes()); 33 System.out.println("发送成功==============="); 34 try { 35 channel.close(); 36 connection.close(); 37 } catch (TimeoutException e) { 38 e.printStackTrace(); 39 } 40 } 41 }
查看消消费者消费消费者消费消费者消费消费者消消费者消费消费者消费
1 package cn.wh.simple; 2 3 import cn.wh.util.RabbitMqConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.QueueingConsumer; 7 8 import java.io.IOException; 9 import java.util.concurrent.TimeoutException; 10 11 public class Accept { 12 private static final java.lang.String QUEVE_NAME = "test_simple_queue"; 13 public static void main(String[] args) throws IOException, InterruptedException { 14 15 //获取一个连接 16 Connection cOnnection=null; 17 { 18 try { 19 cOnnection= RabbitMqConnectionUtil.getConnection(); 20 } catch (IOException e) { 21 e.printStackTrace(); 22 } catch (TimeoutException e) { 23 e.printStackTrace(); 24 } 25 //定义管道 26 Channel channel = connection.createChannel(); 27 //定义队列的消费者 28 QueueingConsumer queueingCOnsumer= new QueueingConsumer(channel); 29 channel.basicConsume(QUEVE_NAME,true,queueingConsumer); 30 while (true){ 31 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); 32 String msg = new String(delivery.getBody()); 33 34 35 System.out.println("msg"+ msg); 36 } 37 } 38 } 39 }
简单队列的不足
耦合性高 生产消费一一对应(如果有多个消费者想都消费这个消息,就不行了) 队列名称变更时需要同时更改