Think Stitch
PRINCIPIA  最近の更新


生産者・消費者問題

リングバッファのモデルを拡張して、複数の生産者と複数の消費者が通信しあうモデルを作ります。 いわゆる生産者・消費者問題(Producers-Consumers problem)です。

これもマルチスレッドプログラミングの本であればたいてい出ている基本的なパターンです。 それをわざわざモデルにする理由は、教科書がすべての疑問に答えてくれるわけではないからです。

「なぜこのような作りになっているのか、ここはこうじゃだめなのか?」という疑問を持ったとしても、書かれていなければ自分で考えるしかありません (それを狙ってあえて書いてなかったり、問題にしてあったりする場合もあるでしょうけど)。 頭の中で考えてもすっきりしなかったり、結果に自信が持てなかったりすることもあるでしょう。

そこで実験です。 実際にプログラムを書いて、あちこち変えて実験するといろいろなことがわかります。 特に正しくない動きを発見すれば、コードが正しくないことははっきりします(逆はいえませんが)。 しかしマルチスレッドのプログラムには、潜在的に問題があるのになかなか表面化しないというやっかいな性質があります。 実験では反例すら見逃すかもしれないのです。

そこでモデル化するわけです。 一度モデルを作ってしまえば、あちこち直して検査できます。 実験と異なり、問題があれば必ず発見できるからです(適切な問いを発すれば、という条件が付きますけど)。 実験よりも少ない手間で変更と検査ができるので、効率よくいろいろな場合を調べることができます。

実際の開発で、教科書に出ている例に似ているのだけど少しだけ違うという状況に出会うことがあります。 こんなときは「この場合についても書いておいてくれればいいのに」と思うのですが、しかたありません。 自分で修正を考えます。 しかし、やっかいなことに、マルチスレッドのアルゴリズムはちょっと変えただけで動かなくなったりするのです。 そんなときにもモデルを持っていれば修正して検査することができます。

うんちくが長くなってしまいました。 楽しい設計に移りましょう。

プロセス構成

リングバッファのときと同じように、問題の設定はオペレーティングシステムが提供する要素を使ってシステムを構築することです。 セマフォと共有メモリ上のバッファについては以前と同じです。 加えて、複数の生産者、消費者がいますから、バッファ上での読み書き位置を表すポインタを共有する必要があります。 そのために、ポインタと、排他制御のためのミューテックスを2組用意します。 リングバッファのときは RECEIVER, SENDER がそれぞれローカルに持っていたポインタを、グローバルにして共有するということです。 生産者は RECEIVER プロセスと入力チャネル、消費者は SENDER プロセスと出力チャネルで表すことも同様です。 ただし、チャネルはプロセス毎に用意します。

RECEIVER, SENDER を囲っている点線枠は、インターリーブによる並行合成を表しています。 各サービスプロセスとの通信はすべてインターリーブによる共有です。 すべての生産者・消費者から矢印を引くと図がわかりにくくなるので、このような省略をしました。

プロセス定義

各プロセスを定義していきます。 イベント・チャネル定義についてはリングバッファの延長でわかると思いますので、省略します。

プロセス RECEIVER

各 RECEIVER プロセスには番号でプロセス ID を振ります。 プロセス定義は1つだけで、プロセス ID の異なる複数のインスタンスを生成します。

左上の点線枠で示された状態は抽象状態です。 この抽象状態はプロセス ID を保持する状態変数 id を定義しています。 すべての状態はこの抽象状態を継承していて、状態変数 id を持つことになります。 抽象状態と継承に関する詳細は後で説明することにして、まず動作を説明します。

  1. 水色で示された状態 R が初期状態です。まず slot.wait で空きスロットを確保します(リングバッファの反省を生かして、先にスロットを確保します)。
  2. プロセス ID に対応する入力チャネルからデータを受信します。(in k) = in.k という関数を定義しておきます。
  3. バッファへの書き込み位置 PUTP を取得するために、まず putp.lock でミューテックス PUTP MUTEX をロックします。
  4. 書き込み位置 PUTP を putp.rd で取得します。
  5. バッファにデータを書き込みます。
  6. 書き込み位置 PUTP を更新します。
  7. putp.unlock でミューテックス PUTP MUTEX をアンロックします。
  8. 最後に MESSAGE SEMAPHORE に msg.signal を送り、データがあることを通知します。

継承と抽象状態

