热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

python16_day11【MQ、Redis、Memcache】

一、RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循MozillaPublicLicense开源协议。MQ全称为MessageQueue,消

一、RabbitMQ

  是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

 

  1.RabbitMQ install

1 安装配置epel源
2 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
3
4 安装erlang
5 $ yum -y install erlang
6
7 安装RabbitMQ
8 $ yum -y install rabbitmq-server
  

注意:service rabbitmq-server start/stop

  2. Python API install  

1 pip install pika
2 or
3 easy_install pika
4 or
5 源码
6 https://pypi.python.org/pypi/pika

  3.基于QUEUE实现生产消费模型

1 import Queue
2 import threading
3
4
5 message = Queue.Queue(10)
6
7
8 def producer(i):
9 while True:
10 message.put(i)
11
12
13 def consumer(i):
14 while True:
15 msg = message.get()
16
17
18 for i in range(12):
19 t = threading.Thread(target=producer, args=(i,))
20 t.start()
21
22 for i in range(10):
23 t = threading.Thread(target=consumer, args=(i,))
24 t.start()

  4.基于RabbitMQ

  对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

1 import pika
2
3 # ######################### 生产者 #########################
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.queue_declare(queue='hello')
10
11 channel.basic_publish(exchange='',
12 routing_key='hello',
13 body='Hello World!')
14 print(" [x] Sent 'Hello World!'")
15 connection.close()
16
17
18
19 # ########################## 消费者 ##########################
20 import pika
21 connection = pika.BlockingConnection(pika.ConnectionParameters(
22 host='localhost'))
23 channel = connection.channel()
24
25 channel.queue_declare(queue='hello')
26
27 def callback(ch, method, properties, body):
28 print(" [x] Received %r" % body)
29
30 channel.basic_consume(callback,
31 queue='hello',
32 no_ack=True)
33
34 print(' [*] Waiting for messages. To exit press CTRL+C')
35 channel.start_consuming()

  5.消费者ack 

1 import pika
2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
3 channel = connection.channel()
4 channel.queue_declare(queue='hello')
5
6
7 def callback(ch, method, properties, body):
8 print(" [x] Received %r" % body)
9
10 channel.basic_consume(callback, queue='hello', no_ack=False)
11 # no_ack: acknowledgment 消息不丢失,MQ判读出现异常,没有消费,没有ack,则把消息放回队列.
12 channel.start_consuming()

消息ack

  6.durable消息持久化

1 import pika
2
3 connection = pika.BlockingConnection(pika.ConnectionParameters(
4 host='127.0.0.1', port=5672))
5 channel = connection.channel()
6
7 channel.queue_declare(queue='hello1', durable=True) # 创建通道, 持久化修改1:durable=True
8
9 channel.basic_publish(exchange='',
10 routing_key='hello',
11 body='Hello World!',
12 properties=pika.BasicProperties(delivery_mode=2) # 持久化修改2
13 )
14 connection.close()

生产者

1 import pika
2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
3 channel = connection.channel()
4
5
6 def callback(ch, method, properties, body):
7 print(" [x] Received %r" % body)
8 import time
9 time.sleep(10)
10 print('ok')
11 ch.basic_ack(delivery_tag=method.delivery_tag) # 持久化:修改2
12
13 channel.basic_consume(callback,
14 queue='hello',
15 no_ack=False) # 持久化:修改1
16
17 channel.start_consuming()

消费者

  7.消息获取顺序

1 import pika
2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
3 channel = connection.channel()
4
5
6 def callback(ch, method, properties, body):
7 print(" [x] Received %r" % body)
8 import time
9 time.sleep(10)
10 print('ok')
11 ch.basic_ack(delivery_tag=method.delivery_tag)
12 channel.basic_qos(prefetch_count=1) # 默认消息队列里的数据是按照顺序被消费者拿走,
13 # 例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
14 # 表示谁来谁取,不再按照奇偶数排列
15 channel.basic_consume(callback,
16 queue='hello',
17 no_ack=False)
18
19 channel.start_consuming()

消费者

  8.发布订阅

  exchange type = fanout

1 import pika
2 import sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='logs',
9 type='fanout')
10
11 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
12 channel.basic_publish(exchange='logs',
13 routing_key='',
14 body=message)
15 print(" [x] Sent %r" % message)
16 connection.close()

发布者

1 import pika
2
3 connection = pika.BlockingConnection(pika.ConnectionParameters(
4 host='localhost'))
5 channel = connection.channel()
6
7 channel.exchange_declare(exchange='logs',
8 type='fanout')
9
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12
13 channel.queue_bind(exchange='logs',
14 queue=queue_name)
15
16 print(' [*] Waiting for logs. To exit press CTRL+C')
17
18
19 def callback(ch, method, properties, body):
20 print(" [x] %r" % body)
21
22 channel.basic_consume(callback,
23 queue=queue_name,
24 no_ack=True)
25
26 channel.start_consuming()

订阅者

  9.发布订阅(关键字)

  exchange type = direct

1 import pika
2 import sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='direct_logs',
9 type='direct')
10
11 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='direct_logs',
14 routing_key=severity,
15 body=message)
16 print(" [x] Sent %r:%r" % (severity, message))
17 connection.close()

发布者

1 import pika
2 import sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='direct_logs',
9 type='direct')
10
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13
14 severities = sys.argv[1:]
15 if not severities:
16 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
17 sys.exit(1)
18
19 for severity in severities:
20 channel.queue_bind(exchange='direct_logs',
21 queue=queue_name,
22 routing_key=severity)
23
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25
26 def callback(ch, method, properties, body):
27 print(" [x] %r:%r" % (method.routing_key, body))
28
29 channel.basic_consume(callback,
30 queue=queue_name,
31 no_ack=True)
32
33 channel.start_consuming()

订阅者

  10.发布订阅(模糊匹配)

  exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • *  表示只能匹配 一个 单词

发送者路由值 队列中
old.boy.python old.* -- 不匹配
old.boy.python old.# -- 匹配

1 import pika
2 import sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='topic_logs',
9 type='topic')
10
11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='topic_logs',
14 routing_key=routing_key,
15 body=message)
16 print(" [x] Sent %r:%r" % (routing_key, message))
17 connection.close()

发布者

1 import pika
2 import sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='topic_logs',
9 type='topic')
10
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17 sys.exit(1)
18
19 for binding_key in binding_keys:
20 channel.queue_bind(exchange='topic_logs',
21 queue=queue_name,
22 routing_key=binding_key)
23
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25
26 def callback(ch, method, properties, body):
27 print(" [x] %r:%r" % (method.routing_key, body))
28
29 channel.basic_consume(callback,
30 queue=queue_name,
31 no_ack=True)
32
33 channel.start_consuming()

订阅者

 

二、Memcached

  1.安装API

    pip3 install python-memcached

  2.基本使用

import memcachemc = memcache.Client(['10.211.55.4:12000'], debug=True)
mc.set("foo", "bar")
ret = mc.get('foo')
print ret

  3.支持集群

  • 根据算法将 k1 转换成一个数字
  • 将数字和主机列表长度求余数&#xff0c;得到一个值 N&#xff08; 0 <&#61; N <列表长度 &#xff09;
  • 在主机列表中根据 第2步得到的值为索引获取主机&#xff0c;例如&#xff1a;host_list[N]
  • 连接 将第3步中获取的主机&#xff0c;将 k1 &#61; "v1" 放置在该服务器的内存中

