【Python】マルチスレッドの使い方2パターン

251, 2019-11-21

目次

マルチスレッド

Pythonでマルチスレッドを使うには大きく分けて2つのパターンがあります。
それは以下の通りです。

  • ワーカーを使う
  • クラスを使う

これらの使い方を簡単に紹介します。

ワーカーを使う場合

ワーカーとは何なのかと言う、スレッドで動作させるエントリーポイントとなる関数のことです。
スレッドを作る時にこの関数をスレッドに指定しておくと、スレッドを起動したときその関数から処理がスタートします。
そして関数が終了すると起動したスレッドも終了します。

ワーカーと別のスレッドでデータを共有したい時の1つの手段としてqueueモジュールを使う方法があります。
queue.Queueは同期キュークラスと呼ばれ、こういった並行処理でデータをやり取りするときに使われます。
↓がサンプルスクリプトです。

import threading
from queue import Queue
import time


print_lock = threading.Lock()
data_q = Queue()


def safe_print(*args, **kwargs):
    with print_lock:
        print(*args, **kwargs)


def worker():
    name = threading.current_thread().name

    while True:
        time.sleep(0.5)

        data = data_q.get()
        safe_print(name, 'eat', data)

        data_q.task_done()


def main():
    nthreads = 4  # number of threads
    ndatas = 20  # number of datas

    # init data queue
    for data in range(ndatas):
        data_q.put(data)

    # start threads with worker
    for x in range(nthreads):
        t = threading.Thread(target=worker, daemon=True)
        t.start()

    # get starting time
    start = time.time()

    # waiting for done thread works
    data_q.join()
    safe_print('Entire job took:', time.time() - start)


if __name__ == '__main__':
    main()

↑のコードを実行すると↓のような結果になります。

Thread-1 eat 0
Thread-4 eat 1
Thread-3 eat 2
Thread-2 eat 3
Thread-1 eat 4
Thread-4 eat 5
Thread-2 eat 6
Thread-3 eat 7
Thread-1 eat 8
Thread-4 eat 9
Thread-3 eat 10
Thread-2 eat 11
Thread-1 eat 12
Thread-4 eat 13
Thread-2 eat 14
Thread-3 eat 15
Thread-1 eat 16
Thread-4 eat 17
Thread-2 eat 18
Thread-3 eat 19
Entire job took: 2.5040030479431152

↑の例ではまずmain関数内でndatasを使ってdata_qを初期化しています。このdata_qQueueです。
それからnthreadsを使ってworker関数をターゲットにしてスレッドを複数作成し、各スレッドをスタートさせています。
この各スレッドはdata_qのデータをそれぞれ処理していきます。ここが並行処理になってますね。
それからdata_q.join()でキューの処理が完了するのを待ちます。各スレッドがそれぞれの仕事を処理して、キューの仕事が空になったら処理が完了します。
このとき、スレッドの作成時にスレッドのdaemon属性をTrueにしておくのを忘れないようにしましょう。こうすることでスレッドはデーモンスレッドになります。
デーモンスレッドとはユーザースレッドがすべて終了すると、自動的に終了する性質を持ったスレッドです。このdaemon属性はデフォルトではFalseですが、Falseのときスレッドはユーザースレッドになります。
つまり、メインスレッドが終了したら起動したデーモンスレッドも勝手に終了してくれるということですね。

デーモンスレッドを使わない場合

もちろんデーモンスレッドを使わずに書くことも出来ます。

import threading
from queue import Queue, Empty
import time


print_lock = threading.Lock()
data_q = Queue()


def safe_print(*args, **kwargs):
    with print_lock:
        print(*args, **kwargs)


def worker():
    name = threading.current_thread().name

    while not data_q.empty():
        time.sleep(0.5)

        try:
            data = data_q.get(timeout=3)
        except Empty:
            break

        safe_print(name, 'eat', data)
        data_q.task_done()

    safe_print(name, 'done')


def main():
    nthreads = 4  # number of threads
    ndatas = 20  # number of datas

    # init data queue
    for data in range(ndatas):
        data_q.put(data)

    # start threads with worker
    threads = []
    for x in range(nthreads):
        t = threading.Thread(target=worker)
        t.start()
        threads.append(t)

    # waiting for done thread works
    for t in threads:
        t.join()


if __name__ == '__main__':
    main()

↑のコードを実行すると↓のような結果になります。

Thread-3 eat 0
Thread-1 eat 1
Thread-2 eat 2
Thread-4 eat 3
Thread-1 eat 4
Thread-2 eat 5
Thread-3 eat 6
Thread-4 eat 7
Thread-3 eat 8
Thread-2 eat 9
Thread-1 eat 10
Thread-4 eat 11
Thread-2 eat 12
Thread-3 eat 13
Thread-1 eat 14
Thread-4 eat 15
Thread-3 eat 16
Thread-2 eat 17
Thread-1 eat 18
Thread-1 done
Thread-4 eat 19
Thread-4 done
Thread-3 done
Thread-2 done

デーモンスレッドを使った場合はスレッドを起動しっぱなしでしたが、スレッドのdaemon属性をTrueにしない場合は↑のようにスレッドをjoinして、スレッドの終了を待機するのがマナーです。
Queuegetメソッドは内部でスレッドをロックしているので、timeout引数にそのロック時間を指定します。↑の例では3秒間ロックして、データを取得できなければEmpty例外を発生させてスレッドを終了するようにしています。
キューが空になる又はEmpty例外が発生した場合、スレッドは終了します。

デーモンスレッドは起動しっぱなしでプログラム終了時に勝手に回収されるのを期待するスレッドなので、ゲームなどではこっちのユーザースレッドのほうが需要が高いかもしれません。

キューを使わない場合