状態遷移図の中で、ある一群の状態が共通の変数を持つということがあります。 例えば、受信したデータを持ち運ぶような場合です。 各状態で状態変数を定義してもよいのですが、変更があった場合に直すのが手間です。 このようなときには状態の継承を使うことができます。

状態を継承すると、継承された状態が持つすべての状態変数が継承されます。 これにより、変更の必要が生じた場合、継承された状態の状態変数を変えるだけで済ませることができます。 target 式と合わせて使うと、より修正が容易になります。

継承の対象とする状態はふつうの状態でよいのですが、継承したい変数以外の変数がある場合などは困ります。 そういうときには抽象状態を使うことができます。 抽象状態は状態変数を定義して継承してもらうためにだけ作る状態です。 プロセスの実の状態にはなりません。 共通する変数だけを持つ抽象状態を作って継承し、あとはそれぞれが個別の状態変数を追加定義すればいいわけです。

RECEIVER での使用例を見てみます。 まず抽象状態 RA を作ります。 ふつうに状態を作ってから、abstract state プロパティを Yes にすると抽象状態になります。 そして状態変数を定義します。 ここではプロセス ID を格納する変数 id を定義します。

継承をしたい場合は base state プロパティに継承する状態の名前を入れます。 すると inherited vars に継承した変数の名前が表示されます。

遷移のプロパティでも、継承した変数が表示されます。

継承した状態でも状態変数を定義すると、継承された状態変数は、その状態で定義した状態変数の前に追加されます。

継承された状態がさらに別の状態を継承している場合は、その経路上で定義されたすべての状態変数が継承されます。 指定した状態名の状態が存在しなかったり、継承の経路が循環している場合にはエラーになります。

プロセス SENDER

SENDER プロセスの動作もリングバッファの拡張なので、詳細な解説は不要でしょう。

セマフォ

セマフォはリングバッファのときと同じです。

(define-process (SEM c)
  (alt
    (if (= c 0)
        STOP
        (! wait (SEM (- c 1))))
    (if (= c N)
        STOP
        (! signal (SEM (+ c 1))))))

バッファ

バッファもリングバッファのときと同じです。

(define-process (BUFFER m)
  (alt
    (? rd (k x) (eq? x (list-ref m k))
       (BUFFER m))
    (? wr (k x)
       (BUFFER (list-update m k x)))))

ポインタプロセス

ポインタプロセスは単なる1変数を表すプロセスです。 バッファと異なりインデックスはないので、読み出しに関しても特別なことはありません。

(define-process (PTR p)
  (alt
    (! p.rd (p) (PTR p))
    (? p.wr (q) (PTR q))))

ポインタは書き込み用 PUTP と読み出し用 GETP の2つが必要なので、改名によって2つのインスタンスを作ります。

ミューテックス

ミューテックスも以前のものと同じです。 改名によって2つのインスタンスを作ります。

システムプロセス

すべてのプロセスを並行合成して、システムプロセスを作ります。 RECEIVER, SENDER は複数あるので、xpar を使ってインターリーブ結合します。 NR, NS はそれぞれ RECEIVER, SENDER の数です。 ここでは NR = NS = 2 としておきます。 (xpar を使っているので 1 は指定できません。)

