Think Stitch
  最近の更新


スレッドプール

スレッドプールを作ってみます.ここで作るスレッドプールは次のようなものです.

以上を繰り返すとします.ワーカが仕事を処理する過程で,新たな仕事が生まれることはないとします.

0  スレッドプールのモデル

まず spurious wakeups のことはいったん忘れて,問題をシンプルにします.

問題を大まかにみると main スレッドとワーカスレッドのやりとりなので,生産者・消費者問題に似ています.そこでお互いが待つための条件変数2個とm,排他制御用のミューテックスを1個用意します.

仕事は単に個数で表すことにします.抽象化です.仕事の個数はグローバル変数 works に入れておくものとします.

待機状態で全員集合するところはバリアに似ています.そこで集まっている人数を数えるために変数 count を用意します.

#define M 13
#define N 4

pthread_t pth[N];
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
pthread_cond_t cv2 = PTHREAD_COND_INITIALIZER;
int count;
int works;

0.0  ワーカー

ワーカーは初期起動したらまず待機状態に入ります.ここはバリアと同じで,count をインクリメントして最後の人を識別します.バリアと異なるのは,最後の人はワーカを起こすのではなく main スレッドを起こす点です.ワーカは全員待ち状態に入ります (*A).

main スレッドは仕事を用意した後,broadcast を発行します.するとワーカ全員が起きてきて,仕事を処理する内側のループに入ります.

もし仕事があれば (works > 0) 処理します (*B).なければ内側のループを脱出し,1周回って待機状態に戻ります.

void *thread(void *arg)
{
    while (true) {
        pthread_mutex_lock(&m);
        count++;
        if (count == N) {
            pthread_cond_signal(&cv2);
        }
        pthread_cond_wait(&cv, &m);             // *A
        pthread_mutex_unlock(&m);

        while (true) {
            pthread_mutex_lock(&m);
            if (works > 0) {
                works--;                        // *B
            } else {
                pthread_mutex_unlock(&m);
                break;
            }
            pthread_mutex_unlock(&m);
        }
    }
    return NULL;
}

0.1  main スレッド

main スレッドは count を 0 に初期化したあとワーカを起動します.起動から待機完了までを pthread_cond_wait で待ちます.

ワーカの待機が完了したら仕事を用意し (works = M),count を再度 0 に初期化してから,pthread_cond_broadcast でワーカ全員を起こし,完了待ちに入ります.初期起動時の待機完了待ちと,その後の仕事完了待ちは兼用です.

int main(void)
{
    pthread_mutex_lock(&m);
    count = 0;
    works = 0;

    for (int i = 0; i < N; ++i) {
        pthread_create(&pth[i], NULL, &thread, NULL);
    }

    while (true) {
        pthread_cond_wait(&cv2, &m);
        assert(count == N);
        assert(works == 0);
        count = 0;
        works = M;
        pthread_cond_broadcast(&cv);
    }

    return 0;
}

0.2  検査

検査を2種類用意します. 1つは main スレッドでの assert で,仕事が完了したときには count = N, workers = 0 であることを確認します.

もう1つはグローバル assert を使って,countworkers の値が適正な範囲にあることを監視します.

// @ASSERT(count >= 0 && count <= N)
// @ASSERT(works >= 0 && works <= M)

検査を実行してみると問題ないことがわかります.

1  Spurious wakeups 対策

上のモデルは明らかに spurious wakeups には対応できていません. pthread_cond_wait を裸で使っているからです.そこで次に spurious wakeups 対策をしましょう.

main スレッドの wait とワーカの wait のうち,main の方はかんたんです.全員がそろっていないのに起きてきたら count < N になっているからです.したがって次のようにすればいいでしょう.

int main(void)
{
    pthread_mutex_lock(&m);
    count = 0;
    works = 0;

    for (int i = 0; i < N; ++i) {
        pthread_create(&pth[i], NULL, &thread, NULL);
    }

    while (true) {
        while (count < N) {                     // ***
            pthread_cond_wait(&cv2, &m);
        }
        assert(count == N);
        assert(works == 0);
        count = 0;
        works = M;
        pthread_cond_broadcast(&cv);
    }

    return 0;
}

問題はワーカです.待ち合わせ中,count は任意の値をとりますから条件としては使えません.したがって状態変数を追加する必要があります.

状態変数 state を用意して,たとえば初期値は 0 にしておきます.ワーカは state が 0 である限り待つとします.つまり wait から抜けてきたときに state が 0 であったら再度 wait するということです.

これを変えるのは main の仕事で,たとえば state を 1 にします.それから broadcast を発行すればワーカ全員が起きてきます.

しかしこれではうまくいきません.なぜなら,仕事を終えたワーカが待機場所に戻ると,state は 1 のままなので抜けてきてしまうからです.セマフォのときに起こったような追い越しが起こってしまいます.

これを解決する方法の1つは,バリアをもう1つ用意して2回集合することです.ディズニーランドのホーンテッドマンションと同じです(?).そうすれば追い越しを避けることができます.これはダブルターンスタイルと呼ばれているテクニックです.

しかしバリアはコストが高いので,できれば避けたいところです.実はよい方法があります.状態変数の値と比較すべき値を1回ごとに代えればいいのです.

次のコードを見てください. flop というグローバル変数を追加しました.これが待つべき値を表しています.値は 0 か 1 です.各ワーカはローカルに turn という変数を持っていて,これと flip が一致している限り待ちます (*A).

そして main に起こされたら turn の値を反転します (*B).こうすれば flip と値が異なるので,次の待機集合をすり抜けることはありません.

int flop;

void *thread(void *arg)
{
    int turn = 0;

    while (true) {
        pthread_mutex_lock(&m);
        count++;
        if (count == N) {
            pthread_cond_signal(&cv2);
        }
        while (flop == turn) {                  // *A
            pthread_cond_wait(&cv, &m);
        }
        turn = 1 - turn;                        // *B: flip
        pthread_mutex_unlock(&m);

        while (true) {
            pthread_mutex_lock(&m);
            if (works > 0) {
                works--;
            } else {
                pthread_mutex_unlock(&m);
                break;
            }
            pthread_mutex_unlock(&m);
        }
    }
    return NULL;
}

main スレッドは flip を 0 に初期化したあと,仕事が完了するたびに flip を反転してから broadcast を発行します.

こうして1度の wait だけで spurious wakeups 対策ができました.

int main(void)
{
    pthread_mutex_lock(&m);
    count = 0;
    works = 0;
    flop = 0;

    for (int i = 0; i < N; ++i) {
        pthread_create(&pth[i], NULL, &thread, NULL);
    }

    while (true) {
        while (count < N) {
            pthread_cond_wait(&cv2, &m);
        }
        assert(count == N);
        assert(works == 0);
        count = 0;
        works = M;
        flop = 1 - flop;                        // *C: flip
        pthread_cond_broadcast(&cv);
    }

    return 0;
}

Spurious wakeups オプションをつけても問題ないことが確認できます.

あとはワーカを終了させるところが必要ですね.マルチスレッドは飛行機と同じで(?)きちんと終了させるところが難しかったりします.よかったら挑戦してみてください.

2017/10/10

© 2013-2017 PRINCIPIA Limited