来源:python中国网 时间:2019-06-06

  前面我们创建进程都是直接利用multiprocessing中的Process动态成生多个进程,但如果需要成百上千个进程手动的去创建就不科学了,此时就可以用到multiprocessing模块提供的Pool方法。它的特点如下:

  1)初始化Pool时,可以指定一个最大进程数;

  2)有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;

  3)但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行。

  4)进程池实现了上下文管理协议,程序可以使用 with 子句来管理进程池,这样就可以避免程序主动关闭进程池。

  请看下面的实例:

# ‐*‐ coding: utf‐8 ‐*‐

from multiprocessing import Pool
import os,time,random


def worker(msg):
    t_start = time.time()
    print("(%s)开始执行,进程号为%d"%(msg,os.getpid()))
    time.sleep(random.randint(1,2))
    t_stop = time.time()
    print("(%s)执行完毕,耗时%d秒"% (msg,t_stop-t_start))


if __name__ == "__main__":

    p = Pool(3)  # 定义一个进程池,最大进程数3
    for i in ['小王','中王','老王']:
        # Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
        p.apply_async(worker,(i,))
    p.close()  # 关闭进程池,关闭后p不再接收新的请求
    p.join()  # 等待p中所有子进程执行完成,必须放在close语句之后

D:installpython3python.exe D:/pyscript/test/test2.py
(小王)开始执行,进程号为6640
(中王)开始执行,进程号为2080
(老王)开始执行,进程号为1100
(小王)执行完毕,耗时1秒
(中王)执行完毕,耗时1秒
(老王)执行完毕,耗时1秒

Process finished with exit code 0


  multiprocessing.Pool常用函数解析:

  apply_async(func[,args[,kwds]]):使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;

  apply(func[,args[,kwds]]):使用阻塞方式调用func,该方法会被阻塞直到func函数执行完成。

  close():关闭Pool,使其不再接受新的任务,它会把当前进程池中的所有任务执行完成后再关闭自己。

  terminate():不管任务是否完成,立即终止进程池;

  join():主进程阻塞,等待子进程的完成,必须在close或terminate之后使用;

# ‐*‐ coding: utf‐8 ‐*‐

from multiprocessing import Pool
import os,time,random


def worker(msg):
    t_start = time.time()
    print("(%s)开始执行,进程号为%d"%(msg,os.getpid()))
    time.sleep(random.randint(1,2))
    t_stop = time.time()
    print("(%s)执行完毕,耗时%d秒"% (msg,t_stop-t_start))


if __name__ == "__main__":

    p = Pool(3)  # 定义一个进程池,最大进程数3
    for i in ['小王','中王','老王']:
        # Pool.apply 阻塞的方式
        p.apply(worker,(i,))
    p.close()  # 关闭进程池,关闭后p不再接收新的请求
    p.join()  # 等待p中所有子进程执行完成,必须放在close语句之后

D:installpython3python.exe D:/pyscript/test/test2.py
(小王)开始执行,进程号为10480
(小王)执行完毕,耗时1秒
(中王)开始执行,进程号为9812
(中王)执行完毕,耗时2秒
(老王)开始执行,进程号为9564
(老王)执行完毕,耗时2秒

Process finished with exit code 0


  multiprocessing.Pool其他函数解析:

  map(func,iterable[,chunksize]):类似于Python的map()全局函数,只不过此处使用新进程对iterable的每一个元素执行func函数。

  map_async(func,iterable[,chunksize[,callback[,error_callback]]]):这是map()方法的异步版本,该方法不会被阻塞。其中callback指定func函数完成后的回调函数,error_callback指定func函数出错后的回调函数。

  imap(func,iterable[,chunksize]):这是map()方法的延迟版本。

  imap_unordered(func,iterable[,chunksize]):功能类似于imap()方法,但该方法不能保证所生成的结果(包含多个元素)与原iterable中的元素顺序一致。

  starmap(func,iterable[,chunksize]):功能类似于map()方法,但该方法要求iterable的元素也是iterable对象,程序会将每一个元素解包之后作为func函数的参数。

# ‐*‐ coding: utf‐8 ‐*‐

from multiprocessing import Pool
import os,time,random


def worker(msg):
    t_start = time.time()
    print("(%s)开始执行,进程号为%d"%(msg,os.getpid()))
    time.sleep(random.randint(1,2))
    t_stop = time.time()
    print("(%s)执行完毕,耗时%d秒"% (msg,t_stop-t_start))


if __name__ == "__main__":

    p = Pool(3)  # 定义一个进程池,最大进程数3
    for i in ['小王','中王','老王']:
        # Pool.apply 阻塞的方式
        p.apply(worker,(i,))
    p.close()  # 关闭进程池,关闭后po不再接收新的请求
    p.join()  # 等待p中所有子进程执行完成,必须放在close语句之后

D:installpython3python.exe D:/pyscript/test/test2.py
(小王)开始执行,进程号为10480
(小王)执行完毕,耗时1秒
(中王)开始执行,进程号为9812
(中王)执行完毕,耗时2秒
(老王)开始执行,进程号为9564
(老王)执行完毕,耗时2秒

Process finished with exit code 0