在现实生活中,如果一个人团队正在共同完成任务,那么他们之间应该有通信,以便正确完成任务。同样的比喻也适用于线程。在编程中,要减少处理器的理想时间,我们创建了多个线程,并为每个线程分配不同的子
在现实生活中,如果一个人团队正在共同完成任务,那么他们之间应该有通信,以便正确完成任务。 同样的比喻也适用于线程。 在编程中,要减少处理器的理想时间,我们创建了多个线程,并为每个线程分配不同的子任务。 因此,必须有一个通信设施,他们应该互相沟通交流,以同步的方式完成工作。
考虑以下与线程通信相关的重要问题 -
没有性能增益 - 如果无法在线程和进程之间实现适当的通信,那么并发性和并行性的性能收益是没有用的。
完成任务 - 如果线程之间没有适当的相互通信机制,分配的任务将无法正常完成。
比进程间通信更高效 - 线程间通信比进程间通信更高效且更易于使用,因为进程内的所有线程共享相同的地址空间,并且不需要使用共享内存。
线程安全通信的Python数据结构
多线程代码出现了将信息从一个线程传递到另一个线程的问题。 标准的通信原语不能解决这个问题。 因此,需要实现我们自己的组合对象,以便在线程之间共享对象以使通信线程安全。 以下是一些数据结构,它们在进行一些更改后提供线程安全通信 -
1. Set
为了以线程安全的方式使用set
数据结构,需要扩展set
类来实现我们自己的锁定机制。
以下是一个扩展类的Python示例 -
class extend_class(set):
def __init__(self, *args, **kwargs):
self._lock = Lock()
super(extend_class, self).__init__(*args, **kwargs)
def add(self, elem):
self._lock.acquire()
try:
super(extend_class, self).add(elem)
finally:
self._lock.release()
def delete(self, elem):
self._lock.acquire()
try:
super(extend_class, self).delete(elem)
finally:
self._lock.release()
在上面的例子中,定义了一个名为extend_class
的类对象,它继承自Python集合类。 在这个类的构造函数中创建一个锁对象。 现在有两个函数 - add()
和delete()
。 这些函数被定义并且是线程安全的。 它们都依赖于超类功能以及一个键异常。
修饰器
这是线程安全通信的另一个关键方法是使用装饰器。
示例
考虑一个Python示例,展示如何使用装饰器和mminus
;
def lock_decorator(method):
def new_deco_method(self, *args, **kwargs):
with self._lock:
return method(self, *args, **kwargs)
return new_deco_method
class Decorator_class(set):
def __init__(self, *args, **kwargs):
self._lock = Lock()
super(Decorator_class, self).__init__(*args, **kwargs)
@lock_decorator
def add(self, *args, **kwargs):
return super(Decorator_class, self).add(elem)
@lock_decorator
def delete(self, *args, **kwargs):
return super(Decorator_class, self).delete(elem)
在上面的例子中,已经定义了一个名为lock_decorator
的装饰器方法,该方法从Python方法类继承。 然后在这个类的构造函数中创建一个锁对象。 现在有两个函数 - add()
和delete()
。 这些函数被定义并且是线程安全的。 他们都依靠超类功能和一个键异常。
2. list
列表数据结构对于临时内存存储而言是线程安全,快速以及简单的结构。在Cpython中,GIL可以防止对它们的并发访问。当我们知道列表是线程安全的,但是数据在哪里呢。实际上,该列表的数据不受保护。例如,如果另一个线程试图做同样的事情,则L.append(x)
不保证能够返回预期的结果。这是因为尽管append()
是一个原子操作并且是线程安全的,但另一个线程试图以并发方式修改列表数据,因此可以看到竞争条件对输出的副作用。
为了解决这类问题并安全地修改数据,我们必须实现一个适当的锁定机制,这进一步确保多个线程不会潜在竞争条件。为了实现适当的锁定机制,可以像前面的例子那样扩展这个类。
列表上的其他一些原子操作如下所示 -
L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()
这里 -
L
,L1
,L2
都是列表
D
,D1
,D2
是字典
x
,y
是对象
i
,j
是整数
3. 队列
如果清单数据不受保护,我们可能不得不面对后果。 可能会得到或删除错误的数据项,竞争条件。 这就是为什么建议使用队列数据结构的原因。 一个真实世界的排队示例可以是单车道单向道路,车辆首先进入,首先退出。 售票窗口和公共汽车站的队列中可以看到更多真实世界的例子。
队列是默认的线程安全数据结构,我们不必担心实现复杂的锁定机制。 Python提供了应用程序中使用不同类型队列的模块。
队列类型
在本节中,我们将获得关于不同类型的队列的信息。 Python提供了三种从queue
模块使用的队列选项 -
- 正常队列(FIFO,先进先出)
- 后进先出,后进先出
- 优先级
我们将在随后的章节中了解不同的队列。
正常队列(FIFO,先进先出)
它是Python提供的最常用的队列实现。 在这种先排队的机制中,首先得到服务。 FIFO也被称为正常队列。 FIFO队列可以表示如下 -
FIFO队列的Python实现
在python中,FIFO队列可以用单线程和多线程来实现。
具有单线程的FIFO队列
要实现单线程的FIFO队列,Queue
类将实现一个基本的先进先出容器。 使用put()
将元素添加到序列的一个“结尾”,并使用get()
从另一端移除元素。
示例
以下是用单线程实现FIFO队列的Python程序 -
import queue
q = queue.Queue()
for i in range(8):
q.put("item-" + str(i))
while not q.empty():
print (q.get(), end = " ")
执行上面示例代码,得到以下结果 -
item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7
输出结果显示上面的程序使用单个线程来说明这些元素将按照它们插入的顺序从队列中移除。
具有多个线程的FIFO队列
为了实现多线程的FIFO,需要从queue
模块扩展来定义myqueue()
函数。 get()
和put()
方法的工作方式与上面讨论的一样,只用单线程实现FIFO队列。 然后为了使它成为多线程,我们需要声明和实例化线程。 这些线程将以FIFO方式使用队列。
示例
以下是用于实现具有多个线程的FIFO队列的Python程序 -
import threading
import queue
import random
import time
def myqueue(queue):
while not queue.empty():
item = queue.get()
if item is None:
break
print("{} removed {} from the queue".format(threading.current_thread(), item))
queue.task_done()
time.sleep(2)
q = queue.Queue()
for i in range(5):
q.put(i)
threads = []
for i in range(4):
thread = threading.Thread(target=myqueue, args=(q,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
执行上面示例代码,得到以下结果 -
removed 0 from the queue
removed 1 from the queue
removed 2 from the queue
removed 3 from the queue
removed 4 from the queue
4. LIFO,后进先出队列
队列使用与FIFO(先进先出)队列完全相反的类比。 在这个队列机制中,最后一个将首先获得服务。 这与实现堆栈数据结构相似。 LIFO队列在实施深度优先搜索时非常有用,如人工智能算法。
LIFO队列的Python实现
在python中,LIFO队列可以用单线程和多线程来实现。
单线程的LIFO队列
要用单线程实现LIFO队列,Queue类将使用结构Queue.LifoQueue
来实现基本的后进先出容器。 现在,在调用put()
时,将元素添加到容器的头部,并使用get()
从头部移除。
示例
以下是用单线程实现LIFO队列的Python程序 -
import queue
q = queue.LifoQueue()
for i in range(8):
q.put("item-" + str(i))
while not q.empty():
print (q.get(), end=" ")
执行上面示例代码,得到以下结果 -
item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0
输出显示上述程序使用单个线程来说明元素将以插入的相反顺序从队列中移除。
带有多个线程的LIFO队列
这个实现与使用多线程实现FIFO队列相似。 唯一的区别是需要使用Queue
类,该类将使用结构Queue.LifoQueue
来实现基本的后进先出容器。
示例
以下是用于实现具有多个线程的LIFO队列的Python程序 -
import threading
import queue
import random
import time
def myqueue(queue):
while not queue.empty():
item = queue.get()
if item is None:
break
print("{} removed {} from the queue".format(threading.current_thread(), item))
queue.task_done()
time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
q.put(i)
threads = []
for i in range(4):
thread = threading.Thread(target=myqueue, args=(q,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
执行上面示例代码,得到以下结果 -
removed 4 from the queue
removed 3 from the queue
removed 2 from the queue
removed 1 from the queue
removed 0 from the queue
优先队列
在FIFO和LIFO队列中,项目顺序与插入顺序有关。 但是,有很多情况下优先级比插入顺序更重要。 让我们考虑一个真实世界的例子。 假设机场的安保人员正在检查不同类别的人员。 VVIP的人员,航空公司工作人员,海关人员,类别可能会优先检查,而不是像到平民那样根据到达情况进行检查。
需要考虑优先队列的另一个重要方面是如何开发任务调度器。 一种常见的设计是在队列中优先处理最具代理性的任务。 该数据结构可用于根据队列的优先级值从队列中提取项目。
优先队列的Python实现
在python中,优先级队列可以用单线程和多线程来实现。
单线程优先队列
要实现单线程优先队列,Queue
类将使用结构Queue.PriorityQueue
在优先级容器上实现任务。 现在,在调用put()
时,元素将添加一个值,其中最低值将具有最高优先级,并因此使用get()
首先检索。
示例
考虑下面的Python程序来实现单线程的优先级队列 -
import queue as Q
p_queue = Q.PriorityQueue()
p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))
while not p_queue.empty():
item = p_queue.get()
print('%s - %s' % item)
执行上面示例代码,得到以下结果 -
1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important
在上面的输出中,可以看到队列已经存储了基于优先级的项目 - 较少的值具有高优先级。
具有多线程的优先队列
该实现类似于具有多个线程的FIFO和LIFO队列的实现。 唯一的区别是需要使用Queue
类通过使用结构Queue.PriorityQueue
来初始化优先级。 另一个区别是队列的生成方式。 在下面给出的例子中,它将生成两个相同的数据集。
示例
以下Python程序有助于实现具有多个线程的优先级队列 -
import threading
import queue
import random
import time
def myqueue(queue):
while not queue.empty():
item = queue.get()
if item is None:
break
print("{} removed {} from the queue".format(threading.current_thread(), item))
queue.task_done()
time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
q.put(i,1)
for i in range(5):
q.put(i,1)
threads = []
for i in range(2):
thread = threading.Thread(target=myqueue, args=(q,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
执行上面示例代码,得到以下结果 -
removed 0 from the queue
removed 0 from the queue
removed 1 from the queue
removed 1 from the queue
removed 2 from the queue
removed 2 from the queue
removed 3 from the queue
removed 3 from the queue
removed 4 from the queue
removed 4 from the queue