もちろんキューを使わずに書くことも出来ます。
↓はbufferというグローバル変数の文字列にランダムな文字を並行処理的に追加していくスクリプトです。

import threading
from queue import Queue, Empty
import time
import random


print_lock = threading.Lock()
buffer_lock = threading.Lock()
buffer = ''


def safe_print(*args, **kwargs):
    with print_lock:
        print(*args, **kwargs)


def worker():
    global buffer 
    name = threading.current_thread().name

    while True:
        time.sleep(0.5)

        with buffer_lock:
            if len(buffer) >= 20:
                break

            # add random character
            buffer += chr(random.randint(ord('a'), ord('z')))

            safe_print(name, buffer)

    safe_print(name, 'done')


def main():
    nthreads = 4  # number of threads

    # start threads with worker
    threads = []
    for x in range(nthreads):
        t = threading.Thread(target=worker)
        t.start()
        threads.append(t)

    # waiting for done thread works
    for t in threads:
        t.join()


if __name__ == '__main__':
    main()

↑のコードを実行すると↓のような結果になります。

Thread-2 z
Thread-3 zq
Thread-4 zqj
Thread-1 zqjw
Thread-1 zqjwi
Thread-4 zqjwif
Thread-2 zqjwify
Thread-3 zqjwifyx
Thread-1 zqjwifyxi
Thread-2 zqjwifyxif
Thread-4 zqjwifyxify
Thread-3 zqjwifyxifyb
Thread-2 zqjwifyxifybg
Thread-4 zqjwifyxifybgj
Thread-1 zqjwifyxifybgjq
Thread-3 zqjwifyxifybgjqv
Thread-3 zqjwifyxifybgjqvw
Thread-1 zqjwifyxifybgjqvwp
Thread-2 zqjwifyxifybgjqvwps
Thread-4 zqjwifyxifybgjqvwpsr
Thread-4 done
Thread-3 done
Thread-2 done
Thread-1 done

各スレッドは文字列bufferが20字以上になったらスレッドを終了します。
buffer_lockという変数はアクセスしたい変数を競合スレッドから保護する変数です。with文にこのロック変数を渡すとwith文に入ったときにロック変数をロックします。このあいだ、競合スレッドは同じロック変数をロックしてbufferにアクセスしようとしますが、ロックされているのでアクセス待ちの状態になります。
従って1つのスレッドがbufferにアクセスしている時に、他のスレッドが破壊的な変更などを行う心配がなくなります。ピースフルですね。

Queueに比べるとロック変数を使った処理のほうが簡単かもしれませんが、人によるかもしれません。
ちなみにロック変数を使った場合、簡単にロック地獄になります。むずかしい設計になりますので気をつけましょう。

クラスを使う場合

クラスを使う場合はthreadingモジュールのThreadクラスを任意のクラスに継承させます。

import threading
import random
import time


print_lock = threading.Lock()


def safe_print(*args, **kwargs):
    with print_lock:
        print(*args, **kwargs)


class Messenger(threading.Thread):
    def run(self):
        name = threading.currentThread().getName()

        for i in range(10):
            sleep_sec = random.uniform(0.5, 2.0)

            head = f'{name:<10} {i:03d}'
            safe_print(f'{head} sleep {sleep_sec} sec')

            time.sleep(sleep_sec)


if __name__ == '__main__':
    # create threads
    x = Messenger(name='sender')
    y = Messenger(name='receiver ')

    # start threads
    x.start()
    y.start()

    # waiting for done threads
    x.join()
    y.join()

    safe_print('done')

↑のコードを実行すると↓のような結果になります。

sender     000 sleep 1.0600789368972483 sec
receiver   000 sleep 1.6554929128509275 sec
sender     001 sleep 1.7823784048292661 sec
receiver   001 sleep 0.6444646976661856 sec
receiver   002 sleep 0.8643483541514729 sec
sender     002 sleep 0.6433188507922311 sec
receiver   003 sleep 1.8143714992683768 sec
sender     003 sleep 0.6958787919611926 sec
sender     004 sleep 1.7454092529296261 sec
receiver   004 sleep 1.0955692622152697 sec
sender     005 sleep 0.9851667506755231 sec
receiver   005 sleep 0.8072348333674091 sec
receiver   006 sleep 1.7656413488232432 sec
sender     006 sleep 1.5023287669258105 sec
sender     007 sleep 1.0903801583353079 sec
receiver   007 sleep 0.9620516268573199 sec
sender     008 sleep 0.830791977471533 sec
receiver   008 sleep 0.8632263518821254 sec
sender     009 sleep 1.5165434396510986 sec
receiver   009 sleep 1.7753154881926316 sec
done

このスクリプトはスレッド間のメッセージの送受信をシミュレートしていますが、実際にはメッセージの送受信は行っておらず、一定時間スリープしてprintしているだけです。
Threadクラスを継承するとstartメソッドやjoinメソッドを使うことが出来るようになります。
startメソッドを呼び出すと内部でrunメソッドが呼ばれます。そのためrunメソッドをオーバーライドしておくとスレッド開始時の処理を定義することが出来ます。
あとはjoinメソッドを使えばスレッドの終了を待機することが出来ます。

マルチスレッドで並行処理させちゃう

このようにマルチスレッドを使うと簡単に並行処理を実現することが出来ます。
音楽聞きながら絵を描いてコーヒーを飲むという複雑な動作が、パソコンにもできるということですね。まぁこれは厳密には並列処理だと思いますが。

こまけぇこたぁいいんだよ

はい。今回は以上になります。

参照

投稿者名です。64字以内で入力してください。

必要な場合はEメールアドレスを入力してください(全体に公開されます)。

投稿する内容です。

スポンサーリンク

スポンサーリンク

スポンサーリンク

スポンサーリンク