热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:3.6.7RabbitMQ教程六–RPC

篇首语:本文由编程笔记#小编为大家整理,主要介绍了3.6.7RabbitMQ教程六–RPC相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了3.6.7 RabbitMQ教程六 – RPC相关的知识,希望对你有一定的参考价值。



Remote procedure call (RPC)

What This Tutorial Focuses On

In the second tutorial we learned how to use Work Queues to distribute time-consuming tasks among multiple workers.

在第二篇教程中我们学到了如何使用Work Queues在多个工作端之间分配耗时任务。

But what if we need to run a function on a remote computer and wait for the result? Well, that‘s a different story. This pattern is commonly known as Remote Procedure Call or RPC.

但如果我们需要在一个远程计算机上运行一个函数并等待结果呢?嗯,这事儿就不一样了。这个模式通常被称为远程过程调用或RPC

In this tutorial we‘re going to use RabbitMQ to build an RPC system: a client and a scalable RPC server. As we don‘t have any time-consuming tasks that are worth distributing, we‘re going to create a dummy RPC service that returns Fibonacci numbers.

这篇教程,我们会使用RabbitMQ来构建一个RPC系统:一个客户端和一个可伸缩的RPC服务端。由于我们没有任何值得分配的耗时任务,所以我们会创建一个无脑的RPC服务,它返回Fibonacci数列。

Client interface

To illustrate how an RPC service could be used we‘re going to create a simple client class. It‘s going to expose a method named call which sends an RPC request and blocks until the answer is received:

为了演示一个RPC服务如何使用,我们会创建一个简单的客户端的类。它将公开一个名为call的方法,该方法发送一个RPC请求并阻塞直到收到应答

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)

A note on RPC

Although RPC is a pretty common pattern in computing, it‘s often criticised. The problems arise when a programmer is not aware whether a function call is local or if it‘s a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.

尽管RPC是计算中非常常见的模式,但它经常为人所诟病。当程序员不知道一个函数调用是本地的还是一个慢的RPC服务时,问题就出现了。像这样的混乱导致了不可预测的系统,并增加了不必要的调试的复杂性。错误使用RPC不会简化软件,反而会导致无法维护的意大利面一般的(大概意思就是一团混乱,缠在一起了)代码

Bearing that in mind, consider the following advice:

  • Make sure it‘s obvious which function call is local and which is remote.
  • Document your system. Make the dependencies between components clear.
  • Handle error cases. How should the client react when the RPC server is down for a long time?

考虑到这一点,请考虑以下建议

  • 确保让哪个函数调用是本地的,哪个是远程的这一点看上去很明显
  • 记录您的系统。明确组件之间的依赖关系
  • 处理错误案例。当RPC服务器长时间关闭时,客户端应该如何反应?

When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.

有疑问时避免使用RPC。如果可以的话,你应该使用一个异步管道 - 结果将异步推送到下一个计算阶段,而不是像RPC那样的阻塞。

Callback queue

In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response the client needs to send a ‘callback‘ queue address with the request. Let‘s try it:

一般来说,在RabbitMQ上执行RPC很容易。客户端发送请求消息,服务器用响应消息答复。为了接收响应,客户端需要随请求发送一个“callback”队列地址。我们来试试:

result = channel.queue_declare(queue=‘‘, exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange=‘‘,
routing_key=‘rpc_queue‘,
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)

Message properties

The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:

  • delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
  • content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
  • reply_to: Commonly used to name a callback queue.
  • correlation_id: Useful to correlate RPC responses with requests.

AMQP 0-9-1协议预定义了一个14个属性的集合,这个集合和消息一起发送。大多数的属性都用不到,以下这些是例外:

  • delivery_mode: 将消息标记为持久(值为2)或暂时(任何其他值)。你可能还记得第二个教程中的这个属性。
  • content_type: 用于描述编码的mime类型。比如经常使用的JSON编码,把这个属性设置为:application/json是一个很好的练习
  • reply_to:通常用来给一个callback队列命名
  • correlation_id: 有助于将RPC响应与请求关联起来
