热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

python多线程输出_Python多线程

多线程基础概念并行与并发并行:同时处理多个任务,必须在多核环境下一段时间内同时处理多个任务,单核也可以并发并发手段线程:内核

多线程基础概念

并行与并发

并行:同时处理多个任务,必须在多核环境下

一段时间内同时处理多个任务,单核也可以并发

并发手段

线程:内核空间的调度

进程:内核空间的调度

协程:用户空间的调度

线程可以允许程序在同一进程空间中并发运行多个操作。本次主要介绍Python标准库中的多线程模块threading。

threading模块

线程初始化

使用threading模块的Thread类初始化对象然后调用start方法启动线程。

import threading

import time

def worker(num):

time.sleep(1)

print('worker-{}'.format(num))

# 创建线程对象 target参数是一个函数, 这个函数即线程要执行的逻辑

threads = [threading.Thread(target=worker, args=(i, ))for i in range(5)]

for t in threads:

t.start()

# start 方法启动一个线程, 当这个线程的逻辑执行完毕的时候,线程自动退出, Python 没有提供主动退出线程的方法

# 输出以下结果

worker-0worker-1worker-2worker-3

worker-4

初始化的五个线程的执行逻辑中的print方法打印字符串及换行符出现了随机分布,即出现了资源竞争。

给线程传递参数

import threading

import time

def worker(*args, **kwargs):

time.sleep(1)

print(args)

print(kwargs)

threads = threading.Thread(target=worker, args=(1, 2, 3), kwargs={'a':'b'}).start()

# 输出

(1, 2, 3)

{'a': 'b'}

args传递位置参数,kwargs传递关键字参数。

Thread常用参数和方法

>>> help(threading.Thread)

可以看到Thread函数的初始化方法中的参数如下:

| __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)

| This constructor should always be called with keyword arguments. Arguments are:

|

| *group* should be None; reserved for future extension when a ThreadGroup

| class is implemented.

|

| *target* is the callable object to be invoked by the run()

| method. Defaults to None, meaning nothing is called.

|

| *name* is the thread name. By default, a unique name is constructed of

| the form "Thread-N" where N is a small decimal number.

|

| *args* is the argument tuple for the target invocation. Defaults to ().

|

| *kwargs* is a dictionary of keyword arguments for the target

| invocation. Defaults to {}.

name

表示线程名称,默认情况下,线程名称是Thread-N,N是一个较小的十进制数。我们可以传递name参数,控制线程名称。

以下会导入logging模块来显示线程的名称等详细信息

import threading

import time

import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker(num):

time.sleep(1)

logging.info('worker-{}'.format(num))

threads = [threading.Thread(target=worker, args=(i, ), name='workerthread-{}'.format(i)) for i in range(5)]

for t in threads:

t.start()

# 输出

2017-03-20 21:39:29,339 INFO [workerthread-0] worker-0

2017-03-20 21:39:29,340 INFO [workerthread-1] worker-1

2017-03-20 21:39:29,340 INFO [workerthread-2] worker-2

2017-03-20 21:39:29,340 INFO [workerthread-3] worker-3

2017-03-20 21:39:29,346 INFO [workerthread-4] worker-4

其中logging模块的basicConfig函数的format中的%(threadName)s就是用来输出当前线程的名称的。

线程可以重名, 线程名并不是线程的唯一标识,但是通常应该避免线程重名,通常的处理手段是加前缀

daemon

Daemon:守护

和Daemon线程相对应的还有Non-Daemon线程,在此Thread初始化函数中的daemon参数即表示线程是否是Daemon线程。

Daemon线程:会伴随主线程结束而结束(可以理解为主线程结束,守护线程结束)

Non-Daemon线程:不会随着主线程结束而结束,主线程需要等待Non-Daemon结束

import logging

import time

import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker():

logging.info('starting')

time.sleep(2)

logging.info('stopping')

if __name__ == '__main__':

logging.info('starting')

t1 = threading.Thread(target=worker, name='worker1', daemon=False)

t1.start()

time.sleep(1)

t2 = threading.Thread(target=worker, name='worker2', daemon=True)

t2.start()

logging.info('stopping')

# 输出

2017-03-20 23:28:06,404 INFO [MainThread] starting

2017-03-20 23:28:06,436 INFO [worker1] starting

2017-03-20 23:28:07,492 INFO [worker2] starting

2017-03-20 23:28:07,492 INFO [MainThread] stopping # 主线程执行完成

2017-03-20 23:28:08,439 INFO [worker1] stopping # 主线程执行完成之后会等Non-Daemon线程执行完成,但是并不会等Daemon线程执行完成,即Daemon线程会随着主线程执行完成而释放

Thread.join()

如果想等Daemon线程执行完成之后主线程再退出,可以使用线程对象的join()方法

import logging

import time

import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker():

logging.info('starting')

time.sleep(2)

logging.info('stopping')

if __name__ == '__main__':

logging.info('starting')

t1 = threading.Thread(target=worker, name='worker1', daemon=False)

t1.start()

time.sleep(1)

