以下の記事の続きです。
Python の multiprocessing で lock を使ってみます。
multiprocessing の際にリソースを共有した場合、lock をしておかないと変なことになるという話です。
lock
lock とは、例えばとある変数があって、その変数に複数のプログラムからアクセスする際、一つのプログラムがアクセスしているときは、他のプログラムはアクセスできないようにする仕組みです。
情報工学におけるロック (lock) とは、計算機システム内に複数の動作主体(プロセス,スレッド等)のある環境で、データやデバイスなどのリソースへのアクセス制限を課す同期機構。ロックは並行性制御ポリシーを実施する手法のひとつである。アクセス制限を課す動作を「ロックする」,「ロックを取得する」などと表現する.また対義語として,制限を解除することをunlock(アンロック,ロック解放,ロック解除)と言う.
出典: フリー百科事典『ウィキペディア(Wikipedia)』
1000+1000-1000
以下のような プログラムを実行してみます。
1000から始まって、0.01秒ごとに1を1000回足して、 0.01秒ごとに1を1000回引くプログラムです。
import time def add_1000(total): for i in range(1000): time.sleep(0.01) total += 1 return total def sub_1000(total): for i in range(1000): time.sleep(0.01) total -= 1 return total if __name__ == '__main__': start = time.time() total = 1000 print(total) total = add_1000(total) print(total) total = sub_1000(total) print(total) elapsed_time = time.time() - start print (f"elapsed_time:{elapsed_time}[sec]") # 実行結果 # 1000 # 2000 # 1000 # elapsed_time:20.980228424072266[sec]
当たり前ですが、20秒ほどかけて、1000に1000を足して1000を引くことで、1000を得ることができました。
multiprocessing を使う
add_1000 とsub_1000 を並列化することで、大体2倍速くなることが期待できるのでやってみます。
multiprocessing.Value
プロセス間で値を共有するには、multiprocessing.Value
を使います。
multiprocessing.Value
(typecode_or_type, *args, lock=True)
join
([timeout])
multiprocessing の他の解説コードを見ていると、終了時に、join() という関数があったりなかったりします。
join()
は、そのプロセスが終了するまで待つというメソッドで、プロセスが終了せずゾンビ化するのを防ぎます。
なぜ書いてあったり書いてなかったりするのかというと、Python は通常、メインプロセスの終了時に、 multiprocessing.Process
インスタンスを暗黙的にjoin()
するので、基本的には書かずとも問題なく動作するからのようです。
What exactly is Python multiprocessing Module’s .join() Method Doing?
ゾンビプロセスを join する
https://docs.python.org/ja/3/library/multiprocessing.html
Unix 上ではプロセスが終了したときに join しないと、そのプロセスはゾンビになります。新たなプロセスが開始する (またはactive_children()
が呼ばれる) ときに、join されていないすべての完了プロセスが join されるので、あまり多くにはならないでしょう。また、終了したプロセスのProcess.is_alive
はそのプロセスを join します。そうは言っても、自分で開始したすべてのプロセスを明示的に join することはおそらく良いプラクティスです。
1000+1000-1000 II
1000から始まって、0.01秒ごとに1を1000回足して、 0.01秒ごとに1を1000回引くプログラムを、maltiprocessing を使い並列化して実行してみます。
import multiprocessing import time def add_1000_no_lock(total): for i in range(1000): time.sleep(0.01) total.value += 1 def sub_1000_no_lock(total): for i in range(1000): time.sleep(0.01) total.value -= 1 if __name__ == '__main__': start = time.time() total = multiprocessing.Value('i', 1000) add_process = multiprocessing.Process(target=add_1000_no_lock, args=(total,)) sub_process = multiprocessing.Process(target=sub_1000_no_lock, args=(total,)) add_process.start() sub_process.start() add_process.join() sub_process.join() print(total.value) elapsed_time = time.time() - start print (f"elapsed_time:{elapsed_time}[sec]") # 実行結果 # 1007 # elapsed_time:10.651745319366455[sec]
大体10秒ほどで終わり想定通りの実行時間となりましたが、結果が1007という数字になってしまいました。
これは、total.value
にadd_process
とsub_process
が同時にアクセスしてしまっているために起こります。
そこで、lock
を使い、total.value
にどちらか片方がアクセスした場合、もう片方はアクセスできないようにします。
lock を使う
class multiprocessing.Lock
lockするためには、 multiprocessing.Lock()
をインスタンス化して multiprocessing.Process
の引数に渡し、実際の処理の中で、 lock する対象を lock.acquire()
と lock.release()
で囲むことで実現できます。
では実際にコードを書いて実行してみます。
1000+1000-1000 III
import multiprocessing import time def add_1000_lock(total, lock): for i in range(1000): time.sleep(0.01) lock.acquire() total.value += 1 lock.release() def sub_1000_lock(total, lock): for i in range(1000): time.sleep(0.01) lock.acquire() total.value -= 1 lock.release() if __name__ == '__main__': start = time.time() total = multiprocessing.Value('i', 1000) lock = multiprocessing.Lock() add_process = multiprocessing.Process(target=add_1000_lock, args=(total, lock)) sub_process = multiprocessing.Process(target=sub_1000_lock, args=(total, lock)) add_process.start() sub_process.start() add_process.join() sub_process.join() print(total.value) elapsed_time = time.time() - start print (f"elapsed_time:{elapsed_time}[sec]") # 実行結果 # 1000 # elapsed_time:10.7224280834198[sec]
想定通り、10秒ほどの時間で、1000を得ることができました。