来源:python中国网 时间:2019-06-06

  上一篇通过Queue队列实现了生产者消费者模型,利用进程池Pool实现生产者消费者模型思路是一样的。

  如果用管道Pipe()去实现也是一个道理,只不过如果是多个进程同时读写管道Pipe()的一端可能会发生数据混乱!官方文档有明确提示:

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).
The two connection objects returned by Pipe() represent the two ends of the pipe. 
Each connection object has send() and recv() methods (among others). 
Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. 
Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

  所以多个生产者和消费者时需要加锁,建议大家用队列去实现,队列其实就是管道加锁(保护数据的机制)的机制!

  进程池-多个生产者和消费者模型

# ‐*‐ coding: utf‐8 ‐*‐

from multiprocessing import Manager
from multiprocessing import Pool
import random
import time
import os


def produce(queue):
    for i in range(5):
        print("produce子进程%s生产 girl_%s" % (os.getpid(),str(i)))
        queue.put('girl_'+ str(i))
        time.sleep(random.randint(0,1))


def consumer(queue):
    while True:
        if not queue.empty():
            girl = queue.get()
            if girl != 'man':
                print("consumer子进程%s消费 %s" % (os.getpid(),girl))
                time.sleep(random.randint(0, 1))
            else:
                print("consumer子进程%s获取结束信号man" % os.getpid())
                queue.put('man')  # 放到队列,让其他消费者也能获取man
                break


if __name__ == "__main__":

    girls_q = Manager().Queue()

    p_pool = Pool(3)  # 生产者进程池
    c_pool = Pool(4)  # 消费者进程池

    for i in range(3):
        p_pool.apply_async(func=produce,args=(girls_q,))
    for i in range(4):
        c_pool.apply_async(func=consumer,args=(girls_q,))
    p_pool.close()
    c_pool.close()

    p_pool.join()
    girls_q.put('man')  # 生产结束信号
    c_pool.join()

    print("生产者消费者模型完毕~~~")

D:installpython3python.exe D:/pyscript/test/test1.py
produce子进程9712生产 girl_0
produce子进程9712生产 girl_1
consumer子进程11652消费 girl_0
produce子进程7140生产 girl_0
produce子进程12788生产 girl_0
produce子进程12788生产 girl_1
consumer子进程11704消费 girl_1
consumer子进程4724消费 girl_0
consumer子进程10880消费 girl_0
produce子进程9712生产 girl_2
produce子进程9712生产 girl_3
produce子进程9712生产 girl_4
consumer子进程11652消费 girl_1
produce子进程7140生产 girl_1
produce子进程7140生产 girl_2
produce子进程7140生产 girl_3
produce子进程7140生产 girl_4
produce子进程12788生产 girl_2
consumer子进程11704消费 girl_2
consumer子进程11704消费 girl_3
consumer子进程4724消费 girl_4
consumer子进程4724消费 girl_1
consumer子进程10880消费 girl_2
consumer子进程11652消费 girl_3
produce子进程12788生产 girl_3
consumer子进程11704消费 girl_4
consumer子进程11704消费 girl_2
consumer子进程4724消费 girl_3
produce子进程12788生产 girl_4
consumer子进程10880消费 girl_4
consumer子进程11652获取结束信号man
consumer子进程11704获取结束信号man
consumer子进程10880获取结束信号man
consumer子进程4724获取结束信号man
生产者消费者模型完毕~~~

Process finished with exit code 0


  管道-单个生产者和消费者模型

  管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。

# ‐*‐ coding: utf‐8 ‐*‐
"""
https://docs.python.org/3/library/multiprocessing.html
"""
import multiprocessing
import random
import time
import os


def producer(conn1):

    for i in range(5):
        item = random.randint(1, 10)
        conn1.send(item)
        print('producer进程({0}) 生产:{1}'.format(os.getpid(), item))
        time.sleep(0.2)
    conn1.close()


def consumer(conn2):

    while True:
        try:
            item = conn2.recv()
        except EOFError:
            conn2.close()
            break
        else:
            print('cusumer进程({0})消费{1}'.format(os.getpid(),item))
            time.sleep(0.2)



if __name__ == "__main__":

    conn1,conn2 = multiprocessing.Pipe()

    process_producer = multiprocessing.Process(
        target=producer, args=(conn1,))

    process_consumer = multiprocessing.Process(
        target=consumer, args=(conn2,))

    process_producer.start()
    process_consumer.start()

    process_producer.join()
    conn1.close()
    process_consumer.join()
    conn2.close()

    print("一切结束!每个进程中都要关闭conn")

D:installpython3python.exe D:/pyscript/test/test1.py
producer进程(8652) 生产:1
cusumer进程(10084)消费1
producer进程(8652) 生产:5
cusumer进程(10084)消费5
producer进程(8652) 生产:10
cusumer进程(10084)消费10
producer进程(8652) 生产:6
cusumer进程(10084)消费6
producer进程(8652) 生产:4
cusumer进程(10084)消费4
一切结束,每个进程中都要关闭conn

Process finished with exit code 0