[Python] multiprocessingを試す (2)

以下の記事の続きです。

multiprocessing multiprocessing とは、プロセスベースで並列処理を行う python の bulid-in モジュールです。 処理を並列に行えるので、処理が速くなります。 multiprocessing--- プロセスベースの並列...

Python の multiprocessing で lock を使ってみます。

multiprocessing の際にリソースを共有した場合、lock をしておかないと変なことになるという話です。

lock

lock とは、例えばとある変数があって、その変数に複数のプログラムからアクセスする際、一つのプログラムがアクセスしているときは、他のプログラムはアクセスできないようにする仕組みです。

情報工学におけるロック (lock) とは、計算機システム内に複数の動作主体(プロセススレッド等)のある環境で、データやデバイスなどのリソースへのアクセス制限を課す同期機構。ロックは並行性制御ポリシーを実施する手法のひとつである。アクセス制限を課す動作を「ロックする」,「ロックを取得する」などと表現する.また対義語として,制限を解除することをunlock(アンロック,ロック解放,ロック解除)と言う.

出典: フリー百科事典『ウィキペディア(Wikipedia)』

1000+1000-1000

以下のような プログラムを実行してみます。

1000から始まって、0.01秒ごとに1を1000回足して、 0.01秒ごとに1を1000回引くプログラムです。

import time

def add_1000(total):
    for i in range(1000):
        time.sleep(0.01)
        total += 1
    return total

def sub_1000(total):
    for i in range(1000):
        time.sleep(0.01)
        total -= 1
    return total

if __name__ == '__main__':
    start = time.time()

    total = 1000
    print(total)
    total = add_1000(total)
    print(total)
    total = sub_1000(total)
    print(total)

    elapsed_time = time.time() - start
    print (f"elapsed_time:{elapsed_time}[sec]")

# 実行結果
# 1000
# 2000
# 1000
# elapsed_time:20.980228424072266[sec]

当たり前ですが、20秒ほどかけて、1000に1000を足して1000を引くことで、1000を得ることができました。

multiprocessing を使う

add_1000 とsub_1000 を並列化することで、大体2倍速くなることが期待できるのでやってみます。

multiprocessing.Value

プロセス間で値を共有するには、multiprocessing.Value を使います。

multiprocessing.Value(typecode_or_type, *args, lock=True)

Pythonでプロセス間の値の共有

join([timeout])

multiprocessing の他の解説コードを見ていると、終了時に、join() という関数があったりなかったりします。

join([timeout])

join()は、そのプロセスが終了するまで待つというメソッドで、プロセスが終了せずゾンビ化するのを防ぎます。

なぜ書いてあったり書いてなかったりするのかというと、Python は通常、メインプロセスの終了時に、 multiprocessing.Process インスタンスを暗黙的にjoin()するので、基本的には書かずとも問題なく動作するからのようです。

What exactly is Python multiprocessing Module’s .join() Method Doing?

ゾンビプロセスを join する
Unix 上ではプロセスが終了したときに join しないと、そのプロセスはゾンビになります。新たなプロセスが開始する (または active_children() が呼ばれる) ときに、join されていないすべての完了プロセスが join されるので、あまり多くにはならないでしょう。また、終了したプロセスの Process.is_alive はそのプロセスを join します。そうは言っても、自分で開始したすべてのプロセスを明示的に join することはおそらく良いプラクティスです。

https://docs.python.org/ja/3/library/multiprocessing.html

1000+1000-1000 II

1000から始まって、0.01秒ごとに1を1000回足して、 0.01秒ごとに1を1000回引くプログラムを、maltiprocessing を使い並列化して実行してみます。

import multiprocessing
import time

def add_1000_no_lock(total):
    for i in range(1000):
        time.sleep(0.01)
        total.value += 1

def sub_1000_no_lock(total):
    for i in range(1000):
        time.sleep(0.01)
        total.value -= 1

if __name__ == '__main__':
    start = time.time()
    total = multiprocessing.Value('i', 1000)

    add_process = multiprocessing.Process(target=add_1000_no_lock, args=(total,))
    sub_process = multiprocessing.Process(target=sub_1000_no_lock, args=(total,)) 

    add_process.start()
    sub_process.start()

    add_process.join()
    sub_process.join()

    print(total.value)

    elapsed_time = time.time() - start
    print (f"elapsed_time:{elapsed_time}[sec]")

# 実行結果
# 1007
# elapsed_time:10.651745319366455[sec]

大体10秒ほどで終わり想定通りの実行時間となりましたが、結果が1007という数字になってしまいました。

これは、total.valueadd_processsub_processが同時にアクセスしてしまっているために起こります。

そこで、lockを使い、total.valueにどちらか片方がアクセスした場合、もう片方はアクセスできないようにします。

lock を使う

class multiprocessing.Lock

lockするためには、 multiprocessing.Lock()をインスタンス化して multiprocessing.Process の引数に渡し、実際の処理の中で、 lock する対象を lock.acquire()lock.release() で囲むことで実現できます。

では実際にコードを書いて実行してみます。

1000+1000-1000 III

import multiprocessing
import time

def add_1000_lock(total, lock):
    for i in range(1000):
        time.sleep(0.01)
        lock.acquire()
        total.value += 1
        lock.release()

def sub_1000_lock(total, lock):
    for i in range(1000):
        time.sleep(0.01)
        lock.acquire()
        total.value -= 1
        lock.release()

if __name__ == '__main__':
    start = time.time()

    total = multiprocessing.Value('i', 1000)
    lock = multiprocessing.Lock()

    add_process = multiprocessing.Process(target=add_1000_lock, args=(total, lock))
    sub_process = multiprocessing.Process(target=sub_1000_lock, args=(total, lock)) 

    add_process.start()
    sub_process.start()

    add_process.join()
    sub_process.join()

    print(total.value)

    elapsed_time = time.time() - start
    print (f"elapsed_time:{elapsed_time}[sec]")

# 実行結果
# 1000
# elapsed_time:10.7224280834198[sec]

想定通り、10秒ほどの時間で、1000を得ることができました。