[Python] multiprocessingを試す (5)

前回からの続きです。

前回からの続きです。 pool python で mulitprocessing を行う方法として、process 以外に pool があります。 プロセスプール process と pool の違い Python Multiproce...

プロセス間の通信を行ってみます。

multiprocessing.Pipe([duplex])

Pipe を使うことで、2つのプロセスの通信を行うことができます。

multiprocessing.Pipe([duplex])

実用的な意味は全くないですが、「与えられたリストの数字を一つずつ二乗してそれを表示する関数」をmultiprocessing で実装してみます。

import multiprocessing 

def square_pipe(nums):
    for i in nums:
        print('square_pipe:', i**2)

if __name__ == '__main__':

    nums = range(5)

    square_process = multiprocessing.Process(target=square_pipe, args=(nums, ))
    square_process.start()
    square_process.join()

# 実行結果
# square_pipe: 0
# square_pipe: 1
# square_pipe: 4
# square_pipe: 9
# square_pipe: 16

sub process で処理した数字 main process に送り、main process 側で受け取って表示してみます。

multiprocessing.Pipe([duplex]) によって返される2つのConnection オブジェクトを使い通信を行います。

import multiprocessing 

def square_pipe(nums, child_conn):
    for i in nums:
        # child_conn から parent_conn に送信
        child_conn.send(i ** 2)
    # child_conn 終了
    child_conn.close()

if __name__ == '__main__':
    
    nums = range(5)

    # connection object の生成
    parent_conn, child_conn = multiprocessing.Pipe()

    square_process = multiprocessing.Process(target=square_pipe, args=(nums, child_conn, ))

    square_process.start()
    
    # parent_conn は何か送られてきているか1秒待つ。
    while parent_conn.poll(1):
        # parent_conn が受け取ったものを表示する。
        print('mainprocess:', parent_conn.recv())

    square_process.join()
    parent_conn.close()

# 実行結果
# mainprocess: 0
# mainprocess: 1
# mainprocess: 4
# mainprocess: 9
# mainprocess: 16

multiprocessing.Queue([maxsize])

Queue を使うことで、複数のプロセスで通信を行うことができます。

multiprocessing.Queue([maxsize])  

通常の queue のイメージそのままに、FIFO で情報のやり取りが行えます。

import multiprocessing
#  queue.Empty を except するために import
import queue

def square_queue(nums, queue):
    for i in nums:
        queue.put(i ** 2)

def cube_queue(nums, queue):
    for i in nums:
        queue.put(i ** 3)

if __name__ == '__main__':
    
    nums = range(5)

    queue_multiprocessing = multiprocessing.Queue()
    square_process = multiprocessing.Process(target=square_queue, args=(nums, queue_multiprocessing))
    cube_process = multiprocessing.Process(target=cube_queue, args=(nums, queue_multiprocessing))

    square_process.start()
    cube_process.start()

    while True:
        try:
            # 一秒待ってあれば取り出す。ないときは例外 queue.Empty が発生。
            num = queue_multiprocessing.get(True, 1)
            print(num)
        except queue.Empty:
            break

    square_process.join()
    cube_process.join()

    queue_multiprocessing.close()

また、処理が終わってから取り出すこともできます。

import multiprocessing

def square_queue(nums, queue):
    for i in nums:
        queue.put(i ** 2)

def cube_queue(nums, queue):
    for i in nums:
        queue.put(i ** 3)

if __name__ == '__main__':
    
    nums = range(5)

    queue_multiprocessing = multiprocessing.Queue()
    square_process = multiprocessing.Process(target=square_queue, args=(nums, queue_multiprocessing))
    cube_process = multiprocessing.Process(target=cube_queue, args=(nums, queue_multiprocessing))

    square_process.start()
    cube_process.start()

    square_process.join()
    cube_process.join()

    while not queue_multiprocessing.empty():
        print(queue_multiprocessing.get())