t2 = threading.Thread(target=worker, name='worker2', daemon=True)

t2.start()

logging.info('stopping')

t1.join()

t2.join()

# 输出

2017-03-20 23:41:07,217 INFO [MainThread] starting

2017-03-20 23:41:07,243 INFO [worker1] starting

2017-03-20 23:41:08,245 INFO [worker2] starting

2017-03-20 23:41:08,246 INFO [MainThread] stopping

2017-03-20 23:41:09,243 INFO [worker1] stopping

2017-03-20 23:41:10,248 INFO [worker2] stopping

使用join函数只有主线程就需要等待Daemon线程执行完成在推出。

join函数的原型:join(self, timeout=None)

join方法会阻塞直到线程退出或者超时, timeout 是可选的,如果不设置timeout, 会一直等待线程退出。如果设置了timeout,会在超时之后退出或者线程执行完成退出。

因为join函数总是返回None,因此在超时时间到达之后如果要知道线程是否还是存活的,可以调用is_alive()方法判断线程是否存活。

threading常用方法

enumerate()

列出当前所有的存活的线程

>>> threading.enumerate()

[<_mainthread started>, , ]

local()

import logging

import threading

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

ctx &#61; threading.local()

ctx.data &#61; 5

data &#61; &#39;a&#39;

def worker():

logging.info(data)

logging.info(ctx.data)

worker()

