篇首语:本文由编程笔记#小编为大家整理,主要介绍了3.6.7 RabbitMQ教程六 – RPC相关的知识,希望对你有一定的参考价值。
In the second tutorial we learned how to use Work Queues to distribute time-consuming tasks among multiple workers.
在第二篇教程中我们学到了如何使用Work Queues在多个工作端之间分配耗时任务。
But what if we need to run a function on a remote computer and wait for the result? Well, that‘s a different story. This pattern is commonly known as Remote Procedure Call or RPC.
但如果我们需要在一个远程计算机上运行一个函数并等待结果呢?嗯,这事儿就不一样了。这个模式通常被称为远程过程调用或RPC
In this tutorial we‘re going to use RabbitMQ to build an RPC system: a client and a scalable RPC server. As we don‘t have any time-consuming tasks that are worth distributing, we‘re going to create a dummy RPC service that returns Fibonacci numbers.
这篇教程,我们会使用RabbitMQ来构建一个RPC系统:一个客户端和一个可伸缩的RPC服务端。由于我们没有任何值得分配的耗时任务,所以我们会创建一个无脑的RPC服务,它返回Fibonacci数列。
To illustrate how an RPC service could be used we‘re going to create a simple client class. It‘s going to expose a method named call which sends an RPC request and blocks until the answer is received:
为了演示一个RPC服务如何使用,我们会创建一个简单的客户端的类。它将公开一个名为call的方法,该方法发送一个RPC请求并阻塞直到收到应答
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)
A note on RPC
Although RPC is a pretty common pattern in computing, it‘s often criticised. The problems arise when a programmer is not aware whether a function call is local or if it‘s a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.
尽管RPC是计算中非常常见的模式,但它经常为人所诟病。当程序员不知道一个函数调用是本地的还是一个慢的RPC服务时,问题就出现了。像这样的混乱导致了不可预测的系统,并增加了不必要的调试的复杂性。错误使用RPC不会简化软件,反而会导致无法维护的意大利面一般的(大概意思就是一团混乱,缠在一起了)代码
Bearing that in mind, consider the following advice:
- Make sure it‘s obvious which function call is local and which is remote.
- Document your system. Make the dependencies between components clear.
- Handle error cases. How should the client react when the RPC server is down for a long time?
考虑到这一点,请考虑以下建议
- 确保让哪个函数调用是本地的,哪个是远程的这一点看上去很明显
- 记录您的系统。明确组件之间的依赖关系
- 处理错误案例。当RPC服务器长时间关闭时,客户端应该如何反应?
When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.
有疑问时避免使用RPC。如果可以的话,你应该使用一个异步管道 - 结果将异步推送到下一个计算阶段,而不是像RPC那样的阻塞。
In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response the client needs to send a ‘callback‘ queue address with the request. Let‘s try it:
一般来说,在RabbitMQ上执行RPC很容易。客户端发送请求消息,服务器用响应消息答复。为了接收响应,客户端需要随请求发送一个“callback”队列地址。我们来试试:
result = channel.queue_declare(queue=‘‘, exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange=‘‘,
routing_key=‘rpc_queue‘,
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
Message properties
The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
- delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
- content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
- reply_to: Commonly used to name a callback queue.
- correlation_id: Useful to correlate RPC responses with requests.
AMQP 0-9-1协议预定义了一个14个属性的集合,这个集合和消息一起发送。大多数的属性都用不到,以下这些是例外:
- delivery_mode: 将消息标记为持久(值为2)或暂时(任何其他值)。你可能还记得第二个教程中的这个属性。
- content_type: 用于描述编码的mime类型。比如经常使用的JSON编码,把这个属性设置为:application/json是一个很好的练习
- reply_to:通常用来给一个callback队列命名
- correlation_id: 有助于将RPC响应与请求关联起来
In the method presented above we suggest creating a callback queue for every RPC request. That‘s pretty inefficient, but fortunately there is a better way - let‘s create a single callback queue per client.
在上述方法中,我们建议为每个RPC请求创建callback队列。这相当低效,但幸运的是有更好的方法 - 让我们为每一个客户端创建一条单独的callback队列。
That raises a new issue, having received a response in that queue it‘s not clear to which request the response belongs. That‘s when the correlation_id property is used. We‘re going to set it to a unique value for every request. Later, when we receive a message in the callback queue we‘ll look at this property, and based on that we‘ll be able to match a response with a request. If we see an unknown correlation_id value, we may safely discard the message - it doesn‘t belong to our requests.
这又出现一个新问题,在队列中收到响应后,不清楚响应属于哪个请求。那是因为当correlation_id属性被使用时。我们会为每个请求设置一个唯一的值。稍后,当我们在callback队列中接收到一条消息时我们会查看这个属性,并基于此属性我们能够将一个回复与一个请求匹配。如果我们看到一个未知的correlation_id值,我们就能安全的丢弃该消息 - 它不属于我们的请求。
You may ask, why should we ignore unknown messages in the callback queue, rather than failing with an error? It‘s due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending us the answer, but before sending an acknowledgment message for the request. If that happens, the restarted RPC server will process the request again. That‘s why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.
你也许会问,为什么我们要忽略那些callback队列中的未知消息,而不是报错运行失败?这是因为服务器端可能存在竞争情况。虽然不太可能,但RPC服务端可能就在给我们发送应答之后,但在给请求发送一条确认消息之前就会死掉。如果这个发生了,重启的RPC服务端会再次处理该请求。这就是为什么在客户端我们必须优雅地处理重复的响应,而RPC从理想上来说应该是等幂的。
Our RPC will work like this:
我们的RPC会像这样工作:
rpc_server.py
import pika
cOnnection= pika.BlockingConnection(
pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘rpc_queue‘)
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
respOnse= fib(n)
ch.basic_publish(exchange=‘‘,
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=‘rpc_queue‘, on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
The server code is rather straightforward:
服务器代码相当简单:
rpc_client.py
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.cOnnection= pika.BlockingConnection(
pika.ConnectionParameters(host=‘localhost‘))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue=‘‘, exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.respOnse= body
def call(self, n):
self.respOnse= None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange=‘‘,
routing_key=‘rpc_queue‘,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
respOnse= fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
The client code is slightly more involved:
客户机代码稍微复杂一些:
Our RPC service is now ready. We can start the server:
我们的rpc服务端现在就绪。我们可以启动服务端:
python rpc_server.py
To request a fibonacci number run the client:
运行客户端,请求fibonacci数列:
python rpc_client.py
The presented design is not the only possible implementation of a RPC service, but it has some important advantages:
已提出的设计不是RPC服务的唯一可能实现,但它有一些重要的优点: