一、RabbitMQ
是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
1.RabbitMQ install
1 安装配置epel源
2 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
3
4 安装erlang
5 $ yum -y install erlang
6
7 安装RabbitMQ
8 $ yum -y install rabbitmq-server
注意:service rabbitmq-server start/stop
2. Python API install
1 pip install pika
2 or
3 easy_install pika
4 or
5 源码
6 https://pypi.python.org/pypi/pika
3.基于QUEUE实现生产消费模型
1 import Queue
2 import threading
3
4
5 message = Queue.Queue(10)
6
7
8 def producer(i):
9 while True:
10 message.put(i)
11
12
13 def consumer(i):
14 while True:
15 msg = message.get()
16
17
18 for i in range(12):
19 t = threading.Thread(target=producer, args=(i,))
20 t.start()
21
22 for i in range(10):
23 t = threading.Thread(target=consumer, args=(i,))
24 t.start()
4.基于RabbitMQ
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
1 import pika
2
3 # ######################### 生产者 #########################
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.queue_declare(queue='hello')
10
11 channel.basic_publish(exchange='',
12 routing_key='hello',
13 body='Hello World!')
14 print(" [x] Sent 'Hello World!'")
15 connection.close()
16
17
18
19 # ########################## 消费者 ##########################
20 import pika
21 connection = pika.BlockingConnection(pika.ConnectionParameters(
22 host='localhost'))
23 channel = connection.channel()
24
25 channel.queue_declare(queue='hello')
26
27 def callback(ch, method, properties, body):
28 print(" [x] Received %r" % body)
29
30 channel.basic_consume(callback,
31 queue='hello',
32 no_ack=True)
33
34 print(' [*] Waiting for messages. To exit press CTRL+C')
35 channel.start_consuming()
5.消费者ack
1 import pika
2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
3 channel = connection.channel()
4 channel.queue_declare(queue='hello')
5
6
7 def callback(ch, method, properties, body):
8 print(" [x] Received %r" % body)
9
10 channel.basic_consume(callback, queue='hello', no_ack=False)
11 # no_ack: acknowledgment 消息不丢失,MQ判读出现异常,没有消费,没有ack,则把消息放回队列.
12 channel.start_consuming()
6.durable消息持久化
1 import pika
2
3 connection = pika.BlockingConnection(pika.ConnectionParameters(
4 host='127.0.0.1', port=5672))
5 channel = connection.channel()
6
7 channel.queue_declare(queue='hello1', durable=True) # 创建通道, 持久化修改1:durable=True
8
9 channel.basic_publish(exchange='',
10 routing_key='hello',
11 body='Hello World!',
12 properties=pika.BasicProperties(delivery_mode=2) # 持久化修改2
13 )
14 connection.close()
1 import pika
2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
3 channel = connection.channel()
4
5
6 def callback(ch, method, properties, body):
7 print(" [x] Received %r" % body)
8 import time
9 time.sleep(10)
10 print('ok')
11 ch.basic_ack(delivery_tag=method.delivery_tag) # 持久化:修改2
12
13 channel.basic_consume(callback,
14 queue='hello',
15 no_ack=False) # 持久化:修改1
16
17 channel.start_consuming()
7.消息获取顺序
1 import pika
2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
3 channel = connection.channel()
4
5
6 def callback(ch, method, properties, body):
7 print(" [x] Received %r" % body)
8 import time
9 time.sleep(10)
10 print('ok')
11 ch.basic_ack(delivery_tag=method.delivery_tag)
12 channel.basic_qos(prefetch_count=1) # 默认消息队列里的数据是按照顺序被消费者拿走,
13 # 例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
14 # 表示谁来谁取,不再按照奇偶数排列
15 channel.basic_consume(callback,
16 queue='hello',
17 no_ack=False)
18
19 channel.start_consuming()
8.发布订阅
exchange type = fanout
1 import pika
2 import sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='logs',
9 type='fanout')
10
11 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
12 channel.basic_publish(exchange='logs',
13 routing_key='',
14 body=message)
15 print(" [x] Sent %r" % message)
16 connection.close()
1 import pika
2
3 connection = pika.BlockingConnection(pika.ConnectionParameters(
4 host='localhost'))
5 channel = connection.channel()
6
7 channel.exchange_declare(exchange='logs',
8 type='fanout')
9
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12
13 channel.queue_bind(exchange='logs',
14 queue=queue_name)
15
16 print(' [*] Waiting for logs. To exit press CTRL+C')
17
18
19 def callback(ch, method, properties, body):
20 print(" [x] %r" % body)
21
22 channel.basic_consume(callback,
23 queue=queue_name,
24 no_ack=True)
25
26 channel.start_consuming()
9.发布订阅(关键字)
exchange type = direct
1 import pika
2 import sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='direct_logs',
9 type='direct')
10
11 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='direct_logs',
14 routing_key=severity,
15 body=message)
16 print(" [x] Sent %r:%r" % (severity, message))
17 connection.close()
1 import pika
2 import sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='direct_logs',
9 type='direct')
10
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13
14 severities = sys.argv[1:]
15 if not severities:
16 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
17 sys.exit(1)
18
19 for severity in severities:
20 channel.queue_bind(exchange='direct_logs',
21 queue=queue_name,
22 routing_key=severity)
23
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25
26 def callback(ch, method, properties, body):
27 print(" [x] %r:%r" % (method.routing_key, body))
28
29 channel.basic_consume(callback,
30 queue=queue_name,
31 no_ack=True)
32
33 channel.start_consuming()
10.发布订阅(模糊匹配)
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
- # 表示可以匹配 0 个 或 多个 单词
- * 表示只能匹配 一个 单词
发送者路由值 队列中
old.boy.python old.* -- 不匹配
old.boy.python old.# -- 匹配
1 import pika
2 import sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='topic_logs',
9 type='topic')
10
11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='topic_logs',
14 routing_key=routing_key,
15 body=message)
16 print(" [x] Sent %r:%r" % (routing_key, message))
17 connection.close()
1 import pika
2 import sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.exchange_declare(exchange='topic_logs',
9 type='topic')
10
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17 sys.exit(1)
18
19 for binding_key in binding_keys:
20 channel.queue_bind(exchange='topic_logs',
21 queue=queue_name,
22 routing_key=binding_key)
23
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25
26 def callback(ch, method, properties, body):
27 print(" [x] %r:%r" % (method.routing_key, body))
28
29 channel.basic_consume(callback,
30 queue=queue_name,
31 no_ack=True)
32
33 channel.start_consuming()
二、Memcached
1.安装API
pip3 install python-memcached
2.基本使用
import memcachemc = memcache.Client(['10.211.55.4:12000'], debug=True)
mc.set("foo", "bar")
ret = mc.get('foo')
print ret
3.支持集群
- 根据算法将 k1 转换成一个数字
- 将数字和主机列表长度求余数&#xff0c;得到一个值 N&#xff08; 0 <&#61; N <列表长度 &#xff09;
- 在主机列表中根据 第2步得到的值为索引获取主机&#xff0c;例如&#xff1a;host_list[N]
- 连接 将第3步中获取的主机&#xff0c;将 k1 &#61; "v1" 放置在该服务器的内存中
mc &#61; memcache.Client([(&#39;1.1.1.1:12000&#39;, 1), (&#39;1.1.1.2:12000&#39;, 2), (&#39;1.1.1.3:12000&#39;, 1)], debug&#61;True)mc.set(&#39;k1&#39;, &#39;v1&#39;)
4.add命令
添加一条键值对&#xff0c;如果已经存在的 key&#xff0c;重复执行add操作异常
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)
mc.add(&#39;k1&#39;, &#39;v1&#39;)
# mc.add(&#39;k1&#39;, &#39;v2&#39;) # 报错&#xff0c;对已经存在的key重复添加&#xff0c;失败&#xff01;&#xff01;&#xff01;
5.replace命令
replace 修改某个key的值&#xff0c;如果key不存在&#xff0c;则异常
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)
# 如果memcache中存在kkkk&#xff0c;则替换成功&#xff0c;否则一场
mc.replace(&#39;kkkk&#39;,&#39;999&#39;)
6.set 和 set_multi
set 设置一个键值对&#xff0c;如果key不存在&#xff0c;则创建&#xff0c;如果key存在&#xff0c;则修改&#xff01;
set_multi 设置多个键值对&#xff0c;如果key不存在&#xff0c;则创建&#xff0c;如果key存在&#xff0c;则修改&#xff01;
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)mc.set(&#39;key0&#39;, &#39;wupeiqi&#39;)mc.set_multi({&#39;key1&#39;: &#39;val1&#39;, &#39;key2&#39;: &#39;val2&#39;})
7.delete 和 delete_multi
delete 在Memcached中删除指定的一个键值对
delete_multi 在Memcached中删除指定的多个键值对
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)mc.delete(&#39;key0&#39;)
mc.delete_multi([&#39;key1&#39;, &#39;key2&#39;])
8.get 和 get_multi
get 获取一个键值对
get_multi 获取多一个键值对
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)val &#61; mc.get(&#39;key0&#39;)
item_dict &#61; mc.get_multi(["key1", "key2", "key3"])
9.append 和 prepend
append 修改指定key的值&#xff0c;在该值 后面 追加内容
prepend 修改指定key的值&#xff0c;在该值 前面 插入内容
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)
# k1 &#61; "v1"mc.append(&#39;k1&#39;, &#39;after&#39;)
# k1 &#61; "v1after"mc.prepend(&#39;k1&#39;, &#39;before&#39;)
# k1 &#61; "beforev1after"
10.decr 和 incr
incr 自增&#xff0c;将Memcached中的某一个值增加 N &#xff08; N默认为1 &#xff09;
decr 自减&#xff0c;将Memcached中的某一个值减少 N &#xff08; N默认为1 &#xff09;
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcachemc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True)
mc.set(&#39;k1&#39;, &#39;777&#39;)mc.incr(&#39;k1&#39;)
# k1 &#61; 778mc.incr(&#39;k1&#39;, 10)
# k1 &#61; 788mc.decr(&#39;k1&#39;)
# k1 &#61; 787mc.decr(&#39;k1&#39;, 10)
# k1 &#61; 777
11.gets 和 cas
如商城商品剩余个数&#xff0c;假设改值保存在memcache中&#xff0c;product_count &#61; 900
A用户刷新页面从memcache中读取到product_count &#61; 900
B用户刷新页面从memcache中读取到product_count &#61; 900
如果A、B用户均购买商品
A用户修改商品剩余个数 product_count&#xff1d;899
B用户修改商品剩余个数 product_count&#xff1d;899
如此一来缓存内的数据便不在正确&#xff0c;两个用户购买商品后&#xff0c;商品剩余还是 899
如果使用python的set和get来操作以上过程&#xff0c;那么程序就会如上述所示情况&#xff01;
如果想要避免此情况的发生&#xff0c;只要使用 gets 和 cas 即可&#xff0c;如&#xff1a;
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
mc &#61; memcache.Client([&#39;10.211.55.4:12000&#39;], debug&#61;True, cache_cas&#61;True)v &#61; mc.gets(&#39;product_count&#39;)
# ...
# 如果有人在gets之后和cas之前修改了product_count&#xff0c;那么&#xff0c;下面的设置将会执行失败&#xff0c;剖出异常&#xff0c;从而避免非正常数据的产生
mc.cas(&#39;product_count&#39;, "899")
Ps&#xff1a;本质上每次执行gets时&#xff0c;会从memcache中获取一个自增的数字&#xff0c;通过cas去修改gets的值时&#xff0c;会携带之前获取的自增值和memcache中的自增值进行比较&#xff0c;如果相等&#xff0c;则可以提交&#xff0c;如果不想等&#xff0c;那表示在gets和cas执行之间&#xff0c;又有其他人执行了gets&#xff08;获取了缓冲的指定值&#xff09;&#xff0c; 如此一来有可能出现非正常数据&#xff0c;则不允许修改。
三、Redis
1.安装API
pip3 install redis
2.功能介绍
- 连接方式
- 连接池
- 操作
- String 操作
- Hash 操作
- List 操作
- Set 操作
- Sort Set 操作
- 管道
- 发布订阅
3.基本操作
import redisr &#61; redis.Redis(host&#61;&#39;10.211.55.4&#39;, port&#61;6379)
r.set(&#39;foo&#39;, &#39;Bar&#39;)
print r.get(&#39;foo&#39;)
4.连接池
redis-py使用connection pool来管理对一个redis server的所有连接&#xff0c;避免每次建立、释放连接的开销。默认&#xff0c;每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池&#xff0c;然后作为参数Redis&#xff0c;这样就可以实现多个Redis实例共享一个连接池。
import redispool &#61; redis.ConnectionPool(host&#61;&#39;10.211.55.4&#39;, port&#61;6379)r &#61; redis.Redis(connection_pool&#61;pool)
r.set(&#39;foo&#39;, &#39;Bar&#39;)
print r.get(&#39;foo&#39;)
5.操作
参考&#xff1a;http://www.cnblogs.com/wupeiqi/articles/5132791.html