Think Stitch
PRINCIPIA  最近の更新


条件変数のモデル
“pthread_cond_wait はなぜミューテックスを引数にとるのか?”

条件変数のモデルを作ります。 ミューテックスのときと同じようにシステムコールをシミュレートしてモデルを作ります。

こちらにある資料にも条件変数とミューテックスに関する詳しい説明とモデルがあります.

はじめて pthead を勉強したとき、条件変数を待つ関数 pthread_cond_wait がなぜ条件変数の他にミューテックスを引数にとるのか不思議でした。 pthead を知っている人であれば、理由を知っている人が多いでしょう。 でも、出会いの運よく、疑問に思っていたけど理由は知らないという人がいたら、ここに来たのはラッキーです。 その理由を「お見せ」しましょう。 「本にそう書いてあったから」とか、「自分で考えてみて、なんとなくはわかるけど・・・」という人にも楽しんでもらえると思います。 「そんなことはよくわかっている」というベテランの方には、あとでもう少し難易度の高い検査も用意してありますので、よかったお付き合いください。

条件変数モデルの検査を行う状況設定

説明の都合上、先に条件変数モデルの検査を行う状況を説明します。 図のように条件変数 CONDVAR と共有メモリ上の変数、それからプロセス P と、あとプロセス Q のインスタンスを複数用意します。

プロセス Q のインスタンスの個数を N とします。 プロセス P はロックをかけて共有変数の値をインクリメントし、それを条件変数を使って通知します。 共有変数の初期値は0です。 プロセス P はこれを N 回繰り返して終了します。

一方、プロセス Q はロックをかけて共有変数の値を読み出し、もし0でなかったらデクリメントしたあと終了します。 実際の応用では値を読み込んだ後で何か仕事をするでしょうけれども、ここでは簡単のために省略します。 もし共有変数の値が0だった場合は、条件変数を待ちます。 通知がなされたら、再度共有変数の値をチェックするところから繰り返します。

プロセス Q のインスタンスが N 個あって、プロセス P は N 回インクリメントしますから、Q の各インスタンスがすべてデクリメントを行った時点で P と Q の すべてのインスタンスが終了することになります。 そうしたら条件変数と共有変数のプロセスも終了させて、システム全体を正常終了させることにします。

条件変数のモデル

リターンイベント

システムコールのシミュレートを行うために、プロセス P とプロセス Q の各インスタンスのリターンイベントを定義します。

(define-event p)
(define-event-list qs N "q")

プロセス P のリターンイベントは p, プロセス Q のリターンイベントは q.0, q.1, ..., q.N-1 です。

システムコールの呼び出しチャネル

次に条件変数とミューテックスのインターフェイスとなるチャネルを定義します。 システムコールの呼び出しチャネルです。 条件変数のチャネルを wait, signal, ミューテックスのチャネルを lock, unlock とします。 今回、broadcast は使わないので省略します。

(define SD (map list (cons p qs)))

(define-channel lock (r) SD)
(define-channel unlock (r) SD)

(define-channel wait (r) SD)
(define-channel signal (r) SD)

ここでちょっとひと工夫、というか新しい小技を紹介します。 ミューテックスのときには、各プロセスに番号で ID を振って区別をしました。 ここでは各プロセスのリターンイベントそのものをプロセス ID として使うことにします。 こうするとモデルの記述がちょっときれいになります。list-ref を使わなくてよいからです。 欠点は、プロセスエクスプローラで見るときにイベントオブジェクトの表示文字列が見えてしまうので、ちょっと見にくくなることです。 たとえば #<event:q.1:8> という感じです。

念のため復習しておくと、チャネルの引数は必ずリストにしなければいけないので、いつものように map と list を使ってリストの中に入れます。 プロセス Q のインスタンス用リターンイベントはすでにリスト qs になっていますから、これにプロセス P のリターンイベント p を追加します。

条件変数とミューテックスのプロセス CM

条件変数とミューテックスを表すプロセス CM を次のように定義します。 これはオペレーティングシステムの一部だと考えることができます。

