热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

(转)RabbitMQ学习之远程过程调用(RPC)(java)

http:blog.csdn.netzhu_tianweiarticledetails40887885在一般使用RabbitMQ做RPC很容易。客户端发送一个请求消息然后服务器回复

http://blog.csdn.net/zhu_tianwei/article/details/40887885

在一般使用RabbitMQ做RPC很容易。客户端发送一个请求消息然后服务器回复一个响应消息。为了收到一个响应,我们需要发送一个'回调'的请求的队列地址。我们可以使用默认队列(在Java客户端除外)。

AMQP协议给消息定义了14个属性。大部分的属性很少使用,除了下面几个:
  deliveryMode: 将消息标记为持久(值为2)或瞬态(任何其他值)。你可能记得在第二个教程中使用了这个属性。
  contentType:用来设置mime类型。例如经常使用的JSON格式数据,就需要将此属性设置为:application/json。
  replyTo: 通常用来命名一个回调队列.
  correlationId: 用来关联RPC请求的响应.

RPC工作流程:

1)、客户端启动时,创建了一个匿名的回调队列。
2)、在一个RPC请求中,客户端发送一个消息,它有两个属性:1.REPLYTO,用来设置回调队列名;2.correlationId,对于每个请求都被设置成唯一的值。
3)、请求被发送到rpc_queue队列.
4)、RPC工作者(又名:服务器)等待接收该队列的请求。当收到一个请求,它就会处理并把结果发送给客户端,使用的队列是replyTo字段指定的。
5)、客户端等待接收回调队列中的数据。当接到一个消息,它会检查它的correlationId属性。如果它和设置的相匹配,就会把响应返回给应用程序。

1、RPC服务器的RPCServer.java,接收消息调用rpc并返回结果

 

[java] view plaincopy
print?
  1. package cn.slimsmart.rabbitmq.demo.rpc;  
  2.   
  3. import java.security.MessageDigest;  
  4.   
  5. import com.rabbitmq.client.AMQP;  
  6. import com.rabbitmq.client.AMQP.BasicProperties;  
  7. import com.rabbitmq.client.Channel;  
  8. import com.rabbitmq.client.Connection;  
  9. import com.rabbitmq.client.ConnectionFactory;  
  10. import com.rabbitmq.client.QueueingConsumer;  
  11. //RPC调用服务端  
  12. public class RPCServer {  
  13.     private static final String RPC_QUEUE_NAME = "rpc_queue";  
  14.     public static void main(String[] args) throws Exception {  
  15.         //• 先建立连接、通道,并声明队列  
  16.         ConnectionFactory factory = new ConnectionFactory();  
  17.         factory.setHost("192.168.36.217");  
  18.         factory.setUsername("admin");  
  19.         factory.setPassword("admin");  
  20.         factory.setPort(AMQP.PROTOCOL.PORT);  
  21.         Connection connection = factory.newConnection();  
  22.         Channel channel = connection.createChannel();  
  23.         channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);  
  24.         //•可以运行多个服务器进程。通过channel.basicQos设置prefetchCount属性可将负载平均分配到多台服务器上。  
  25.         channel.basicQos(1);  
  26.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  27.         //打开应答机制autoAck=false  
  28.         channel.basicConsume(RPC_QUEUE_NAME, false, consumer);  
  29.         System.out.println(" [x] Awaiting RPC requests");  
  30.         while (true) {  
  31.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  32.             BasicProperties props = delivery.getProperties();  
  33.             BasicProperties replyProps = new BasicProperties.Builder()  
  34.                     .correlationId(props.getCorrelationId()).build();  
  35.             String message = new String(delivery.getBody());  
  36.             System.out.println(" [.] getMd5String(" + message + ")");  
  37.             String response = getMd5String(message);  
  38.             //返回处理结果队列  
  39.             channel.basicPublish("", props.getReplyTo(), replyProps,  
  40.                     response.getBytes());  
  41.             //发送应答   
  42.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
  43.         }  
  44.     }  
  45.     // 模拟RPC方法 获取MD5字符串  
  46.     public static String getMd5String(String str) {  
  47.         MessageDigest md5 = null;  
  48.         try {  
  49.             md5 = MessageDigest.getInstance("MD5");  
  50.         } catch (Exception e) {  
  51.             System.out.println(e.toString());  
  52.             e.printStackTrace();  
  53.             return "";  
  54.         }  
  55.         char[] charArray = str.toCharArray();  
  56.         byte[] byteArray = new byte[charArray.length];  
  57.   
  58.         for (int i &#61; 0; i < charArray.length; i&#43;&#43;)  
  59.             byteArray[i] &#61; (byte) charArray[i];  
  60.         byte[] md5Bytes &#61; md5.digest(byteArray);  
  61.         StringBuffer hexValue &#61; new StringBuffer();  
  62.         for (int i &#61; 0; i < md5Bytes.length; i&#43;&#43;) {  
  63.             int val &#61; ((int) md5Bytes[i]) & 0xff;  
  64.             if (val < 16)  
  65.                 hexValue.append("0");  
  66.             hexValue.append(Integer.toHexString(val));  
  67.         }  
  68.         return hexValue.toString();  
  69.     }  
  70. }  


