热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

高性能RabbitMQ

1,什么是RabbitMqRabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和

1,什么是RabbitMq

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

百度百科 ,RabbitMQ 官网  AMQP 协议

2,几种MQ对比

RabbitMQ 是用Erlang 语言进行开发的,一款设计之初就是抗高并发的语言


3,RabbitMQ 安装

1.下载并安装erlang,下载地址:http://www.erlang.org/download
2.配置erlang环境变量信息
新增环境变量ERLANG_HOME
=erlang的安装地址
%ERLANG_HOME%\bin加入到path中
3.下载并安装RabbitMQ,下载地址:http://www.rabbitmq.com/download.html

注意: RabbitMQ 它依赖于Erlang,需要先安装Erlang。

  RabbitMQ 管理平台地址 http://127.0.0.1:15672

  默认账号:guest/guest 用户可以自己创建新的账号

 

 https://blog.csdn.net/qq_35098526/article/details/80009424 安装之后启动不了,可以在sbin 里面:

 输入:rabbitmq-plugins enable rabbitmq_management  (先定位到rabbitmq安装目录)命令,出现plugins安装成功的提示。

 

过程:

Microsoft Windows [Version 10.0.17134.950]
C:\Program Files\RabbitMQ Server>
C:\Program Files\RabbitMQ Server
>cd rabbitmq_server-3.7.8
C:\Program Files\RabbitMQ Server\rabbitmq_server
-3.7.8>cd sbin
C:\Program Files\RabbitMQ Server\rabbitmq_server
-3.7.8\sbin>rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node [email protected]
-2MDM24J:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to [email protected]
-2MDM24J...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
started
3 plugins.

 

 

 

4,RabbitMQ 五种队列形式

    1.点对点队列,也可以叫做简单队列

      生产者投递的消息,每次只准一个消费者来消费,如果消费者集群的话,消息会被均摊。

      例如:50 个消息,2个消费者,消费者1会消费奇数,消费者2会消费偶数,两个消费者不受影响,各自消费各自的消息

      producer:

public class Producer {
private static final String QUEUE_NAME = "rabbitmq_simple_queue_one";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection
= MQConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i <50; i++) {
String msg
= "Hello, World :" + i;
System.out.println(msg);
channel.basicPublish(
"", QUEUE_NAME, null, msg.getBytes());
}
channel.close();
connection.close();
}
}

consumer1:

public class Consumer {
private static final String QUEUE_NAME = "rabbitmq_simple_queue_one";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println(
"consumer1");
Connection connection
= MQConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer
= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString
= new String(body, "UTF-8");
System.out.println(
"消费者获取消息:" + msgString);
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME, true, consumer); //true 代表采用自动签收的应答模式
}
}

consumer2:

public class Consumer2 {
private static final String QUEUE_NAME = "rabbitmq_simple_queue_one";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println(
"consumer2");
Connection connection
= MQConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer
= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString
= new String(body, "UTF-8");
System.out.println(
"消费者获取消息:" + msgString);
try {
Thread.sleep(
3000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME, true, consumer); //true 代表采用自动签收的应答模式
}
}

  2,工作队列模式,也可以叫做公平队列模式

       点对点简单队列弊端:消费者集群的话,消息会被均摊处理,但是不同的消费者处理消息的能力是不同的,consumer1 每秒处理1个消息,consumer2 美妙处理3个消息,如果消息均摊,consumer1的效率则被浪费。

       公平消费模式:谁处理的快,并且采用手动签收,告知RabbitMQ之后,RabbitMQ 再给分发消息。这样,谁处理的快,谁就会处理的多。

       

producer:

public class Producer {
// 公平队列名称
private static final String QUEUE_NAME = "rabbitmq_fair_queue_one";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection
= MQConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 保证消费者只能取一个/每次
channel.basicQos(1); //每次只给消费者1条消息,等消费完成,手动ack 应答之后,再给下一条
for (int i = 0; i <50; i++) {
String msg
= "Hello, World: " + i;
System.out.println(msg);
channel.basicPublish(
"", QUEUE_NAME, null, msg.getBytes());
}
channel.close();
connection.close();
}
}

consumer1:

public class Consumer {
private static final String QUEUE_NAME = "rabbitmq_fair_queue_one";
public static void main(String[] args) throws IOException, TimeoutException {

System.out.println(
"consumer01");
Connection connection
= MQConnectionUtils.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(
1);
DefaultConsumer consumer
= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString
= new String(body, "UTF-8");
System.out.println(
"消费者获取消息:" + msgString);
try {
Thread.sleep(
200);
}
catch (Exception e) {
}
finally {
channel.basicAck(envelope.getDeliveryTag(),
false);
}
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME, false, consumer); //false 代表使用手动消息应答,需要使用channel.basicAck(envelope.getDeliveryTag(),false) 告知消息中间件
}
}

