Think Stitch
PRINCIPIA  最近の更新


イベントオブジェクトによる一括引取り可能なリングバッファ

2つのイベントオブジェクトを使ってローカル手続き呼び出しが実現できるのですから、リングバッファを作ることもできるでしょう。 ただ残念ながらセマフォによる解と違って、イベントオブジェクトの他にミューテックスが必要になります(生産者1、消費者1でもという意味)。 でもその代わりに、セマフォ解ではできないことができます。 消費者側がリングバッファに溜まっているデータを一括で引き取ることです。 (セマフォによってはできる場合もあります。) このモデルを作ってみることにします。

プロセス構成

プロセスの構成は以下のとおりです。2つのイベントオブジェクトとミューテックスを使います。

今回はバッファ部分の抽象度を少し上げることにします。モデルを簡単にするためです。 実際にリングは作らず、溜まっているデータの個数だけをカウントすることにします。

プロセス定義

生産者プロセス P

生産者プロセス P は下図のとおりです。

  1. まずロックをかけてバッファの空きを確認します。
  2. 空きがなければアンロックし、引き取り通知 ack.wait を待ちます(上側のパス)。
  3. 空きがある場合にはデータを1個追加します。このモデルでは変数の値をインクリメントすることになります。ミューテックスをアンロックしたあと、データがあることを msg.signal で通知します(下側のパス)。

消費者プロセス Q

消費者プロセス Q は下図のとおりです。

  1. まずロックをかけてデータがあるかどうかを確認します。
  2. データがない場合は、ミューテックをアンロックしたあと、データ追加の通知 msg.wait を待ちます。(上側のパス)。
  3. データがある場合にはすべて一括で引き取ります。このモデルでは変数に 0 を書き込むことになります。ミューテックスがあるおかげでこのようなことができます。データを引き取ったらミューテックスをアンロックして、データを引き取ったことを ack.signal で通知します(下側のパス)。

後で仕様と比較できるように、今回もプロセス Q 用の読み書きチャネルを P と分けておきます。

MUTEX

再掲