Correlation id

In the method presented above we suggest creating a callback queue for every RPC request. That‘s pretty inefficient, but fortunately there is a better way - let‘s create a single callback queue per client.

在上述方法中,我们建议为每个RPC请求创建callback队列。这相当低效,但幸运的是有更好的方法 - 让我们为每一个客户端创建一条单独的callback队列。

That raises a new issue, having received a response in that queue it‘s not clear to which request the response belongs. That‘s when the correlation_id property is used. We‘re going to set it to a unique value for every request. Later, when we receive a message in the callback queue we‘ll look at this property, and based on that we‘ll be able to match a response with a request. If we see an unknown correlation_id value, we may safely discard the message - it doesn‘t belong to our requests.

这又出现一个新问题,在队列中收到响应后,不清楚响应属于哪个请求。那是因为当correlation_id属性被使用时。我们会为每个请求设置一个唯一的值。稍后,当我们在callback队列中接收到一条消息时我们会查看这个属性,并基于此属性我们能够将一个回复与一个请求匹配。如果我们看到一个未知的correlation_id值,我们就能安全的丢弃该消息 - 它不属于我们的请求。

You may ask, why should we ignore unknown messages in the callback queue, rather than failing with an error? It‘s due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending us the answer, but before sending an acknowledgment message for the request. If that happens, the restarted RPC server will process the request again. That‘s why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.

你也许会问,为什么我们要忽略那些callback队列中的未知消息,而不是报错运行失败?这是因为服务器端可能存在竞争情况。虽然不太可能,但RPC服务端可能就在给我们发送应答之后,但在给请求发送一条确认消息之前就会死掉。如果这个发生了,重启的RPC服务端会再次处理该请求。这就是为什么在客户端我们必须优雅地处理重复的响应,而RPC从理想上来说应该是等幂的。

Summary

                                   技术图片

Our RPC will work like this:

  • When the Client starts up, it creates an anonymous exclusive callback queue.
  • For an RPC request, the Client sends a message with two properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
  • The request is sent to an rpc_queue queue.
  • The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
  • The client waits for data on the callback queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the application.

我们的RPC会像这样工作:

  • 当客户端启动,它创建一个匿名的、独占的callback队列
  • 对一个RPC请求,客户端会发送一条带有两个属性的消息:reply_to,它被设置给callback队列和correlation_id,它为每个请求设置一个唯一的值
  • RPC工作端(服务端)等待队列里的每个请求。当一条请求出现时,它会干活并发回一条有结果的消息给客户端,通过使用reply_to域内的队列
  • 客户端等待callback队列中的数据。当一条消息出现时,它检查correlation_id属性。如果它与请求中的值匹配,则会将响应返回给应用程序。

Putting it all together

rpc_server.py