(define-process SYSTEM
  (hpar (list rd wr
              slot.wait slot.signal
              msg.wait msg.signal
              putp.rd putp.wr
              getp.rd getp.wr
              putp.lock putp.unlock
              getp.lock getp.unlock)
    (par '()
      (BUFFER m0)
      (rename `((,wait   . ,slot.wait)
                (,signal . ,slot.signal))
        (SEM N))
      (rename `((,wait   . ,msg.wait)
                (,signal . ,msg.signal))
        (SEM 0))
      (rename `((,p.rd . ,putp.rd)
                (,p.wr . ,putp.wr))
        (PTR 0))
      (rename `((,p.rd . ,getp.rd)
                (,p.wr . ,getp.wr))
        (PTR 0))
      (rename `((,lock . ,putp.lock)
                (,unlock . ,putp.unlock))
        MUTEX)
      (rename `((,lock . ,getp.lock)
                (,unlock . ,getp.unlock))
        MUTEX))
    (par '()
      (xpar id (interval 0 NR) '() (R id))
      (xpar id (interval 0 NS) '() (S id)))))

仕様

さて、今回も仕様を後回しにしました。 生産者・消費者問題の仕様はどのようなものでしょうか。 大まかにはもちろん、入力されたものが出てくるのですが、その振る舞いはやや複雑です。

まず、すべてのチャネルからいつでも入力できるわけではありません。 バッファがいっぱいのときは当然ですが、スロットが空くのを待っている RECEIVER は入力を受け付けないからです。 そこで、ここでは仕様をかなりゆるいものにして、バッファがいっぱいでなければ、どれかのチャネルから入力できるとしておきます。 これは非決定的選択 xndc で書くことができます。 より厳しい仕様については考えてみてください(なげやりですみません)。

もう1つややこしいのは、出力です。 これはチャネルを使っているせいなので、あまり本質的なことではないのですが、出力が行われる順序は、データを格納した順序とは異なり非決定的になります。 データの引き取り順序は、ミューテックスで排他制御していますから必ず格納順なのですが、引き取ったあとのプロセスが任意の順序で実行されるので非決定的になってしまうのです。 そこで出力に関しても xndc を使ってゆるい仕様にしてあります。 (どの SENDER が引き取るかについては当然非決定的です。) 生産者・消費者問題の仕様としては、データの引き取りイベント rd?(k x) に着目して記述・検査をした方がよいでしょう。 これもぜひやってみてください(ほんと、ごめんなさい。今日は熱があるのでそこまでやる元気がでません)。

(define-process SPEC
  (MQUE N '()))

(define-process (MQUE n q)
  (alt  
    (if (= n (length q))
        STOP
        (xndc id (interval 0 NR)
          (? (in id) (x) (MQUE n (cons x q)))))
    (if (null? q)
        STOP
        (xndc x q
          (xndc id (interval 0 NS)
            (! (out id) (x) (MQUE n (remq1 x q))))))))

仕様に関して1点補足します。 非決定的に選択したキューの要素を削除するために、関数 remq1 というものを用意しておきます。 Scheme の標準関数 remq は指定した値に一致する要素をすべて削除してしまうので、1個だけ削除するものが必要なのです。

(define (remq1 x xs)
  (cond ((null? xs) '())
        ((eq? x (car xs)) (cdr xs))
        (else
         (cons (car xs) (remq1 x (cdr xs))))))

簡単にするために、基本の再帰で書きました。 はじめて LISP を勉強したころを思い出します。 その頃は全部大文字で書いていましたが。

正当性検証

今回はすっきり OK をいただきました。 仕様がゆるいからですけどね。 (^^;

ロック外でバッファを更新した場合

こんなゆるい仕様でも、ちょっと変えるととたんに違反してしまうのです。 例えば RECEIVER は PUTP のロック中にバッファへの書き込みを行っていますが、これは必要でしょうか? (まあ、聞くからには必要なんですけどね。でもすぐに何がいけないのかわかるでしょうか。)

wr をロックの外に出してみます。RECEIVER は次のようになります。

(ちなみに、元のプロセスで wr が put.wr の前にあるのは、状態数を減らすためです。 先に書き込みをしてしまえば、書き込む値 x を捨てられるからです。 もっとも多少の最適化はしているので影響ないかもしれません。 (「最適化」という用語はなんとかならないのでしょうかね。 不適切だし、使っていてもちょっとはずかしいです。 最適じゃないし。せめて効率化とか・・・。))

正当性検査をするとトレース違反になります。

仕様の計算木は次のとおりです。 ここでは N = 2 なので、B を2回入力してバッファはいっぱいです。 あとは B を出力するしかありません。

ところがシステムの遷移リストを見ると A の出力があります!

そんなはずはないとバッファの中を見てみると、A があります!!

システムプロセスの隠蔽を剥いでトレースを求めると次のようになります。 わかりますでしょうか、どうでしょうか?

2つの RECEIVER が並行して受信と書き込みを始めます。

  1. ポインタを先に読んだのは RECEIVER.1 で、ポインタ値は 0 です(line 4)。 インターリーブなので、一般にはイベントからクライアントはわかりません。 でもここでは前に in.1.B (line 2)があるので、RECEIVER.1 だとわかります。
  2. 次に RECEIVER.0 がポインタ値 1 を取りました(line 9)。
  3. そのまま RECEIVER.0 はインデックス 1 に B を書き込み、msg.signal を発行します(line 12, 13)。
  4. これを受けて SENDER (のどちらか)がインデックス 0 (!)の値を読み出しました(line 18)。

つまり A というのは配列の初期値なんです。 wr をロックの外に出したせいで、書き込みの順序が入れ替わってしまったのでした。 (この問題はデータの種類が2種類ないと検出できません。)

まったくマルチスレッドのプログラミングは難しいものです。

2013/09/01
© 2013,2014,2015 PRINCIPIA Limited