热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

python第九周:paramiko多线程、队列

1.paramiko模块用处:连接远程服务器并执行相关操作使用方法:SSHClient:连接远程服务器并执行基本命令importparam

1.paramiko模块

用处:连接远程服务器并执行相关操作

使用方法:

SSHClient:连接远程服务器并执行基本命令

import paramiko#创建SSH对象
ssh = paramiko.SSHClient()
#允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#连接服务器
ssh.connect(hostname="cl.salt.com",port=22,username="Mr Wu",password=123456)
#执行命令
stdin,stdout,stderr = ssh.exec_command("df") #标准输入、标准输出、标准错误
#
获取命令结果
res,err = stdout.read(),stderr.read()
result
= res if res else err
print(result)
#关闭连接
ssh.close()

SSHFtp:连接远程服务器并执行上传下载功能

import paramiko
#建立连接
transport = paramiko.Transport(("hostname",22))
transport.connect(username
="Mr Wu",password="123")
#TCP/IP等协议实在SFTPClient中定义的
sftp = paramiko.SFTPClient.from_transport(transport)
#将本地文件location.py上传至服务器 /tmp/test.py
sftp.put("/tmp/location.py","/tmp/test.py")
#将远程文件remove_path 下载到本地 local_path
sftp.get("remove_path","local_path")
transport.close()

SSH_RSA:基于公钥密钥进行连接

RSA:非对称密钥验证

公钥:保存在要连接的服务器

私钥:保存在本地机器

import paramikoprivate_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')#创建SSH对象
ssh = paramiko.SSHClient()
#允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#连接服务器
ssh.connect(hostname="cl.salt.com",port=22,username="Mr Wu",key_filename=private_key)
#执行命令
stdin,stdout,stderr = ssh.exec_command("df")
#获取命令结果
res,err = stdout.read(),stderr.read()
result
= res if res else err
print(result.decode())

2.进程:

什么是进程:程序的执行实例称为进程。
每个进程都提供执行程序所需的资源。 进程具有虚拟地址空间,可执行代码,系统对象的打开句柄,安全上下文,唯一进程标识符,环境变量,优先级类,最小和最大工作集大小以及至少一个执行线程。 每个进程都使用单个线程启动,通常称为主线程,但可以从其任何线程创建其他线程。

进程的特点:进程之间的内存是相互独立的

                     要操作CPU,必须通过线程

                     进程之间不能直接通信,必须通过中间代理

                     创建子进程就是对父进程进行一次克隆

                     进程只能操作它的子进程,不能操作它的父进程

                     对父进程的修改不会对子进程产生影响

3.线程:

什么是线程:操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程的实际运作单位,一条线程指的是进程中的一个单一顺序的控制流,一个进程可以并发多个线程,每个线程可以执行不同的任务。

线程就是一段可执行的上下文,CPU运行时就不断切换上下文来执行不同的线程,当一个线程没有执行完毕时,寄存器会保存可执行的上下文的中断的位置,当CPU又切换到这段上下文时,就从中断位置继续执行

线程的特点:是操作系统的最小的调度单位,是一串指令的集合

                     所有在同一进程里的线程共享同一块内存空间

                     一个线程可以控制和操作同一进程里的其他线程

                     对主线程的修改可能会影响其他的线程

4.python GIL(Global Interpreter Lock)

全局解释器锁:在python中,全局解释器锁是一个互斥锁,它可以防止多个本机线程同时执行python字节码,这种锁是必要的,因为python线程调用的是操作系统的原生系统,cpython的内存管理不是线程安全的。

全局解释器锁是历史遗留问题:

这是python的内存python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁。

所以由于GIL的存在,python并不能利用cpu多核的特性,在微观层面上,python并不是真正的多线程,在同一时刻只能有一个线程在运行。

python threading模块:

线程的两种调用方式:

直接调用:

实例化一个线程:t = threading.Thread()

import threading,time
#循环启动50个线程
def run(n):time.sleep(2)print("task:",n,threading.current_thread())
for i in range(50):t = threading.Thread(target=run,args=("t%s"%i,))t.start()

继承式调用:

import threading
class MyThread(threading.Thread):def __init__(self,n):super(MyThread,self).__init__()self.n = ndef run(self):print("running task",self.n)t1 = MyThread("t1")
t2
= MyThread("t2")t1.start()
t2.start()

t.Join()

阻塞:主线程必须等待线程t执行完毕后才能继续执行