プロセスパラメータ m と ms はミューテックス用で、前のモデルと同じです。 m はミューテックスの所有者、ms はミューテックスの獲得待ち行列です。 前のモデルと違う点は、プロセス ID として数の代わりにリターンイベントが入るという点です。

プロセスパラメータ cs は条件変数を待っているプロセスの待ち行列です。 ms と同じように、リターンイベントを登録します。

(define-process (CM m ms cs)
  (alt
    (? lock (r) (g r ms cs)
       (if (eq? r m)
           STOP
           (if m
               (CM m (append ms (list r)) cs)
               (! r (CM r ms cs)))))
    (? unlock (r) (g r ms cs)
       (if (and m (eqv? r m))
           (! r
              (if (null? ms)
                  (CM #f '() cs)
                  (! (car ms)
                     (CM (car ms) (cdr ms) cs))))
           STOP))
    (? wait (r) (g r ms cs)
       (if (and m (eqv? r m))                  ; (1)
           (let ((cs-u (append cs (list r))))
             (if (null? ms)                    ; (2)
                 (CM #f '() cs-u)
                 (! (car ms)
                    (CM (car ms) (cdr ms) cs-u))))
           STOP))
    (? signal (r) (g r ms cs)
       (! r                                    ; (3)
          (if (null? cs)
              (CM m ms '())
              (! (car cs)                      ; (4)
                 (CM m ms (cdr cs))))))
    (! terminate SKIP)))                       ; (5)

ミューテックスの部分は前のモデル MUTEX5 とほぼ同じです。 待ちに入るときには ms の末尾に追加して、待ち行列(FIFO)とします。 MUTEX5 と異なり、リターンの順序は固定してあります。 システムコールの引数がリターンイベントなので、

(? lock (r) ... (! r ...))

と書けるところが便利です。

では条件変数のパートを見てみます。 まず wait を呼ぶことができるのはミューテックスの所有者だけです (1)。これをチェックしておきます。 次に、ミューテックスを待っているプロセスがいるかどうかチェックします (2)。 もしいない場合は単に待ちに入るだけです。 いる場合には、待ち行列先頭のプロセスにロックを渡し、リターンさせます。 いずれの場合も待ち行列 cs への登録は末尾に行います(FIFO)。 この部分は共通なので、cs-u としてまとめてあります。

プロセス CM が wait の処理を行っている間、他のシステムコールは実行されませんから、呼び出しプロセスがミューテックスをアンロックする動作と待ちに入る動作は不可分(アトミック)に行われることになります。

次は signal です。 条件変数を待っているプロセスがいるかいないかにかかわらず呼び出しプロセスは返すので、先にリターンさせています (3)。 もし待っているプロセスがない場合は何もしません。 待っているプロセスがいる場合には、先頭のプロセスを解放します (4)。 signal なので解放するのは1個だけです。

pthread_cond_wait の場合、待ちから解放されたらそのままミューテックスの再獲得に入るのですが、ここでは省略してあります。 リターンしたあとで lock を呼び出すのと同じだからです。 (実際の応用でも再獲得がじゃまな場合があります。システムコールのコストを考えると、両方あればいいんと思うんですけど。)

最後の節 (5) はプロセスを終了させるためのしかけです。 イベント terminate を受け取ったら終了することにします。 前回のミューテックスでは簡単にするために省略しましたが、今回はきっちりやることにします。

尚、各受信についているのは、前のミューテックスモデルと同じくモデルを有限にするためのガードです。 関数 g は次のように定義しておきます。

(define (g r ms cs)
  (and (not (memq r ms)) (not (memq r cs))))

システムの設計

では条件変数を使ったシステムを作りましょう。 プロセスの構成は次のようにします。

プロセス P

プロセス P は下図のとおりです。

プロセス P は処理を N 回繰り返すので、カウンタとして状態変数 n を用意します。 状態変数は抽象状態 PA で定義して継承します。

ミューテックスをロックして変数をインクリメントし、アンロックして条件変数にシグナルを発行します。 これを N 回繰り返します。

最後が終了なので、ループからの脱出は処理の後に置きました。 このため、状態変数 n の初期値は N - 1 となります。

プロセス Q

プロセス Q は下図のとおりです。

プロセス ID としてリターンイベントを使うので、抽象状態 QA で状態変数 q を定義し、各状態で継承します。

まずミューテックスをロックして、共有変数の値を読みます。 もし値が0でなければ、デクリメントしてアンロックし、プロセスを終了します。 値が0であった場合は条件変数の待ちに入ります。 シグナルを受けて返ってきたら、ロックから再試行します。

繰り返しますが、pthread_cond_wait と異なり、wait から返ってきた時点ではミューテックスのロックを持っていないので、ロックの獲得から再開しなければなりません。 もし pthread_cond_wait と同じモデルを作った場合には、状態 Q2 に戻ればいいことになります。

共有メモリ上の変数プロセス MEM

共有メモリ上の変数を表すプロセス MEM を次のように定義します。

(define-process (MEM m)
  (alt
    (! rd (m) (MEM m))
    (? wr (x) (MEM x))
    (! terminate SKIP)))

プロセス MEM にも終了のしかけを入れておきます。

システムプロセス SYS

最後にプロセスを並行合成して、システムのプロセスを作ります。

(define-process SYS
  (par (cons* lock unlock
              wait signal
              rd wr
              terminate p qs)
    (seq
      (par '()
        (P (- N 1))
        (if (null? (cdr qs))
            (Q (car qs))
            (xpar q qs '() (Q q))))
      (! terminate SKIP))
    (par (list terminate)
      (MEM 0)
      (CM #f '() '()))))

いつもと異なる点が1つあります。クライアントプロセス P と Q のすべてのインスタンスが終了すると、インターリーブの par が終了します。 その後で、terminate イベントを発生するように逐次合成 seq を入れてあります。 イベント terminate は同期リストに入っているので、プロセス MEM, CM とも同期することになります。 これは初めて出てきたパターンです。 3つのプロセスが同期しているからです。 CSP の同期型相互作用は、2つだけでなく3つ以上のプロセスの間で同期することができます。 これをマルチ同期と呼ぶことにします。 これのおかげで、1個の終了要求イベントだけで、すべてのサービスプロセスを終了させることができます。

プロセス Q のインスタンスを生成する部分の場合わけは N = 1 の場合に対処するためです。 xpar は最低2つのプロセスを必要とするからです。 N = 1 の場合は qs が要素を1つしか持たないので、(cdr qs) は空リスト () になります。 これを見ているわけです。 もちろん (= N 1) でもかまいません(なぜそうしなかった???)。

検査

もし何か不具合があればシステムは正常終了できないはずです。 つまりデッドロックします。 そこでデッドロック検査をしてみます。

結果は次のとおり、問題ないことがわかりました。

システムプロセスのシミュレーション

これだけではちゃんと動いているのかどうかわからないでしょうから、プロセスエクスプローラで探査してみましょう。N = 2 とします。

まず初期状態では各クライアントプロセスがロックシステムコールを呼び出そうとします。

プロセス Q0 が呼び出した場合を見てみます。 ロック獲得後、共有変数を読むと値が0なので、条件変数の待ちに入ります。 cs にリターンイベントが登録されたことがわかります。

次にプロセス Q1 がシステムコールを呼び出したとします。 同じく変数の値が0なので待ちに入ります。 cs は (q.0 q.1) の順序になります。

いよいよ(?)プロセス P が入ってきました。 変数をインクリメントし、シグナルを発行します。 すると待ち行列の先頭にいたプロセス Q0 が解放されました。 cs は q.1 だけになります。

もう1度プロセス P が走って、今度はプロセス Q1 が解放されました。 cs は空になりました。 プロセス P はこれで2回回ったので、終了します。

プロセス Q0 がロックして変数を読みます。 値が2なので1に更新し、アンロックして終了します。

同じくプロセス Q1 がロックしてデクリメントし、アンロックして終了します。

これでクライアントプロセス P, Q はすべて OMEGA になりました。

プロセス木が順に畳まれて・・・

逐次合成 seq が移行し、終了要求イベント terminate が発行されようとしています。

終了要求イベント terminate が発生すると、3プロセスが同時に SKIP へ遷移します。マルチ同期の瞬間です。

最後にシステム全体が正常に終了しました。

一例に過ぎませんが、条件変数が機能している様子がわかります。 モデルを作っていろいろ歩き回ると、よりよくわかると思います。

MUTEX と 条件変数の分離

それでは、もし wait システムコールでミューテックスの解放と待ち状態への遷移が不可分ではなかったらどうなるのかを見ていきましょう。 上のモデルでは条件変数とミューテックスを1つのプロセス CM で実現していました。 このままの形で wait システムコールを修正してもいいんですけど、せっかくなのではっきりわかるように、プロセスを分離することにします。 構成を以下のように変えます。

ミューテックス MUTEX

MUTEX は CM からそのまま切り出しただけです。 ただし、プロセスパラメータ cs がなくなったので、ガードは変えてあります。

(define-process (MUTEX m ms)
  (alt
    (? lock (r) (not (memq r ms))
       (if (eq? r m)
           STOP
           (if m
               (MUTEX m (append ms (list r)))
               (! r (MUTEX r ms)))))
    (? unlock (r) (not (memq r ms))
       (if (and m (eqv? r m))
           (! r
              (if (null? ms)
                  (MUTEX #f '())
                  (! (car ms)
                     (MUTEX (car ms) (cdr ms)))))
           STOP))
    (! terminate SKIP)))

孤独な条件変数 CV

条件変数はだいぶシンプルになります。 wait システムコールでは、単に待ち状態になるだけです。

(define-process (CV cs)
  (alt
    (? wait (r) (not (memq r cs))
       (CV (append cs (list r))))
    (? signal (r) (not (memq r cs))
       (! r
          (if (null? cs)
              (CV '())
              (! (car cs) (CV (cdr cs))))))
    (! terminate SKIP)))

プロセス Qx

wait の処理を2つに分離したので、プロセス Q は合わせて変更する必要があります。 これを Qx とします。 プロセス P に影響はありません。

以前の wait 呼び出し部分を unlock と 新 wait 呼び出しの連続に代えます。

システムプロセス SYS2

各プロセスを並行合成して、システムプロセス SYS2 を作ります。

(define-process SYS2
  (par (cons* lock unlock
              wait signal
              rd wr
              terminate p qs)
    (seq
      (par '()
        (P (- N 1))
        (if (null? (cdr qs))
            (Qx (car qs))
            (xpar q qs '() (Qx q))))
      (! terminate SKIP))
    (par (list terminate)
      (MEM 0)
      (MUTEX #f '())
      (CV '()))))

terminate イベントは4者で同期することになります (^^)。

検査 2

検査をすると、デッドロックが発見されました。

N = 1 としてもデッドロックします。そこでこの状況を見てみます。

  1. まず Q0 が動きます。変数の値0を読んで、待ちに入ろうとするのですが、アンロックしたところでプロセス P が動き始めます(行 3〜5)。
  2. プロセス P は変数をインクリメントしてシグナルを発行します(行 11)。プロセス Q0 はまだ待ち状態ではありません。
  3. プロセス P が signal からリターンした直後に Q0 が待ちに入ります(行 13)。

この後プロセス P は終了してしまうので、当然のことながらプロセス Q0 を起こしてくれる人はいません。

見てのとおり、問題はプロセス Q0 の unlock と wait の間にプロセス P が割り込んだことです。 そのせいでプロセス Q0 が待ちに入る前に signal が発行されて、空振りしてしまうのです。

これを避けるには unlock と wait を不可分にする必要があります。 そのためには、wait システムコールがミューテックスにアクセスできなくてはなりません。 だから pthread_cond_wait はミューテックスを引数に取るというわけです。

はじめてこの現象を理解した人と条件変数を設計した人はすばらしいですねえ。

より精密な検査

さて、初心者の人には楽しんでもらえたかもしませんが、条件変数をよく知っている人には常識的な話で、やや冗長&退屈だったかもしれません。 そこでもっと精密な検査をしてみることにします。

リングバッファや生産者・消費者問題では、(システムの振る舞いに対して)比較的ゆるい仕様を使っていたので、気に入らない人がいたかもしれません。 検査のために、いろいろしかけを入れたり、モデルを修正したりする点が気に入らない人もいたかもしれません。 (最後までやってみせないところが気に入らないという人もいたかも。) そこで今回はもっと詰めた仕様を作ることにします。 さらに新しい技も使っちゃいます。 ちょっとたいへんですけど覚悟してください。 いや、その、いつも簡単過ぎて退屈なのかなと思って。

仕様: 何を検査する?

検査するポイントはいろいろありますけど、ここでは、生産者・消費者問題のときにちょっといった、共有変数への書き込み wr を追ってみることにします。 前と同じように、プロセス Q が読んだ値を観測用のチャネルを用意して出力させることもできますけど、モデルを修正することになるし、出力自体のインターリーブで順序関係が失われてしまうこともあったので、今回はモデルの修正なしで、直接 wr を観測します。

ところが1つ問題があります。共通変数への書き込み wr はプロセス P と Q の両方が使うので、イベントだけからはどちらが使ったのか判別できません。 共有変数のプロセス MEM を修正して、それぞれのプロセス用に p.wr と q.wr を用意すれば区別ができますけど、モデルを修正しないという約束に反します。 そこでちょっと高級な(?)テクニックを使うことにします。

イベントを外部からの観測で区別する

まず目標をはっきりさせましょう。 ここでの目標は、システムに属するプロセス、すなわち CM, MEM, P, Q を修正せずに P が発行した wr と Q が発行した wr を区別することです。 より具体的には、前者を p.wr へ、後者を q.wr へ改名します。

2つを区別するためにやってよいことは、プロセスを外から観測することだけです。 観測するだけで区別できるでしょうか。 できます。 どうするかというと、wr の前に発生した rd のうち、もっとも近いものを見るのです。 もしそれがプロセス P によるものならば、rd の引数は wr よりも 1 少ないはずです。 逆に 1 大きければプロセス Q によるものだとわかります。 そこで rd と wr を監視するプロセスを作って、どちらのプロセスが発生させたものであるかを区別します。

区別できた後はどうするかというと、p.wr と q.wr に改名します。 これは次のようにして実現できます。 まず wr を p.wr と q.wr の両方に改名します。 1つのチャネルを2つのチャネルに改名するのです。 つまり1対多の改名です。 こうすると計算木上で wr の付いているすべての遷移が、p.wr と q.wr の2本になります。 遷移先は同じです。

ここで監視プロセスにもうひと働きしてもらいます。 監視プロセスは各 wr がプロセス P, Q どちらによるものなのかを識別しているので、プロセス P の場合は p.wr だけ、Q の場合は q.wr だけを発生するように制限をかけるのです。 これは制約プロセスで説明したように、並行合成を使って実現することができます。

結果としてできるプロセスは、元のシステムプロセスと同じ動作をするけれども、wr だけが p.wr と q.wr に区別されたものになります。 こうすれば、p.wr, q.wr に対して記述した仕様と比較することができます。 逆にいえば、P と Q の振る舞いを識別する詳細な仕様を作ることができるということです。

プロセス REGULATOR

システムプロセスを外から監視し、かつ制限を加えるプロセス REGULATOR を定義します。 プロセス REGULATOR は2つの補助プロセスを使って定義します。

まずすべてのイベント Events から terminate, rd, wr, p.wr, q.wr を取り除いたイベント A を作っておきます。 これは rd と p.wr, q.wr 以外を無視するためです。 wr も使わないので除いておきます(これは必須ではありませんが)。 最初に計算しておくのは、何度も計算するのを避けるためです。 変数 Events と関数 channel-events はプロセスの中でしか使うことができないので、このようにしています。 関数 lset-difference は集合の差を求める関数です。

(define-process REGULATOR
  (let ((A (lset-difference
            eq?
            Events
            (list terminate)
            (channel-events rd)
            (channel-events wr)
            (channel-events p.wr)
            (channel-events q.wr))))
    (REGULATOR-HUNT A)))

プロセス REGULATOR は、まず rd を監視します。 もし rd を見つけたら、その引数 x を添えてプロセス REGULATOR-RESTRICT へ移行します。 rd 以外のイベントは何が発生しても無視します。 ただし terminate が発生した場合は終了します。 これがないと、制約をかけたあとのシステムが終了できなくなるからです。

(define-process (REGULATOR-HUNT A)
  (alt
    (! terminate SKIP)
    (xalt e A (! e (REGULATOR-HUNT A)))
    (? rd (x)
       (REGULATOR-RESTRICT x A))))

つづいてプロセス REGULATOR-RESTRICT は p.wr と q.wr を監視しつつ制約をかけます。 rd の引数を c として、p.wr(c+1) と q.wr(c-1) だけを許可し、あとの p.wr, q.wr は禁止します。 もし再び rd が発生した場合には、監視する引数を更新します。 連続する rd は、プロセス Q が 0 を読んで待ちに入るときに発生することがあります。 それ以外のイベントは無視します。 terminate が発生した場合はデッドロックすることになります。 最後に wr がないのは正常な動作ではないからです。

(define-process (REGULATOR-RESTRICT c A)
  (alt
    (xalt e A
      (! e (REGULATOR-RESTRICT c A)))
    (? rd (x)
       (REGULATOR-RESTRICT x A))
    (? p.wr (x) (= x (+ c 1))
       (REGULATOR-HUNT A))
    (? q.wr (x) (= x (- c 1))
       (REGULATOR-HUNT A))))

プロセス REGULATOR の計算木

N = 2 の場合の、プロセス REGULATOR の計算木は以下のようになります。 rd 後の wr の引数の値に応じて、p.wr または q.wr が出ていることがわかります。 引数が 0 の場合と N の場合は、それぞれ q.wr, p.wr が出ません。 0 のときに q.wr がないのは当然ですね。 N のときに p.wr がないのは、プロセス P が N 回しかインクリメントしないからです。

ラベルをつけた各状態での遷移リストは以下のとおりです。 初期状態で terminate を受けると終了します。 また REGULATOR-RESTRICT で rd を受けると、受信値が更新されることがわかります。

変換したシステムプロセス RR-SYS

プロセス REGULATOR を使ってシステムプロセスを変換します。 まずチャネル wr を p.wr と q.wr に改名します。 次に REGULATOR と並行合成します。 同期リストにはすべてのイベント Events を指定します。 これによって2つのプロセスはすべてのイベントで同期しなければならないので、REGULATOR が発生させるイベントだけが発生することになります。

(define-process RR-SYS
  (par Events
    (rename `((,wr . ,p.wr) (,wr . ,q.wr))
      SYS)
    REGULATOR))

変換したシステムプロセス RR-SYS のシミュレーション

変換したシステムプロセス RR-SYS を探査してみます。N=2 とします。

最初にプロセス P が動く場合を見ていくと、wr が p.wr になっていることが確認できます。

その後、再び P が動く場合と Q のどちらかが動く場合の分岐があります。

先に上の分岐、P を追うとつぎのようになります。 p.wr.2 が発生しています。 プロセス P はこれで2回目の実行なので、終了します。

次に Q0 が動くしたの分岐を見ると、確かに q.wr.0 になっています。 Q0 はこれで終了します。

プロセス P, Q すべてが終了すると、terminate の発生に来ます。

ここで REGULATOR を含む4者が同期し、システムは終了します。

共有変数への書き込みの仕様

これで p.wr, q.wr を使って書かれた仕様と比較する準備はできました。 では p.wr, q.wr を使って仕様を書きましょう。

どのようなシーケンスがありうるかすぐにわかるでしょうか。 私はだいぶ考え込んでしまいました。 P, Q それぞれが N 個あるとき、この 2N 個を並べてできるすべてのリストを考えます。 P に +1, Q に -1 を割り当てたとき、先頭から途中までの合計が1度も負にならない場合だけを選別します。 これを先頭から各位置までの合計に変換したものが、p.wr, q.wr の引数になるでしょう?

初期値0をいけにえにして、部分合計を求めながらやれば一気に求めることができます。 リストの長さについて再帰的に求めることにします。 ss はいままでに求まっているありうるリストのリスト、u, d はそれぞれ残りの P, Q の数です。

(define (f u d ss)
  (if (and (= u 0) (= d 0))
      ss
      (append
       (if (> d 0)
           (f u (- d 1)
              (map (lambda (s)
                     (cons (- (car s) 1) s))
                   (filter (lambda (s)
                             (> (car s) 0))
                           ss)))
           '())
       (if (> u 0)
           (f (- u 1) d
              (map (lambda (s)
                     (cons (+ (car s) 1) s))
                   ss))
           '()))))

ss の各要素について、もし先頭の値、つまり現在の値が正で、Q が残っているならば1つ減らした値を追加します。 一方、P が残ってる場合は、無条件に +1 した要素を追加できます。 この2つをあわせたものが、1つ長いすべてのリストのリストになります。

例えば N = 3 の場合だと次のようになります。末尾の 0 はいけにえの残りです。

(f 3 3 '((0)))
((0 1 0 1 0 1 0)
 (0 1 2 1 0 1 0)
 (0 1 0 1 2 1 0)
 (0 1 2 1 2 1 0)
 (0 1 2 3 2 1 0))

合計を先に求めてしまったので、P, Q の区別がなくなっちゃいました。 同時に埋め込むこともできますが、汚くなるので、REGULATOR と同じく増減で区別することにします。

(define ES
  (map (lambda (s)
         (cdr
          (reverse
           (maplist (lambda (s)
                      (if (null? (cdr s))
                          'dummy
                          (if (< (car s) (cadr s))
                              (list q.wr (car s))
                              (list p.wr (car s)))))
                    s))))
       (f N N '((0)))))

maplist というのは、map と異なりリストの要素ではなく、まず全体、次にその cdr、さらに cddr と渡していくものです。 定義を見た方が早いでしょう。

(define (maplist fn xs)
  (if (null? xs)
      '()
      (cons (fn xs)
            (maplist fn (cdr xs)))))

ES は次のようになります。

(((p.wr 1) (q.wr 0) (p.wr 1) (q.wr 0) (p.wr 1) (q.wr 0))
 ((p.wr 1) (q.wr 0) (p.wr 1) (p.wr 2) (q.wr 1) (q.wr 0))
 ((p.wr 1) (p.wr 2) (q.wr 1) (q.wr 0) (p.wr 1) (q.wr 0))
 ((p.wr 1) (p.wr 2) (q.wr 1) (p.wr 2) (q.wr 1) (q.wr 0))
 ((p.wr 1) (p.wr 2) (p.wr 3) (q.wr 2) (q.wr 1) (q.wr 0)))

この各要素を $ に渡せば、チャネルイベントを作ることができます。

仕様プロセス SPEC

仕様プロセスは次のように定義できます。 まず REGULATOR のときと同じように、コストの高い値を事前に作っておきます。 1つは p.wr, q.wr 以外のすべてのイベントのリスト A、もう1つは ES から作ったチャネルイベントのリストのリスト ess です。

(define-process SPEC
  (let ((A (lset-difference
            eq?
            Events
            (channel-events p.wr)
            (channel-events q.wr)))
        (ess (map (lambda (s)
                    (map (lambda (args)
                           (apply $ args))
                         s))
                  ES)))
    (SPEC-HUNT A (+ N N) ess)))

仕様の本体は次のようになります。 イベントは p.wr, q.wr それぞれ N 回ずつ、合計 2N 回発生するので、これをプロセスパラメータ n でカウントします。 n が 0 になったら終了しますが、tick が発生するまでの間に p.wr, q.wr 以外のイベントが発生する可能性があるので、これに対処します。 n > 0 の場合は発生しうるイベントのリスト ess のうち、先頭のイベントとして許可されているものだけが発生できます。 これは (map car ess) と表すことができます。 このうち発生したものを e とすると、ess の中で先頭が e と異なるものは除外します。 イベントが発生するたびに、残りの可能性を絞り込んでいくわけです。 これ以外に、A に含まれるイベントは任意に発生できます。

(define-process (SPEC-HUNT A n ess)
  (if (= n 0)
      (ndc
        SKIP
        (xndc e A (! e (SPEC-HUNT A n '()))))
      (ndc
        (xndc e (map car ess)
          (! e
             (SPEC-HUNT A (- n 1)
                        (map cdr
                             (filter (lambda (es)
                                       (eq? e (car es)))
                                     ess)))))
        (xndc e A
          (! e (SPEC-HUNT A n ess))))))

これで仕様は完成です。

仕様プロセス SPEC のシミュレーション

N = 3 の場合、プロセス SPEC の計算木は次のようになります。

(クリックで拡大、別ウィンドウ)

5通りの可能性が見て取れます。

検査 3

検査をすると、仕様どおりとなりました。

あとは、わざとあちこち壊して試してみると面白いと思います。

補足

検査って結局何?

この検査を見ていると、結局何を検査したことになっているのかわからないと思われた人がいるかもしれません。 仕様や検査のためのしかけが、対象であるシステムに期待される動作に依存しているので、検査になっていないのではないかと考える人もいるかもしれません。 でも、そもそもプログラムが期待したものかどうかを検査するってどういうことですか?

さかのぼって考えてみればわかると思うのですが、プログラムに対する要求は人間の頭の中から出てくるものです。 最初は雲のようにふわふわしていて、数学やプログラムのように厳密なものじゃありません。 でも、それをいろいろな方法で表現しているうちに、詳細が明確になっていくわけです。

システマティックにテストをしているところならば、テストケースは仕様から作るはずです。 つまり、同じものから出たものを違うルートを通して違う形式に変換して、比較しているわけです。 意地悪くいえば同じものを同じかどうか比較しているわけですよね。

それが無内容にならないのは、途中の変換で発生する誤りが検出できるからです。 違うルートで作ったもののうち、一方に間違いがあれば比較で発見できます。 テストってそういうことでしょう? 逆に両方のルートで偶然同じ間違いが発生して、比較がOKになる可能性だってあるわけです。

検査したい対象が複雑になり、厳密な1つの仕様として表現できなくなることはあります。 それでも、様々な角度から、様々な方法で欲しいものを表現してみて比較して見れば精度を上げることができるわけです。

対象の性質に依存した検査をしたって、仮に仕様の方が間違っていたとしたって、何か情報は得られるわけです。 正当性検査は相対比較なので、仕様が悪ければ検査をパスしてしまう可能性はいつでもありますから、多様な検査をした方がいいわけです。

ここでやった検査が目的のはっきりしないものであることは確かで、それはあまりいいことではありませんが、もしシステムが正しく動いているとすればこうなっているはずだという意味では検査になっているわけです。できれば、目的をはっきりいえるような検査の、排他的でできるだけ小さいセットが作れればいいのですけどね。今回の目的は、検査の目的をはっきりさせることよりも、そういういろいろな検査で使えるテクニックを紹介することが目的だったわけです :P 。

SPEC の改良

rd の後、いつまで待っても wr が発生しない場合や、最後の wr の後終了しない場合を検出するためにカウンタを入れるとよいかもしれません。

仕様の厳しさ

詰めた仕様を作るといった割には、これでも結構穴があります。 例えば、N = 3 のとき5通りの書き込みパターンがありますけど、これらは非決定的選択で指定されているので1つでもあれば OK になってしまいます。 かならずすべてのパターンが現れるという検査をするにはどうしたらいいのでしょう?

改名の分類

改名 rename は1対1、多対1、1対多などの区別ができます。 1対1は再利用で使いました。 多対1は抽象化でした。 1対多は使い道がなさそうでしたけど、こんな使い道があるんです。 多対多は・・・どうなんでしょ?

2013/09/07
© 2013,2014,2015 PRINCIPIA Limited