import pika
cOnnection= pika.BlockingConnection(
pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘rpc_queue‘)
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
respOnse= fib(n)
ch.basic_publish(exchange=‘‘,
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=‘rpc_queue‘, on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()

The server code is rather straightforward:

  • (4) As usual we start by establishing the connection and declaring the queue.
  • (11) We declare our fibonacci function. It assumes only valid positive integer input. (Don‘t expect this one to work for big numbers, it‘s probably the slowest recursive implementation possible).
  • (19) We declare a callback for basic_consume, the core of the RPC server. It‘s executed when the request is received. It does the work and sends the response back.
  • (32) We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set the prefetch_count setting.

服务器代码相当简单:

  • (4)像往常一样,我们首先建立连接并声明队列
  • (11)我们声明Fibonacci函数。它假定只有有效的正整数输入。(不要指望它一个对大数有用,它可能是最慢的递归实现)
  • (19)我们为RPC服务器的核心basic_consume声明一个callback。它在收到请求时执行。它完成工作并将响应发送回。
  • (32)我们可能需要运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要设置prefetch_count设定。

rpc_client.py

import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.cOnnection= pika.BlockingConnection(
pika.ConnectionParameters(host=‘localhost‘))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue=‘‘, exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.respOnse= body
def call(self, n):
self.respOnse= None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange=‘‘,
routing_key=‘rpc_queue‘,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
respOnse= fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

The client code is slightly more involved:

  • (7) We establish a connection, channel and declare an exclusive ‘callback‘ queue for replies.
  • (16) We subscribe to the ‘callback‘ queue, so that we can receive RPC responses.
  • (18) The ‘on_response‘ callback executed on every response is doing a very simple job, for every response message it checks if the correlation_id is the one we‘re looking for. If so, it saves the response in self.response and breaks the consuming loop.
  • (23) Next, we define our main call method - it does the actual RPC request.
  • (24) In this method, first we generate a unique correlation_id number and save it - the ‘on_response‘ callback function will use this value to catch the appropriate response.
  • (25) Next, we publish the request message, with two properties: reply_to and correlation_id.
  • (32) At this point we can sit back and wait until the proper response arrives.
  • (33) And finally we return the response back to the user.

客户机代码稍微复杂一些:

  • (7)我们建立一个连接、通道并为回复声明一个独占的“callback”队列。
  • (16)我们订阅“callback”队列,以便接收RPC响应。
  • (18)对每个响应执行的“on_response”回调操作非常简单,它会检查每个响应消息的correlation_id是否是我们要查找的correlation_id。如果是,它会将响应保存在self.response中,并断开消费循环。
  • (23)接下来,我们定义主要的调用方法-它执行实际的RPC请求。
  • (24)在这个方法中,首先我们生成一个唯一的correlation_id号并保存它 - “on_response”回调函数将使用这个值来捕获适当的响应。
  • (25)接下来,我们发布请求消息,它有两个属性:reply_to和correlation_id。
  • (32)此时,我们可以坐下来并等待直到有合适的回应。
  • (33)最后,我们将响应返回给用户。

Our RPC service is now ready. We can start the server:

我们的rpc服务端现在就绪。我们可以启动服务端:

python rpc_server.py

To request a fibonacci number run the client:

运行客户端,请求fibonacci数列:

python rpc_client.py

The presented design is not the only possible implementation of a RPC service, but it has some important advantages:

  • If the RPC server is too slow, you can scale up by just running another one. Try running a second rpc_server.py in a new console.
  • On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queue_declare are required. As a result the RPC client needs only one network round trip for a single RPC request.

已提出的设计不是RPC服务的唯一可能实现,但它有一些重要的优点:

  • 如果RPC服务器太慢,可以通过运行另一个服务器来扩展。尝试在新控制台中运行第二个rpc_server.py
  • 在客户端,RPC只需要发送和接收一条消息。不需要queue_declare之类的同步调用。因此,RPC客户机对于单个RPC请求只需要一次网络往返。

推荐阅读
  • Codeforces 1065D 解题心得与代码实现分析 ... [详细]
  • 在启用分层编译的情况下,即时编译器(JIT)的触发条件涉及多个因素,包括方法调用频率、代码复杂度和运行时性能数据。本文将详细解析这些条件,并探讨分层编译如何优化JVM的执行效率。 ... [详细]
  • 内网渗透技术详解:PTH、PTT与PTK在域控环境中的应用及猫盘内网穿透配置
    本文深入探讨了内网渗透技术,特别是PTH、PTT与PTK在域控环境中的应用,并详细介绍了猫盘内网穿透的配置方法。通过这些技术,安全研究人员可以更有效地进行内网渗透测试,解决常见的渗透测试难题。此外,文章还提供了实用的配置示例和操作步骤,帮助读者更好地理解和应用这些技术。 ... [详细]
  • BZOJ4240 Gym 102082G:贪心算法与树状数组的综合应用
    BZOJ4240 Gym 102082G 题目 "有趣的家庭菜园" 结合了贪心算法和树状数组的应用,旨在解决在有限时间和内存限制下高效处理复杂数据结构的问题。通过巧妙地运用贪心策略和树状数组,该题目能够在 10 秒的时间限制和 256MB 的内存限制内,有效处理大量输入数据,实现高性能的解决方案。提交次数为 756 次,成功解决次数为 349 次,体现了该题目的挑战性和实际应用价值。 ... [详细]
  • 在稀疏直接法视觉里程计中,通过优化特征点并采用基于光度误差最小化的灰度图像线性插值技术,提高了定位精度。该方法通过对空间点的非齐次和齐次表示进行处理,利用RGB-D传感器获取的3D坐标信息,在两帧图像之间实现精确匹配,有效减少了光度误差,提升了系统的鲁棒性和稳定性。 ... [详细]
  • 如何在Java中高效构建WebService
    本文介绍了如何利用XFire框架在Java中高效构建WebService。XFire是一个轻量级、高性能的Java SOAP框架,能够简化WebService的开发流程。通过结合MyEclipse集成开发环境,开发者可以更便捷地进行项目配置和代码编写,从而提高开发效率。此外,文章还详细探讨了XFire的关键特性和最佳实践,为读者提供了实用的参考。 ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • 深入解析零拷贝技术(Zerocopy)及其应用优势
    零拷贝技术(Zero-copy)是Netty框架中的一个关键特性,其核心在于减少数据在操作系统内核与用户空间之间的传输次数。通过避免不必要的内存复制操作,零拷贝显著提高了数据传输的效率和性能。本文将深入探讨零拷贝的工作原理及其在实际应用中的优势,包括降低CPU负载、减少内存带宽消耗以及提高系统吞吐量等方面。 ... [详细]
  • 本文详细介绍了 Sublime Text 3 在 2021 年的激活密钥及其在线激活方法。用户可以通过提供的链接访问云海天教程,获取更多详细的激活码信息和操作步骤。此外,文章还提供了安全可靠的激活方案,帮助用户顺利激活软件,提升编程效率。 ... [详细]
  • Sublime Text 3 注册密钥及激活方法详解
    本文详细介绍了Sublime Text 3的注册密钥获取与激活方法,旨在帮助用户合法且高效地使用这款强大的文本编辑器。文章不仅提供了最新的注册密钥信息,还涵盖了详细的激活步骤,确保用户能够顺利激活软件,享受其带来的便捷与高效。此外,文中还简要对比了Sublime Text 3与其他主流文本编辑器的功能差异,为用户提供更多选择参考。 ... [详细]
  • [Offer收割]编程竞赛第8轮 A 小Ho的完美主义倾向
    题目链接:小Ho的完美主义倾向题目描述:小Ho在一条直线型的街道上漫步。这条街道由若干块长度为L的石板铺设而成,因此每隔L的距离就会出现一道石板间的接缝。小Ho对这些规律排列的接缝产生了浓厚的兴趣,他决定研究如何在这条街道上行走,以满足自己对完美路径的追求。本题要求在给定的约束条件下,计算出小Ho能够实现其目标的所有可能方案数。 ... [详细]
  • 深入解析 org.hibernate.event.spi.EventSource.getFactory() 方法及其应用实例 ... [详细]
  • 目录RPC是什么RPC的优点RPC的缺点RPC是什么RPC(RemoteProcedureCall)isaprotocolthatoneprogramcanusetorequest ... [详细]
  • Panabit应用层流量管理解决方案
    Panabit是一款国内领先的应用层流量管理解决方案,提供高度开放且免费的专业服务,尤其擅长P2P应用的精准识别与高效控制。截至2009年3月25日,该系统已实现对多种网络应用的全面支持,有效提升了网络资源的利用效率和安全性。 ... [详细]
  • 首次探索Serf:分布式服务协调工具的入门指南
    serf是出自Hashicorp的开源项目,实现了去中心化的gossip(八卦)协议,其中gossip协议定义了一种类似病毒感染的消息传播过程。一些著名的开源项目,如Docker和 ... [详细]
author-avatar
love28119_529_700
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有