热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ消息队列安装及在Myeclipse下的开发实例

RabbitMQ安装教程RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的

RabbitMQ安装教程
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。

第一步:安装Erlang环境
下载地址:http://www.erlang.org/download
otp_win64_19.1.exe
下载exe执行文件直接安装,设置环境变量。
这里写图片描述

第二步:安装RabbitMQ
下载地址:http://www.rabbitmq.com/
rabbitmq-server-3.6.8.exe
下载exe执行文件直接安装即可。
这里写图片描述

执行命令:rabbitmq-plugins enable rabbitmq_management
开启网页支持 http://localhost:15672 默认用户:guest/guest
执行命令:rabbitmq-server start 开启服务
这里写图片描述
这里写图片描述

第三步:Myeclipse开发RabbitMQ实例—Topic Exchange模式
所需jar包在www.rabbitmq.com上下载的rabbitmq-java-client-bin-3.6.4

  RabbitMQ提供了四种Exchange模式:fanout,direct,topic,header 。 本文使用topic作为示例。
 这里写图片描述
所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“key.#”能够匹配到“key.1.2”,但是“key.” 只会匹配到“key.1”。所以,Topic Exchange 使用非常灵活。

生产者:

package com;import com.rabbitmq.client.*;public class Sender {private final static String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] argv) throws Exception{//获取到连接以及mq通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");//factory.setPort(5672);//factory.setUsername("geust");//factory.setPassword("geust");Connection connection =factory.newConnection();Channel channel = connection.createChannel(); // 声明exchange channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息内容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "key.1.2", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close();}
}

消费者1:

package com;
import com.rabbitmq.client.*;public class RecvTest1 {private final static String QUEUE_NAME = "test_queue_topic_work1"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { System.out.println(" recive 1 start.");// 获取到连接以及mq通道 ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");//factory.setPort(5672);//factory.setUsername("geust");//factory.setPassword("geust");Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
}

消费者2:

package comm;
import com.rabbitmq.client.*;public class RecvTest2 {private final static String QUEUE_NAME = "test_queue_topic_work2"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { System.out.println(" recive 2 start.");// 获取到连接以及mq通道 ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");//factory.setPort(5672);//factory.setUsername("geust");//factory.setPassword("geust"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.#"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
}

注:要引入rabbitmq-client.jar的包
代码结构如下:
这里写图片描述
先启动生产者进程产生消息,再启动消费者进程,可在网页上看到路由test_exchange_topic,有两个待消费的队列test_queue_topic_work1,test_queue_topic_work2
这里写图片描述
这里写图片描述
此时先自动生产者在启动两个消费者:可以看到消费者1接收不到消息,消费者2可以接收到消息。当把生产者的key.1.2改为key.1时可以发现两个消费者都能接收到消息。
这里写图片描述
这里写图片描述

至此,RabbitMQ消息队列安装及在Myeclipse下的开发完成,相比UNIX的IPC的单机局限性,RabbitMQ的接口简单及在分布式上应用很大。

推荐阅读
  • 如何实现JDK版本的切换功能,解决开发环境冲突问题
    本文介绍了在开发过程中遇到JDK版本冲突的情况,以及如何通过修改环境变量实现JDK版本的切换功能,解决开发环境冲突的问题。通过合理的切换环境,可以更好地进行项目开发。同时,提醒读者注意不仅限于1.7和1.8版本的转换,还要适应不同项目和个人开发习惯的需求。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • Jmeter对RabbitMQ压力测试
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Jmeter对RabbitMQ压力测试相关的知识,希望对你有一定的参考价值。Jm ... [详细]
  • 五、RabbitMQ Java Client基本使用详解
    JavaClient的5.x版本系列需要JDK8,用于编译和运行。在Android上,仅支持Android7.0或更高版本。4.x版本系列支持7.0之前 ... [详细]
  • Skywalking系列博客1安装单机版 Skywalking的快速安装方法
    本文介绍了如何快速安装单机版的Skywalking,包括下载、环境需求和端口检查等步骤。同时提供了百度盘下载地址和查询端口是否被占用的命令。 ... [详细]
  • Nginx使用(server参数配置)
    本文介绍了Nginx的使用,重点讲解了server参数配置,包括端口号、主机名、根目录等内容。同时,还介绍了Nginx的反向代理功能。 ... [详细]
  • Mac OS 升级到11.2.2 Eclipse打不开了,报错Failed to create the Java Virtual Machine
    本文介绍了在Mac OS升级到11.2.2版本后,使用Eclipse打开时出现报错Failed to create the Java Virtual Machine的问题,并提供了解决方法。 ... [详细]
  • 本文介绍了使用postman进行接口测试的方法,以测试用户管理模块为例。首先需要下载并安装postman,然后创建基本的请求并填写用户名密码进行登录测试。接下来可以进行用户查询和新增的测试。在新增时,可以进行异常测试,包括用户名超长和输入特殊字符的情况。通过测试发现后台没有对参数长度和特殊字符进行检查和过滤。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • 在CentOS/RHEL 7/6,Fedora 27/26/25上安装JAVA 9的步骤和方法
    本文介绍了在CentOS/RHEL 7/6,Fedora 27/26/25上安装JAVA 9的详细步骤和方法。首先需要下载最新的Java SE Development Kit 9发行版,然后按照给出的Shell命令行方式进行安装。详细的步骤和方法请参考正文内容。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 基于Axis、XFire、CXF的webservice客户端调用示例
    本文介绍了如何使用Axis、XFire、CXF等工具来实现webservice客户端的调用,以及提供了使用Java代码进行调用的示例。示例代码中设置了服务接口类、地址,并调用了sayHello方法。 ... [详细]
  • Hibernate延迟加载深入分析-集合属性的延迟加载策略
    本文深入分析了Hibernate延迟加载的机制,特别是集合属性的延迟加载策略。通过延迟加载,可以降低系统的内存开销,提高Hibernate的运行性能。对于集合属性,推荐使用延迟加载策略,即在系统需要使用集合属性时才从数据库装载关联的数据,避免一次加载所有集合属性导致性能下降。 ... [详细]
  • 开发笔记:spring boot项目打成war包部署到服务器的步骤与注意事项
    本文介绍了将spring boot项目打成war包并部署到服务器的步骤与注意事项。通过本文的学习,读者可以了解到如何将spring boot项目打包成war包,并成功地部署到服务器上。 ... [详细]
  • 本文介绍了Java后台Jsonp处理方法及其应用场景。首先解释了Jsonp是一个非官方的协议,它允许在服务器端通过Script tags返回至客户端,并通过javascript callback的形式实现跨域访问。然后介绍了JSON系统开发方法,它是一种面向数据结构的分析和设计方法,以活动为中心,将一连串的活动顺序组合成一个完整的工作进程。接着给出了一个客户端示例代码,使用了jQuery的ajax方法请求一个Jsonp数据。 ... [详细]
author-avatar
钟罄石
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有