consumer2:

public class Consumer2 {

private static final String QUEUE_NAME = "rabbitmq_fair_queue_one";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println(
"consumer02");
Connection connection
= MQConnectionUtils.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(
1);
DefaultConsumer consumer
= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString
= new String(body, "UTF-8");
System.out.println(
"消费者获取消息:" + msgString);
try {
Thread.sleep(
1000); //让这个消费者处理消息的能力更差一点
}
catch (Exception e) {
}
finally {
channel.basicAck(envelope.getDeliveryTag(),
false);
}
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

   3,发布订阅模式,采用fanout 扇形交换机,

        高级队列模式中,有交换机,生产者将消息发给交换机,在根据交换机的类型,发给定的的队列,然后发给指定的消费者消费

  producer:

public class Producer {
// 定义交换机名称
private static final String EXCHANGE_NAME = "rabbitmq_pubsub_exchanger_one";
// 定义交换机类型
private static final String EXCHANGE_TYPE = "fanout";
public static void main(String[] args) throws IOException, TimeoutException {
// 和rabbitmq 建立连接
Connection cOnnection= MQConnectionUtils.getConnection();
// 创建channel
Channel channel = connection.createChannel();
// 创建交换机
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);

String message
= "pub/sub";

channel.basicPublish(EXCHANGE_NAME,
"", null, message.getBytes());

channel.close();

connection.close();
}
}

邮件消费者:

ublic class EmailConsumer {
private static final String QUEUE_NAME = "rabbitmq_pubsub_email_queue_one";
private static final String EXCHANGE_NAME = "rabbitmq_pubsub_exchanger_one";
public static void main(String[] args) throws IOException, TimeoutException {

System.out.println(
"邮件消费者。。。");
Connection connection
= MQConnectionUtils.getConnection();
Channel channel
= connection.createChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将队列和交换机进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer
= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg
= new String(body, "UTF-8");
System.out.println(
"消费者获取生产者消息 :" + msg);
}
};
// 消费者监听队列消息 true 代表自动签收
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

短信消费者:

// 信息消费者
public class TextConsumer {
private static final String QUEUE_NAME = "rabbitmq_pubsub_text_queue_one";
private static final String EXCHANGE_NAME = "rabbitmq_pubsub_exchanger_one";
public static void main(String[] args) throws IOException, TimeoutException {

System.out.println(
"短信消费者。。。");
Connection connection
= MQConnectionUtils.getConnection();
Channel channel
= connection.createChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将队列和交换机进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer
= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg
= new String(body, "UTF-8");
System.out.println(
"消费者获取生产者消息 :" + msg);
}
};
// 消费者监听队列消息 true 代表自动签收
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

4,路由模式:采用direct 交换机

producer:

public class Producer {
// 定义交换机名称
private static final String EXCHANGE_NAME = "rabbitmq_direct_exchanger_one";
// 定义交换机类型
private static final String EXCHANGE_TYPE = "direct";
// 定义路由
private static final String ROUTINGKEY = "info";
public static void main(String[] args) throws IOException, TimeoutException {
// 和rabbitmq 建立连接
Connection cOnnection= MQConnectionUtils.getConnection();
// 创建channel
Channel channel = connection.createChannel();
// 创建交换机
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);

String message
= "pub/sub";

channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY,
null, message.getBytes());

channel.close();

connection.close();
}
}

邮件消费者:

public class EmailConsumer {
private static final String QUEUE_NAME = "rabbitmq_direct_email_queue_one";
private static final String EXCHANGE_NAME = "rabbitmq_direct_exchanger_one";
private static final String ROUTINGKEY_INFO = "info";
public static void main(String[] args) throws IOException, TimeoutException {

System.out.println(
"邮件消费者。。。");
Connection connection
= MQConnectionUtils.getConnection();
Channel channel
= connection.createChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将队列和交换机进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY_INFO);
DefaultConsumer consumer
= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg
= new String(body, "UTF-8");
System.out.println(
"消费者获取生产者消息 :" + msg);
}
};
// 消费者监听队列消息 true 代表自动签收
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

短信消费者:

public class TextConsumer {
private static final String QUEUE_NAME = "rabbitmq_direct_text_queue_one";
private static final String EXCHANGE_NAME = "rabbitmq_direct_exchanger_one";
// 设定路由
private static final String ROUTINGKEY_INFO = "info";
private static final String ROUTINGKEY_WARN = "warn";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println(
"短信消费者。。。");
Connection connection
= MQConnectionUtils.getConnection();
Channel channel
= connection.createChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将队列和交换机进行绑定 绑定路由
//info 和 warn 路由的都能接收到
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY_INFO);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY_WARN);

