Think Stitch
  最近の更新


生産者・消費者問題 1

Pthread Model Checker で生産者・消費者問題を扱ってみます.

0  生産者・消費者問題のモデル

まずは教科書通りにモデルを作ります.実際に値を生成してバッファに格納する部分は省略して,バッファに置いてある数だけ数えることにします.抽象的なモデルということです.データの内容で振る舞いが変わらないようなアルゴリズムの場合は,このようにして状態を小さくして,検査できる規模を大きくすることができます.

pthread_cond_wait もループで囲って,起きてきたら必ず条件を再チェックするようにしました.

検査をしてみるともちろん問題はありません.spurious wakeups オプションをつけても問題はありません.

#include <stdio.h>
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <assert.h>

#define N   3
#define NP  3
#define NC  3

pthread_t pthp[NP];
pthread_t pthc[NC];
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
pthread_cond_t cv2 = PTHREAD_COND_INITIALIZER;
int count;

void *producer(void *arg)
{
    while (true) {
        pthread_mutex_lock(&mutex);
        while (count == N) {                    // *A
            pthread_cond_wait(&cv, &mutex);
        }
        count++;
        pthread_cond_signal(&cv2);
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

void *consumer(void *arg)
{
    while (true) {
        pthread_mutex_lock(&mutex);
        while (count == 0) {                    // *B
            pthread_cond_wait(&cv2, &mutex);
        }
        count--;
        pthread_cond_signal(&cv);
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main(void)
{
    int i;
    for (i = 0; i < NP; ++i) {
        pthread_create(&pthp[i], NULL, &producer, NULL);
    }
    for (i = 0; i < NC; ++i) {
        pthread_create(&pthc[i], NULL, &consumer, NULL);
    }
    for (i = 0; i < NP; ++i) {
        pthread_join(pthp[i], NULL);
    }
    for (i = 0; i < NC; ++i) {
        pthread_join(pthc[i], NULL);
    }
    return 0;
}

1  条件を再テストする理由

pthread_cond_wait をループで囲ってあるのはもちろん spurious wakeups 対策のためもあるのですが,それだけではありません.ちょっと基本に立ち返ってこの点を調べてみたいと思います.

spurious wakeups はないと仮定します.本当にループが必要なのかどうか確かめるために,生産者の部分を次のように変えてみました. while ループをやめて,if による一度だけの検査にします.そして pthread_cond_wait から抜けてきたときに条件が成立しているかどうか assert でチェックします.「消費者に起こされたのだからバッファに空きができているはずだろう」というわけです.

void *producer(void *arg)
{
    while (true) {
        pthread_mutex_lock(&mutex);
        if (count == N) {                       // *C
            pthread_cond_wait(&cv, &mutex);
        }
        assert(count < N);                      // *D
        count++;
        pthread_cond_signal(&cv2);
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

モデル検査のこつの1つは小さなパラメータから順に試すことです.まず生産者と消費者をそれぞれ1個だけでやってみると問題はありません.生産者1個,消費者2個でも問題ありません.ところが生産者2個,消費者1個だと assert 違反が見つかります.

レポートは次のとおりです.

------------------------
global vars:
    count = 0
mutex:
    mutex <unlocked>
thread:
0 main 1
------------------------
#1 main tau
50: for (i = 0; i < NP; ++i) {
------------------------
global vars:
    count = 0
mutex:
    mutex <unlocked>
thread:
0 main 2
    i = 0
------------------------
#2 main tau
50: for (i = 0; i < NP; ++i) {
------------------------
global vars:
    count = 0
mutex:
    mutex <unlocked>
thread:
0 main 3
    i = 0
------------------------
#3 main create pthp[0]
51: pthread_create(&pthp[i], NULL, &producer, NULL);
------------------------
global vars:
    count = 0
mutex:
    mutex <unlocked>
thread:
0 main 5
    i = 0
1 pthp[0] 28
    arg = 0
------------------------
#4 main tau
50: for (i = 0; i < NP; ++i) {
------------------------
global vars:
    count = 0
mutex:
    mutex <unlocked>
thread:
0 main 2
    i = 1
1 pthp[0] 28
    arg = 0
------------------------
#5 main tau
50: for (i = 0; i < NP; ++i) {
------------------------
global vars:
    count = 0
mutex:
    mutex <unlocked>
thread:
0 main 3
    i = 1
1 pthp[0] 28
    arg = 0
------------------------
#6 main create pthp[1]
51: pthread_create(&pthp[i], NULL, &producer, NULL);
------------------------
global vars:
    count = 0
mutex:
    mutex <unlocked>
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 28
    arg = 0
------------------------
#7 pthp[1] tau
20: while (true) {
------------------------
global vars:
    count = 0
mutex:
    mutex <unlocked>
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 29
------------------------
#8 pthp[1] lock mutex
21: pthread_mutex_lock(&mutex);
------------------------
global vars:
    count = 0
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 30
------------------------
#9 pthp[1] load count
22: if (count == N) {
------------------------
global vars:
    count = 0
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 31
    t15 = 0
------------------------
#10 pthp[1] tau
22: if (count == N) {
------------------------
global vars:
    count = 0
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 33
------------------------
#11 pthp[1] load count
25: assert(count < N);
------------------------
global vars:
    count = 0
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 35
    t16 = 0
------------------------
#12 pthp[1] assert
25: assert(count < N);
------------------------
global vars:
    count = 0
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 36
------------------------
#13 pthp[1] load count
26: count++;
------------------------
global vars:
    count = 0
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 37
    t17 = 0
------------------------
#14 pthp[1] store count
26: count++;
------------------------
global vars:
    count = 1
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 38
------------------------
#15 pthp[1] signal cv2
27: pthread_cond_signal(&cv2);
------------------------
global vars:
    count = 1
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 39
------------------------
#16 pthp[1] unlock mutex
28: pthread_mutex_unlock(&mutex);
------------------------
global vars:
    count = 1
mutex:
    mutex <unlocked>
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 29
------------------------
#17 pthp[1] lock mutex
21: pthread_mutex_lock(&mutex);
------------------------
global vars:
    count = 1
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 30
------------------------
#18 pthp[1] load count
22: if (count == N) {
------------------------
global vars:
    count = 1
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 31
    t15 = 1
------------------------
#19 pthp[1] tau
22: if (count == N) {
------------------------
global vars:
    count = 1
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 33
------------------------
#20 pthp[1] load count
25: assert(count < N);
------------------------
global vars:
    count = 1
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 35
    t16 = 1
------------------------
#21 pthp[1] assert
25: assert(count < N);
------------------------
global vars:
    count = 1
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 36
------------------------
#22 pthp[1] load count
26: count++;
------------------------
global vars:
    count = 1
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 37
    t17 = 1
------------------------
#23 pthp[1] store count
26: count++;
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 38
------------------------
#24 pthp[1] signal cv2
27: pthread_cond_signal(&cv2);
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 39
------------------------
#25 pthp[1] unlock mutex
28: pthread_mutex_unlock(&mutex);
------------------------
global vars:
    count = 2
mutex:
    mutex <unlocked>
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 29
------------------------
#26 pthp[1] lock mutex
21: pthread_mutex_lock(&mutex);
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 30
------------------------
#27 pthp[1] load count
22: if (count == N) {
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 31
    t15 = 2
------------------------
#28 pthp[1] tau
22: if (count == N) {
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 33
------------------------
#29 pthp[1] load count
25: assert(count < N);
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 35
    t16 = 2
------------------------
#30 pthp[1] assert
25: assert(count < N);
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 36
------------------------
#31 pthp[1] load count
26: count++;
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 37
    t17 = 2
------------------------
#32 pthp[1] store count
26: count++;
------------------------
global vars:
    count = 3
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 38
------------------------
#33 pthp[1] signal cv2
27: pthread_cond_signal(&cv2);
------------------------
global vars:
    count = 3
mutex:
    mutex pthp[1]
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 39
------------------------
#34 pthp[1] unlock mutex
28: pthread_mutex_unlock(&mutex);
------------------------
global vars:
    count = 3
mutex:
    mutex <unlocked>
thread:
0 main 5
    i = 1
1 pthp[0] 28
    arg = 0
2 pthp[1] 29
------------------------
#35 main tau
50: for (i = 0; i < NP; ++i) {
------------------------
global vars:
    count = 3
mutex:
    mutex <unlocked>
thread:
0 main 2
    i = 2
1 pthp[0] 28
    arg = 0
2 pthp[1] 29
------------------------
#36 main tau
50: for (i = 0; i < NP; ++i) {
------------------------
global vars:
    count = 3
mutex:
    mutex <unlocked>
thread:
0 main 4
1 pthp[0] 28
    arg = 0
2 pthp[1] 29
------------------------
#37 pthp[1] lock mutex
21: pthread_mutex_lock(&mutex);
------------------------
global vars:
    count = 3
mutex:
    mutex pthp[1]
thread:
0 main 4
1 pthp[0] 28
    arg = 0
2 pthp[1] 30
------------------------
#38 main tau
53: for (i = 0; i < NC; ++i) {
------------------------
global vars:
    count = 3
mutex:
    mutex pthp[1]
thread:
0 main 6
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 30
------------------------
#39 pthp[1] load count
22: if (count == N) {
------------------------
global vars:
    count = 3
mutex:
    mutex pthp[1]
thread:
0 main 6
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 31
    t15 = 3
------------------------
#40 main tau
53: for (i = 0; i < NC; ++i) {
------------------------
global vars:
    count = 3
mutex:
    mutex pthp[1]
thread:
0 main 7
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 31
    t15 = 3
------------------------
#41 pthp[1] tau
22: if (count == N) {
------------------------
global vars:
    count = 3
mutex:
    mutex pthp[1]
thread:
0 main 7
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 32
------------------------
#42 main create pthc[0]
54: pthread_create(&pthc[i], NULL, &consumer, NULL);
------------------------
global vars:
    count = 3
mutex:
    mutex pthp[1]
thread:
0 main 9
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 32
3 pthc[0] 18
    arg = 0
------------------------
#43 pthp[1] wait cv mutex
23: pthread_cond_wait(&cv, &mutex);
------------------------
global vars:
    count = 3
mutex:
    mutex <unlocked>
thread:
0 main 9
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 34 wait cv
3 pthc[0] 18
    arg = 0
------------------------
#44 pthc[0] tau
35: while (true) {
------------------------
global vars:
    count = 3
mutex:
    mutex <unlocked>
thread:
0 main 9
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 34 wait cv
3 pthc[0] 19
------------------------
#45 pthc[0] lock mutex
36: pthread_mutex_lock(&mutex);
------------------------
global vars:
    count = 3
mutex:
    mutex pthc[0]
thread:
0 main 9
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 34 wait cv
3 pthc[0] 20
------------------------
#46 pthc[0] load count
37: while (count == 0) {
------------------------
global vars:
    count = 3
mutex:
    mutex pthc[0]
thread:
0 main 9
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 34 wait cv
3 pthc[0] 21
    t13 = 3
------------------------
#47 pthc[0] tau
37: while (count == 0) {
------------------------
global vars:
    count = 3
mutex:
    mutex pthc[0]
thread:
0 main 9
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 34 wait cv
3 pthc[0] 23
------------------------
#48 pthc[0] load count
40: count--;
------------------------
global vars:
    count = 3
mutex:
    mutex pthc[0]
thread:
0 main 9
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 34 wait cv
3 pthc[0] 25
    t14 = 3
------------------------
#49 pthc[0] store count
40: count--;
------------------------
global vars:
    count = 2
mutex:
    mutex pthc[0]
thread:
0 main 9
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 34 wait cv
3 pthc[0] 26
------------------------
#50 pthc[0] signal cv
41: pthread_cond_signal(&cv);
------------------------
global vars:
    count = 2
mutex:
    mutex pthc[0]
thread:
0 main 9
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 34
3 pthc[0] 27
------------------------
#51 pthc[0] unlock mutex
42: pthread_mutex_unlock(&mutex);
------------------------
global vars:
    count = 2
mutex:
    mutex <unlocked>
thread:
0 main 9
    i = 0
1 pthp[0] 28
    arg = 0
2 pthp[1] 34
3 pthc[0] 19
------------------------
#52 pthp[0] tau
20: while (true) {
------------------------
global vars:
    count = 2
mutex:
    mutex <unlocked>
thread:
0 main 9
    i = 0
1 pthp[0] 29
2 pthp[1] 34
3 pthc[0] 19
------------------------
#53 pthp[0] lock mutex
21: pthread_mutex_lock(&mutex);
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[0]
thread:
0 main 9
    i = 0
1 pthp[0] 30
2 pthp[1] 34
3 pthc[0] 19
------------------------
#54 pthp[0] load count
22: if (count == N) {
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[0]
thread:
0 main 9
    i = 0
1 pthp[0] 31
    t15 = 2
2 pthp[1] 34
3 pthc[0] 19
------------------------
#55 pthp[0] tau
22: if (count == N) {
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[0]
thread:
0 main 9
    i = 0
1 pthp[0] 33
2 pthp[1] 34
3 pthc[0] 19
------------------------
#56 pthp[0] load count
25: assert(count < N);
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[0]
thread:
0 main 9
    i = 0
1 pthp[0] 35
    t16 = 2
2 pthp[1] 34
3 pthc[0] 19
------------------------
#57 pthp[0] assert
25: assert(count < N);
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[0]
thread:
0 main 9
    i = 0
1 pthp[0] 36
2 pthp[1] 34
3 pthc[0] 19
------------------------
#58 pthp[0] load count
26: count++;
------------------------
global vars:
    count = 2
mutex:
    mutex pthp[0]
thread:
0 main 9
    i = 0
1 pthp[0] 37
    t17 = 2
2 pthp[1] 34
3 pthc[0] 19
------------------------
#59 pthp[0] store count
26: count++;
------------------------
global vars:
    count = 3
mutex:
    mutex pthp[0]
thread:
0 main 9
    i = 0
1 pthp[0] 38
2 pthp[1] 34
3 pthc[0] 19
------------------------
#60 pthp[0] signal cv2
27: pthread_cond_signal(&cv2);
------------------------
global vars:
    count = 3
mutex:
    mutex pthp[0]
thread:
0 main 9
    i = 0
1 pthp[0] 39
2 pthp[1] 34
3 pthc[0] 19
------------------------
#61 pthp[0] unlock mutex
28: pthread_mutex_unlock(&mutex);
------------------------
global vars:
    count = 3
mutex:
    mutex <unlocked>
thread:
0 main 9
    i = 0
1 pthp[0] 29
2 pthp[1] 34
3 pthc[0] 19
------------------------
#62 pthp[1] lock mutex
23: pthread_cond_wait(&cv, &mutex);
------------------------
global vars:
    count = 3
mutex:
    mutex pthp[1]
thread:
0 main 9
    i = 0
1 pthp[0] 29
2 pthp[1] 33
3 pthc[0] 19
------------------------
#63 pthp[1] load count
25: assert(count < N);
------------------------
global vars:
    count = 3
mutex:
    mutex pthp[1]
thread:
0 main 9
    i = 0
1 pthp[0] 29
2 pthp[1] 35
    t16 = 3
3 pthc[0] 19
------------------------

長いパスを読むのはなかなかたいへんですが,それでも自分でこれを見つけるのははるかに難しいでしょう.少しでも楽になるように,まず行頭の # で grep して遷移だけを切り出し,内部的な何もしていない遷移(tau とあるものです)を除きます.残ったものを読みながら store の後の変数値を見たり,signal で起こされたのはどのスレッドなのかを確認したりしながら読み進めると,何が起こったのかわかってきます.

#8 pthp[1] lock mutex
#13 pthp[1] load count
#14 pthp[1] store count
    count = 1
#15 pthp[1] signal cv2
#16 pthp[1] unlock mutex
#17 pthp[1] lock mutex
#22 pthp[1] load count
#23 pthp[1] store count
    count = 2
#24 pthp[1] signal cv2
#25 pthp[1] unlock mutex
#26 pthp[1] lock mutex
#31 pthp[1] load count
#32 pthp[1] store count
    count = 3
#33 pthp[1] signal cv2
#34 pthp[1] unlock mutex
#37 pthp[1] lock mutex
#39 pthp[1] load count
#43 pthp[1] wait cv mutex       (1)
#45 pthc[0] lock mutex
#48 pthc[0] load count
#49 pthc[0] store count
    count = 2
    2 pthp[1] 34 wait cv
#50 pthc[0] signal cv           (2)
    2 pthp[1] 34 wait
#51 pthc[0] unlock mutex
#53 pthp[0] lock mutex
#58 pthp[0] load count
#59 pthp[0] store count         (3)
    count = 3
#60 pthp[0] signal cv2
#61 pthp[0] unlock mutex
#62 pthp[1] lock mutex
#63 pthp[1] load count
    count = 3

ストーリーはこうです:

  1. まず生産者 pth[1] が起きてきてバッファがいっぱいになるまでデータを格納し,待ちに入ります.
  2. 次に消費者が来て,データを1つ引き取り,signal を発行して生産者 pth[1] を起こします.
  3. ここでもう1つの生産者 pth[0] が割り込んできて,空いたバッファにデータを入れてしまいます.

そのあとでのこのこと起きてきた生産者 pth[1] はバッファに空きがなくて驚くというわけです.

整理していわれてみればあたりまえのことです.複数の生産者や消費者は,同種の同僚と競争になるのでこのようなことが起こるわけです.そのためには pthread_cond_wait から返ってきたときに条件を再チェックしなければならないのでした.

対称性から,消費者の方を変更して消費者を2個以上にすれば同様の問題が発生します.ぜひ試してみてください.

2  まとめ

pthread_cond_wait から返ってきたときに条件が成立していないことがあるのは spurious wakeups だけが理由ではありませんでした.要するに必ず再チェックしろということですね.

2017/10/10

© 2013-2017 PRINCIPIA Limited