前回からの続きです。
プロセス間の通信を行ってみます。
multiprocessing.Pipe([duplex])
Pipe を使うことで、2つのプロセスの通信を行うことができます。
multiprocessing.Pipe([duplex])
実用的な意味は全くないですが、「与えられたリストの数字を一つずつ二乗してそれを表示する関数」をmultiprocessing で実装してみます。
import multiprocessing
def square_pipe(nums):
for i in nums:
print('square_pipe:', i**2)
if __name__ == '__main__':
nums = range(5)
square_process = multiprocessing.Process(target=square_pipe, args=(nums, ))
square_process.start()
square_process.join()
# 実行結果
# square_pipe: 0
# square_pipe: 1
# square_pipe: 4
# square_pipe: 9
# square_pipe: 16
sub process で処理した数字 main process に送り、main process 側で受け取って表示してみます。
multiprocessing.Pipe([duplex]) によって返される2つのConnection オブジェクトを使い通信を行います。
import multiprocessing
def square_pipe(nums, child_conn):
for i in nums:
# child_conn から parent_conn に送信
child_conn.send(i ** 2)
# child_conn 終了
child_conn.close()
if __name__ == '__main__':
nums = range(5)
# connection object の生成
parent_conn, child_conn = multiprocessing.Pipe()
square_process = multiprocessing.Process(target=square_pipe, args=(nums, child_conn, ))
square_process.start()
# parent_conn は何か送られてきているか1秒待つ。
while parent_conn.poll(1):
# parent_conn が受け取ったものを表示する。
print('mainprocess:', parent_conn.recv())
square_process.join()
parent_conn.close()
# 実行結果
# mainprocess: 0
# mainprocess: 1
# mainprocess: 4
# mainprocess: 9
# mainprocess: 16
multiprocessing.Queue([maxsize])
Queue を使うことで、複数のプロセスで通信を行うことができます。
multiprocessing.Queue([maxsize])
通常の queue のイメージそのままに、FIFO で情報のやり取りが行えます。
import multiprocessing
# queue.Empty を except するために import
import queue
def square_queue(nums, queue):
for i in nums:
queue.put(i ** 2)
def cube_queue(nums, queue):
for i in nums:
queue.put(i ** 3)
if __name__ == '__main__':
nums = range(5)
queue_multiprocessing = multiprocessing.Queue()
square_process = multiprocessing.Process(target=square_queue, args=(nums, queue_multiprocessing))
cube_process = multiprocessing.Process(target=cube_queue, args=(nums, queue_multiprocessing))
square_process.start()
cube_process.start()
while True:
try:
# 一秒待ってあれば取り出す。ないときは例外 queue.Empty が発生。
num = queue_multiprocessing.get(True, 1)
print(num)
except queue.Empty:
break
square_process.join()
cube_process.join()
queue_multiprocessing.close()
また、処理が終わってから取り出すこともできます。
import multiprocessing
def square_queue(nums, queue):
for i in nums:
queue.put(i ** 2)
def cube_queue(nums, queue):
for i in nums:
queue.put(i ** 3)
if __name__ == '__main__':
nums = range(5)
queue_multiprocessing = multiprocessing.Queue()
square_process = multiprocessing.Process(target=square_queue, args=(nums, queue_multiprocessing))
cube_process = multiprocessing.Process(target=cube_queue, args=(nums, queue_multiprocessing))
square_process.start()
cube_process.start()
square_process.join()
cube_process.join()
while not queue_multiprocessing.empty():
print(queue_multiprocessing.get())