作者:是远航呀 | 来源:互联网 | 2023-06-18 16:02
sudoaptinstallerlang-ysudoaptupdate&&sudoaptinstallwget-ysudoaptinstallapt-transport-https
sudo apt install erlang -y
sudo apt update && sudo apt install wget -y
sudo apt install apt-transport-https -y
wget -O- https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc | sudo apt-key add -
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
echo "deb https://dl.bintray.com/rabbitmq-erlang/debian focal erlang-22.x" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
echo "deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang-22.x" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt install rabbitmq-server -y
pip install pika
实际应用的一个例子,可参考
RabbitMQ 简介及例子,可以运行。
send.py
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)
channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
connection.close()
receive.py
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(
queue=‘hello‘, on_message_callback=callback, auto_ack=True)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
参考 Python使用RabbitMQ(AMQP)极简例子,可以通过网页来监视rabbitmq。
Python使用RabbitMQ实现RPC调用示例,我已经验证通过,不过rpcclient.py在sublime中正常,在命令行下有错,暂时找不到原因。
rpcserver.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pika
def fib(n):
if(n == 0):
return 0
if(n == 1):
return 1
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
# 执行方法
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
# 返回消息给客户端
ch.basic_publish(
exchange=‘‘,
routing_key=props.reply_to, # 消息发送队列
body=str(response),
properties=pika.BasicProperties(
correlation_id=props.correlation_id
)
)
ch.basic_ack(delivery_tag=method.delivery_tag) # 任务完成,告诉客户端
if __name__ == "__main__":
params = pika.ConnectionParameters(host=‘localhost‘)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue=‘rpc_queue‘) # 指定一个队列
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=‘rpc_queue‘,
on_message_callback=on_request
)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
rpcclient.py:
# coding=utf-8
import pika
import uuid
# import settings
class RPCClient:
def __init__(self):
params = pika.ConnectionParameters(host=‘localhost‘)
self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()
result = self.channel.queue_declare( # 定义接收返回值队列
queue=str(uuid.uuid4()),
exclusive=True
)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response
)
# 调用远程方法
def call(self, n):
self.response = None
# 调用远程方法
self.corr_id = str(uuid.uuid4()) # 调用唯一标识
self.channel.basic_publish(
exchange=‘‘,
routing_key=‘rpc_queue‘, # 消息发送队列
properties=pika.BasicProperties(
correlation_id=self.corr_id,
reply_to=self.callback_queue
),
body=str(n)
)
# 等待响应
while self.response is None:
self.connection.process_data_events() # 非阻塞版的start_consuming()
return int(self.response)
# 接收到消息后调用
def on_response(self, ch, method, props, body):
# 如果收到的ID和本机生成的相同,则返回的结果就是我想要的指令返回的结果
if(self.corr_id == props.correlation_id):
self.response = body
if __name__ == "__main__":
client = RPCClient()
print(" [x] Requesting fib(7)")
response = client.call(7)
print(" [.] Got %r" % response)