import threading,time
def run(n):time.sleep(2)print("task:",n,threading.current_thread())
start_time
= time.time()
t_list
= [] #存放线程实例
for i in range(10):t = threading.Thread(target=run,args=("t%s"%i,))t.start()t_list.append(t) #为了不阻塞后面线程的启动,不在这里join,先放到一个列表里
for t in t_list:t.join()
end_time
= time.time()
print("=========all threads has finished....")
print("cost:",end_time - start_time,threading.current_thread())
#整个程序是一个主线程,for循环内是子线程,主线程与子线程并发运行
#
output:
'''
task: t0
task: t1
task: t2
task: t3
task: t4
task: t6
task: t5
task: t8
task: t7
task: t9
=========all threads has finished....
cost: 2.0088112354278564 <_MainThread(MainThread, started 6208)>
&#39;&#39;&#39;

t.setBaemon(True)

将线程t设置为守护线程&#xff0c;整个程序在非守护线程执行结束时就结束&#xff0c;不会等待守护进程

注&#xff1a;由守护进程创建的其他进程&#xff0c;在非守护进程结束时也会结束&#xff0c;不管其是否完成任务

import threading,time
def run(n):time.sleep(2)print("task:",n,threading.current_thread())
start_time
&#61; time.time()
for i in range(50):t &#61; threading.Thread(target&#61;run,args&#61;("t%s"%i,))t.setDaemon(True)t.start()
end_time
&#61; time.time()
print("&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;all threads has finished....")
print("cost:",end_time - start_time,threading.current_thread())
#整个程序是一个主线程&#xff0c;for循环内是子线程,主线程与子线程并发运行
#
整个程序在非守护线程执行结束时就结束&#xff0c;不会等待守护进程
注&#xff1a;由守护进程创建的其他进程&#xff0c;在非守护进程结束时也会结束&#xff0c;不管其是否完成任务

#
output:
&#39;&#39;&#39;
&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;all threads has finished....
cost: 0.03975486755371094 <_MainThread(MainThread, started 10208)>
&#39;&#39;&#39;

线程锁&#xff1a;

设count &#61; 0&#xff0c;当我们启用50个线程去执行count &#43;&#61; 1时。按理来说&#xff0c;由于GIL的存在&#xff0c;这些线程其实是串行的&#xff0c;计算出的最终count的值应该等于50&#xff0c;但是在实际程序运行时&#xff0c;count的值常常会小于50。这与python GIL的运行原理有关&#xff0c;当CPU大概执行一百多条命令时python解释器就会释放全局解释器锁&#xff0c;这样就可能导致某两个线程返回的是同一个值&#xff08;其中一个线程还没有执行完毕时&#xff0c;python解释器释放了全局解释器锁&#xff0c;导致另一个线程拿到的值是数据池里面没有改变的值&#xff09;

这个时候我们可以自己设置线程锁&#xff0c;即在一个线程执行完任务count &#43;&#61; 1后&#xff0c;才会释放线程锁&#xff0c;其他线程才能申请全局解释器锁。

import threading,time
#实例化一个线程锁
lock &#61; threading.Lock()
num
&#61; 0
#当数据量不是特别大时可使用线程锁
def run(n):lock.acquire() #加锁global numnum &#43;&#61; 1lock.release() #释放锁
for t in range(50):t &#61; threading.Thread(target&#61;run,args&#61;(num,))t.start()print("num:",num)
#output: num: 50

递归锁&#xff08;Rlock&#xff09;

import threading,timenum,num2 &#61; 0,0
lock
&#61; threading.RLock() #递归锁
def run1():print("grab the first part data")lock.acquire()global numnum &#43;&#61; 1lock.release()return numdef run2():print("grab the second part data")lock.acquire()global num2num2 &#43;&#61; 1lock.release()return num2def run3():lock.acquire()res &#61; run1()print("-----between run1 and run2------")res2 &#61; run2()lock.release()print(res,res2)for i in range(2):t &#61; threading.Thread(target&#61;run3)t.start()while threading.active_count() !&#61; 1:pass
else:print("&#61;&#61;&#61;&#61;&#61;&#61;&#61;all threads done&#61;&#61;&#61;&#61;&#61;&#61;")print(num,num2)
#output:
&#39;&#39;&#39;
grab the first part data
-----between run1 and run2------
grab the second part data
1 1
grab the first part data
-----between run1 and run2------
grab the second part data
2 2
&#61;&#61;&#61;&#61;&#61;&#61;&#61;all threads done&#61;&#61;&#61;&#61;&#61;&#61;
2 2
&#39;&#39;&#39;

信号量&#xff08;semaphore&#xff09;

线程锁同时允许一个线程更改数据&#xff0c;而信号量是同时允许一定数量的线程更改数据。

注&#xff1a;在设置的信号量线程里&#xff0c;只要有一个线程完成&#xff0c;就会让新的线程进来&#xff0c;直到所有线程运行结束

semaphore &#61; threading.BoundedSemaphore(5) #设置信号量&#xff0c;最大只能设置为5

semaphore.acquire() #申请线程锁

semaphore.release() #释放线程锁

#在设置的信号量线程里&#xff0c;只要有一个线程完成&#xff0c;就会让新的线程进来&#xff0c;保持设置的线程数直到所有线程运行结束
import threading,timedef run(n):semaphore.acquire()print("run the thread: %s\n"%n)global numnum &#43;&#61; 1time.sleep(1)semaphore.release()
if __name__ &#61;&#61; &#39;__main__&#39;:num &#61; 0semaphore &#61; threading.BoundedSemaphore()#最多允许5个线程同时运行for i in range(20):t &#61; threading.Thread(target&#61;run,args&#61;(i,))t.start()while threading.active_count() !&#61; 1:pass
else:print("-----all threads done-----")print(num)

计时器&#xff08;Timer&#xff09;

此类表示仅在经过一定时间后才应运行的操作。
与线程一样&#xff0c;通过调用start&#xff08;&#xff09;方法启动计时器。 通过调用thecancel&#xff08;&#xff09;方法可以停止计时器&#xff08;在其动作开始之前&#xff09;。 计时器在执行其操作之前将等待的时间间隔可能与用户指定的时间间隔不完全相同。

import threading
def run():print("My Name Is Mr Wu!")
t
&#61; threading.Timer(10,run)
t.start()
#在10秒后输出 My Name Is Mr Wu!

事件&#xff08;event&#xff09;

事件是一个简单的同步对象;

事件代表一个内部标志和线程。

事件可以等待设置标志&#xff0c;或者自己设置或清除标志。

简单点来说&#xff0c;事件就是通过设置、清除标志位来实现两个线程之间的交互。

event &#61; threading.Event() #生成一个事件实例

event.clear() #清空标志位

event.set() #设置标志位

event.wait() #等待标志位被设置

程序示例&#xff1a;红灯停、绿灯行。红灯、绿灯的持续时间都是5&#xff0c;当汽车看到红灯的时候停止行驶&#xff0c;当看到绿灯的时候行驶。

import time,threading
event
&#61; threading.Event()#生成一个事件实例
def lighter():count &#61; 0while True:if count > 5 and count <&#61; 10:#改成红灯event.clear()#清空标志位print("\033[41;1m红灯停\033[0m")elif count > 10: #改成绿灯#event.set()count &#61; 0else:event.set()print("\033[42;1m绿灯行\033[0m")time.sleep(1)count &#43;&#61; 1def car(name):while True:if event.is_set(): #绿灯行print("[%s] running...."%name)time.sleep(1)else:print("[%s] sees red light , waiting..."%name)event.wait()print("[%s] sees green light , start going..."%name)
car1
&#61; threading.Thread(target&#61;car,args&#61;("Tesla",))
light
&#61; threading.Thread(target&#61;lighter)
light.start()
car1.start()

队列&#xff08;Queue&#xff09;

当必须在多个线程之间安全地交换信息时&#xff0c;队列在线程编程中特别有用。

简单点说&#xff0c;队列就是一个存放数据的容器&#xff0c;线程既可以把数据放进去也可以把数据取出来&#xff0c;以此达到线程交互的目的。

队列的作用&#xff1a;
1.解耦&#xff0c;使程序直接实现松耦合

2.提高处理效率

class queue.Queue(maxsize &#61; 0) #先进先出

class queue.LifoQueue(maxsize &#61; 0) #后进先出

class queue.PriorityQueue(maxsize &#61; 0) #存储数据时可设置优先级

优先级队列的构造函数。 maxsize是一个整数&#xff0c;用于设置可以放入队列的项目数的上限。 达到此大小后&#xff0c;插入将阻止&#xff0c;直到消耗队列项。 如果maxsize小于或等于零&#xff0c;则队列大小为无限大

exception queue.Bmpty

在对空的Queue对象调用非阻塞get&#xff08;&#xff09;&#xff08;或get_nowait&#xff08;&#xff09;&#xff09;时引发异常。

 

 exception queue.Full

在已满的Queue对象上调用非阻塞put&#xff08;&#xff09;&#xff08;或put_nowait&#xff08;&#xff09;&#xff09;时引发异常。

Queue.qsize() #返回队列的长度

Queue.empty() #return True if enmpty

Queue.full() #return True if full

Queue.put(item,block &#61; True,timeout &#61; None) #取出队列里的数据

当把将数据放入队列中时。 如果block&#61;True且timeout&#61;None&#xff08;默认值&#xff09;&#xff0c;则在队列为满的时候阻塞&#xff0c;直到队列有空闲区域的时候才能继续放入数据。 如果timeout是一个正数&#xff0c;则队列阻塞最多timeout秒&#xff0c;如果在该时间内队列为满&#xff0c;则会引发Full异常。如果block&#61;False&#xff0c;当队列有空闲区域时&#xff0c;则将数据放在队列中&#xff0c;否则引发Full异常&#xff08;在这种情况下忽略超时&#xff09;。

Queue.put_nowait(item)

Equivalent to put(item,False).

Queue.get(block &#61; True,timeout &#61; None)

当从队列中取出数据时。 如果block&#61;true且timeout&#61;None&#xff08;默认值&#xff09;&#xff0c;则在队列为空的时候阻塞&#xff0c;直到队列中有数据时才能继续取出数据。 如果timeout是一个正数&#xff0c;则队列阻塞最多timeout秒&#xff0c;如果在该时间内队列为空&#xff0c;则会引发Empty异常。当block&#61;False&#xff0c;当队列不为空时&#xff0c;则可取出数据&#xff0c;否则引发Empty异常&#xff08;在这种情况下忽略超时&#xff09;、

Queue.get_nowait(time)

Equivalent to get(item,False).

生产者消费者模型

为什么要使用生产者消费者模式&#xff1f;

在线程世界里&#xff0c;生产者就是生产数据的线程&#xff0c;消费者就是消费数据的线程。在多线程开发当中&#xff0c;如果生产者处理速度很快&#xff0c;而消费者处理速度很慢&#xff0c;那么生产者就必须等待消费者处理完&#xff0c;才能继续生产数据。同样的道理&#xff0c;如果消费者的处理能力大于生产者&#xff0c;那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产这消费者模式&#xff1f;

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯&#xff0c;而通过阻塞队列来进行通讯&#xff0c;所以生产者生产完数据之后不用等待消费者处理&#xff0c;直接扔给阻塞队列&#xff0c;消费者不找生产者要数据&#xff0c;而是直接从阻塞队列里取&#xff0c;阻塞队列就相当于一个缓冲区&#xff0c;平衡了生产者和消费者的处理能力。

程序示例&#xff1a;

import threading,queue,time
q
&#61; queue.Queue(maxsize&#61;10)
def producer(name):count &#61; 1while True:q.put("骨头%s"%count)print("生产了骨头",count)count &#43;&#61; 1time.sleep(2)
def consumer(name):#while q.qsize() > 0:while True:print("[%s] 取到[%s],并且吃了它...."%(name,q.get()))time.sleep(1)
p
&#61; threading.Thread(target&#61;producer,args&#61;("Alex",))
c
&#61; threading.Thread(target&#61;consumer,args&#61;("ChengRongHua",))
c1
&#61; threading.Thread(target&#61;consumer,args&#61;("王森",))
p.start()
c.start()
c1.start()

 

转:https://www.cnblogs.com/BUPT-MrWu/p/10014269.html



推荐阅读
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 在Kubernetes上部署JupyterHub的步骤和实验依赖
    本文介绍了在Kubernetes上部署JupyterHub的步骤和实验所需的依赖,包括安装Docker和K8s,使用kubeadm进行安装,以及更新下载的镜像等。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • eclipse学习(第三章:ssh中的Hibernate)——11.Hibernate的缓存(2级缓存,get和load)
    本文介绍了eclipse学习中的第三章内容,主要讲解了ssh中的Hibernate的缓存,包括2级缓存和get方法、load方法的区别。文章还涉及了项目实践和相关知识点的讲解。 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
author-avatar
毛小猫TTN
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有