多线程基础概念
并行与并发
并行:同时处理多个任务,必须在多核环境下
一段时间内同时处理多个任务,单核也可以并发
并发手段
线程:内核空间的调度
进程:内核空间的调度
协程:用户空间的调度
线程可以允许程序在同一进程空间中并发运行多个操作。本次主要介绍Python标准库中的多线程模块threading。
threading模块
线程初始化
使用threading模块的Thread类初始化对象然后调用start方法启动线程。
import threading
import time
def worker(num):
time.sleep(1)
print('worker-{}'.format(num))
# 创建线程对象 target参数是一个函数, 这个函数即线程要执行的逻辑
threads = [threading.Thread(target=worker, args=(i, ))for i in range(5)]
for t in threads:
t.start()
# start 方法启动一个线程, 当这个线程的逻辑执行完毕的时候,线程自动退出, Python 没有提供主动退出线程的方法
# 输出以下结果
worker-0worker-1worker-2worker-3
worker-4
初始化的五个线程的执行逻辑中的print方法打印字符串及换行符出现了随机分布,即出现了资源竞争。
给线程传递参数
import threading
import time
def worker(*args, **kwargs):
time.sleep(1)
print(args)
print(kwargs)
threads = threading.Thread(target=worker, args=(1, 2, 3), kwargs={'a':'b'}).start()
# 输出
(1, 2, 3)
{'a': 'b'}
args传递位置参数,kwargs传递关键字参数。
Thread常用参数和方法
>>> help(threading.Thread)
可以看到Thread函数的初始化方法中的参数如下:
| __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
| This constructor should always be called with keyword arguments. Arguments are:
|
| *group* should be None; reserved for future extension when a ThreadGroup
| class is implemented.
|
| *target* is the callable object to be invoked by the run()
| method. Defaults to None, meaning nothing is called.
|
| *name* is the thread name. By default, a unique name is constructed of
| the form "Thread-N" where N is a small decimal number.
|
| *args* is the argument tuple for the target invocation. Defaults to ().
|
| *kwargs* is a dictionary of keyword arguments for the target
| invocation. Defaults to {}.
name
表示线程名称,默认情况下,线程名称是Thread-N,N是一个较小的十进制数。我们可以传递name参数,控制线程名称。
以下会导入logging模块来显示线程的名称等详细信息
import threading
import time
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
def worker(num):
time.sleep(1)
logging.info('worker-{}'.format(num))
threads = [threading.Thread(target=worker, args=(i, ), name='workerthread-{}'.format(i)) for i in range(5)]
for t in threads:
t.start()
# 输出
2017-03-20 21:39:29,339 INFO [workerthread-0] worker-0
2017-03-20 21:39:29,340 INFO [workerthread-1] worker-1
2017-03-20 21:39:29,340 INFO [workerthread-2] worker-2
2017-03-20 21:39:29,340 INFO [workerthread-3] worker-3
2017-03-20 21:39:29,346 INFO [workerthread-4] worker-4
其中logging模块的basicConfig函数的format中的%(threadName)s就是用来输出当前线程的名称的。
线程可以重名, 线程名并不是线程的唯一标识,但是通常应该避免线程重名,通常的处理手段是加前缀
daemon
Daemon:守护
和Daemon线程相对应的还有Non-Daemon线程,在此Thread初始化函数中的daemon参数即表示线程是否是Daemon线程。
Daemon线程:会伴随主线程结束而结束(可以理解为主线程结束,守护线程结束)
Non-Daemon线程:不会随着主线程结束而结束,主线程需要等待Non-Daemon结束
import logging
import time
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
def worker():
logging.info('starting')
time.sleep(2)
logging.info('stopping')
if __name__ == '__main__':
logging.info('starting')
t1 = threading.Thread(target=worker, name='worker1', daemon=False)
t1.start()
time.sleep(1)
t2 = threading.Thread(target=worker, name='worker2', daemon=True)
t2.start()
logging.info('stopping')
# 输出
2017-03-20 23:28:06,404 INFO [MainThread] starting
2017-03-20 23:28:06,436 INFO [worker1] starting
2017-03-20 23:28:07,492 INFO [worker2] starting
2017-03-20 23:28:07,492 INFO [MainThread] stopping # 主线程执行完成
2017-03-20 23:28:08,439 INFO [worker1] stopping # 主线程执行完成之后会等Non-Daemon线程执行完成,但是并不会等Daemon线程执行完成,即Daemon线程会随着主线程执行完成而释放
Thread.join()
如果想等Daemon线程执行完成之后主线程再退出,可以使用线程对象的join()方法
import logging
import time
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
def worker():
logging.info('starting')
time.sleep(2)
logging.info('stopping')
if __name__ == '__main__':
logging.info('starting')
t1 = threading.Thread(target=worker, name='worker1', daemon=False)
t1.start()
time.sleep(1)
t2 = threading.Thread(target=worker, name='worker2', daemon=True)
t2.start()
logging.info('stopping')
t1.join()
t2.join()
# 输出
2017-03-20 23:41:07,217 INFO [MainThread] starting
2017-03-20 23:41:07,243 INFO [worker1] starting
2017-03-20 23:41:08,245 INFO [worker2] starting
2017-03-20 23:41:08,246 INFO [MainThread] stopping
2017-03-20 23:41:09,243 INFO [worker1] stopping
2017-03-20 23:41:10,248 INFO [worker2] stopping
使用join函数只有主线程就需要等待Daemon线程执行完成在推出。
join函数的原型:join(self, timeout=None)
join方法会阻塞直到线程退出或者超时, timeout 是可选的,如果不设置timeout, 会一直等待线程退出。如果设置了timeout,会在超时之后退出或者线程执行完成退出。
因为join函数总是返回None,因此在超时时间到达之后如果要知道线程是否还是存活的,可以调用is_alive()方法判断线程是否存活。
threading常用方法
enumerate()
列出当前所有的存活的线程
>>> threading.enumerate()
[<_mainthread started>, , ]
local()
import logging
import threading
logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)
ctx &#61; threading.local()
ctx.data &#61; 5
data &#61; &#39;a&#39;
def worker():
logging.info(data)
logging.info(ctx.data)
worker()
threading.Thread(target&#61;worker).start()
# 输出
2017-03-21 00:02:08,102 INFO [MainThread] a
2017-03-21 00:02:08,113 INFO [MainThread] 5
2017-03-21 00:02:08,119 INFO [Thread-34] a
Exception in thread Thread-34:
Traceback (most recent call last):
File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "", line 7, in worker
logging.info(ctx.data)
AttributeError: &#39;_thread._local&#39; object has no attribute &#39;data&#39;
线程共享内存、状态和资源。但是thread模块的local类的对象的属性&#xff0c; 只在当前线程可见。
Thread类的派生
Python中可以通过继承 Thread 类并重写 run 方法来编写多线程的逻辑&#xff0c;此时逻辑函数就是run。
class mythread(threading.Thread):
def run(self):
print(&#39;mythread run&#39;)
t &#61; mythread()
t.run() # 输出mythread run
t.start() # 输出mythread run
通过继承方式派生而来的子类对象可以同时执行start方法和run方法&#xff0c;结果是一样的&#xff0c;都是执行子类的run方法。但是非继承的方式不能同时使用start方法和run方法&#xff0c;会报错。
派生时逻辑函数的参数传递
class mythread(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__() # 需要调用父类的初始化方法初始化
self.args &#61; args
self.kwargs &#61; kwargs
def run(self):
print(&#39;mythread run&#39;, self.args, self.kwargs)
t &#61; mythread(1, 2, 3, a&#61;&#39;b&#39;)
t.start() # 输出mythread run (1, 2, 3) {&#39;a&#39;: &#39;b&#39;}
Timer类
Timer类&#xff1a;Thread类的派生类&#xff0c;也在threading模块中。意为定时器&#xff0c;用作线程的延迟执行。
>>> help(threading.Timer)
Timer类的初始化方法&#xff1a;__init__(self, interval, function, args&#61;None, kwargs&#61;None)
interval&#xff1a;时间间隔&#xff0c;即几秒之后开始执行function
function&#xff1a;线程执行的逻辑函数
args&#xff1a;位置参数
kwargs&#xff1a;关键字参数
代码
import threading
import time
import logging
logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)
def worker():
logging.info(&#39;worker running&#39;)
t1 &#61; threading.Timer(interval&#61;3, function&#61;worker)
t2 &#61; threading.Timer(interval&#61;3, function&#61;worker)
t1.setName(&#39;t1&#39;)
t2.setName(&#39;t2&#39;)
logging.info(&#39;start&#39;)
t1.start()
t2.start()
time.sleep(2)
logging.info(&#39;canceling {}&#39;.format(t1.name))
t1.cancel() # 2s之后仍然可以取消t1
logging.info(&#39;end&#39;)
# 输出
2017-03-21 19:28:52,801 INFO [MainThread] start
2017-03-21 19:28:54,811 INFO [MainThread] canceling t1
2017-03-21 19:28:54,819 INFO [MainThread] end
2017-03-21 19:28:55,808 INFO [t2] worker running
Timer.cancel()&#xff1a;取消仍然存活的定时器&#xff0c;如果定时器已经开始执行function&#xff0c;则无法取消。
Timer.setDaemon(True)&#xff1a;设置定时器为守护线程
线程同步
当使用多个线程来访问同一个数据时&#xff0c;会经常出现资源争用等线程安全问题(比如多个线程都在操作同一数据导致数据不一致)&#xff0c;这时候我们就可以使用一些同步技术来解决这类问题。比如Event&#xff0c;Lock&#xff0c;Condition&#xff0c;Barrier&#xff0c;Semaphore等等。
Event
>>> help(threading.Event)
Event对象内置一个标志&#xff0c;这个标志可以由set()方法和clear()方法设定。线程可以使用wait()方法进行阻塞等待&#xff0c;知道Event对象内置标志被set。
clear(self)&#xff1a;设置内置标志为False
set(self)&#xff1a;设置内置标志为True
wait(self, timeout&#61;None)&#xff1a;开始阻塞&#xff0c;直到内置标志被设置为True(即wait会阻塞线程直到set方法被调用或者超时)
is_set(self)&#xff1a;当且仅当内置标志为True的时候返回True
代码
以下代码实现的逻辑是&#xff1a;一个boss和五个睡觉工人&#xff0c;只要有一个工人完成了睡觉任务&#xff0c;那么就唤醒boss和其他工人。
import datetime
import threading
import logging
import random
logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)
def worker(event: threading.Event):
s &#61; random.randint(1, 5)
event.wait(s) # wait方法而不使用sleep方法&#xff0c;可以让其他工人收到通知后不再等待
logging.info(&#39;sleep {}&#39;.format(s))
event.set()
def boss(event:threading.Event):
start &#61; datetime.datetime.now()
event.wait()
end &#61; datetime.datetime.now()
logging.info(&#39;that boss exit takes {}s&#39;.format(end - start))
def start():
event &#61; threading.Event()
b &#61; threading.Thread(target&#61;boss, args&#61;(event, ), name&#61;&#39;boss&#39;)
b.start()
for i in range(5):
t &#61; threading.Thread(target&#61;worker, args&#61;(event, ), name&#61;&#39;worker-{}&#39;.format(i))
t.start()
执行start()方法&#xff0c;测试结果
>>> start()
2017-03-21 21:20:17,195 INFO [worker-2] sleep 1
2017-03-21 21:20:17,198 INFO [boss] that boss exit takes 0:00:01.004954s
2017-03-21 21:20:17,199 INFO [worker-0] sleep 2
2017-03-21 21:20:17,199 INFO [worker-1] sleep 3
2017-03-21 21:20:17,199 INFO [worker-3] sleep 2
2017-03-21 21:20:17,198 INFO [worker-4] sleep 1
可以看到&#xff1a;worker-2退出之后&#xff0c;boss和另外四个worker也瞬间就退出了。所以event对象的内置状态被set之后&#xff0c;相关线程就不再wait了。
event&#xff1a;在线程之间发送信号&#xff0c;通常用于某个线程需要等待其他线程处理完成某些动作之后才能启动
wait()方法的timeout参数
def worker(event: threading.Event):
while not event.wait(3):
logging.info(&#39;run run run&#39;)
event &#61; threading.Event()
threading.Thread(target&#61;worker, args&#61;(event, )).start()
# 输出
2017-03-21 21:32:47,275 INFO [Thread-8] run run run
2017-03-21 21:32:50,277 INFO [Thread-8] run run run
2017-03-21 21:32:53,281 INFO [Thread-8] run run run
2017-03-21 21:32:56,284 INFO [Thread-8] run run run
...
程序每隔3s就会输出一次结果&#xff0c;直到执行set()方法才会停止。因此我们可以写一个定时器(类似于Thread类的派生类Timer)。
代码
class Timer:
def __init__(self, interval, function, *args, **kwargs):
self.interval &#61; interval
self.function &#61; function
self.args &#61; args
self.kwargs &#61; kwargs
self.event &#61; threading.Event()
self.thread &#61; threading.Thread(target&#61;self.__target(), args&#61;args, kwargs&#61;kwargs)
def __target(self):
if not self.event.wait(self.interval):
return self.function
def start(self):
self.thread.start()
def cancel(self):
self.event.set()
def worker(act):
logging.info(&#39;run-{}&#39;.format(act))
t &#61; Timer(5, worker, &#39;hahaha&#39;)
t.start() # 输出2017-03-21 22:14:59,645 INFO [Thread-20] run-hahaha
延迟5s之后执行了逻辑函数&#xff0c;也可以使用cancel函数取消。(要注意参数的传递&#xff0c;此处Timer初始化不能使用关键字参数)
Lock
event是用来同步线程之间的操作的&#xff0c;但是如果要控制共享资源的访问那就需要用到锁机制了&#xff0c;在Python标准库中的实现就是内置的lock类。
>>> help(threading.Lock)
threading.Lock()函数会创建一个lock类的对象。
>>> help(threading.Lock())
锁对象是一个同步原语(synchronization primitive)&#xff0c;lock对象主要有以下三个方法&#xff1a;
acquire()&#xff1a; acquire(blocking&#61;True, timeout&#61;-1) -> bool 获得锁(即锁定锁)。成功获得锁返回True&#xff0c;没有获得锁则返回False。
release()&#xff1a; release() 释放锁
locked()&#xff1a; locked() -> bool 检查锁是否被锁住
代码
以下代码实现了在多个进程同时对资源进行访问时&#xff0c;进行加锁和解锁的操作&#xff0c;保证加减操作和赋值操作组合之后的原子性。
class Counter: # 计时器有加减方法&#xff0c;都会修改value值&#xff0c;因此都需要加锁处理
def __init__(self, start&#61;0):
self.value &#61; start
self.lock &#61; threading.Lock()
def inc(self):
self.lock.acquire()
try:
self.value &#43;&#61; 1
finally:
self.lock.release() # 需要用finally语句保证锁一定会被释放&#xff0c;否则资源永远不可访问
def dec(self):
self.lock.acquire()
try:
self.value -&#61; 1
finally:
self.lock.release()
def inc_worker(c: Counter):
pause &#61; random.random()
logging.info(&#39;sleeping-{}&#39;.format(pause))
time.sleep(pause)
c.inc()
logging.info(&#39;cur_value:{}&#39;.format(c.value))
def dec_worker(c: Counter):
pause &#61; random.random()
logging.info(&#39;sleeping-{}&#39;.format(pause))
time.sleep(pause)
c.dec()
logging.info(&#39;cur_value:{}&#39;.format(c.value))
c &#61; Counter()
for i in range(2):
threading.Thread(target&#61;inc_worker, args&#61;(c, ), name&#61;&#39;inc_worker-{}&#39;.format(i)).start()
for i in range(3):
threading.Thread(target&#61;dec_worker, args&#61;(c, ), name&#61;&#39;dec_worker-{}&#39;.format(i)).start()
测试输出
2017-03-21 23:17:44,761 INFO [inc_worker-0] sleeping-0.6542416949220327
2017-03-21 23:17:44,766 INFO [inc_worker-1] sleeping-0.48615543229897873
2017-03-21 23:17:44,771 INFO [dec_worker-0] sleeping-0.12355589507242459
2017-03-21 23:17:44,776 INFO [dec_worker-1] sleeping-0.5276710391905681
2017-03-21 23:17:44,784 INFO [dec_worker-2] sleeping-0.5546251407611247
2017-03-21 23:17:44,900 INFO [dec_worker-0] cur_value:-1
2017-03-21 23:17:45,258 INFO [inc_worker-1] cur_value:0
2017-03-21 23:17:45,312 INFO [dec_worker-1] cur_value:-1
2017-03-21 23:17:45,351 INFO [dec_worker-2] cur_value:-2
2017-03-21 23:17:45,421 INFO [inc_worker-0] cur_value:-1
可见&#xff0c;各项操作之间保持相互原子性&#xff0c;没有出现干扰。
因为lock类实现了__enter__和__exit__两个魔术方法&#xff0c;因此支持上下文管理器&#xff0c;可以修改以上Counter类的实现方法如下&#xff1a;
class Counter:
def __init__(self, start&#61;0):
self.value &#61; start
self.lock &#61; threading.Lock()
def inc(self):
self.lock.acquire()
with self.lock:
self.value &#43;&#61; 1
def dec(self):
self.lock.acquire()
with self.lock:
self.value -&#61; 1
即使用上下文管理器来代替try...finally...语句&#xff0c;测试输出应该以以上结果一致。
acquire方法的blocking参数
当blocking&#61;True时&#xff0c;A线程中执行了lock.acquire()方法之后并且没有执行到lock.release()方法&#xff0c;如果在B线程中再次执行lock.acquire()方法&#xff0c;则B线程阻塞。
正如以上代码实现&#xff0c;当有n个线程需要修改一个共享资源的时候&#xff0c;其他线程在获取锁之前都处于阻塞状态。(python的阻塞都会让出cpu的时间片&#xff0c;因此不是忙等待)
当blocking&#61;Fasle时&#xff0c;A线程中执行了lock.acquire()方法之后并且没有执行到lock.release()方法&#xff0c;如果在B线程中再次执行lock.acquire()方法&#xff0c;则B线程不会阻塞&#xff0c;并且acquire函数返回False。
acquire方法的timeout参数
当blocking&#61;True并且timeout>0时&#xff0c;acquire会一直阻塞到超时或者锁被释放。
acquire(0)的参数传递
模拟acquire方法的默认参数&#xff0c;编写一下函数进行模拟参数传递的过程&#xff1a;
def print1(blocking&#61;True, timeout&#61;-1):
print(blocking, timeout)
print1(0) # 输出0 -1
print1(10) # 输出10 -1
可见第一个位置参数&#xff0c;替代了blocking。也就是说lock.acquire(0)等效于lock.acquire(blocking&#61;False)
RLock
正常的lock对象是不能多次调用acquire的&#xff0c;但是可重用锁RLock可以多次调用 acquire 而不阻塞&#xff0c;而且 release 时也要执行和 acquire 一样的次数。
Condition
除了Event对象之外&#xff0c;线程同步还可以使用条件同步机制Condition。一类线程等待特定条件&#xff0c;而另一类线程发出特定条件满足的信号。
>>> help(threading.Condition)
在Condition的帮助中有以下几个方法&#xff1a;
初始化方法&#xff1a;init(self, lock&#61;None)。如果给定了lock参数&#xff0c;那么必须是Lock或者Rlock对象&#xff0c;并且被当做底层锁来使用。如果没有指定&#xff0c;那么会创建一个RLock对象的锁&#xff0c;也被当做底层锁来使用。
实现了__enter__和__exit__方法&#xff0c;支持上下文管理器。
notify(self, n&#61;1)&#xff1a;唤醒一个或多个在当前Condition上等待的其他线程&#xff0c;如果此方法的调用线程没有获得锁&#xff0c;那么在调用的时候就会报错RuntimeError
notify_all(self)&#xff1a;唤醒所有线程
wait(self, timeout&#61;None)&#xff1a;一直等待着知道被notifyed或者发生超时
实例代码
以下代码实现的是&#xff1a;有一个生产者线程&#xff0c;会生产若干次&#xff0c;每次生产结束后需要通知所有的消费者线程来消费&#xff0c;因此下面代码使用的是notify_all方法。
import threading
import time
import logging
import random
logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)
class Producer_Consumer_Model:
def __init__(self):
self.data &#61; None
self.event &#61; threading.Event() # 用来控制消费者退出
self.condition &#61; threading.Condition()
def Consumer(self):
while not self.event.is_set():
with self.condition:
self.condition.wait() # 一直等待直到收到生产者通知notify_all
logging.info(self.data) # 收到通知之后&#xff0c;开始执行消费者的业务逻辑部分
def Producer(self):
for _ in range(4): # 每个生产者生产4次
data &#61; random.randint(0, 100)
logging.info(data)
with self.condition:
self.data &#61; data # 写入成功就表示生产成功&#xff0c;因此需要在此加锁并且能够通知消费者线程去消费&#xff0c;因此选择使用condition来处理
self.condition.notify_all() # 生产成功之后通知所有的消费者去消费
self.event.wait(1) # 没生产一次等待1s
self.event.set() # 所有的生产完成之后通知消费者退出
m &#61; Producer_Consumer_Model()
for i in range(3):
threading.Thread(target&#61;m.Consumer, name&#61;&#39;Consumer-{}&#39;.format(i)).start()
p &#61; threading.Thread(target&#61;m.Producer, name&#61;&#39;Producer&#39;)
p.start()
测试结果(一个生产者&#xff0c;三个消费者)
2017-03-22 22:07:42,875 INFO [Producer] 16
2017-03-22 22:07:42,883 INFO [Consumer-0] 16
2017-03-22 22:07:42,890 INFO [Consumer-2] 16
2017-03-22 22:07:42,894 INFO [Consumer-1] 16
2017-03-22 22:07:43,884 INFO [Producer] 76
2017-03-22 22:07:43,888 INFO [Consumer-0] 76
2017-03-22 22:07:43,895 INFO [Consumer-2] 76
2017-03-22 22:07:43,898 INFO [Consumer-1] 76
2017-03-22 22:07:44,889 INFO [Producer] 31
2017-03-22 22:07:44,891 INFO [Consumer-0] 31
2017-03-22 22:07:44,911 INFO [Consumer-2] 31
2017-03-22 22:07:44,913 INFO [Consumer-1] 31
2017-03-22 22:07:45,892 INFO [Producer] 17
2017-03-22 22:07:45,894 INFO [Consumer-0] 17
2017-03-22 22:07:45,907 INFO [Consumer-2] 17
2017-03-22 22:07:45,910 INFO [Consumer-1] 17
可见&#xff0c;生产者每生产一次&#xff0c;所有的消费者就会去消费。如果想控制每次生产之后通知几个消费者来消费&#xff0c;那么就可以使用notify方法&#xff0c;指定消费者线程个数。
代码如下
import threading
import time
import logging
import random
logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)
class Producer_Consumer_Model:
def __init__(self):
self.data &#61; None
self.event &#61; threading.Event() # 用来控制消费者退出
self.condition &#61; threading.Condition()
def Consumer(self):
while not self.event.is_set():
with self.condition:
self.condition.wait() # 一直等待直到收到生产者通知notify_all
logging.info(self.data) # 收到通知之后&#xff0c;开始执行消费者的业务逻辑部分
def Producer(self):
for _ in range(4): # 每个生产者生产4次
data &#61; random.randint(0, 100)
logging.info(data)
with self.condition:
self.data &#61; data # 写入成功就表示生产成功&#xff0c;因此需要在此加锁并且能够通知消费者线程去消费&#xff0c;因此选择使用condition来处理
self.condition.notify(1) # 生产成功之后通知所有的消费者去消费
self.event.wait(1) # 没生产一次等待1s
self.event.set() # 所有的生产完成之后通知消费者退出
m &#61; Producer_Consumer_Model()
for i in range(3):
threading.Thread(target&#61;m.Consumer, name&#61;&#39;Consumer-{}&#39;.format(i)).start()
p &#61; threading.Thread(target&#61;m.Producer, name&#61;&#39;Producer&#39;)
p.start()
测试结果(一个生产者&#xff0c;三个消费者&#xff0c;每次生产之后只通知一个消费者去消费)
2017-03-22 22:24:52,933 INFO [Producer] 11
2017-03-22 22:24:52,948 INFO [Consumer-0] 11
2017-03-22 22:24:53,949 INFO [Producer] 47
2017-03-22 22:24:53,967 INFO [Consumer-1] 47
2017-03-22 22:24:54,967 INFO [Producer] 14
2017-03-22 22:24:54,983 INFO [Consumer-2] 14
2017-03-22 22:24:55,986 INFO [Producer] 54
2017-03-22 22:24:55,993 INFO [Consumer-0] 54
Condition 通常用于生产者消费者模式&#xff0c; 生产者生产消息之后&#xff0c; 使用notify 或者 notify_all 通知消费者消费。
消费者使用wait方法阻塞等待生产者通知
notify通知指定个wait的线程&#xff0c; notify_all通知所有的wait线程
无论notify/notify_all还是wait 都必须先acqurie&#xff0c; 完成后必须确保release&#xff0c; 通常使用with语法
Barrier
Barrier类存在于threading模块中&#xff0c;中文可以翻译成栅栏
>>> help(threading.Barrier)
可以看到Barrier的主要方法和属性&#xff1a;
__init__(self, parties, action&#61;None, timeout&#61;None)&#xff1a;初始化方法&#xff0c;创建一个Barrier
parties&#xff1a;所有参与的线程的数量
action&#xff1a;所有的线程都wait之后并且在线程释放之前就会执行这个action函数&#xff0c;相当于集结之后要做的事情。
timeout&#xff1a;相当于给需要等待的每个线程的wait方法加上timeout参数&#xff0c;超时则barrier不再生效
abort(self)&#xff1a;将Barrier设置成broken状态
reset(self)&#xff1a;将Barrier重置为最初状态
wait(self, timeout&#61;None)&#xff1a;在Barrier前等待&#xff0c;返回在Barrier前等待的下标&#xff0c;从0到parties-1
broken&#xff1a;如果Barrier处于broken状态则返回True
n_waiting&#xff1a;当前已经在Barrier处等待的线程的数量
parties&#xff1a;需要在Barrier处等待的线程的数量
示例代码
import threading
import logging
logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)
barrier &#61; threading.Barrier(parties&#61;3)
def worker(barrier: threading.Barrier):
logging.info(&#39;waiting for barrier with {} others&#39;.format(barrier.n_waiting))
worker_id &#61; barrier.wait()
logging.info(&#39;after barrier {}&#39;.format(worker_id))
for i in range(3):
threading.Thread(target&#61;worker, args&#61;(barrier, ), name&#61;&#39;worker-{}&#39;.format(i)).start()
测试结果
2017-03-22 23:25:03,992 INFO [worker-0] waiting for barrier with 0 others
2017-03-22 23:25:03,995 INFO [worker-1] waiting for barrier with 1 others
2017-03-22 23:25:03,998 INFO [worker-2] waiting for barrier with 2 others
2017-03-22 23:25:04,001 INFO [worker-2] after barrier 2
2017-03-22 23:25:04,001 INFO [worker-0] after barrier 0
2017-03-22 23:25:04,001 INFO [worker-1] after barrier 1
可见&#xff0c;所有的线程都会一直等待&#xff0c;知道所有的线程都到期了&#xff0c;然后就通过barrier&#xff0c;继续执行后续操作。
Barrier会建立一个控制点&#xff0c;所有参与的线程都会阻塞&#xff0c;直到所有参与的“各方”达到这一点。 它让线程分开启动&#xff0c;然后暂停&#xff0c;直到它们都准备好再继续。因此&#xff0c;这一点可以理解为各个线程的一个集结点。
abort函数的使用
将Barrier设置成broken状态。所有线程在参与集结过程中&#xff0c;只要执行了barrier.abort方法&#xff0c;那么正在等待的线程都会抛出threading.BrokenBarrierError异常。可以理解为&#xff0c;只要有一个线程确定已经到不了Barrier并且通知了Barrier&#xff0c;那么Barrier就会执行abort方法&#xff0c;通知所有正在wait的线程放弃集结。
实例代码
import threading
import logging
logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)
def worker(barrier: threading.Barrier):
logging.info(&#39;waiting for barrier with {} others&#39;.format(barrier.n_waiting))
try:
worker_id &#61; barrier.wait()
except threading.BrokenBarrierError:
logging.info(&#39;aborting&#39;)
else:
logging.info(&#39;after barrier {}&#39;.format(worker_id))
barrier &#61; threading.Barrier(4) # 需要等待4个线程
for i in range(3):
threading.Thread(target&#61;worker, args&#61;(barrier, ), name&#61;&#39;worker-{}&#39;.format(i)).start() # 3个线程都开始wait
barrier.abort() # 还有一个线程没有到wait&#xff0c;此时执行abort方法&#xff0c;则所有正在wait的线程都抛出异常
测试结果
2017-03-22 23:47:43,184 INFO [worker-0] waiting for barrier with 0 others
2017-03-22 23:47:43,192 INFO [worker-1] waiting for barrier with 1 others
2017-03-22 23:47:43,201 INFO [worker-2] waiting for barrier with 2 others
2017-03-22 23:47:43,211 INFO [worker-2] aborting
2017-03-22 23:47:43,207 INFO [worker-1] aborting
2017-03-22 23:47:43,207 INFO [worker-0] aborting
Semaphore
Semaphore类存在于threading模块中
help(threading.Semaphore)
信号量内部管理者一个计数器&#xff0c;这个计数器的值等于release()方法调用的次数减去acquire()方法调用的次数然后再加上初始值value&#xff0c;value默认为1。
可以看到Semaphore的主要方法&#xff1a;
__init__(self, value&#61;1)&#xff1a;初始化一个信号量&#xff0c;value为内部计数器赋初值&#xff0c;默认为1
acquire(self, blocking&#61;True, timeout&#61;None)&#xff1a;获取信号量&#xff0c;内部计数器减一
release(self)&#xff1a;释放信号量&#xff0c;内部计数器加一
示例代码
import threading
import time
import logging
import random
logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)
class Pool:
def __init__(self, num):
self.num &#61; num
self.conns &#61; [self._make_connect(i) for i in range(num)] # 存放连接
self.sem &#61; threading.Semaphore(num) # 信号量内部计数器初始为连接数
def _make_connect(self, name): # 根据连接名称创建连接
conn &#61; &#39;connect-{}&#39;.format(name)
return conn
def get_connect(self): # 从连接池获取连接
self.sem.acquire()
return self.conns.pop()
def return_connect(self, conn): # 将连接conn返还到连接池中
self.conns.insert(0, conn)
self.sem.release()
def worker(pool: Pool):
logging.info(&#39;starting&#39;)
conn &#61; pool.get_connect() # 如果获取不到则会阻塞在acquire处
logging.info(&#39;get connect {}&#39;.format(conn))
t &#61; random.randint(1, 3)
time.sleep(t)
logging.info(&#39;takes {}s&#39;.format(t))
pool.return_connect(conn)
logging.info(&#39;return connect {}&#39;.format(conn))
pool &#61; Pool(2) # 连接池中有两个连接可以使用
for i in range(3): # 三个线程使用两个连接开始任务
threading.Thread(target&#61;worker, args&#61;(pool, ), name&#61;&#39;worker-{}&#39;.format(i)).start()
测试结果
2017-03-23 00:54:36,056 INFO [worker-0] starting
2017-03-23 00:54:36,062 INFO [worker-0] get connect connect-1
2017-03-23 00:54:36,074 INFO [worker-1] starting
2017-03-23 00:54:36,079 INFO [worker-1] get connect connect-0
2017-03-23 00:54:36,089 INFO [worker-2] starting
2017-03-23 00:54:39,074 INFO [worker-0] takes 3s
2017-03-23 00:54:39,076 INFO [worker-0] return connect connect-1
2017-03-23 00:54:39,076 INFO [worker-2] get connect connect-1
2017-03-23 00:54:39,093 INFO [worker-1] takes 3s
2017-03-23 00:54:39,097 INFO [worker-1] return connect connect-0
2017-03-23 00:54:40,093 INFO [worker-2] takes 1s
2017-03-23 00:54:40,107 INFO [worker-2] return connect connect-1
这个测试结果显示&#xff1a;三个线程获取连接池中的两个连接&#xff0c;结果出现了一个线程等待其他线程执行完成之后再获取连接的过程。
Queue
Condition线程同步部分用来传递数据的是一个封装在生产者消费者模型中的元素data(正常使用情况下一般封装的都是一个列表&#xff0c;类似与Barrier部分的连接池中的conns列表)。
Python的queue模块中提供了同步的、线程安全的队列类&#xff0c;包括三种队列&#xff1a;
FIFO(先入先出)队列Queue
LIFO(后入先出)队列LifoQueue
优先级队列PriorityQueue
这些队列都实现了锁原语&#xff0c;能够在多线程中直接使用。可以使用队列来实现线程间的同步。因此我们可以使用queue模块来替换掉生产者消费者中的全局元素&#xff0c;代码如下&#xff1a;
import random
import queue
import threading
class Producer_Consumer_Model:
def __init__(self):
self.q &#61; queue.Queue()
self.event &#61; threading.Event()
def Consumer(self):
while not self.event.is_set():
logging.info(self.q.get())
def Producer(self):
while not self.event.wait(3):
data &#61; random.randint(1, 100)
logging.info(data)
self.q.put(data)
m &#61; Producer_Consumer_Model()
threading.Thread(target&#61;m.Consumer, name&#61;&#39;Consumer&#39;).start()
threading.Thread(target&#61;m.Producer, name&#61;&#39;Producer&#39;).start()
测试结果
2017-03-23 10:11:22,990 INFO [Producer] 26
2017-03-23 10:11:22,993 INFO [Consumer] 26
2017-03-23 10:11:25,993 INFO [Producer] 89
2017-03-23 10:11:26,003 INFO [Consumer] 89
2017-03-23 10:11:29,004 INFO [Producer] 14
2017-03-23 10:11:29,006 INFO [Consumer] 14
2017-03-23 10:11:32,007 INFO [Producer] 17
2017-03-23 10:11:32,009 INFO [Consumer] 17
每生产一次&#xff0c;消费者就会消费一次。当消费者线程&#xff0c;读取Queue则调用Queue.get()方法&#xff0c;若Queue为空时消费者线程获取不到内容&#xff0c;就会阻塞在这里&#xff0c;直到成功获取内容。
线程同步总结
Event&#xff1a;主要用于线程之间的事件通知
Lock,Rlock&#xff1a;主要用于保护共享资源
Condition&#xff1a;主要用于生产者消费者模型&#xff0c;可以理解为Event和Lock的结合体
Barrier&#xff1a;同步指定个等待的线程
Semaphore&#xff1a;主要用于保护资源&#xff0c;和Lock的区别在于可以多个线程访问共享资源&#xff0c;而锁一次只能一个线程访问到共享资源&#xff0c;即锁是value&#61;1的信号量
Queue&#xff1a;使用FIFO队列进行同步&#xff0c;适用于生产者消费者模型
GIL
GIL(Global Interpreter Lock)&#xff1a;全局解释器锁
Python代码的执行由Python 主循环来控制&#xff0c;Python 在设计之初就考虑到要在解释器的主循环中&#xff0c;同时只有一个线程在执行&#xff0c;即在任意时刻&#xff0c;只有一个线程在解释器中运行。对Python 主循环的访问由全局解释器锁(GIL)来控制&#xff0c;正是这个锁能保证同一时刻只有一个线程在运行。
因此Python多线程程序的执行顺序如下&#xff1a;
设置GIL
切换到一个线程去运行
运行
结束线程
解锁GIL
重复以上步骤
因此&#xff0c;Python的多线程并没有实现并行&#xff0c;只是实现了并发而已。如果要实现真正的并行&#xff0c;那就需要使用Python的多进程模块multiprocessing(multiprocessing模块的宗旨是像管理线程一样来管理进程)。
参考资料
记得帮我点赞哦&#xff01;
精心整理了计算机各个方向的从入门、进阶、实战的视频课程和电子书&#xff0c;按照目录合理分类&#xff0c;总能找到你需要的学习资料&#xff0c;还在等什么&#xff1f;快去关注下载吧&#xff01;&#xff01;&#xff01;
念念不忘&#xff0c;必有回响&#xff0c;小伙伴们帮我点个赞吧&#xff0c;非常感谢。
我是职场亮哥&#xff0c;YY高级软件工程师、四年工作经验&#xff0c;拒绝咸鱼争当龙头的斜杠程序员。
听我说&#xff0c;进步多&#xff0c;程序人生一把梭
如果有幸能帮到你&#xff0c;请帮我点个【赞】&#xff0c;给个关注&#xff0c;如果能顺带评论给个鼓励&#xff0c;将不胜感激。
职场亮哥文章列表&#xff1a;更多文章
本人所有文章、回答都与版权保护平台有合作&#xff0c;著作权归职场亮哥所有&#xff0c;未经授权&#xff0c;转载必究&#xff01;