以下の記事の続きです。
Python の multiprocessing で lock を使ってみます。
multiprocessing の際にリソースを共有した場合、lock をしておかないと変なことになるという話です。
lock
lock とは、例えばとある変数があって、その変数に複数のプログラムからアクセスする際、一つのプログラムがアクセスしているときは、他のプログラムはアクセスできないようにする仕組みです。
情報工学におけるロック (lock) とは、計算機システム内に複数の動作主体(プロセス,スレッド等)のある環境で、データやデバイスなどのリソースへのアクセス制限を課す同期機構。ロックは並行性制御ポリシーを実施する手法のひとつである。アクセス制限を課す動作を「ロックする」,「ロックを取得する」などと表現する.また対義語として,制限を解除することをunlock(アンロック,ロック解放,ロック解除)と言う.
出典: フリー百科事典『ウィキペディア(Wikipedia)』
1000+1000-1000
以下のような プログラムを実行してみます。
1000から始まって、0.01秒ごとに1を1000回足して、 0.01秒ごとに1を1000回引くプログラムです。
import time
def add_1000(total):
for i in range(1000):
time.sleep(0.01)
total += 1
return total
def sub_1000(total):
for i in range(1000):
time.sleep(0.01)
total -= 1
return total
if __name__ == '__main__':
start = time.time()
total = 1000
print(total)
total = add_1000(total)
print(total)
total = sub_1000(total)
print(total)
elapsed_time = time.time() - start
print (f"elapsed_time:{elapsed_time}[sec]")
# 実行結果
# 1000
# 2000
# 1000
# elapsed_time:20.980228424072266[sec]
当たり前ですが、20秒ほどかけて、1000に1000を足して1000を引くことで、1000を得ることができました。
multiprocessing を使う
add_1000 とsub_1000 を並列化することで、大体2倍速くなることが期待できるのでやってみます。
multiprocessing.Value
プロセス間で値を共有するには、multiprocessing.Value を使います。
multiprocessing.Value(typecode_or_type, *args, lock=True)
join([timeout])
multiprocessing の他の解説コードを見ていると、終了時に、join() という関数があったりなかったりします。
join()は、そのプロセスが終了するまで待つというメソッドで、プロセスが終了せずゾンビ化するのを防ぎます。
なぜ書いてあったり書いてなかったりするのかというと、Python は通常、メインプロセスの終了時に、 multiprocessing.Process インスタンスを暗黙的にjoin()するので、基本的には書かずとも問題なく動作するからのようです。
What exactly is Python multiprocessing Module’s .join() Method Doing?
ゾンビプロセスを join する
https://docs.python.org/ja/3/library/multiprocessing.html
Unix 上ではプロセスが終了したときに join しないと、そのプロセスはゾンビになります。新たなプロセスが開始する (またはactive_children()が呼ばれる) ときに、join されていないすべての完了プロセスが join されるので、あまり多くにはならないでしょう。また、終了したプロセスのProcess.is_aliveはそのプロセスを join します。そうは言っても、自分で開始したすべてのプロセスを明示的に join することはおそらく良いプラクティスです。
1000+1000-1000 II
1000から始まって、0.01秒ごとに1を1000回足して、 0.01秒ごとに1を1000回引くプログラムを、maltiprocessing を使い並列化して実行してみます。
import multiprocessing
import time
def add_1000_no_lock(total):
for i in range(1000):
time.sleep(0.01)
total.value += 1
def sub_1000_no_lock(total):
for i in range(1000):
time.sleep(0.01)
total.value -= 1
if __name__ == '__main__':
start = time.time()
total = multiprocessing.Value('i', 1000)
add_process = multiprocessing.Process(target=add_1000_no_lock, args=(total,))
sub_process = multiprocessing.Process(target=sub_1000_no_lock, args=(total,))
add_process.start()
sub_process.start()
add_process.join()
sub_process.join()
print(total.value)
elapsed_time = time.time() - start
print (f"elapsed_time:{elapsed_time}[sec]")
# 実行結果
# 1007
# elapsed_time:10.651745319366455[sec]
大体10秒ほどで終わり想定通りの実行時間となりましたが、結果が1007という数字になってしまいました。
これは、total.valueにadd_processとsub_processが同時にアクセスしてしまっているために起こります。
そこで、lockを使い、total.valueにどちらか片方がアクセスした場合、もう片方はアクセスできないようにします。
lock を使う
class multiprocessing.Lock
lockするためには、 multiprocessing.Lock()をインスタンス化して multiprocessing.Process の引数に渡し、実際の処理の中で、 lock する対象を lock.acquire() と lock.release() で囲むことで実現できます。
では実際にコードを書いて実行してみます。
1000+1000-1000 III
import multiprocessing
import time
def add_1000_lock(total, lock):
for i in range(1000):
time.sleep(0.01)
lock.acquire()
total.value += 1
lock.release()
def sub_1000_lock(total, lock):
for i in range(1000):
time.sleep(0.01)
lock.acquire()
total.value -= 1
lock.release()
if __name__ == '__main__':
start = time.time()
total = multiprocessing.Value('i', 1000)
lock = multiprocessing.Lock()
add_process = multiprocessing.Process(target=add_1000_lock, args=(total, lock))
sub_process = multiprocessing.Process(target=sub_1000_lock, args=(total, lock))
add_process.start()
sub_process.start()
add_process.join()
sub_process.join()
print(total.value)
elapsed_time = time.time() - start
print (f"elapsed_time:{elapsed_time}[sec]")
# 実行結果
# 1000
# elapsed_time:10.7224280834198[sec]
想定通り、10秒ほどの時間で、1000を得ることができました。