热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

pythonthreading模块使用以及python多线程操作的实践(使用Queue队列模块)

今天花了近乎一天的时间研究python关于多线程的问题,查看了大量源码自己也实践了一个生产消费者模型,所以把一天的收获总结一下。由于GIL(GlobalInterpreterLoc

今天花了近乎一天的时间研究python关于多线程的问题,查看了大量源码 自己也实践了一个生产消费者模型,所以把一天的收获总结一下。

由于GIL(Global Interpreter Lock)锁的关系,纯的python代码处理一般逻辑的确无法活动性能上的极大提升,但是在处理需要等待外部资源返回或多用户的应用程序中,多线程仍然可以作为一个比较好的工具来进行使用。

 

python提供了两个模块thread和threading 来支持python的多线程操作。通俗的讲一般现在我们只使用threading模块来编程了,thread模块定义了很多原始行为,更接近底层,而threading模块抽象了thread模块可用性更好,同时提供更多特性。

现在创建线程的通用方法一般是创建一个类并且继承threading.Thread,然后重写其中的__init__和run()方法。 更多详情可以参考threading模块代码内注释以及代码。下面直接看个例子。

import time
import threading


class Test(threading.Thread):
    def __init__(self, name, delay):
        super(Test, self).__init__()
        self.name = name
        self.delay = delay

    def run(self):
        print "%s delay for %s seconds" % (self.name, self.delay)
        time.sleep(self.delay)
        c = 0
        while True:
            print "This is thread %s on line %s" % (self.name, c)
            c += 1
            if c == 3:
                print "End of thread %s" % self.name
                break


t1 = Test('Thread1', 5)
t2 = Test('Thread2', 5)

t1.start()
print 'Wait t1 to end'
t1.join()
t2.start()
t2.join()
print 'End of main'

 

注意一下这一句 :

super(Test, self).__init__()

这是按照模块要求,必须初始化父类的__init__函数 所以使用了super()

其他并没有值得多少的地方,

创建线程方便的实例化自己写的继承threading.Thread的类 然后传入对应的参数。

最后使用xxx.start()来运行线程。 使用xxx.join()来阻塞线程。

 

特别注意的是。继承自Threading类的子类还有一个daemon参数,如果这个参数适用setDaemon()方法置为True之后,主线程将不会等待子线程都结束之后才结束,而是自己运行完之后就结束,这种方式相当粗暴。 如果将daemon参数设置为False的话,主线成将会等待所有子线程结束之后再结束。daemon属性可以通过使用isDaemon()方法获取一个boolean值。

 

更进一步的,我必须介绍一下线程之间的同步和互斥问题。下面引用《计算机操作系统》中的介绍。

       当线程并发执行时,由于资源共享和线程协作,使用线程之间会存在以下两种制约关系。

     (1)间接相互制约。一个系统中的多个线程必然要共享某种系统资源,如共享CPU,共享I/O设备,所谓间接相互制约即源于这种资源共享,打印机就是最好的例子,线程A在使用打印机时,其它线程都要等待。

     (2)直接相互制约。这种制约主要是因为线程之间的合作,如有线程A将计算结果提供给线程B作进一步处理,那么线程B在线程A将数据送达之前都将处于阻塞状态。

       间接相互制约可以称为互斥,直接相互制约可以称为同步,对于互斥可以这样理解,线程A和线程B互斥访问某个资源则它们之间就会产个顺序问题——要么线程A等待线程B操作完毕,要么线程B等待线程操作完毕,这其实就是线程的同步了。因此同步包括互斥,互斥其实是一种特殊的同步

 

       在一段时间内只允许一个线程访问的资源就称为临界资源或独占资源,计算机中大多数物理设备,进程中的共享变量等待都是临界资源,它们要求被互斥的访问。每个进程中访问临界资源的代码称为临界区。

 

这里为了介绍这种稍微复杂的概念。 再列出一个生产消费者的例子 使用到了Queue队列。

# coding:utf-8
import Queue
import time
import random
import threading


# write_lock = threading.Lock()                                                 # 创建primitive锁对象用于控制输出


class Producer(threading.Thread):
    # q传递一个队列参数, con传递了一个链接, name传递了一个名字
    def __init__(self, q, con, name):
        super(Producer, self).__init__()
        self.q = q
        self.con = con
        self.name = name
        print "Producer " + self.name + "Started"

    def run(self):
        while True:
            # 锁对象常用的acquire获得锁方法和release释放锁方法
            # 这里使用的是Thread的Condition对象
            self.con.acquire()
            if self.q.full():
                print 'Queue is full, producer wait!'

                # 手动挂起,并且只能在获得Lock的情况下才可以使用 否则会触发RuntimeError
                # 调用wait()会释放Lock 直到该线程被notify(),notifyall()或超时该线程又重新获得Lock
                self.con.wait()
            else:
                value = random.randint(0, 10)
                print self.name + " put " + str(value) + "into queue"
                self.q.put((self.name+":"+str(value)))                    # 放置到队列中

                # 通知消费者,notify通知其他线程,被挂起的线程接到通知后会开始运行
                # 默认通知一个正在等待该condition的线程,最多唤醒n个线程 必须在获得Lock的情况下使用否则会报错.
                self.con.notify()
                self.con.release()                                            # 释放锁对象


class Consumer(threading.Thread):
    def __init__(self, q, con, name):
        super(Consumer, self).__init__()
        self.q = q
        self.con = con
        self.name = name
        print "Consumer " + self.name + "started\n"

    def run(self):
        while True:
            # Condition常用的acquire获得条件和release释放锁方法
            self.con.acquire()
            if self.q.empty():
                print 'queue is empty, consumer wait!'
                self.con.wait()
            else:
                value = self.q.get()                                            # 从队列中取消息
                print self.name + " get " + value + "from queue"

                # 发送消息通知生产者
                self.con.notify()
                self.con.release()                                              # 释放锁对象
                print 'queue still have ' + str(q.qsize()) + 'task\n'


if __name__ == "__main__":
    q = Queue.Queue(10)

    # 使用Condition对象可以在某些事件触发或达到特定的条件后才处理数据.
    con = threading.Condition()

    # 两个生产者
    p1 = Producer(q, con, "P1")
    p2 = Producer(q, con, "P2")
    c1 = Consumer(q, con, "C1")
    p2.start()
    p1.start()
    c1.start()

可以看到这是一个典型的生产消费者模型。 两个生产者负责往队列中添加数据,一个消费者复杂从队列里面拿出数据处理掉然后继续拿出继续处理,直到队列为空然后挂起等待生产者生产。

要解决同步和互斥其实最简单的办法就是引入一个锁的机制。在一个线程访问一个共享资源的时候,就将这个资源锁住,直到该线程访问完毕之后再释放给其他线程进行访问。这样就保证了,在一个线程处理一个共享资源的情况下,不会因为其他线程也在处理同一个资源而造成混乱。

可以看到这里我申明了con = threading.Condition() 申明了一个条件对象。而它拥有acquire()/release()/notify()/wait() 等方法。 分别是获取锁,释放锁,唤醒操作和等待操作。上面用到了这四种方法。可以看到我们在生产者Producer中,先获取了一个锁对象,然后来判断队列是否已满,如果满了则不再生产,并且挂起当前线程并且暂时释放掉获取的锁。 直到收到消费者的notify(),并且获得消费者释放的锁。

 

这样就很清楚了,通过锁的机制以及挂起的机制。我们可以更安全的进行多线程操作。 其实在这个例子中,queue本身是线程安全的。不需要额外的同步机制。所以我们可以完全不需要条件锁。也就是说我们不需要制造一个Condition对象,直接使用队列就可以轻松实现多线程的消费者和生产者模型了。

# coding:utf-8
import Queue
import time
import random
import threading


# write_lock = threading.Lock()                                                 # 创建primitive锁对象用于控制输出


class Producer(threading.Thread):
    # q传递一个队列参数, con传递了一个链接, name传递了一个名字
    def __init__(self, q, name):
        super(Producer, self).__init__()
        self.q = q
        # self.con = con
        self.name = name
        print "Producer " + self.name + "Started"

    def run(self):
        while True:
            # 锁对象常用的acquire获得锁方法和release释放锁方法
            # 这里使用的是Thread的Condition对象
            # self.con.acquire()
            if self.q.full():
                print 'Queue is full, producer wait!'

                # 手动挂起,并且只能在获得Lock的情况下才可以使用 否则会触发RuntimeError
                # 调用wait()会释放Lock 直到该线程被notify(),notifyall()或超时该线程又重新获得Lock
                # self.con.wait()
            else:
                value = random.randint(0, 10)
                print self.name + " put " + str(value) + "into queue"
                self.q.put((self.name+":"+str(value)))                    # 放置到队列中

                # 通知消费者,notify通知其他线程,被挂起的线程接到通知后会开始运行
                # 默认通知一个正在等待该condition的线程,最多唤醒n个线程 必须在获得Lock的情况下使用否则会报错.
                # self.con.notify()
                # self.con.release()                                            # 释放锁对象


class Consumer(threading.Thread):
    def __init__(self, q, name):
        super(Consumer, self).__init__()
        self.q = q
        # self.con = con
        self.name = name
        print "Consumer " + self.name + "started\n"

    def run(self):
        while True:
            # Condition常用的acquire获得锁方法和release释放锁方法
            # self.con.acquire()
            if self.q.empty():
                print 'queue is empty, consumer wait!'
                # self.con.wait()
            else:
                value = self.q.get()                                            # 从队列中取消息
                print self.name + " get " + value + "from queue"

                # 发送消息通知生产者
                # self.con.notify()
                # self.con.release()                                              # 释放锁对象
                print 'queue still have ' + str(q.qsize()) + 'task\n'


if __name__ == "__main__":
    q = Queue.Queue(10)

    # 使用Condition对象可以在某些事件触发或达到特定的条件后才处理数据.
    # con = threading.Condition()

    # 两个生产者
    p1 = Producer(q, "P1")
    p2 = Producer(q, "P2")
    c1 = Consumer(q, "C1")
    p2.start()
    p1.start()
    c1.start()

 

以上内容参考链接:

1. python threading模块文档翻译: http://my.oschina.net/lionets/blog/194577?fromerr=pbWOeveo

2. 多线程7经典线程与互斥总结:http://blog.csdn.net/dazhong159/article/details/7927034

3. 《编写高质量代码改善python程序的91个建议》第48和49建议。

 


推荐阅读
  • YOLOv7基于自己的数据集从零构建模型完整训练、推理计算超详细教程
    本文介绍了关于人工智能、神经网络和深度学习的知识点,并提供了YOLOv7基于自己的数据集从零构建模型完整训练、推理计算的详细教程。文章还提到了郑州最低生活保障的话题。对于从事目标检测任务的人来说,YOLO是一个熟悉的模型。文章还提到了yolov4和yolov6的相关内容,以及选择模型的优化思路。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • 预备知识可参考我整理的博客Windows编程之线程:https:www.cnblogs.comZhuSenlinp16662075.htmlWindows编程之线程同步:https ... [详细]
  • RouterOS 5.16软路由安装图解教程
    本文介绍了如何安装RouterOS 5.16软路由系统,包括系统要求、安装步骤和登录方式。同时提供了详细的图解教程,方便读者进行操作。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 上图是InnoDB存储引擎的结构。1、缓冲池InnoDB存储引擎是基于磁盘存储的,并将其中的记录按照页的方式进行管理。因此可以看作是基于磁盘的数据库系统。在数据库系统中,由于CPU速度 ... [详细]
  • 开源Keras Faster RCNN模型介绍及代码结构解析
    本文介绍了开源Keras Faster RCNN模型的环境需求和代码结构,包括FasterRCNN源码解析、RPN与classifier定义、data_generators.py文件的功能以及损失计算。同时提供了该模型的开源地址和安装所需的库。 ... [详细]
  • 本文介绍了安全性要求高的真正密码随机数生成器的概念和原理。首先解释了统计学意义上的伪随机数和真随机数的区别,以及伪随机数在密码学安全中的应用。然后讨论了真随机数的定义和产生方法,并指出了实际情况下真随机数的不可预测性和复杂性。最后介绍了随机数生成器的概念和方法。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • 本文介绍了如何使用python从列表中删除所有的零,并将结果以列表形式输出,同时提供了示例格式。 ... [详细]
  • CentOS 7部署KVM虚拟化环境之一架构介绍
    本文介绍了CentOS 7部署KVM虚拟化环境的架构,详细解释了虚拟化技术的概念和原理,包括全虚拟化和半虚拟化。同时介绍了虚拟机的概念和虚拟化软件的作用。 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
author-avatar
Ax珊
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有