2.客户端RPCClient.java&#xff0c;发送rpc调用消息&#xff0c;接收结果

 

 

[java] view plaincopy
print?
  1. package cn.slimsmart.rabbitmq.demo.rpc;  
  2.   
  3. import com.rabbitmq.client.AMQP;  
  4. import com.rabbitmq.client.Channel;  
  5. import com.rabbitmq.client.Connection;  
  6. import com.rabbitmq.client.ConnectionFactory;  
  7. import com.rabbitmq.client.QueueingConsumer;  
  8. import com.rabbitmq.client.AMQP.BasicProperties;  
  9.   
  10. //RPC调用客户端  
  11. public class RPCClient {  
  12.     private Connection connection;  
  13.     private Channel channel;  
  14.     private String requestQueueName &#61; "rpc_queue";  
  15.     private String replyQueueName;  
  16.     private QueueingConsumer consumer;  
  17.   
  18.     public RPCClient() throws Exception {  
  19.         //• 先建立一个连接和一个通道&#xff0c;并为回调声明一个唯一的&#39;回调&#39;队列  
  20.         ConnectionFactory factory &#61; new ConnectionFactory();  
  21.         factory.setHost("192.168.36.217");  
  22.         factory.setUsername("admin");  
  23.         factory.setPassword("admin");  
  24.         factory.setPort(AMQP.PROTOCOL.PORT);  
  25.         connection &#61; factory.newConnection();  
  26.         channel &#61; connection.createChannel();  
  27.         //• 注册&#39;回调&#39;队列&#xff0c;这样就可以收到RPC响应  
  28.         replyQueueName &#61; channel.queueDeclare().getQueue();  
  29.         consumer &#61; new QueueingConsumer(channel);  
  30.         channel.basicConsume(replyQueueName, true, consumer);  
  31.     }  
  32.   
  33.     //发送RPC请求  
  34.     public String call(String message) throws Exception {  
  35.         String response &#61; null;  
  36.         String corrId &#61; java.util.UUID.randomUUID().toString();  
  37.         //发送请求消息&#xff0c;消息使用了两个属性&#xff1a;replyto和correlationId  
  38.         BasicProperties props &#61; new BasicProperties.Builder()  
  39.                 .correlationId(corrId).replyTo(replyQueueName).build();  
  40.         channel.basicPublish("", requestQueueName, props, message.getBytes());  
  41.         //等待接收结果  
  42.         while (true) {  
  43.             QueueingConsumer.Delivery delivery &#61; consumer.nextDelivery();  
  44.             //检查它的correlationId是否是我们所要找的那个  
  45.             if (delivery.getProperties().getCorrelationId().equals(corrId)) {  
  46.                 response &#61; new String(delivery.getBody());  
  47.                 break;  
  48.             }  
  49.         }  
  50.         return response;  
  51.     }  
  52.     public void close() throws Exception {  
  53.         connection.close();  
  54.     }  
  55. }  

3、运行client主函数RPCMain.java

 

 

