热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

python多进程:multiprocessingPool和tqdm

https:blog.csdn.netqq_39694935articledetails84552076【Python】multiprocessingPool进程间通信共享1.tq

https://blog.csdn.net/qq_39694935/article/details/84552076

【Python】multiprocessing Pool 进程间通信共享

1. tqdm模块的简洁使用

直接上代码:

from tqdm import tqdm
from multiprocessing import Pool
import functools
from pymongo import MongoClient
mdb = MongoClient('120.xx.26.xx:20002', username='xx', password='xxxxx')

# 三种main的写法只写一种即可

def create_data(image):

    # TODO 具体处理逻辑
    print(image)
    return str(image)


def main_deal():

    num_processor = 20
    p = Pool(num_processor)
    images = mdb['db_name']['image'].find(no_cursor_timeout=True).batch_size(200)

    fw = open('result.txt', 'w+')
    for result in tqdm(p.imap(create_data, images), total=images.count()):
        fw.write(result + '\n')
    fw.close()

    for _ in tqdm(p.imap_unordered(create_data, images)):
        pass
    p.close()
    p.join()


def main_deal():

    num_processor = 20
    p = Pool(num_processor)

    images = mdb['goodlook']['image_generated_data'].find(no_cursor_timeout=True).batch_size(200)
    fw = open('result.txt', 'w+')
    for result in tqdm(p.imap_unordered(create_data, images)):
        fw.write(result + '\n')
    fw.close()

    p.close()
    p.join()


def main_deal():
    num_processor = 20
    p = Pool(num_processor)

    images = mdb['goodlook']['image_generated_data'].find(no_cursor_timeout=True).batch_size(200)
    fw = open('result.txt', 'w+')

    pt = functools.partial(create_data)
    for result in tqdm(p.imap_unordered(pt, images)):
        fw.write(result + '\n')
    fw.close()

    p.close()
    p.join()


if __name__ == '__main__':
    main_deal()

2.进程池多进程

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "Victor"
# Date: 2020/6/18

import traceback
import multiprocessing
from multiprocessing import Pool

concurrent_num = 10


def task_run(data, msg):
    try:
        # time.sleep(random.randrange(1, 4))
        msg = multiprocessing.current_process().name + '-' + msg
        print(f"hello world : {data}, {msg}")

    except Exception as e:
        traceback.print_exc()
        print("error: ", e)

    return None


if __name__ == '__main__':

    data = {}
    p = Pool(concurrent_num)

    for i in range(concurrent_num):
        msg = 'index-%d' % i
        p.apply_async(task_run, (data, msg,))

    p.close()
    p.join()

3. 进程池和调度器模块的冲突

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
''' 
@Author: Victor
@Contact: 
@Date: 2020/10/15
@function: ''
'''

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler

import time
import random
import multiprocessing


class TodayCollection(object):

    def __init__(self):
        self.name = "今日采集类"
        self.scheduler = BlockingScheduler()

    def execute_tasks(self, index, d_arr, p_lock):
        try:

            # 一般用于写数据库
            # if p_lock:
            #     p_lock.acquire()
            # print(index, d_arr)
            # if p_lock:
            #     p_lock.release()

            # p_lock.acquire()
            # print(index, d_arr)
            # p_lock.release()

            while True:
                print(index, d_arr, random.random())

        except Exception as ex:
            print(ex)

    def start(self):
        groups = [[1, 3, 22], [3, 4, 6, 8], [3, 3, 4, 4], [3, 5, 6, 7]]
        manager = multiprocessing.Manager()
        p_lock = manager.Lock()
        pool = multiprocessing.Pool(processes=4)
        for index, d_arr in enumerate(groups):
            if d_arr:
                pool.apply_async(self.execute_tasks, (index, d_arr, p_lock))

        pool.close()
        pool.join()
        pool.terminate()


if __name__ == '__main__':

    # apscheduler的BlockingScheduler和BackgroundScheduler导致多进程异常退出
    # 要想正常直接去掉self.scheduler = BlockingScheduler()
    TodayCollection().start()

  

 


推荐阅读
author-avatar
手机用户2502860565
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有