热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ服务之入门篇

RabbitMQ是一种消息中间件,能够很好的处理来自客户端的异步消息发送及请求,将消息发送放入到服务端的队列池中,而接收端可以根据RabbitMQ配置的转发机制接收和过滤服务端转发

RabbitMQ是一种消息中间件,能够很好的处理来自客户端的异步消息发送及请求,将消息发送放入到服务端的队列池中,而接收端可以根据RabbitMQ配置的转发机制接收和过滤服务端转发来的消息。RabbitMQ可以根据指定的消息转发规则进行消息的转发、缓冲和持久化操作,这也是其根身立命的地方,但是其诞生的主要目的是为了均衡线程耗时操作的压力,前提是这些操作要满足没有要求即时反应,因为其不适合用在要求即时反应的需求,此时可以考虑使用缓存中间件Redis、Memcache等,另外,RabbitMQ主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

版权声明:原创不易,请尊重作者劳作成果,转载请标明出处:

http://blog.csdn.net/why_2012_gogo/article/details/53639702

 

 

 

l   如何安装

l   基本原理

l   入门例子

 

 

一、如何安装

在Mac中,安装RabbitMQ服务环境,需要到官网下载最新的.tar.xz文件并解压,若要启动RabbitMQ服务或是使用相关的便捷命令,我们需要进入sbin目录下,那么启动服务如下:

$sudo ./rabbitmq-server start

 

启动服务之后,等待便是RabbitMQ队列接收和转发消息了。如果要停止RabbitMQ服务,可以使用如下命令操作:

$sudo ./rabbitmqctl stop_app  ---停止服务

$sudo ./rabbitmqctl start_app  ---开始服务

 

 

二、基本原理

RabbitMQ的原理是根据其转发器或交换机规则的特性来说的,目前RabbitMQ有四种转发器类型:fanout、direct、topic及headers,前三种类型转发器比较常用,而headers用的比较少,原因有两个,一个是它的用法比较麻烦,另外就是前三者基本可以满足所有实际需求,可以取代它的存在,不管怎样,在这里都会介绍下这四种转发器的原理,具体如下:

1、fanout  exchange

RabbitMQ服务之入门篇

 

说明:

fanout转发器,是几种转发器中转发消息最快的一种,其路由规则会将消息转发给与转发器绑定的每一个队列中,也就是轮循转发相同消息给队列。

 

 

2、direct  exchange

RabbitMQ服务之入门篇

 

说明:

direct转发器,会根据当前发送和接受端协商的统一的routing key来完全匹配转发消息,也就是转发器发送标有routing key标志的路由信息,只有接收端的binding key与routing key与之相同,才会接收到信息。

 

 

3、topic  exchange

RabbitMQ服务之入门篇

 

说明:

topic转发器,相对于direct转发器,topic可以转发符合多个条件的消息,也就是发送端发送消息,而接受端可以灵活配置接收消息的路由规则,例如:msg.#和msg.*,前者能够接收msg.log.info和msg.log类型消息,而后者则能接收到msg.*类型消息,所以#号代表一个或多个单词匹配,而*则代表一个单词匹配了,实际上就是正常的规则过滤机制。

 

4、headers  exchange

 

RabbitMQ服务之入门篇

 

说明:

headers转发器,也是用的比较少的转发器,原因请查看第一部分介绍。此种转发器,忽略了路由routing key规则,使用了健-值对形式匹配规则,此种转发器规定,在接受端必须使用x-match,它目前有两种类型:all和any,前者代表所有的键-值都满足后,才能收到信息,而后者则满足任意个就可以收到消,这个会在后续文章介绍,这里只需了解即可。

 

 

三、入门例子

这里我们以HelloWorld!程序为例子,来介绍RabbitMQ的使用。项目类型为maven管理的java项目,在介绍RabbitMQ使用时,会封装一个通用的功能,随着陆续文章的介绍,会完善该封装工具的功能,后续不再说明。

 

1、准备

因为接下来介绍的发送端和接收消息端的服务链接是相同的,所以我们建议封装为BaseConnector.java,供接收端和发送端继承使用;发送和接收消息的载体MessageInfo.java也是需要的,并且是需要被序列化,另外,在启动程序前,需要先启动机器的RabbitMQ服务,否则拒绝访问链接,具体如下:

 

A、BaseConnector.java

publicclass BaseConnector {

    protectedChannel channel;

protected Connection connection;

protected   String queueName;

   

public BaseConnector(String queueName)throwsIOException, TimeoutException {

this. queueName = queueName;

 

       ConnectionFactory factory = new ConnectionFactory();

       factory.setHost("127.0.0.1 ");

       cOnnection= factory.newConnection();  //创建连接 

       channel = connection.createChannel();  //创建频道

       channel.queueDeclare(queueName, false,false,false,null);//声明队列

    }

   

    protectedvoid close() {  //关闭频道及链接

        try {

           channel.close();

           connection.close();

       }catch (IOException e) {

           e.printStackTrace();

       }catch (TimeoutException e) {

           e.printStackTrace();

       }

    }

}

 

B、MessageInfo.java