[java] view plaincopy
print?
  1. package cn.slimsmart.rabbitmq.demo.rpc;  
  2.   
  3. public class RPCMain {  
  4.   
  5.     public static void main(String[] args) throws Exception {  
  6.         RPCClient rpcClient &#61; new RPCClient();  
  7.         System.out.println(" [x] Requesting getMd5String(abc)");     
  8.         String response &#61; rpcClient.call("abc");  
  9.         System.out.println(" [.] Got &#39;" &#43; response &#43; "&#39;");  
  10.         rpcClient.close();  
  11.     }  
  12. }  


先运行服务端&#xff0c;再运行RPCMain&#xff0c;发送消息调用RPC。

 

这里介绍的是该设计不是实现RPC服务的唯一可能&#xff0c;但它有一些重要的优点:
1&#xff09;如果RPC服务器速度太慢&#xff0c;你可以通过运行多个RPC服务器。尝试在一个新的控制台上运行第二RPCServer。
2&#xff09;RPC客户端只发送和接收一个消息。不需要queueDeclare那样要求同步调用。因此&#xff0c;RPC客户端只需要在一个网络上发送和接收为一个单一的RPC请求。



推荐阅读
  • 优化后的标题:深入探讨网关安全:将微服务升级为OAuth2资源服务器的最佳实践
    本文深入探讨了如何将微服务升级为OAuth2资源服务器,以订单服务为例,详细介绍了在POM文件中添加 `spring-cloud-starter-oauth2` 依赖,并配置Spring Security以实现对微服务的保护。通过这一过程,不仅增强了系统的安全性,还提高了资源访问的可控性和灵活性。文章还讨论了最佳实践,包括如何配置OAuth2客户端和资源服务器,以及如何处理常见的安全问题和错误。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • Python多线程编程技巧与实战应用详解 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 开发日志:201521044091 《Java编程基础》第11周学习心得与总结
    开发日志:201521044091 《Java编程基础》第11周学习心得与总结 ... [详细]
  • 深入解析C#中app.config文件的配置与修改方法
    在C#开发过程中,经常需要对系统的配置文件进行读写操作,如系统初始化参数的修改或运行时参数的更新。本文将详细介绍如何在C#中正确配置和修改app.config文件,包括其结构、常见用法以及最佳实践。此外,还将探讨exe.config文件的生成机制及其在不同环境下的应用,帮助开发者更好地管理和维护应用程序的配置信息。 ... [详细]
  • 本文深入解析了通过JDBC实现ActiveMQ消息持久化的机制。JDBC能够将消息可靠地存储在多种关系型数据库中,如MySQL、SQL Server、Oracle和DB2等。采用JDBC持久化方式时,数据库会自动生成三个关键表:`activemq_msgs`、`activemq_lock`和`activemq_ACKS`,分别用于存储消息数据、锁定信息和确认状态。这种机制不仅提高了消息的可靠性,还增强了系统的可扩展性和容错能力。 ... [详细]
  • 本文详细介绍了 Java 中遍历 Map 对象的几种常见方法及其应用场景。首先,通过 `entrySet` 方法结合增强型 for 循环进行遍历是最常用的方式,适用于需要同时访问键和值的场景。此外,还探讨了使用 `keySet` 和 `values` 方法分别遍历键和值的技巧,以及使用迭代器(Iterator)进行更灵活的遍历操作。每种方法都附有示例代码和具体的应用实例,帮助开发者更好地理解和选择合适的遍历策略。 ... [详细]
  • Python 程序转换为 EXE 文件:详细解析 .py 脚本打包成独立可执行文件的方法与技巧
    在开发了几个简单的爬虫 Python 程序后,我决定将其封装成独立的可执行文件以便于分发和使用。为了实现这一目标,首先需要解决的是如何将 Python 脚本转换为 EXE 文件。在这个过程中,我选择了 Qt 作为 GUI 框架,因为之前对此并不熟悉,希望通过这个项目进一步学习和掌握 Qt 的基本用法。本文将详细介绍从 .py 脚本到 EXE 文件的整个过程,包括所需工具、具体步骤以及常见问题的解决方案。 ... [详细]
  • 利用 Python Socket 实现 ICMP 协议下的网络通信
    在计算机网络课程的2.1实验中,学生需要通过Python Socket编程实现一种基于ICMP协议的网络通信功能。与操作系统自带的Ping命令类似,该实验要求学生开发一个简化的、非标准的ICMP通信程序,以加深对ICMP协议及其在网络通信中的应用的理解。通过这一实验,学生将掌握如何使用Python Socket库来构建和解析ICMP数据包,并实现基本的网络探测功能。 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • 本文探讨了如何利用Java代码获取当前本地操作系统中正在运行的进程列表及其详细信息。通过引入必要的包和类,开发者可以轻松地实现这一功能,为系统监控和管理提供有力支持。示例代码展示了具体实现方法,适用于需要了解系统进程状态的开发人员。 ... [详细]
  • 使用Maven JAR插件将单个或多个文件及其依赖项合并为一个可引用的JAR包
    本文介绍了如何利用Maven中的maven-assembly-plugin插件将单个或多个Java文件及其依赖项打包成一个可引用的JAR文件。首先,需要创建一个新的Maven项目,并将待打包的Java文件复制到该项目中。通过配置maven-assembly-plugin,可以实现将所有文件及其依赖项合并为一个独立的JAR包,方便在其他项目中引用和使用。此外,该方法还支持自定义装配描述符,以满足不同场景下的需求。 ... [详细]
author-avatar
十一
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有