threading.Thread(target&#61;worker).start()

# 输出

2017-03-21 00:02:08,102 INFO [MainThread] a

2017-03-21 00:02:08,113 INFO [MainThread] 5

2017-03-21 00:02:08,119 INFO [Thread-34] a

Exception in thread Thread-34:

Traceback (most recent call last):

File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 914, in _bootstrap_inner

self.run()

File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 862, in run

self._target(*self._args, **self._kwargs)

File "", line 7, in worker

logging.info(ctx.data)

AttributeError: &#39;_thread._local&#39; object has no attribute &#39;data&#39;

线程共享内存、状态和资源。但是thread模块的local类的对象的属性&#xff0c; 只在当前线程可见。

Thread类的派生

Python中可以通过继承 Thread 类并重写 run 方法来编写多线程的逻辑&#xff0c;此时逻辑函数就是run。

class mythread(threading.Thread):

def run(self):

print(&#39;mythread run&#39;)

t &#61; mythread()

t.run() # 输出mythread run

t.start() # 输出mythread run

通过继承方式派生而来的子类对象可以同时执行start方法和run方法&#xff0c;结果是一样的&#xff0c;都是执行子类的run方法。但是非继承的方式不能同时使用start方法和run方法&#xff0c;会报错。

派生时逻辑函数的参数传递

class mythread(threading.Thread):

def __init__(self, *args, **kwargs):

super().__init__() # 需要调用父类的初始化方法初始化

self.args &#61; args

self.kwargs &#61; kwargs

def run(self):

print(&#39;mythread run&#39;, self.args, self.kwargs)

t &#61; mythread(1, 2, 3, a&#61;&#39;b&#39;)

t.start() # 输出mythread run (1, 2, 3) {&#39;a&#39;: &#39;b&#39;}

Timer类

Timer类&#xff1a;Thread类的派生类&#xff0c;也在threading模块中。意为定时器&#xff0c;用作线程的延迟执行。

>>> help(threading.Timer)

Timer类的初始化方法&#xff1a;__init__(self, interval, function, args&#61;None, kwargs&#61;None)

interval&#xff1a;时间间隔&#xff0c;即几秒之后开始执行function

function&#xff1a;线程执行的逻辑函数

args&#xff1a;位置参数

kwargs&#xff1a;关键字参数

代码

import threading

import time

import logging

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

def worker():

logging.info(&#39;worker running&#39;)

t1 &#61; threading.Timer(interval&#61;3, function&#61;worker)

t2 &#61; threading.Timer(interval&#61;3, function&#61;worker)

t1.setName(&#39;t1&#39;)

t2.setName(&#39;t2&#39;)

logging.info(&#39;start&#39;)

t1.start()

t2.start()

time.sleep(2)

logging.info(&#39;canceling {}&#39;.format(t1.name))

t1.cancel() # 2s之后仍然可以取消t1

logging.info(&#39;end&#39;)

# 输出

2017-03-21 19:28:52,801 INFO [MainThread] start

2017-03-21 19:28:54,811 INFO [MainThread] canceling t1

2017-03-21 19:28:54,819 INFO [MainThread] end

2017-03-21 19:28:55,808 INFO [t2] worker running

Timer.cancel()&#xff1a;取消仍然存活的定时器&#xff0c;如果定时器已经开始执行function&#xff0c;则无法取消。

Timer.setDaemon(True)&#xff1a;设置定时器为守护线程

线程同步

当使用多个线程来访问同一个数据时&#xff0c;会经常出现资源争用等线程安全问题(比如多个线程都在操作同一数据导致数据不一致)&#xff0c;这时候我们就可以使用一些同步技术来解决这类问题。比如Event&#xff0c;Lock&#xff0c;Condition&#xff0c;Barrier&#xff0c;Semaphore等等。

Event

>>> help(threading.Event)

Event对象内置一个标志&#xff0c;这个标志可以由set()方法和clear()方法设定。线程可以使用wait()方法进行阻塞等待&#xff0c;知道Event对象内置标志被set。

clear(self)&#xff1a;设置内置标志为False

set(self)&#xff1a;设置内置标志为True

wait(self, timeout&#61;None)&#xff1a;开始阻塞&#xff0c;直到内置标志被设置为True(即wait会阻塞线程直到set方法被调用或者超时)

is_set(self)&#xff1a;当且仅当内置标志为True的时候返回True

代码

以下代码实现的逻辑是&#xff1a;一个boss和五个睡觉工人&#xff0c;只要有一个工人完成了睡觉任务&#xff0c;那么就唤醒boss和其他工人。

import datetime

import threading

import logging

import random

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

def worker(event: threading.Event):

s &#61; random.randint(1, 5)

event.wait(s) # wait方法而不使用sleep方法&#xff0c;可以让其他工人收到通知后不再等待

logging.info(&#39;sleep {}&#39;.format(s))

event.set()

def boss(event:threading.Event):

start &#61; datetime.datetime.now()

event.wait()

end &#61; datetime.datetime.now()

logging.info(&#39;that boss exit takes {}s&#39;.format(end - start))

def start():

event &#61; threading.Event()

b &#61; threading.Thread(target&#61;boss, args&#61;(event, ), name&#61;&#39;boss&#39;)

b.start()

for i in range(5):

t &#61; threading.Thread(target&#61;worker, args&#61;(event, ), name&#61;&#39;worker-{}&#39;.format(i))

t.start()

执行start()方法&#xff0c;测试结果

>>> start()

2017-03-21 21:20:17,195 INFO [worker-2] sleep 1

2017-03-21 21:20:17,198 INFO [boss] that boss exit takes 0:00:01.004954s

2017-03-21 21:20:17,199 INFO [worker-0] sleep 2

2017-03-21 21:20:17,199 INFO [worker-1] sleep 3

2017-03-21 21:20:17,199 INFO [worker-3] sleep 2

2017-03-21 21:20:17,198 INFO [worker-4] sleep 1

可以看到&#xff1a;worker-2退出之后&#xff0c;boss和另外四个worker也瞬间就退出了。所以event对象的内置状态被set之后&#xff0c;相关线程就不再wait了。

event&#xff1a;在线程之间发送信号&#xff0c;通常用于某个线程需要等待其他线程处理完成某些动作之后才能启动

wait()方法的timeout参数

def worker(event: threading.Event):

while not event.wait(3):

logging.info(&#39;run run run&#39;)

event &#61; threading.Event()

threading.Thread(target&#61;worker, args&#61;(event, )).start()

# 输出

2017-03-21 21:32:47,275 INFO [Thread-8] run run run

2017-03-21 21:32:50,277 INFO [Thread-8] run run run

2017-03-21 21:32:53,281 INFO [Thread-8] run run run

2017-03-21 21:32:56,284 INFO [Thread-8] run run run

...

程序每隔3s就会输出一次结果&#xff0c;直到执行set()方法才会停止。因此我们可以写一个定时器(类似于Thread类的派生类Timer)。

代码

class Timer:

def __init__(self, interval, function, *args, **kwargs):

self.interval &#61; interval

self.function &#61; function

self.args &#61; args

self.kwargs &#61; kwargs

self.event &#61; threading.Event()

self.thread &#61; threading.Thread(target&#61;self.__target(), args&#61;args, kwargs&#61;kwargs)

def __target(self):

if not self.event.wait(self.interval):

return self.function

def start(self):

self.thread.start()

def cancel(self):

self.event.set()

def worker(act):

logging.info(&#39;run-{}&#39;.format(act))

t &#61; Timer(5, worker, &#39;hahaha&#39;)

t.start() # 输出2017-03-21 22:14:59,645 INFO [Thread-20] run-hahaha

延迟5s之后执行了逻辑函数&#xff0c;也可以使用cancel函数取消。(要注意参数的传递&#xff0c;此处Timer初始化不能使用关键字参数)

Lock

event是用来同步线程之间的操作的&#xff0c;但是如果要控制共享资源的访问那就需要用到锁机制了&#xff0c;在Python标准库中的实现就是内置的lock类。

>>> help(threading.Lock)

threading.Lock()函数会创建一个lock类的对象。

>>> help(threading.Lock())

锁对象是一个同步原语(synchronization primitive)&#xff0c;lock对象主要有以下三个方法&#xff1a;

acquire()&#xff1a; acquire(blocking&#61;True, timeout&#61;-1) -> bool 获得锁(即锁定锁)。成功获得锁返回True&#xff0c;没有获得锁则返回False。

release()&#xff1a; release() 释放锁

locked()&#xff1a; locked() -> bool 检查锁是否被锁住

代码

以下代码实现了在多个进程同时对资源进行访问时&#xff0c;进行加锁和解锁的操作&#xff0c;保证加减操作和赋值操作组合之后的原子性。

class Counter: # 计时器有加减方法&#xff0c;都会修改value值&#xff0c;因此都需要加锁处理

def __init__(self, start&#61;0):

self.value &#61; start

self.lock &#61; threading.Lock()

def inc(self):

self.lock.acquire()

try:

self.value &#43;&#61; 1

finally:

self.lock.release() # 需要用finally语句保证锁一定会被释放&#xff0c;否则资源永远不可访问

def dec(self):

self.lock.acquire()

try:

self.value -&#61; 1

finally:

self.lock.release()

def inc_worker(c: Counter):

pause &#61; random.random()

logging.info(&#39;sleeping-{}&#39;.format(pause))

time.sleep(pause)

c.inc()

logging.info(&#39;cur_value:{}&#39;.format(c.value))

def dec_worker(c: Counter):

pause &#61; random.random()

logging.info(&#39;sleeping-{}&#39;.format(pause))

time.sleep(pause)

c.dec()

logging.info(&#39;cur_value:{}&#39;.format(c.value))

c &#61; Counter()

for i in range(2):

threading.Thread(target&#61;inc_worker, args&#61;(c, ), name&#61;&#39;inc_worker-{}&#39;.format(i)).start()

for i in range(3):

threading.Thread(target&#61;dec_worker, args&#61;(c, ), name&#61;&#39;dec_worker-{}&#39;.format(i)).start()

测试输出

2017-03-21 23:17:44,761 INFO [inc_worker-0] sleeping-0.6542416949220327

2017-03-21 23:17:44,766 INFO [inc_worker-1] sleeping-0.48615543229897873

2017-03-21 23:17:44,771 INFO [dec_worker-0] sleeping-0.12355589507242459

2017-03-21 23:17:44,776 INFO [dec_worker-1] sleeping-0.5276710391905681

2017-03-21 23:17:44,784 INFO [dec_worker-2] sleeping-0.5546251407611247

2017-03-21 23:17:44,900 INFO [dec_worker-0] cur_value:-1

2017-03-21 23:17:45,258 INFO [inc_worker-1] cur_value:0

2017-03-21 23:17:45,312 INFO [dec_worker-1] cur_value:-1

2017-03-21 23:17:45,351 INFO [dec_worker-2] cur_value:-2

2017-03-21 23:17:45,421 INFO [inc_worker-0] cur_value:-1

可见&#xff0c;各项操作之间保持相互原子性&#xff0c;没有出现干扰。

因为lock类实现了__enter__和__exit__两个魔术方法&#xff0c;因此支持上下文管理器&#xff0c;可以修改以上Counter类的实现方法如下&#xff1a;

class Counter:

def __init__(self, start&#61;0):

self.value &#61; start

self.lock &#61; threading.Lock()

def inc(self):

self.lock.acquire()

with self.lock:

self.value &#43;&#61; 1

def dec(self):

self.lock.acquire()

with self.lock:

self.value -&#61; 1

即使用上下文管理器来代替try...finally...语句&#xff0c;测试输出应该以以上结果一致。

acquire方法的blocking参数

当blocking&#61;True时&#xff0c;A线程中执行了lock.acquire()方法之后并且没有执行到lock.release()方法&#xff0c;如果在B线程中再次执行lock.acquire()方法&#xff0c;则B线程阻塞。

正如以上代码实现&#xff0c;当有n个线程需要修改一个共享资源的时候&#xff0c;其他线程在获取锁之前都处于阻塞状态。(python的阻塞都会让出cpu的时间片&#xff0c;因此不是忙等待)

当blocking&#61;Fasle时&#xff0c;A线程中执行了lock.acquire()方法之后并且没有执行到lock.release()方法&#xff0c;如果在B线程中再次执行lock.acquire()方法&#xff0c;则B线程不会阻塞&#xff0c;并且acquire函数返回False。

acquire方法的timeout参数

当blocking&#61;True并且timeout>0时&#xff0c;acquire会一直阻塞到超时或者锁被释放。

acquire(0)的参数传递

模拟acquire方法的默认参数&#xff0c;编写一下函数进行模拟参数传递的过程&#xff1a;

def print1(blocking&#61;True, timeout&#61;-1):

print(blocking, timeout)

print1(0) # 输出0 -1

print1(10) # 输出10 -1

可见第一个位置参数&#xff0c;替代了blocking。也就是说lock.acquire(0)等效于lock.acquire(blocking&#61;False)

RLock

正常的lock对象是不能多次调用acquire的&#xff0c;但是可重用锁RLock可以多次调用 acquire 而不阻塞&#xff0c;而且 release 时也要执行和 acquire 一样的次数。

Condition

除了Event对象之外&#xff0c;线程同步还可以使用条件同步机制Condition。一类线程等待特定条件&#xff0c;而另一类线程发出特定条件满足的信号。

>>> help(threading.Condition)

在Condition的帮助中有以下几个方法&#xff1a;

初始化方法&#xff1a;init(self, lock&#61;None)。如果给定了lock参数&#xff0c;那么必须是Lock或者Rlock对象&#xff0c;并且被当做底层锁来使用。如果没有指定&#xff0c;那么会创建一个RLock对象的锁&#xff0c;也被当做底层锁来使用。

实现了__enter__和__exit__方法&#xff0c;支持上下文管理器。

notify(self, n&#61;1)&#xff1a;唤醒一个或多个在当前Condition上等待的其他线程&#xff0c;如果此方法的调用线程没有获得锁&#xff0c;那么在调用的时候就会报错RuntimeError

notify_all(self)&#xff1a;唤醒所有线程

wait(self, timeout&#61;None)&#xff1a;一直等待着知道被notifyed或者发生超时

实例代码

以下代码实现的是&#xff1a;有一个生产者线程&#xff0c;会生产若干次&#xff0c;每次生产结束后需要通知所有的消费者线程来消费&#xff0c;因此下面代码使用的是notify_all方法。

import threading

import time

import logging

import random

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

class Producer_Consumer_Model:

def __init__(self):

self.data &#61; None

self.event &#61; threading.Event() # 用来控制消费者退出

self.condition &#61; threading.Condition()

def Consumer(self):

while not self.event.is_set():

with self.condition:

self.condition.wait() # 一直等待直到收到生产者通知notify_all

logging.info(self.data) # 收到通知之后&#xff0c;开始执行消费者的业务逻辑部分

def Producer(self):

for _ in range(4): # 每个生产者生产4次

data &#61; random.randint(0, 100)

logging.info(data)

with self.condition:

self.data &#61; data # 写入成功就表示生产成功&#xff0c;因此需要在此加锁并且能够通知消费者线程去消费&#xff0c;因此选择使用condition来处理

self.condition.notify_all() # 生产成功之后通知所有的消费者去消费

self.event.wait(1) # 没生产一次等待1s

self.event.set() # 所有的生产完成之后通知消费者退出

m &#61; Producer_Consumer_Model()

for i in range(3):

threading.Thread(target&#61;m.Consumer, name&#61;&#39;Consumer-{}&#39;.format(i)).start()

p &#61; threading.Thread(target&#61;m.Producer, name&#61;&#39;Producer&#39;)

p.start()

测试结果(一个生产者&#xff0c;三个消费者)

2017-03-22 22:07:42,875 INFO [Producer] 16

2017-03-22 22:07:42,883 INFO [Consumer-0] 16

2017-03-22 22:07:42,890 INFO [Consumer-2] 16

2017-03-22 22:07:42,894 INFO [Consumer-1] 16

2017-03-22 22:07:43,884 INFO [Producer] 76

2017-03-22 22:07:43,888 INFO [Consumer-0] 76

2017-03-22 22:07:43,895 INFO [Consumer-2] 76

2017-03-22 22:07:43,898 INFO [Consumer-1] 76

2017-03-22 22:07:44,889 INFO [Producer] 31

2017-03-22 22:07:44,891 INFO [Consumer-0] 31

2017-03-22 22:07:44,911 INFO [Consumer-2] 31

2017-03-22 22:07:44,913 INFO [Consumer-1] 31

2017-03-22 22:07:45,892 INFO [Producer] 17

2017-03-22 22:07:45,894 INFO [Consumer-0] 17

2017-03-22 22:07:45,907 INFO [Consumer-2] 17

2017-03-22 22:07:45,910 INFO [Consumer-1] 17

可见&#xff0c;生产者每生产一次&#xff0c;所有的消费者就会去消费。如果想控制每次生产之后通知几个消费者来消费&#xff0c;那么就可以使用notify方法&#xff0c;指定消费者线程个数。

代码如下

import threading

import time

import logging

import random

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

class Producer_Consumer_Model:

def __init__(self):

self.data &#61; None

self.event &#61; threading.Event() # 用来控制消费者退出

self.condition &#61; threading.Condition()

def Consumer(self):

while not self.event.is_set():

with self.condition:

self.condition.wait() # 一直等待直到收到生产者通知notify_all

logging.info(self.data) # 收到通知之后&#xff0c;开始执行消费者的业务逻辑部分

def Producer(self):

for _ in range(4): # 每个生产者生产4次

data &#61; random.randint(0, 100)

logging.info(data)

with self.condition:

self.data &#61; data # 写入成功就表示生产成功&#xff0c;因此需要在此加锁并且能够通知消费者线程去消费&#xff0c;因此选择使用condition来处理

self.condition.notify(1) # 生产成功之后通知所有的消费者去消费

self.event.wait(1) # 没生产一次等待1s

self.event.set() # 所有的生产完成之后通知消费者退出

m &#61; Producer_Consumer_Model()

for i in range(3):

threading.Thread(target&#61;m.Consumer, name&#61;&#39;Consumer-{}&#39;.format(i)).start()

p &#61; threading.Thread(target&#61;m.Producer, name&#61;&#39;Producer&#39;)

p.start()

测试结果(一个生产者&#xff0c;三个消费者&#xff0c;每次生产之后只通知一个消费者去消费)

2017-03-22 22:24:52,933 INFO [Producer] 11

2017-03-22 22:24:52,948 INFO [Consumer-0] 11

2017-03-22 22:24:53,949 INFO [Producer] 47

2017-03-22 22:24:53,967 INFO [Consumer-1] 47

2017-03-22 22:24:54,967 INFO [Producer] 14

2017-03-22 22:24:54,983 INFO [Consumer-2] 14

2017-03-22 22:24:55,986 INFO [Producer] 54

2017-03-22 22:24:55,993 INFO [Consumer-0] 54

Condition 通常用于生产者消费者模式&#xff0c; 生产者生产消息之后&#xff0c; 使用notify 或者 notify_all 通知消费者消费。

消费者使用wait方法阻塞等待生产者通知

notify通知指定个wait的线程&#xff0c; notify_all通知所有的wait线程

无论notify/notify_all还是wait 都必须先acqurie&#xff0c; 完成后必须确保release&#xff0c; 通常使用with语法

Barrier

Barrier类存在于threading模块中&#xff0c;中文可以翻译成栅栏

>>> help(threading.Barrier)

可以看到Barrier的主要方法和属性&#xff1a;

__init__(self, parties, action&#61;None, timeout&#61;None)&#xff1a;初始化方法&#xff0c;创建一个Barrier

parties&#xff1a;所有参与的线程的数量

action&#xff1a;所有的线程都wait之后并且在线程释放之前就会执行这个action函数&#xff0c;相当于集结之后要做的事情。

timeout&#xff1a;相当于给需要等待的每个线程的wait方法加上timeout参数&#xff0c;超时则barrier不再生效

abort(self)&#xff1a;将Barrier设置成broken状态

reset(self)&#xff1a;将Barrier重置为最初状态

wait(self, timeout&#61;None)&#xff1a;在Barrier前等待&#xff0c;返回在Barrier前等待的下标&#xff0c;从0到parties-1

broken&#xff1a;如果Barrier处于broken状态则返回True

n_waiting&#xff1a;当前已经在Barrier处等待的线程的数量

parties&#xff1a;需要在Barrier处等待的线程的数量

示例代码

import threading

import logging

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

barrier &#61; threading.Barrier(parties&#61;3)

def worker(barrier: threading.Barrier):

logging.info(&#39;waiting for barrier with {} others&#39;.format(barrier.n_waiting))

worker_id &#61; barrier.wait()

logging.info(&#39;after barrier {}&#39;.format(worker_id))

for i in range(3):

threading.Thread(target&#61;worker, args&#61;(barrier, ), name&#61;&#39;worker-{}&#39;.format(i)).start()

测试结果

2017-03-22 23:25:03,992 INFO [worker-0] waiting for barrier with 0 others

2017-03-22 23:25:03,995 INFO [worker-1] waiting for barrier with 1 others

2017-03-22 23:25:03,998 INFO [worker-2] waiting for barrier with 2 others

2017-03-22 23:25:04,001 INFO [worker-2] after barrier 2

2017-03-22 23:25:04,001 INFO [worker-0] after barrier 0

2017-03-22 23:25:04,001 INFO [worker-1] after barrier 1

可见&#xff0c;所有的线程都会一直等待&#xff0c;知道所有的线程都到期了&#xff0c;然后就通过barrier&#xff0c;继续执行后续操作。

Barrier会建立一个控制点&#xff0c;所有参与的线程都会阻塞&#xff0c;直到所有参与的“各方”达到这一点。 它让线程分开启动&#xff0c;然后暂停&#xff0c;直到它们都准备好再继续。因此&#xff0c;这一点可以理解为各个线程的一个集结点。

abort函数的使用

将Barrier设置成broken状态。所有线程在参与集结过程中&#xff0c;只要执行了barrier.abort方法&#xff0c;那么正在等待的线程都会抛出threading.BrokenBarrierError异常。可以理解为&#xff0c;只要有一个线程确定已经到不了Barrier并且通知了Barrier&#xff0c;那么Barrier就会执行abort方法&#xff0c;通知所有正在wait的线程放弃集结。

实例代码

import threading

import logging

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

def worker(barrier: threading.Barrier):

logging.info(&#39;waiting for barrier with {} others&#39;.format(barrier.n_waiting))

try:

worker_id &#61; barrier.wait()

except threading.BrokenBarrierError:

logging.info(&#39;aborting&#39;)

else:

logging.info(&#39;after barrier {}&#39;.format(worker_id))

barrier &#61; threading.Barrier(4) # 需要等待4个线程

for i in range(3):

threading.Thread(target&#61;worker, args&#61;(barrier, ), name&#61;&#39;worker-{}&#39;.format(i)).start() # 3个线程都开始wait

barrier.abort() # 还有一个线程没有到wait&#xff0c;此时执行abort方法&#xff0c;则所有正在wait的线程都抛出异常

测试结果

2017-03-22 23:47:43,184 INFO [worker-0] waiting for barrier with 0 others

2017-03-22 23:47:43,192 INFO [worker-1] waiting for barrier with 1 others

2017-03-22 23:47:43,201 INFO [worker-2] waiting for barrier with 2 others

2017-03-22 23:47:43,211 INFO [worker-2] aborting

2017-03-22 23:47:43,207 INFO [worker-1] aborting

2017-03-22 23:47:43,207 INFO [worker-0] aborting

Semaphore

Semaphore类存在于threading模块中

help(threading.Semaphore)

信号量内部管理者一个计数器&#xff0c;这个计数器的值等于release()方法调用的次数减去acquire()方法调用的次数然后再加上初始值value&#xff0c;value默认为1。

可以看到Semaphore的主要方法&#xff1a;

__init__(self, value&#61;1)&#xff1a;初始化一个信号量&#xff0c;value为内部计数器赋初值&#xff0c;默认为1

acquire(self, blocking&#61;True, timeout&#61;None)&#xff1a;获取信号量&#xff0c;内部计数器减一

release(self)&#xff1a;释放信号量&#xff0c;内部计数器加一

示例代码

import threading

import time

import logging

import random

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

class Pool:

def __init__(self, num):

self.num &#61; num

self.conns &#61; [self._make_connect(i) for i in range(num)] # 存放连接

self.sem &#61; threading.Semaphore(num) # 信号量内部计数器初始为连接数

def _make_connect(self, name): # 根据连接名称创建连接

conn &#61; &#39;connect-{}&#39;.format(name)

return conn

def get_connect(self): # 从连接池获取连接

self.sem.acquire()

return self.conns.pop()

def return_connect(self, conn): # 将连接conn返还到连接池中

self.conns.insert(0, conn)

self.sem.release()

def worker(pool: Pool):

logging.info(&#39;starting&#39;)

conn &#61; pool.get_connect() # 如果获取不到则会阻塞在acquire处

logging.info(&#39;get connect {}&#39;.format(conn))

t &#61; random.randint(1, 3)

time.sleep(t)

logging.info(&#39;takes {}s&#39;.format(t))

pool.return_connect(conn)

logging.info(&#39;return connect {}&#39;.format(conn))

pool &#61; Pool(2) # 连接池中有两个连接可以使用

for i in range(3): # 三个线程使用两个连接开始任务

threading.Thread(target&#61;worker, args&#61;(pool, ), name&#61;&#39;worker-{}&#39;.format(i)).start()

测试结果

2017-03-23 00:54:36,056 INFO [worker-0] starting

2017-03-23 00:54:36,062 INFO [worker-0] get connect connect-1

2017-03-23 00:54:36,074 INFO [worker-1] starting

2017-03-23 00:54:36,079 INFO [worker-1] get connect connect-0

2017-03-23 00:54:36,089 INFO [worker-2] starting

2017-03-23 00:54:39,074 INFO [worker-0] takes 3s

2017-03-23 00:54:39,076 INFO [worker-0] return connect connect-1

2017-03-23 00:54:39,076 INFO [worker-2] get connect connect-1

2017-03-23 00:54:39,093 INFO [worker-1] takes 3s

2017-03-23 00:54:39,097 INFO [worker-1] return connect connect-0

2017-03-23 00:54:40,093 INFO [worker-2] takes 1s

2017-03-23 00:54:40,107 INFO [worker-2] return connect connect-1

这个测试结果显示&#xff1a;三个线程获取连接池中的两个连接&#xff0c;结果出现了一个线程等待其他线程执行完成之后再获取连接的过程。

Queue

Condition线程同步部分用来传递数据的是一个封装在生产者消费者模型中的元素data(正常使用情况下一般封装的都是一个列表&#xff0c;类似与Barrier部分的连接池中的conns列表)。

Python的queue模块中提供了同步的、线程安全的队列类&#xff0c;包括三种队列&#xff1a;

FIFO(先入先出)队列Queue

LIFO(后入先出)队列LifoQueue

优先级队列PriorityQueue

这些队列都实现了锁原语&#xff0c;能够在多线程中直接使用。可以使用队列来实现线程间的同步。因此我们可以使用queue模块来替换掉生产者消费者中的全局元素&#xff0c;代码如下&#xff1a;

import random

import queue

import threading

class Producer_Consumer_Model:

def __init__(self):

self.q &#61; queue.Queue()

self.event &#61; threading.Event()

def Consumer(self):

while not self.event.is_set():

logging.info(self.q.get())

def Producer(self):

while not self.event.wait(3):

data &#61; random.randint(1, 100)

logging.info(data)

self.q.put(data)

m &#61; Producer_Consumer_Model()

threading.Thread(target&#61;m.Consumer, name&#61;&#39;Consumer&#39;).start()

threading.Thread(target&#61;m.Producer, name&#61;&#39;Producer&#39;).start()

测试结果

2017-03-23 10:11:22,990 INFO [Producer] 26

2017-03-23 10:11:22,993 INFO [Consumer] 26

2017-03-23 10:11:25,993 INFO [Producer] 89

2017-03-23 10:11:26,003 INFO [Consumer] 89

2017-03-23 10:11:29,004 INFO [Producer] 14

2017-03-23 10:11:29,006 INFO [Consumer] 14

2017-03-23 10:11:32,007 INFO [Producer] 17

2017-03-23 10:11:32,009 INFO [Consumer] 17

每生产一次&#xff0c;消费者就会消费一次。当消费者线程&#xff0c;读取Queue则调用Queue.get()方法&#xff0c;若Queue为空时消费者线程获取不到内容&#xff0c;就会阻塞在这里&#xff0c;直到成功获取内容。

线程同步总结

Event&#xff1a;主要用于线程之间的事件通知

Lock,Rlock&#xff1a;主要用于保护共享资源

Condition&#xff1a;主要用于生产者消费者模型&#xff0c;可以理解为Event和Lock的结合体

Barrier&#xff1a;同步指定个等待的线程

Semaphore&#xff1a;主要用于保护资源&#xff0c;和Lock的区别在于可以多个线程访问共享资源&#xff0c;而锁一次只能一个线程访问到共享资源&#xff0c;即锁是value&#61;1的信号量

Queue&#xff1a;使用FIFO队列进行同步&#xff0c;适用于生产者消费者模型

GIL

GIL(Global Interpreter Lock)&#xff1a;全局解释器锁

Python代码的执行由Python 主循环来控制&#xff0c;Python 在设计之初就考虑到要在解释器的主循环中&#xff0c;同时只有一个线程在执行&#xff0c;即在任意时刻&#xff0c;只有一个线程在解释器中运行。对Python 主循环的访问由全局解释器锁(GIL)来控制&#xff0c;正是这个锁能保证同一时刻只有一个线程在运行。

因此Python多线程程序的执行顺序如下&#xff1a;

设置GIL

切换到一个线程去运行

运行

结束线程

解锁GIL

重复以上步骤

因此&#xff0c;Python的多线程并没有实现并行&#xff0c;只是实现了并发而已。如果要实现真正的并行&#xff0c;那就需要使用Python的多进程模块multiprocessing(multiprocessing模块的宗旨是像管理线程一样来管理进程)。

参考资料

记得帮我点赞哦&#xff01;

精心整理了计算机各个方向的从入门、进阶、实战的视频课程和电子书&#xff0c;按照目录合理分类&#xff0c;总能找到你需要的学习资料&#xff0c;还在等什么&#xff1f;快去关注下载吧&#xff01;&#xff01;&#xff01;

念念不忘&#xff0c;必有回响&#xff0c;小伙伴们帮我点个赞吧&#xff0c;非常感谢。

我是职场亮哥&#xff0c;YY高级软件工程师、四年工作经验&#xff0c;拒绝咸鱼争当龙头的斜杠程序员。

听我说&#xff0c;进步多&#xff0c;程序人生一把梭

如果有幸能帮到你&#xff0c;请帮我点个【赞】&#xff0c;给个关注&#xff0c;如果能顺带评论给个鼓励&#xff0c;将不胜感激。

职场亮哥文章列表&#xff1a;更多文章

本人所有文章、回答都与版权保护平台有合作&#xff0c;著作权归职场亮哥所有&#xff0c;未经授权&#xff0c;转载必究&#xff01;



推荐阅读
  • 本文介绍了如何使用Python的Paramiko库批量更新多台服务器的登录密码。通过示例代码展示了具体实现方法,确保了操作的高效性和安全性。Paramiko库提供了强大的SSH2协议支持,使得远程服务器管理变得更加便捷。此外,文章还详细说明了代码的各个部分,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 属性类 `Properties` 是 `Hashtable` 类的子类,用于存储键值对形式的数据。该类在 Java 中广泛应用于配置文件的读取与写入,支持字符串类型的键和值。通过 `Properties` 类,开发者可以方便地进行配置信息的管理,确保应用程序的灵活性和可维护性。此外,`Properties` 类还提供了加载和保存属性文件的方法,使其在实际开发中具有较高的实用价值。 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 本文详细介绍了在 CentOS 7 系统中配置 fstab 文件以实现开机自动挂载 NFS 共享目录的方法,并解决了常见的配置失败问题。 ... [详细]
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • com.hazelcast.config.MapConfig.isStatisticsEnabled()方法的使用及代码示例 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • 本文将详细介绍Python中*args和**kwargs的用法,包括它们的基本概念、应用场景以及注意事项。 ... [详细]
  • 本文介绍了 .NET 中用于线程间通信的工具 WaitHandle 及其子类 ManualResetEvent 和 AutoResetEvent,并详细解释了线程池的概念及其在优化资源利用方面的优势。 ... [详细]
  • 本文总结了一些开发中常见的问题及其解决方案,包括特性过滤器的使用、NuGet程序集版本冲突、线程存储、溢出检查、ThreadPool的最大线程数设置、Redis使用中的问题以及Task.Result和Task.GetAwaiter().GetResult()的区别。 ... [详细]
  • PTArchiver工作原理详解与应用分析
    PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
  • 本文介绍了如何在 Spring 3.0.5 中使用 JdbcTemplate 插入数据并获取 MySQL 表中的自增主键。 ... [详细]
  • poj 3352 Road Construction ... [详细]
author-avatar
烧饼来一个则_815
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有