publicclassMessageInfoimplements Serializable {

    privatestatic final long serialVersionUID = 1L;

   

    privateString channel; //消息渠道

    privateString content; //消息内容

   

    privateint hashCode;  //异步线程标志

   

    publicString getChannel() {

       returnchannel;

    }

 

    publicvoid setChannel(String channel) {

       this.channel= channel;

    }

 

    publicString getContent() {

       returncontent;

    }

 

    publicvoid setContent(String content) {

       this.cOntent= content;

    }

 

    publicint getHashCode() {

       returnhashCode;

    }

 

    publicvoid setHashCode(int hashCode) {

       this.hashCode= hashCode;

    }

 

    @Override

    publicString toString() {

       return"MessageInfo [channel=" + channel + ", cOntent=" + content

              +", hashCode=" + hashCode

              +"]";

    }

}

 

C、启动RabbitMQ服务

具体如何启动,请查看文章第一部分介绍。

 

 

2、发送端

PublisherHandler.java:

publicclassPublisherHandlerextends BaseConnector {

 

    publicPublisherHandler(String queueName)throws IOException,TimeoutException {

       super(queueName);

    }

 

    publicvoid sendMessage(MessageInfo messageInfo) {

       try{

              channel.basicPublish("",queueName,null, SerializationUtils.serialize(messageInfo));

       }catch (Exception e) {

           System.out.println("RabbitMQSend Message Error:"+e.getMessage());

       }

    }

   

    /**

     * 关闭频道及链接

     */

    publicvoid close() {

       super.close();

    }

}

 

3、接收端

接受端的实现,这里考虑到多线程的情况,也是实际使用中常遇到的环境,所以它的实现,我们需要实现RabbitMQ接口Consumer,以及Runnable接口,具体如下:

publicclass ReceiverHandlerextendsBaseConnectorimplements Runnable,Consumer {

    privateint hashCode = 0;

   

    publicReceiverHandler(String queueName)throws IOException, TimeoutException {

       super(queueName);

    }

   

    publicvoid receiveMessage() {

       hashCode = Thread.currentThread().hashCode(); //区分不同工作进程的输出

      

       try{

           System.out.println(hashCode+ " [*] Waiting for messages. To exit press CTRL+C");

          

           Stringop_result = channel.basicConsume(queueName, true,this);  

           if("".equals(op_result)){

              System.out.println("BasicConsumeConfig Consumer Queue Error!");

           }

       }catch (IOException e) {

           System.out.println("Consumer Delivery Error,Msg info:" + e.getMessage());

       }catch (Exception e) {

           System.out.println("Error Is Opening,Msg info:" + e.getMessage());

       }

    }

   

    @Override

    publicvoid handleCancel(String arg0)throws IOException {

       debug("===handleCancel==="+arg0);

    }

 

    @Override

    publicvoid handleCancelOk(String arg0) {

       debug("===handleCancelOk==="+arg0);

    }

 

    @Override

    publicvoid handleConsumeOk(String arg0) {

       debug("===handleCOnsumeOk==="+arg0);

    }

 

    @Override

    publicvoid handleDelivery(String consumerTag, Envelope env,

           BasicPropertiesprops, byte[] body) throws IOException {

       MessageInfomessageInfo = (MessageInfo) SerializationUtils.deserialize(body);

       messageInfo.setHashCode(hashCode);

       System.out.println("message-info:"+msgInfo.toString());

    }

 

    @Override

    publicvoid handleRecoverOk(String arg0) {

       debug("===handleRecoverOk==="+arg0);

    }

 

    @Override

    publicvoid handleShutdownSignal(String arg0, ShutdownSignalException arg1) {

       debug("===handleShutdownSignal==="+arg0+"===ShutdownSignalException==="+arg1.getMessage());

    }

 

    @Override

    publicvoid run() {

       receiveMessage();

    }

}

 

4、运行入口

publicstatic void main(String[] args){  

       PublisherHandlerpublisher = null;

       ReceiverHandlerreceiver = null;

       try{

           receiver= new ReceiverHandler("mq_hello");  //接收者

           ThreadreceiverThread = new Thread(receiver);

           receiverThread.start();

          

           publisher= new PublisherHandler("mq_hello");    //发送者

           MessageInfomsgInfo = new MessageInfo();

           msgInfo.setChannel("hello");

           msgInfo.setContent("HelloWorld!");

           publisher.sendMessage(msgInfo);

       }catch (IOException | TimeoutException e) {

           e.printStackTrace();

       }finally {

           publisher.close();

       }

}

 

 

 

运行结果:

RabbitMQ服务之入门篇

 

 

 

 

 

如上图,我们可以清楚看到,HelloWorld!程序已经成功运行,得到了我们预期的结果显示。好了,消息队列RabbitMQ入门篇就介绍到这里,由于作者水平有限,如有问题请在评论发言或QQ群(245389109(新))讨论,谢谢。


