作者:手机用户2502860565 | 来源:互联网 | 2023-01-31 13:51
https:blog.csdn.netqq_39694935articledetails84552076【Python】multiprocessingPool进程间通信共享1.tq
https://blog.csdn.net/qq_39694935/article/details/84552076
【Python】multiprocessing Pool 进程间通信共享
1. tqdm模块的简洁使用
直接上代码:
from tqdm import tqdm
from multiprocessing import Pool
import functools
from pymongo import MongoClient
mdb = MongoClient('120.xx.26.xx:20002', username='xx', password='xxxxx')
# 三种main的写法只写一种即可
def create_data(image):
# TODO 具体处理逻辑
print(image)
return str(image)
def main_deal():
num_processor = 20
p = Pool(num_processor)
images = mdb['db_name']['image'].find(no_cursor_timeout=True).batch_size(200)
fw = open('result.txt', 'w+')
for result in tqdm(p.imap(create_data, images), total=images.count()):
fw.write(result + '\n')
fw.close()
for _ in tqdm(p.imap_unordered(create_data, images)):
pass
p.close()
p.join()
def main_deal():
num_processor = 20
p = Pool(num_processor)
images = mdb['goodlook']['image_generated_data'].find(no_cursor_timeout=True).batch_size(200)
fw = open('result.txt', 'w+')
for result in tqdm(p.imap_unordered(create_data, images)):
fw.write(result + '\n')
fw.close()
p.close()
p.join()
def main_deal():
num_processor = 20
p = Pool(num_processor)
images = mdb['goodlook']['image_generated_data'].find(no_cursor_timeout=True).batch_size(200)
fw = open('result.txt', 'w+')
pt = functools.partial(create_data)
for result in tqdm(p.imap_unordered(pt, images)):
fw.write(result + '\n')
fw.close()
p.close()
p.join()
if __name__ == '__main__':
main_deal()
2.进程池多进程
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "Victor"
# Date: 2020/6/18
import traceback
import multiprocessing
from multiprocessing import Pool
concurrent_num = 10
def task_run(data, msg):
try:
# time.sleep(random.randrange(1, 4))
msg = multiprocessing.current_process().name + '-' + msg
print(f"hello world : {data}, {msg}")
except Exception as e:
traceback.print_exc()
print("error: ", e)
return None
if __name__ == '__main__':
data = {}
p = Pool(concurrent_num)
for i in range(concurrent_num):
msg = 'index-%d' % i
p.apply_async(task_run, (data, msg,))
p.close()
p.join()
3. 进程池和调度器模块的冲突
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Author: Victor
@Contact:
@Date: 2020/10/15
@function: ''
'''
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
import time
import random
import multiprocessing
class TodayCollection(object):
def __init__(self):
self.name = "今日采集类"
self.scheduler = BlockingScheduler()
def execute_tasks(self, index, d_arr, p_lock):
try:
# 一般用于写数据库
# if p_lock:
# p_lock.acquire()
# print(index, d_arr)
# if p_lock:
# p_lock.release()
# p_lock.acquire()
# print(index, d_arr)
# p_lock.release()
while True:
print(index, d_arr, random.random())
except Exception as ex:
print(ex)
def start(self):
groups = [[1, 3, 22], [3, 4, 6, 8], [3, 3, 4, 4], [3, 5, 6, 7]]
manager = multiprocessing.Manager()
p_lock = manager.Lock()
pool = multiprocessing.Pool(processes=4)
for index, d_arr in enumerate(groups):
if d_arr:
pool.apply_async(self.execute_tasks, (index, d_arr, p_lock))
pool.close()
pool.join()
pool.terminate()
if __name__ == '__main__':
# apscheduler的BlockingScheduler和BackgroundScheduler导致多进程异常退出
# 要想正常直接去掉self.scheduler = BlockingScheduler()
TodayCollection().start()