DefaultConsumer consumer
= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg
= new String(body, "UTF-8");
System.out.println(
"消费者获取生产者消息 :" + msg);
}
};
// 消费者监听队列消息 true 代表自动签收
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

5,通配符模式,采用topic 交换机  # 代表任意 * 代表一个

producer:

public class Producer {
// 定义交换机名称
private static final String EXCHANGE_NAME = "rabbitmq_topic_exchanger_one";
// 定义交换机类型
private static final String EXCHANGE_TYPE = "topic";
// 定义路由
private static final String ROUTINGKEY = "routingkey.info.error.warn";
public static void main(String[] args) throws IOException, TimeoutException {
// 和rabbitmq 建立连接
Connection cOnnection= MQConnectionUtils.getConnection();
// 创建channel
Channel channel = connection.createChannel();
// 创建交换机
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);

String message
= "pub/sub";

channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY,
null, message.getBytes());

channel.close();

connection.close();
}
}

邮件消费者:

public class EmailConsumer {
private static final String QUEUE_NAME = "rabbitmq_topic_email_queue_one";
private static final String EXCHANGE_NAME = "rabbitmq_topic_exchanger_one";
private static final String ROUTINGKEY = "routingkey.#";
public static void main(String[] args) throws IOException, TimeoutException {

System.out.println(
"邮件消费者。。。");
Connection connection
= MQConnectionUtils.getConnection();
Channel channel
= connection.createChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将队列和交换机进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
DefaultConsumer consumer
= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg
= new String(body, "UTF-8");
System.out.println(
"消费者获取生产者消息 :" + msg);
}
};
// 消费者监听队列消息 true 代表自动签收
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

短信消费者:

public class TextConsumer {
private static final String QUEUE_NAME = "rabbitmq_topic_text_queue_one";
private static final String EXCHANGE_NAME = "rabbitmq_topic_exchanger_one";
private static final String ROUTINGKEY = "routingkey.info.*";
// 设定路由
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println(
"短信消费者。。。");
Connection connection
= MQConnectionUtils.getConnection();
Channel channel
= connection.createChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 将队列和交换机进行绑定 绑定路由
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
DefaultConsumer consumer
= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg
= new String(body, "UTF-8");
System.out.println(
"消费者获取生产者消息 :" + msg);
}
};
// 消费者监听队列消息 true 代表自动签收
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

 

     

    

 



推荐阅读
  • Docker安装Rabbitmq(配合宝塔)
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Docker安装Rabbitmq(配合宝塔)相关的知识,希望对你有一定的参考价值。一、事前准备 ... [详细]
  • WebSocket与Socket.io的理解
    WebSocketprotocol是HTML5一种新的协议。它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送 ... [详细]
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
  • RabbitMq之发布确认高级部分1.为什么会需要发布确认高级部分?在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢 ... [详细]
  • 1、概述首先和大家一起回顾一下Java消息服务,在我之前的博客《Java消息队列-JMS概述》中,我为大家分析了:然后在另一篇博客《Java消息队列-ActiveMq实战》中 ... [详细]
  • RabbitMQ的消息持久化处理
    1、RabbitMQ的消息持久化处理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。2、auto ... [详细]
  • 消息中间件RabbitMQ 高级特性之消费端ACK与重回队列
    什么是消费端的ACK和重回队列?消费端的手工ACK和NACK消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿如果由于服务器宕机等严重问题 ... [详细]
  • 讨伐Java多线程与高并发——MQ篇
    本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
  • celery 爬虫使用
    简介celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它由三部分组成,消息中间件, ... [详细]
  • 1、运行npmrundev命令在cmd上面也不算报错输出一些东西看不懂什么意思。报错页: ... [详细]
  • 搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的详细步骤
    本文详细介绍了搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的步骤,包括环境说明、相关软件下载的地址以及所需的插件下载地址。 ... [详细]
  • 本文介绍了使用AJAX的POST请求实现数据修改功能的方法。通过ajax-post技术,可以实现在输入某个id后,通过ajax技术调用post.jsp修改具有该id记录的姓名的值。文章还提到了AJAX的概念和作用,以及使用async参数和open()方法的注意事项。同时强调了不推荐使用async=false的情况,并解释了JavaScript等待服务器响应的机制。 ... [详细]
  • Webpack5内置处理图片资源的配置方法
    本文介绍了在Webpack5中处理图片资源的配置方法。在Webpack4中,我们需要使用file-loader和url-loader来处理图片资源,但是在Webpack5中,这两个Loader的功能已经被内置到Webpack中,我们只需要简单配置即可实现图片资源的处理。本文还介绍了一些常用的配置方法,如匹配不同类型的图片文件、设置输出路径等。通过本文的学习,读者可以快速掌握Webpack5处理图片资源的方法。 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
author-avatar
黄自安_725
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有