热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ协议基础及C++和Java混合开发

目前面对大多数的需要在异构系统间进行消息传递技术路线,大多会选择socket或webservice。这两种技术的共同特点是耦合紧,调试依赖双方同步,但是效率高。除此以外,使用消息队

目前面对大多数的需要在异构系统间进行消息传递技术路线,大多会选择socket或webservice。这两种技术的共同特点是耦合紧,调试依赖双方同步,但是效率高。除此以外,使用消息队列(MQ)的应用场景也偶尔能遇到。本文就将要从AMQP协议说起,重点介绍利用RabbitMQ实现C++和Java跨系统开发的实践。


一、AMQP是什么

AMQP又称为高级消息队列协议,是一种进程间进行异步消息的网络协议。它的出现是为了让各类消息中间件提供统一服务,以降低系统集成的开销。目前,完全准寻AMQP协议的消息中间件只有RabbitMQ。虽然各大中间件产品也都针对不同的语言推出了客户端。但是,无论是从业务适应性还是集成通用性上来说,比较推荐的还是RabbitMQ。不同的消息中间件在性能上的差异网上资料很多,这里不再赘述。

amqp协议和http协议一样都是建立在TCP/IP协议簇之上的应用层协议。不同于http协议的,它是一个二进制协议,具有多信道,异步,高效等特点。amqp协议规定了从消息发布者到消息接收者之间的消息传递方式,并且提出了交换机(Exchange)队列(Queue)以及他们之间的路由(Routing)。

作为一套标准协议,使用者甚至可以完全根据amqp的协议规范定制化的开发出客户端和RabbitMQ通信,这一特点也让RabbitMQ在业务通用性上具备了得天独厚的优势。标准的amqp协议格式如下:


amqp://:@:/

username: 用户名

password: 登录密码

host: 服务所在主机地址

port: 服务端口号

virtual: 虚拟路径


AMQP协议最值得学习的地方在于,它定义了消息的发送和投递过程:

  交换机(Exchange)负责接收消息,并根据提前指定的规则(Routing)投送消息到特定队列(Queue)。消费者监听队列,并处理消息。如果多个消费者监听同一个队列,消息一般会轮流的发送给它们。以实现负载均衡。此外,通过虚拟路径约束还允许在不同的虚拟路径下建立同命队列。

AMQP协议默认提供了四种类型的交换机:

直接交换机(Direct Exchange):根据路由键的不同将消息直接发送到不同队列,未匹配路由键的消息会被丢弃。

技术分享图片

技术分享图片

扇形交换机(Funout Exchange):扇形交换机是实现广播的基础,它能够同时将消息推送给多个队列。

技术分享图片

主题交换机(Topic Exchange):交换机会根据路由键进行模糊匹配,从而完成消息投送。

技术分享图片

头交换机(Header Exchange):它不依赖特定路由键,而是将投送目标写在消息头,支持字典类型,配置更加灵活。


二、C++开发指南

官网提供了其它常见语言的开发向导,对于C++个人推荐使用AMQP-CPP这套库。另外还需要一套网络库支持,个人也推荐libevent。编译方法可以参考github上的说明。发送方式区别于传统的socket,你不应该将一条消息分多个部分发送。因此推荐使用对象序列化模型直接转换为字节数组,同样受到tcp/ip传输的制约,你应该选择高效的序列化工具来进行。个人推荐使用protobuf,同样作为一种跨平台的支持。

下面以一套RPC调用为例进行说明:


#include
#include
"event2/event.h"
#include
"amqpcpp.h"
#include
"amqpcpp/libevent.h"
#include
"amqp_msg.pb.h"
#include
<string>
using namespace std;
using namespace amqp;
int main()
{
event_base
*base = event_base_new(); // 通过libevent启动实践循环
AMQP::LibEventHandler handler(base);
AMQP::TcpConnection connection(
&handler, AMQP::Address("localhost", 5672, AMQP::Login("guest", "guest"), "/"));
AMQP::TcpChannel channel(
&connection); // 创建一条通道
channel.setQos(1);
// 监听login.rpc队列
channel.consume("login.rpc").onReceived([&](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
{
cout
<<"login.rpc" << endl;
Login login;
login.ParseFromArray(message.body(), message.bodySize());
Response resp;
// 创建应答对象
resp.set_status(Response_RespType_OK);
resp.set_session_id(
"acd");
char data[1024] = {0};
int data_size = resp.ByteSizeLong();
resp.SerializeToArray(data, data_size);
AMQP::Envelope env(data, data_size);
env.setCorrelationID(message.correlationID());
// 获取应答ID
channel.publish("", message.replyTo(), env); // 发送给应答队列
channel.ack(deliveryTag); // 向MQ发送确认
}).onSuccess([&](const std::string &consumertag)
{
}).onError([](
const char *message)
{
event_base_loopbreak(
base); // 发送错误中断事件循环
cout < endl;
});
// 监听logout.rpc队列
channel.consume("logout.rpc")
.onReceived([
&channel](const AMQP::Message &message, uint64_t deliveryTag, bool)
{
Logout logout;
logout.ParseFromArray(message.body(), message.bodySize());
Response resp;
resp.set_status(Response_RespType_OK);
char data[1024] = {0};
int data_size = resp.ByteSizeLong();
resp.SerializeToArray(data, data_size);
AMQP::Envelope env(data, data_size);
env.setCorrelationID(message.correlationID());
channel.publish(
"", message.replyTo(), env);
channel.ack(deliveryTag);
}).onError([](
const char *message)
{
event_base_loopbreak(
base);
cout
< endl;
});
event_base_dispatch(base); // 事件循环
event_base_free(base); // 释放
return 0;
}

AMQP-CPP库直接主动连接,或者你也可以在继承相应的Handler自己完成网络连接。此外,Connection 和 Channel的创建也都支持回调函数。如:


channel.onError([&base](const char* message)
{
std::cout
<<"Channel error: " < std::endl;
event_base_loopbreak(base);
});


channel.declareQueue("queueName", AMQP::passive)
.onSuccess([
&](const string& name, uint32_t, uint32_t)
{
cout
<<"Queue Name:" < endl;
});


channel.declareExchange("logs", AMQP::fanout)
.onSuccess([
&]() {})


三、Spring AMQP开发指南

与Spring整合的技巧,官网有很详细的指导意见。这里只给出与上文C++配合的请求端如何发送以及等待应答的核心代码:


@GetMapping("login")
public String loginRpc() throws InvalidProtocolBufferException {
AmqpMsg.Login login
= AmqpMsg.Login.newBuilder()
.addParams(AmqpMsg.PairParams.newBuilder().setKey(
"username").setValue("admin").build())
.addParams(AmqpMsg.PairParams.newBuilder().setKey(
"password").setValue("admin").build())
.build();
byte[] resp = (byte[]) template.convertSendAndReceive(directExchange.getName(), "login.rpc", login.toByteArray());
AmqpMsg.Response response
= AmqpMsg.Response.parseFrom(resp);
if (response.getStatus() == AmqpMsg.Response.RespType.OK) {
String sessionID
= response.getSessionId();
System.out.println(
"登录成功 SessiOnID=" + sessionID);
return "SUCCESS";
}
return "ERROR";
}
@GetMapping(
"logout")
public String logoutRpc() throws InvalidProtocolBufferException {
AmqpMsg.Logout logout
= AmqpMsg.Logout.newBuilder()
.setSessionId(
"123456").build();
byte[] resp = (byte[]) template.convertSendAndReceive(directExchange.getName(), "logout.rpc", logout.toByteArray());
AmqpMsg.Response response
= AmqpMsg.Response.parseFrom(resp);
if(response.getStatus() == AmqpMsg.Response.RespType.OK) {
System.out.println(
"注销成功");
return "SUCCESS";
}
return "ERROR";
}


@Configuration
public class RPCRabbitConfig {
@Bean(
"simple")
public Queue simpleQueue() {
return new Queue("simple");
}
@Bean(
"login.rpc")
public Queue loginRpcQueue() {
return new Queue("login.rpc");
}
@Bean(
"logout.rpc")
public Queue logoutRpcQueue() {
return new Queue("logout.rpc");
}
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange("amq.direct");
}

@Bean
public Binding loginRpcBinding(DirectExchange exchange, @Qualifier("login.rpc") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("login.rpc");
}
@Bean
public Binding logoutRpcBind(DirectExchange exchange, @Qualifier("logout.rpc") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("logout.rpc");
}

}

 

后记:可能是由于工作上与架构的关系比较密切,目前在博客中提供的大多数解决方案都以跨平台应用为主。如果您对文章中介绍的知识点有任何的疑问也可以与我联系。


