RabbitMQ是一种消息中间件,能够很好的处理来自客户端的异步消息发送及请求,将消息发送放入到服务端的队列池中,而接收端可以根据RabbitMQ配置的转发机制接收和过滤服务端转发
RabbitMQ是一种消息中间件,能够很好的处理来自客户端的异步消息发送及请求,将消息发送放入到服务端的队列池中,而接收端可以根据RabbitMQ配置的转发机制接收和过滤服务端转发来的消息。RabbitMQ可以根据指定的消息转发规则进行消息的转发、缓冲和持久化操作,这也是其根身立命的地方,但是其诞生的主要目的是为了均衡线程耗时操作的压力,前提是这些操作要满足没有要求即时反应,因为其不适合用在要求即时反应的需求,此时可以考虑使用缓存中间件Redis、Memcache等,另外,RabbitMQ主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
版权声明:原创不易,请尊重作者劳作成果,转载请标明出处:
http://blog.csdn.net/why_2012_gogo/article/details/53639702
l 如何安装
l 基本原理
l 入门例子
一、如何安装
在Mac中,安装RabbitMQ服务环境,需要到官网下载最新的.tar.xz文件并解压,若要启动RabbitMQ服务或是使用相关的便捷命令,我们需要进入sbin目录下,那么启动服务如下:
$sudo ./rabbitmq-server start
启动服务之后,等待便是RabbitMQ队列接收和转发消息了。如果要停止RabbitMQ服务,可以使用如下命令操作:
$sudo ./rabbitmqctl stop_app ---停止服务
$sudo ./rabbitmqctl start_app ---开始服务
二、基本原理
RabbitMQ的原理是根据其转发器或交换机规则的特性来说的,目前RabbitMQ有四种转发器类型:fanout、direct、topic及headers,前三种类型转发器比较常用,而headers用的比较少,原因有两个,一个是它的用法比较麻烦,另外就是前三者基本可以满足所有实际需求,可以取代它的存在,不管怎样,在这里都会介绍下这四种转发器的原理,具体如下:
1、fanout exchange
说明:
fanout转发器,是几种转发器中转发消息最快的一种,其路由规则会将消息转发给与转发器绑定的每一个队列中,也就是轮循转发相同消息给队列。
2、direct exchange
说明:
direct转发器,会根据当前发送和接受端协商的统一的routing key来完全匹配转发消息,也就是转发器发送标有routing key标志的路由信息,只有接收端的binding key与routing key与之相同,才会接收到信息。
3、topic exchange
说明:
topic转发器,相对于direct转发器,topic可以转发符合多个条件的消息,也就是发送端发送消息,而接受端可以灵活配置接收消息的路由规则,例如:msg.#和msg.*,前者能够接收msg.log.info和msg.log类型消息,而后者则能接收到msg.*类型消息,所以#号代表一个或多个单词匹配,而*则代表一个单词匹配了,实际上就是正常的规则过滤机制。
4、headers exchange
说明:
headers转发器,也是用的比较少的转发器,原因请查看第一部分介绍。此种转发器,忽略了路由routing key规则,使用了健-值对形式匹配规则,此种转发器规定,在接受端必须使用x-match,它目前有两种类型:all和any,前者代表所有的键-值都满足后,才能收到信息,而后者则满足任意个就可以收到消,这个会在后续文章介绍,这里只需了解即可。
三、入门例子
这里我们以HelloWorld!程序为例子,来介绍RabbitMQ的使用。项目类型为maven管理的java项目,在介绍RabbitMQ使用时,会封装一个通用的功能,随着陆续文章的介绍,会完善该封装工具的功能,后续不再说明。
1、准备
因为接下来介绍的发送端和接收消息端的服务链接是相同的,所以我们建议封装为BaseConnector.java,供接收端和发送端继承使用;发送和接收消息的载体MessageInfo.java也是需要的,并且是需要被序列化,另外,在启动程序前,需要先启动机器的RabbitMQ服务,否则拒绝访问链接,具体如下:
A、BaseConnector.java
publicclass BaseConnector {
protectedChannel channel;
protected Connection connection;
protected String queueName;
public BaseConnector(String queueName)throwsIOException, TimeoutException {
this. queueName = queueName;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1 ");
cOnnection= factory.newConnection(); //创建连接
channel = connection.createChannel(); //创建频道
channel.queueDeclare(queueName, false,false,false,null);//声明队列
}
protectedvoid close() { //关闭频道及链接
try {
channel.close();
connection.close();
}catch (IOException e) {
e.printStackTrace();
}catch (TimeoutException e) {
e.printStackTrace();
}
}
}
B、MessageInfo.java
publicclassMessageInfoimplements Serializable {
privatestatic final long serialVersionUID = 1L;
privateString channel; //消息渠道
privateString content; //消息内容
privateint hashCode; //异步线程标志
publicString getChannel() {
returnchannel;
}
publicvoid setChannel(String channel) {
this.channel= channel;
}
publicString getContent() {
returncontent;
}
publicvoid setContent(String content) {
this.cOntent= content;
}
publicint getHashCode() {
returnhashCode;
}
publicvoid setHashCode(int hashCode) {
this.hashCode= hashCode;
}
@Override
publicString toString() {
return"MessageInfo [channel=" + channel + ", cOntent=" + content
+", hashCode=" + hashCode
+"]";
}
}
C、启动RabbitMQ服务
具体如何启动,请查看文章第一部分介绍。
2、发送端
PublisherHandler.java:
publicclassPublisherHandlerextends BaseConnector {
publicPublisherHandler(String queueName)throws IOException,TimeoutException {
super(queueName);
}
publicvoid sendMessage(MessageInfo messageInfo) {
try{
channel.basicPublish("",queueName,null, SerializationUtils.serialize(messageInfo));
}catch (Exception e) {
System.out.println("RabbitMQSend Message Error:"+e.getMessage());
}
}
/**
* 关闭频道及链接
*/
publicvoid close() {
super.close();
}
}
3、接收端
接受端的实现,这里考虑到多线程的情况,也是实际使用中常遇到的环境,所以它的实现,我们需要实现RabbitMQ接口Consumer,以及Runnable接口,具体如下:
publicclass ReceiverHandlerextendsBaseConnectorimplements Runnable,Consumer {
privateint hashCode = 0;
publicReceiverHandler(String queueName)throws IOException, TimeoutException {
super(queueName);
}
publicvoid receiveMessage() {
hashCode = Thread.currentThread().hashCode(); //区分不同工作进程的输出
try{
System.out.println(hashCode+ " [*] Waiting for messages. To exit press CTRL+C");
Stringop_result = channel.basicConsume(queueName, true,this);
if("".equals(op_result)){
System.out.println("BasicConsumeConfig Consumer Queue Error!");
}
}catch (IOException e) {
System.out.println("Consumer Delivery Error,Msg info:" + e.getMessage());
}catch (Exception e) {
System.out.println("Error Is Opening,Msg info:" + e.getMessage());
}
}
@Override
publicvoid handleCancel(String arg0)throws IOException {
debug("===handleCancel==="+arg0);
}
@Override
publicvoid handleCancelOk(String arg0) {
debug("===handleCancelOk==="+arg0);
}
@Override
publicvoid handleConsumeOk(String arg0) {
debug("===handleCOnsumeOk==="+arg0);
}
@Override
publicvoid handleDelivery(String consumerTag, Envelope env,
BasicPropertiesprops, byte[] body) throws IOException {
MessageInfomessageInfo = (MessageInfo) SerializationUtils.deserialize(body);
messageInfo.setHashCode(hashCode);
System.out.println("message-info:"+msgInfo.toString());
}
@Override
publicvoid handleRecoverOk(String arg0) {
debug("===handleRecoverOk==="+arg0);
}
@Override
publicvoid handleShutdownSignal(String arg0, ShutdownSignalException arg1) {
debug("===handleShutdownSignal==="+arg0+"===ShutdownSignalException==="+arg1.getMessage());
}
@Override
publicvoid run() {
receiveMessage();
}
}
4、运行入口
publicstatic void main(String[] args){
PublisherHandlerpublisher = null;
ReceiverHandlerreceiver = null;
try{
receiver= new ReceiverHandler("mq_hello"); //接收者
ThreadreceiverThread = new Thread(receiver);
receiverThread.start();
publisher= new PublisherHandler("mq_hello"); //发送者
MessageInfomsgInfo = new MessageInfo();
msgInfo.setChannel("hello");
msgInfo.setContent("HelloWorld!");
publisher.sendMessage(msgInfo);
}catch (IOException | TimeoutException e) {
e.printStackTrace();
}finally {
publisher.close();
}
}
运行结果:
如上图,我们可以清楚看到,HelloWorld!程序已经成功运行,得到了我们预期的结果显示。好了,消息队列RabbitMQ入门篇就介绍到这里,由于作者水平有限,如有问题请在评论发言或QQ群(245389109(新))讨论,谢谢。