(define-process (MUTEX m ms)
  (alt
    (? lock (r) (not (memq r ms))
       (if (eq? r m)
           STOP
           (if m
               (MUTEX m (append ms (list r)))
               (! r (MUTEX r ms)))))
    (? unlock (r) (not (memq r ms))
       (if (and m (eqv? r m))
           (! r
              (if (null? ms)
                  (MUTEX #f '())
                  (! (car ms)
                     (MUTEX (car ms) (cdr ms)))))
           STOP))
    (! terminate SKIP)))

共有メモリ上のバッファ MEM

再掲

(define-process (MEM m)
  (alt
    (! rd (m) (MEM m))
    (! q.rd (m) (MEM m))
    (? wr (x) (MEM x))
    (? q.wr (x) (MEM x))
    (! terminate SKIP)))

自動リセットイベントオブジェクト EVENTOBJ

再掲

(define-process (EVENTOBJ s ps)
  (alt
    (? wait (r) (not (memq r ps))
       (if s
           (! r (EVENTOBJ #f '()))
           (EVENTOBJ #f (cons r ps))))
    (? signal (r) (not (memq r ps))
       (! r
          (if (null? ps)
              (EVENTOBJ #t '())
              (! (car ps)
                 (EVENTOBJ #f (cdr ps))))))
    (! terminate SKIP)))

システムプロセス SYS

プロセス構成図のとおりにプロセスを並行合成します。

(define-process SYS
  (par (list lock unlock
             msg.wait msg.signal
             ack.wait ack.signal
             rd wr q.rd q.wr
             p q
             terminate)
    (par '() P Q)
    (par '()
      (MEM 0)
      (MUTEX #f '())
      (rename `((,wait . ,msg.wait)
                (,signal . ,msg.signal))
        (EVENTOBJ #f '()))
      (rename `((,wait . ,ack.wait)
                (,signal . ,ack.signal))
        (EVENTOBJ #f '())))))

検査

仕様

正しいメモリの読み書きシーケンスを表す仕様を作りましょう。 まずプロセス P の読み出し rd はすべて無視します。 wr だけを見れば動きがわかるからです。 加えてプロセス Q の読み出しの中で、値 0 を読んだ場合、つまりイベント q.rd.0 を無視します。 この場合は処理が前に進まないからです。

一般に、バッファの中に k 個のデータが置かれている場合を考えます。 k > 0 であれば、プロセス Q は読み出しを行うことができます。その結果、q.rd.k, q.wr.0 が順に発生します。 これはロックの中で排他的に行われるので、間に別のイベントが割り込むことはないはずです。

また k < M (M はバッファの大きさ)であれば、プロセス P はデータを置くことができます。 したがって wr.(k+1) というイベントが発生します。

0 < k < M の場合は q.rd.k と wr.(k+1) が非決定的に発生するので、非決定的選択 ndc で結合します。 仕様を表すプロセス SPEC は次のようになります。 alt と異なり、STOP と if を使ったガード表現は使えないことに注意してください(実は1度まちがえた)。

(define-process SPEC
  (SPEC-LOOP 0))

(define-process (SPEC-LOOP k)
  (if (= k 0)
      (! wr ((+ k 1))
         (SPEC-LOOP (+ k 1)))
      (if (= k M)
          (! q.rd (k)
             (! q.wr(0)
                (SPEC-LOOP 0)))
          (ndc
            (! q.rd (k)
               (! q.wr(0)
                  (SPEC-LOOP 0)))
            (! wr ((+ k 1))
               (SPEC-LOOP (+ k 1)))))))

仕様 SPEC の計算木は次のようになります。(クリックで拡大、別ウィンドウ)

抽象化したシステムプロセス HSYS

仕様と比較するために、読み書き以外のイベントを隠蔽したプロセス HSYS を定義しておきます。 先に説明したとおり、プロセス P の読み出し rd と、プロセス Q の読み出し q.rd.0 も隠蔽します。

(define-process HSYS
  (hide (list lock unlock
              msg.wait msg.signal
              ack.wait ack.signal
              rd ($ q.rd 0)
              p q
              terminate)
    SYS))

検査

検査を実行すると、問題ないことがわかります。

効率化 1

このモデルにはまだ改良の余地があります。2つだけやることにします。

生産者はデータを追加するたびに msg.signal を発行していますけど、これは最初の1個目だけでいいんじゃないかと思います。 ほんとですか? 確信、持てます? 答えを予想しておいてください :P 。

改良した生産者プロセス Pb

生産者プロセス P を以下のように改造しました。 見てのとおり、msg.signal を発行するのはデータがなかった場合だけにしました。 検査は後にします :P 。

効率化 2

このモデルを見た人があるときいいました。

「生産者も消費者もロックの中でデータのあるなしを確認しているのだから、イベントオブジェクトは1つでいいんじゃない?。起こされて空振りするかもしれないから、ちょっと効率は落ちるかもしれないけど、イベントオブジェクトが節約できるよ。」

さてどうでしょうか。これも答えを予想しておいてください。

変更点

プロセス P と Q をそれぞれ以下のように修正します。

システムプロセスを以下のように修正します。 イベントオブジェクトは1つだけにして、改名は取り除きます。

(define-process SYS2
  (par (list lock unlock
             wait signal
             rd wr q.rd q.wr
             p q
             terminate)
    (par '() Px Qx)
    (par '()
      (MEM 0)
      (MUTEX #f '())
      (EVENTOBJ #f '()))))

検査 2

検査結果は次のようになりました。

改良 1 は問題ないようですね。しかし改良 2 にはデッドロックがあります。 こうやって検査ですぐに確認できるのはとても便利です。 3ヶ月プログラムを書いてからデッドロックが発見されたら悲しいですから。 悲しいじゃ済まないか。

分析

デッドロックに至るパスは次のとおりです。

実はこれ、デッドロックに至る最短のパスなんです。 その意味はお分かりでしょう。 実際のコードだったら数倍に膨れ上がるでしょうから、考えて見つけるのは大変です。 これから詳細を見ますけど、実システムだったらそれほど頻繁には発生しないでしょうから、再現させるのもたいへんです。 (ちなみにモデルと同じようにエッセンスだけのコードを書くと容易に再現します。) このモデルの全状態数は 415 状態なので、多い方ではありませんが、それはツールを使っているからわかることであって、実システムで全状態を調べることは不可能でしょう。 ツールの威力がよくわかる例だと思います。

では詳細を見ていきます。 解説をパスの横に書けるといいんですけど、私の力量ではそんな難しいレイアウトはできないので、すみませんがウィンドウを2枚開くとかして見てください。

レアなケースであることは確かです。 実システムではオペレーティングシステムのスケジューリングが公平であればあるほど起こりにくいと思います。 しかしだからといって、無視してもよいということにはならないでしょう。

実に奥の深い、パスの長い問題でした。

2013/09/08
© 2013,2014,2015 PRINCIPIA Limited