作者:单纯只是一2502904797 | 来源:互联网 | 2023-10-17 20:16
第七课主要内容:
- 多进程
- 多线程
- FIFO,LIFO,优先队列
- 线程局部变量
- 进程与线程的选择
- 线程池
- 异步IO概念及twisted案例
- 股票数据抓取
Linux下实例:
import os
print('Process (%s) start...' % os.getpid())
pid = os.fork()
if pid == 0:
print('Child process (%s), ppid is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process.' % os.getpid())
windows下实例(需要导入multiprocessing这个库):
from multiprocessing import Process
import os
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Process(target = run_proc, args = ('test',))
p.start()
p.join()
print('End')
多线程运行实例:
如果想深入多线程,请参考书籍《unix环境高级编程》
import time, threading
def loop():
thread_name = threading.current_thread().name
print('Thread %s is running...' % thread_name)
n = 0
while n <5:
n = n + 1
print('Thread %s >>> %d' % (thread_name, n))
print('Thread %s ends.' % thread_name)
thread_name = threading.current_thread().name
print('Thread %s is running...' % thread_name)
t = threading.Thread(target = loop, name = 'loopThread')
t.start()
t.join()
print('Thread %s ends.' % thread_name)
多线程竞争实例(操作银行存款):
线程先要获得锁,然后才可以运行,之后再释放掉获得的锁。在实际生产环境中必须得有锁,否则会出错。
import threading
import time
balance = 0
lock = threading.Lock()
def change_it(n):
global balance
balance = balance + n
balance = balance - n
'''
def run_thread(n):
for i in range(10000):
change_it(n)
'''
def run_thread(n):
for i in range(10000):
lock.acquire()
try:
change_it(n)
finally:
lock.release()
t1 = threading.Thread(target = run_thread, args = (5, ))
t2 = threading.Thread(target = run_thread, args = (8, ))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
有三种队列:FIFO:先进先出;LIFO:last in first out(相当于堆栈);优先队列:每进去的时候都会排序,在出来的时候保障是有序的。
实例:
# FIFO: First in first out
# LIFO: Last in firstt out
# Priority Queue
import queue
import threading
q = queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get())
q = queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get())
class Task:
def __init__(self, priority, description):
self.priority = priority
self.description = description
def __lt__(self, other): # Python 2.7, implement __cmp__
return self.priority
实例:
import multiprocessing
import threading
def loop():
x = 0
while True:
x = x ^ 1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target = loop)
t.start()
实例:
import threading
local_school = threading.local()
def process_student():
std = local_school.student
print('Hello %s (%s)\n' % (std, threading.current_thread().name))
def process_thread(name):
local_school.student = name
process_student()
t1 = threading.Thread(target = process_thread, args = ('Tom', ), name = 'TA')
t2 = threading.Thread(target = process_thread, args = ('Jack', ), name = 'TB')
t1.start()
t2.start()
t1.join()
t2.join()
实例:
import time
import threadpool
def long_op(n):
print('%d\n' % n)
time.sleep(2)
pool = threadpool.ThreadPool(2)
tasks = threadpool.makeRequests(long_op, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(len(tasks))
[pool.putRequest(task) for task in tasks]
pool.wait()
异步IO例子:
import asyncio
@asyncio.coroutine
def wget(host):
cOnnect= asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
print(header)
writer.write(header.encode('utf-8'))
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
实例:
import sys
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import LineReceiver
from twisted.python import log
from twisted.internet import reactor
class CmdProtocol(LineReceiver):
delimiter = '\n'
def connectionMade(self):
self.client_ip = self.transport.getPeer()[1]
log.msg("Client connection from %s" % self.client_ip)
if len(self.factory.clients) >= self.factory.clients_max:
log.msg("Too many connections. bye !")
self.client_ip = None
self.transport.loseConnection()
else:
self.factory.clients.append(self.client_ip)
def connectionLost(self, reason):
log.msg('Lost client connection. Reason: %s' % reason)
if self.client_ip:
self.factory.clients.remove(self.client_ip)
def lineReceived(self, line):
log.msg('Cmd received from %s : %s' % (self.client_ip, line))
class MyFactory(ServerFactory):
protocol = CmdProtocol
def __init__(self, clients_max=10):
self.clients_max = clients_max
self.clients = []
log.startLogging(sys.stdout)
reactor.listenTCP(9999, MyFactory(2))
reactor.run()
import requests
import threading
def get_stock(code):
url = 'http://hq.sinajs.cn/list=' + code
resp = requests.get(url).text
print('%s\n' % resp)
codes = ['sz000878', 'sh600993', 'sz000002', 'sh600153', 'sz002230', 'sh600658']
threads = [threading.Thread(target = get_stock, args = (code, )) for code in codes]
for t in threads:
t.start()
for t in threads:
t.join()
import requests
import threadpool
def get_stock(code):
url = 'http://hq.sinajs.cn/list=' + code
resp = requests.get(url).text
print('%s\n' % resp)
codes = ['sz000878', 'sh600993', 'sz000002', 'sh600153', 'sz002230', 'sh600658']
pool = threadpool.ThreadPool(2)
tasks = threadpool.makeRequests(get_stock, codes)
[pool.putRequest(task) for task in tasks]
pool.wait()
import aiohttp
import asyncio
@asyncio.coroutine
def get_stock(code):
url = 'http://hq.sinajs.cn/list=' + code
resp = yield from aiohttp.request('GET', url)
body = yield from resp.read()
print(body.decode('gb2312'))
codes = ['sz000878', 'sh600993', 'sz000002', 'sh600153', 'sz002230', 'sh600658']
tasks = [get_stock(code) for code in codes]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()