推荐阅读
  • 本文将深入探讨MySQL与MongoDB在游戏账户服务中的应用特点及优劣。通过对比这两种数据库的性能、扩展性和数据一致性,结合实际案例,帮助开发者更好地选择适合游戏账户服务的数据库方案。同时,文章还将介绍如何利用Erlang语言进行高效的游戏服务器开发,提升系统的稳定性和并发处理能力。 ... [详细]
  • 浅析python实现布隆过滤器及Redis中的缓存穿透原理_python
    本文带你了解了位图的实现,布隆过滤器的原理及Python中的使用,以及布隆过滤器如何应对Redis中的缓存穿透,相信你对布隆过滤 ... [详细]
  • 本文详细介绍了 InfluxDB、collectd 和 Grafana 的安装与配置流程。首先,按照启动顺序依次安装并配置 InfluxDB、collectd 和 Grafana。InfluxDB 作为时序数据库,用于存储时间序列数据;collectd 负责数据的采集与传输;Grafana 则用于数据的可视化展示。文中提供了 collectd 的官方文档链接,便于用户参考和进一步了解其配置选项。通过本指南,读者可以轻松搭建一个高效的数据监控系统。 ... [详细]
  • 在《PHP应用性能优化实战指南:从理论到实践的全面解析》一文中,作者分享了一次实际的PHP应用优化经验。文章回顾了先前进行的一次优化项目,指出即使系统运行时间较长后出现的各种问题和性能瓶颈,通过采用一些通用的优化策略仍然能够有效解决。文中不仅详细阐述了优化的具体步骤和方法,还结合实例分析了优化前后的性能对比,为读者提供了宝贵的参考和借鉴。 ... [详细]
  • 掌握PHP框架开发与应用的核心知识点:构建高效PHP框架所需的技术与能力综述
    掌握PHP框架开发与应用的核心知识点对于构建高效PHP框架至关重要。本文综述了开发PHP框架所需的关键技术和能力,包括但不限于对PHP语言的深入理解、设计模式的应用、数据库操作、安全性措施以及性能优化等方面。对于初学者而言,熟悉主流框架如Laravel、Symfony等的实际应用场景,有助于更好地理解和掌握自定义框架开发的精髓。 ... [详细]
  • NoSQL数据库,即非关系型数据库,有时也被称作Not Only SQL,是一种区别于传统关系型数据库的管理系统。这类数据库设计用于处理大规模、高并发的数据存储与查询需求,特别适用于需要快速读写大量非结构化或半结构化数据的应用场景。NoSQL数据库通过牺牲部分一致性来换取更高的可扩展性和性能,支持分布式部署,能够有效应对互联网时代的海量数据挑战。 ... [详细]
  • 利用Telnet进行Memcached操作详解
    在使用Telnet对Memcached进行操作前,需确保Memcached服务已启动。可通过命令如 `ps -ef | grep memcached` 或 `netstat -l | grep memcache` 来检查服务状态。此外,建议验证端口监听情况以确保连接无误。 ... [详细]
  • 一文了解消息中间件RabbitMQ
    消息中间件---RabbitMQ1消息中间件的作用2.常用的消息中间件3消息中间件RabbitMQ3.1RabbitMQ介绍3.3RabbitMQ的队列模式3.3RabbitMQ的 ... [详细]
  • python中安装并使用redis相关的知识
    本文介绍了在python中安装并使用redis的相关知识,包括redis的数据缓存系统和支持的数据类型,以及在pycharm中安装redis模块和常用的字符串操作。 ... [详细]
  • 本文详细探讨了使用Python3编写爬虫时如何应对网站的反爬虫机制,通过实例讲解了如何模拟浏览器访问,帮助读者更好地理解和应用相关技术。 ... [详细]
  • 服务器部署中的安全策略实践与优化
    服务器部署中的安全策略实践与优化 ... [详细]
  • 本文深入探讨了NoSQL数据库的四大主要类型:键值对存储、文档存储、列式存储和图数据库。NoSQL(Not Only SQL)是指一系列非关系型数据库系统,它们不依赖于固定模式的数据存储方式,能够灵活处理大规模、高并发的数据需求。键值对存储适用于简单的数据结构;文档存储支持复杂的数据对象;列式存储优化了大数据量的读写性能;而图数据库则擅长处理复杂的关系网络。每种类型的NoSQL数据库都有其独特的优势和应用场景,本文将详细分析它们的特点及应用实例。 ... [详细]
  • 本文介绍了如何在 Windows 系统上利用 Docker 构建一个包含 NGINX、PHP、MySQL、Redis 和 Elasticsearch 的集成开发环境。通过详细的步骤说明,帮助开发者快速搭建和配置这一复杂的技术栈,提升开发效率和环境一致性。 ... [详细]
  • 1.关系型数据库永久性保存数据的仓库php的变量只是php脚本执行期间,临时性保存变量的空间【使用内存空间临时保存】关系型数据库:利用二者的关系来描述实体的信息。【利用二维表字段名 ... [详细]
  • Centos下安装memcached+memcached教程
    本文介绍了在Centos下安装memcached和使用memcached的教程,详细解释了memcached的工作原理,包括缓存数据和对象、减少数据库读取次数、提高网站速度等。同时,还对memcached的快速和高效率进行了解释,与传统的文件型数据库相比,memcached作为一个内存型数据库,具有更高的读取速度。 ... [详细]
author-avatar
NHHermit
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有