Rabbitmq priority 优先级
顾名思义,具有更高优先级的队列具有较高的优先权,优先级高的消息具备优先被消费的特权。
RabbitMQ has priority queue implementation in the core as of version 3.5.0.
在系统应用中会根据业务的优先级来决定哪些内容优先被解决,那么在RabbitMQ 3.5+版本中支持了队列优先级和消息优先级,那么这里就尝试下该功能。
在rabbitmq webUI界面中,add queue是通过添加Arguments或者是在代码中添加效果是一样的,取值范围在0~255之间:
The message priority field is defined as an unsigned byte, so in practice priorities should be between 0 and 255.
具体代码如下:
生产者:
public class NewTask {private final static String hostname &#61; "192.168.1.137";private final static String QUEUE_NAME &#61; "workqueue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory &#61; new ConnectionFactory();factory.setHost(hostname);Connection connection &#61; factory.newConnection();Channel channel &#61; connection.createChannel();try {Map param &#61; new HashMap();param.put("x-max-priority", 10);channel.queueDeclare(QUEUE_NAME, false, false, true, param);for(int i&#61;0;i<10;i&#43;&#43;) {AMQP.BasicProperties.Builder builder &#61; new AMQP.BasicProperties.Builder();if(i&#61;&#61;9 || i&#61;&#61;3 || i&#61;&#61;5)builder.priority(5);AMQP.BasicProperties properties &#61; builder.build();channel.basicPublish("",QUEUE_NAME,properties,("messages-"&#43;i).getBytes());}}catch (Exception e){e.printStackTrace();}finally {channel.close();connection.close();}}
}
消费者&#xff1a;
public class Worker1 {private final static String hostname &#61; "192.168.1.137";private final static String QUEUE_NAME &#61; "workqueue";public static void main(String[] argv) throws java.io.IOException,java.lang.InterruptedException, TimeoutException {ConnectionFactory factory &#61; new ConnectionFactory();factory.setHost(hostname);Connection connection &#61; factory.newConnection();Channel channel &#61; connection.createChannel();QueueingConsumer consumer &#61; new QueueingConsumer(channel);channel.basicConsume(QUEUE_NAME, false, consumer);while (true) {QueueingConsumer.Delivery delivery &#61; consumer.nextDelivery();String msg &#61; new String(delivery.getBody());System.out.println(msg);channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}}}
代码中判断i&#61;&#61;3,i&#61;&#61;5,i&#61;&#61;9这三个消息的权重相等5且高于其他消息&#xff0c;理应提前被处理&#xff0c;代码运行结果为&#xff1a;
测试结果&#xff1a;
查看运行结果可以验证优先级队列的作用。
当然&#xff0c;在消费端速度大于生产端速度&#xff0c;且broker中没有消息堆积的话&#xff0c;对发送的消息设置优先级也没什么实际意义&#xff0c;因为发送端刚发送完一条消息就被消费端消费了&#xff0c;那么就相当于broker至多只有一条消息&#xff0c;那么对于单条消息来说优先级是没有什么意义的。
参考链接
1、https://www.rabbitmq.com/priority.html
2、http://zkread.com/article/1246857.html