推荐阅读
  • 本文详细探讨了JavaScript中的作用域链和闭包机制,解释了它们的工作原理及其在实际编程中的应用。通过具体的代码示例,帮助读者更好地理解和掌握这些概念。 ... [详细]
  • Python 内存管理机制详解
    本文深入探讨了Python的内存管理机制,涵盖了垃圾回收、引用计数和内存池机制。通过具体示例和专业解释,帮助读者理解Python如何高效地管理和释放内存资源。 ... [详细]
  • C#设计模式学习笔记:观察者模式解析
    本文将探讨观察者模式的基本概念、应用场景及其在C#中的实现方法。通过借鉴《Head First Design Patterns》和维基百科等资源,详细介绍该模式的工作原理,并提供具体代码示例。 ... [详细]
  • Appium + Java 自动化测试中处理页面空白区域点击问题
    在进行移动应用自动化测试时,有时会遇到某些页面没有返回按钮,只能通过点击空白区域返回的情况。本文将探讨如何在Appium + Java环境中有效解决此类问题,并提供详细的解决方案。 ... [详细]
  • 如何清除Chrome浏览器地址栏的特定历史记录
    在使用Chrome浏览器时,你可能会发现地址栏保存了大量浏览记录。有时你可能希望删除某些特定的历史记录而不影响其他数据。本文将详细介绍如何单独删除地址栏中的特定记录以及批量清除所有历史记录的方法。 ... [详细]
  • 利用Selenium与ChromeDriver实现豆瓣网页全屏截图
    本文介绍了一种使用Selenium和ChromeDriver结合Python代码,轻松实现对豆瓣网站进行完整页面截图的方法。该方法不仅简单易行,而且解决了新版Selenium不再支持PhantomJS的问题。 ... [详细]
  • 探索新一代API文档工具,告别Swagger的繁琐
    对于后端开发者而言,编写和维护API文档既繁琐又不可或缺。本文将介绍一款全新的API文档工具,帮助团队更高效地协作,简化API文档生成流程。 ... [详细]
  • 深入解析动态代理模式:23种设计模式之三
    在设计模式中,动态代理模式是应用最为广泛的一种代理模式。它允许我们在运行时动态创建代理对象,并在调用方法时进行增强处理。本文将详细介绍动态代理的实现机制及其应用场景。 ... [详细]
  • 深入理解ExtJS:从入门到精通
    本文详细介绍了ExtJS的功能及其在大型企业前端开发中的应用。通过实例和详细的文件结构解析,帮助初学者快速掌握ExtJS的核心概念,并提供实用技巧和最佳实践。 ... [详细]
  • 通常情况下,修改my.cnf配置文件后需要重启MySQL服务才能使新参数生效。然而,通过特定命令可以在不重启服务的情况下实现配置的即时更新。本文将详细介绍如何在线调整MySQL配置,并验证其有效性。 ... [详细]
  • 并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
    Java并发编程实践目录并发编程01——ThreadLocal并发编程02——ConcurrentHashMap并发编程03——阻塞队列和生产者-消费者模式并发编程04——闭锁Co ... [详细]
  • 深入解析 Android IPC 中的 Messenger 机制
    本文详细介绍了 Android 中基于消息传递的进程间通信(IPC)机制——Messenger。通过实例和源码分析,帮助开发者更好地理解和使用这一高效的通信工具。 ... [详细]
  • Python自动化测试入门:Selenium环境搭建
    本文详细介绍如何在Python环境中安装和配置Selenium,包括开发工具PyCharm的安装、Python环境的设置以及Selenium包的安装方法。此外,还提供了编写和运行第一个自动化测试脚本的步骤。 ... [详细]
  • 嵌入式开发环境搭建与文件传输指南
    本文详细介绍了如何为嵌入式应用开发搭建必要的软硬件环境,并提供了通过串口和网线两种方式将文件传输到开发板的具体步骤。适合Linux开发初学者参考。 ... [详细]
  • Linux中的yum安装软件
    yum俗称大黄狗作用:解决安装软件包的依赖关系当安装依赖关系的软件包时,会将依赖的软件包一起安装。本地yum:需要yum源,光驱挂载。yum源:(刚开始查看yum源中的内容就是上图 ... [详细]
author-avatar
中国脐橙在线
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有