mc &#61; memcache.Client([(&#39;1.1.1.1:12000&#39;, 1), (&#39;1.1.1.2:12000&#39;, 2), (&#39;1.1.1.3:12000&#39;, 1)], debug&#61;True)mc.set(&#39;k1&#39;, &#39;v1&#39;)

  4.add命令

  添加一条键值对&#xff0c;如果已经存在的 key&#xff0c;重复执行add操作异常

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)
mc.add(&#39;k1&#39;, &#39;v1&#39;)
# mc.add(&#39;k1&#39;, &#39;v2&#39;) # 报错&#xff0c;对已经存在的key重复添加&#xff0c;失败&#xff01;&#xff01;&#xff01;

  5.replace命令

  replace 修改某个key的值&#xff0c;如果key不存在&#xff0c;则异常

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)
# 如果memcache中存在kkkk&#xff0c;则替换成功&#xff0c;否则一场
mc.replace(&#39;kkkk&#39;,&#39;999&#39;)

  6.set 和 set_multi

  set            设置一个键值对&#xff0c;如果key不存在&#xff0c;则创建&#xff0c;如果key存在&#xff0c;则修改&#xff01;
  set_multi   设置多个键值对&#xff0c;如果key不存在&#xff0c;则创建&#xff0c;如果key存在&#xff0c;则修改&#xff01;

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)mc.set(&#39;key0&#39;, &#39;wupeiqi&#39;)mc.set_multi({&#39;key1&#39;: &#39;val1&#39;, &#39;key2&#39;: &#39;val2&#39;})

  7.delete 和 delete_multi

  delete             在Memcached中删除指定的一个键值对
  delete_multi    在Memcached中删除指定的多个键值对

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)mc.delete(&#39;key0&#39;)
mc.delete_multi([&#39;key1&#39;, &#39;key2&#39;])

  8.get 和 get_multi

  get            获取一个键值对
  get_multi   获取多一个键值对

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)val &#61; mc.get(&#39;key0&#39;)
item_dict &#61; mc.get_multi(["key1", "key2", "key3"])

  9.append 和 prepend

  append    修改指定key的值&#xff0c;在该值 后面 追加内容
  prepend   修改指定key的值&#xff0c;在该值 前面 插入内容

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)
# k1 &#61; "v1"mc.append(&#39;k1&#39;, &#39;after&#39;)
# k1 &#61; "v1after"mc.prepend(&#39;k1&#39;, &#39;before&#39;)
# k1 &#61; "beforev1after"

  10.decr 和 incr

  incr  自增&#xff0c;将Memcached中的某一个值增加 N &#xff08; N默认为1 &#xff09;
  decr 自减&#xff0c;将Memcached中的某一个值减少 N &#xff08; N默认为1 &#xff09;

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)
mc.set(&#39;k1&#39;, &#39;777&#39;)mc.incr(&#39;k1&#39;)
# k1 &#61; 778mc.incr(&#39;k1&#39;, 10)
# k1 &#61; 788mc.decr(&#39;k1&#39;)
# k1 &#61; 787mc.decr(&#39;k1&#39;, 10)
# k1 &#61; 777

  11.gets 和 cas

   如商城商品剩余个数&#xff0c;假设改值保存在memcache中&#xff0c;product_count &#61; 900

  A用户刷新页面从memcache中读取到product_count &#61; 900
  B用户刷新页面从memcache中读取到product_count &#61; 900

  如果A、B用户均购买商品

  A用户修改商品剩余个数 product_count&#xff1d;899
  B用户修改商品剩余个数 product_count&#xff1d;899

  如此一来缓存内的数据便不在正确&#xff0c;两个用户购买商品后&#xff0c;商品剩余还是 899
  如果使用python的set和get来操作以上过程&#xff0c;那么程序就会如上述所示情况&#xff01;

  如果想要避免此情况的发生&#xff0c;只要使用 gets 和 cas 即可&#xff0c;如&#xff1a;

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
mc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True, cache_cas&#61;True)v &#61; mc.gets(&#39;product_count&#39;)
# ...
# 如果有人在gets之后和cas之前修改了product_count&#xff0c;那么&#xff0c;下面的设置将会执行失败&#xff0c;剖出异常&#xff0c;从而避免非正常数据的产生
mc.cas(&#39;product_count&#39;, "899")

  Ps&#xff1a;本质上每次执行gets时&#xff0c;会从memcache中获取一个自增的数字&#xff0c;通过cas去修改gets的值时&#xff0c;会携带之前获取的自增值和memcache中的自增值进行比较&#xff0c;如果相等&#xff0c;则可以提交&#xff0c;如果不想等&#xff0c;那表示在gets和cas执行之间&#xff0c;又有其他人执行了gets&#xff08;获取了缓冲的指定值&#xff09;&#xff0c; 如此一来有可能出现非正常数据&#xff0c;则不允许修改。

 

三、Redis

  1.安装API

    pip3 install redis

  2.功能介绍

  • 连接方式
  • 连接池
  • 操作
    • String 操作
    • Hash 操作
    • List 操作
    • Set 操作
    • Sort Set 操作
  • 管道
  • 发布订阅

  3.基本操作

import redisr &#61; redis.Redis(host&#61;&#39;10.211.55.4&#39;, port&#61;6379)
r.set(&#39;foo&#39;, &#39;Bar&#39;)
print r.get(&#39;foo&#39;)

  4.连接池

  redis-py使用connection pool来管理对一个redis server的所有连接&#xff0c;避免每次建立、释放连接的开销。默认&#xff0c;每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池&#xff0c;然后作为参数Redis&#xff0c;这样就可以实现多个Redis实例共享一个连接池。

import redispool &#61; redis.ConnectionPool(host&#61;&#39;10.211.55.4&#39;, port&#61;6379)r &#61; redis.Redis(connection_pool&#61;pool)
r.set(&#39;foo&#39;, &#39;Bar&#39;)
print r.get(&#39;foo&#39;)

  5.操作

    参考&#xff1a;http://www.cnblogs.com/wupeiqi/articles/5132791.html

 

转:https://www.cnblogs.com/weibiao/p/6664553.html



推荐阅读
  • celery 爬虫使用
    简介celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它由三部分组成,消息中间件, ... [详细]
  • python中安装并使用redis相关的知识
    本文介绍了在python中安装并使用redis的相关知识,包括redis的数据缓存系统和支持的数据类型,以及在pycharm中安装redis模块和常用的字符串操作。 ... [详细]
  • 由PHP转让Memcahce,首先,需要在server安装Memcache,如何安装Memcache这不是本文的重点,大约m ... [详细]
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • Python实现变声器功能(萝莉音御姐音)的方法及步骤
    本文介绍了使用Python实现变声器功能(萝莉音御姐音)的方法及步骤。首先登录百度AL开发平台,选择语音合成,创建应用并填写应用信息,获取Appid、API Key和Secret Key。然后安装pythonsdk,可以通过pip install baidu-aip或python setup.py install进行安装。最后,书写代码实现变声器功能,使用AipSpeech库进行语音合成,可以设置音量等参数。 ... [详细]
  • Centos7.6安装Gitlab教程及注意事项
    本文介绍了在Centos7.6系统下安装Gitlab的详细教程,并提供了一些注意事项。教程包括查看系统版本、安装必要的软件包、配置防火墙等步骤。同时,还强调了使用阿里云服务器时的特殊配置需求,以及建议至少4GB的可用RAM来运行GitLab。 ... [详细]
  • 基于dlib的人脸68特征点提取(眨眼张嘴检测)python版本
    文章目录引言开发环境和库流程设计张嘴和闭眼的检测引言(1)利用Dlib官方训练好的模型“shape_predictor_68_face_landmarks.dat”进行68个点标定 ... [详细]
  • 网络请求模块选择——axios框架的基本使用和封装
    本文介绍了选择网络请求模块axios的原因,以及axios框架的基本使用和封装方法。包括发送并发请求的演示,全局配置的设置,创建axios实例的方法,拦截器的使用,以及如何封装和请求响应劫持等内容。 ... [详细]
  • Python操作MySQL(pymysql模块)详解及示例代码
    本文介绍了使用Python操作MySQL数据库的方法,详细讲解了pymysql模块的安装和连接MySQL数据库的步骤,并提供了示例代码。内容涵盖了创建表、插入数据、查询数据等操作,帮助读者快速掌握Python操作MySQL的技巧。 ... [详细]
  • 开源Keras Faster RCNN模型介绍及代码结构解析
    本文介绍了开源Keras Faster RCNN模型的环境需求和代码结构,包括FasterRCNN源码解析、RPN与classifier定义、data_generators.py文件的功能以及损失计算。同时提供了该模型的开源地址和安装所需的库。 ... [详细]
  • Python使用Pillow包生成验证码图片的方法
    本文介绍了使用Python中的Pillow包生成验证码图片的方法。通过随机生成数字和符号,并添加干扰象素,生成一幅验证码图片。需要配置好Python环境,并安装Pillow库。代码实现包括导入Pillow包和随机模块,定义随机生成字母、数字和字体颜色的函数。 ... [详细]
  • 本文介绍了在Windows系统下安装Python、setuptools、pip和virtualenv的步骤,以及安装过程中需要注意的事项。详细介绍了Python2.7.4和Python3.3.2的安装路径,以及如何使用easy_install安装setuptools。同时提醒用户在安装完setuptools后,需要继续安装pip,并注意不要将Python的目录添加到系统的环境变量中。最后,还介绍了通过下载ez_setup.py来安装setuptools的方法。 ... [详细]
  • Python已成为全球最受欢迎的编程语言之一,然而Python程序的安全运行存在一定的风险。本文介绍了Python程序安全运行需要满足的三个条件,即系统路径上的每个条目都处于安全的位置、"主脚本"所在的目录始终位于系统路径中、若python命令使用-c和-m选项,调用程序的目录也必须是安全的。同时,文章还提出了一些预防措施,如避免将下载文件夹作为当前工作目录、使用pip所在路径而不是直接使用python命令等。对于初学Python的读者来说,这些内容将有所帮助。 ... [详细]
  • 通过Anaconda安装tensorflow,并安装运行spyder编译器的完整教程
    本文提供了一个完整的教程,介绍了如何通过Anaconda安装tensorflow,并安装运行spyder编译器。文章详细介绍了安装Anaconda、创建tensorflow环境、安装GPU版本tensorflow、安装和运行Spyder编译器以及安装OpenCV等步骤。该教程适用于Windows 8操作系统,并提供了相关的网址供参考。通过本教程,读者可以轻松地安装和配置tensorflow环境,以及运行spyder编译器进行开发。 ... [详细]
  • 源码包安装RabbitMQ3.6
    先安装erlang依赖,也是门编程语言,下载源码包地址:https:www.erlang.orgdownloads20.1首先先安装个依赖 ... [详细]
author-avatar
mobiledu2502857147
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有