Think Stitch
PRINCIPIA  最近の更新


リングバッファ

2つのプロセスがリングバッファを介して通信するモデルを考えます。 ここでの問題設定は、オペレーティングシステムが提供する共有メモリとセマフォを使ってリングバッファを実装するというものです。

プロセス構成

オペレーティングシステムやマルチスレッドプログラミングなどの教科書でよく見かけるリングバッファの典型的な実装方法の1つは、2つのセマフォを使うものです。 1つは、リングバッファとして使用する共有メモリ上の配列の中に、どれだけのデータ(メッセージ)が格納されているかを表します。 もう1つは逆に、配列にどれだけの空きスロットがあるかを表します。 したがってセマフォのカウンタの合計値はいつでも配列の大きさになります。 前者を MESSAGE SEMAPHORE、後者を SLOT SEMAPHORE と呼ぶことにします。

ここでは、後での正当性検査の都合上、データを生産してリングバッファに格納するプロセスと、リングバッファからデータを取り出して消費するプロセスを、それぞれ RECEIVER, SENDER とし、データの生成および消費は、外部との通信チャネル in, out を通じて行うことにします。 実際にリングバッファを使用する場面では、RECEIVER がデータの生産者自身、SENDER が消費者自身と考えることになります。 (そもそもチャネルが実装の構成要素として使えるのであれば、リングバッファなど作らずに済むわけですから。)

BUFFER は共有メモリ上に置かれた配列を表すプロセスです。 純粋な CSP ではプロセス間でメモリを共有しませんから、共有メモリもプロセスとしてモデル化します。 配列は RECEIVER, SENDER 双方から読み書きできます。 配列の読み書きもチャネル通信を使ってモデル化します。

このモデルではインターリーブによるサービスの共有を使います。 バッファおよび2つのセマフォは、インターリーブによって RECEIVER と SENDER から共有されます。

イベント・チャネル定義

共有メモリ上に置く配列の大きさを N とします。

(define N 5)

やり取りされるデータを集めたリストを DD とします。 ここでは2つのデータ A, B だけがあるとします。

(define DD '(A B))  ; data domain

モデル検査では、できるだけ小さなモデルからはじめることがお勧めです。 ただし、ユーザガイドでも書きましたが、あまり小さくしすぎると消えてしまう性質というのもあります。 例えばデータの種類を A 1つだけにしてしまうと、データが書き込まれたかどうか判別できなくなってしまいます。 どの程度の大きさが適切であるかは、モデルから一般論的に考えるか、実際にやってみるしかありません。 前者は恐ろしく難しくなることがあるので、ヒントをつかむためにも、まずモデル検査をしてみるとよいのです。

入出力チャネル

RECEIVER, SENDER がそれぞれ使用する入出力チャネル in, out を定義します。

(define DDD (map list DD))    ;  = ((A) (B))

(define-channel in (x) DDD)
(define-channel out (x) DDD)

セマフォのインターフェイス用イベント

セマフォのインターリーブによるモデルはインターリーブによるサービスの共有で紹介しました。 ここではセマフォが2つ必要なので、MESSAGE 用と SLOT 用それぞれにもイベントを用意して、改名を使って再利用することにします。

(define-event wait)
(define-event signal)

(define-event slot.wait)
(define-event slot.signal)

(define-event msg.wait)
(define-event msg.signal)

バッファのインターフェイス用チャネル

バッファの読み書きを行うインターフェイス用のチャネル rd, wr を定義します。 どちらも2つの引数を取ります。 1つは読み書きするスロットのインデックス、もう1つはデータです。 バッファの大きさが N ですから、インデックス k の範囲は 0 <= k < N となります。 データの範囲は DD で定義しました。 したがってチャネル rd, wr の定義域は、これらのすべての組み合わせになります。

BUFFER-DOMAIN = ((0 A) (0 B) (1 A) (1 B) (2 A) (2 B)
                 (3 A) (3 B) (4 A) (4 B))

この組み合わせは関数 combinations で求めることができます。 関数 combinations はリストのリストを引数にとり、各リストから1つずつ選んだ要素からなるリストすべてを求めます。 ここでは (combinations '((0 1 2 3 4) (A B))) を計算していることになります。 (こういう関数の定義を考えるのはとても楽しいです。)

(define DI (interval 0 N))
(define BUFFER-DOMAIN (combinations (list DI DD)))

(define-channel rd (k x) BUFFER-DOMAIN)
(define-channel wr (k x) BUFFER-DOMAIN)

プロセス定義

つづいて各プロセスを定義しましょう。 先頭のプロセス構成図を見ながら読んでいただくといいと思います。

プロセス RECEIVER

RECEIVER はまず入力チャネル in からデータを受信します。 (応用ではここでデータが生産されたと考えます。) 次に SLOT SEMAPHORE に slot.wait を出して空きスロットを確保します。 もしなければ空きができるまでブロックします。 スロットが確保できたらデータをバッファに書き込みます。 書き込むスロットのインデックスは RECEIVER 自身がローカルに状態変数 k として持ちます。 インデックスは書き込んだ後にインクリメントして、配列の末尾に達したら先頭に戻します(リングバッファ)。 そして MESSAGE SEMAPHORE に msg.signal を送り、データがあることを通知します。

プロセス SENDER

SENDER は MESSAGE SEMAPHORE に msg.wait を出して、データを待ちます。 もしなければデータが置かれるまでブロックします。 msg.wait が発生したら、バッファからデータを読み取ります。 読み取ったデータは出力チャネル out を通じて送信します。 (応用ではここでデータを消費すると考えます。) RECEIVER 同様、SENDER もバッファのインデックスをローカルに状態変数 k として持ちます。 出力が済んだらインデックスを更新します(この前後は問題ではありません)。 最後に SLOT SEMAPHORE に slot.signal を送り、データを引き取ったのでスロットが空いたことを通知します。

1点、バッファからのデータ読み取り rd の遷移にガード [j=k] がついています。 これについてはバッファプロセスのところで説明します。

セマフォ SEM

セマフォのモデルはインターリーブによるサービスの共有で紹介したものと同じです。 今回はプロセス式で書いてみました。

(define-process (SEM c)
  (alt
    (if (= c 0)
        STOP
        (! wait (SEM (- c 1))))
    (if (= c N)
        STOP
        (! signal (SEM (+ c 1))))))

バッファ BUFFER

バッファは単なる配列なので、共有メモリ上に置く(スレッドやメモリ保護のないリアルタイム OS ならそのまま)という以外に特別な点はありません。 これに対して、モデルではちょっと工夫が必要です。 モデルでは副作用によるデータの改変ができないので、配列としてベクタを使うことはできないからです(つまり vector-set! は使えない)。 そこで配列を長さ N のリスト m で表し、要素の書き換えでは新しいリストを作ることにします。

インデックス k の要素は (list-ref m k) で得ることができます。 インデックス k の要素を x に置き換える、より正確には置き換えたあとの配列を表すリストを新たに作る関数 list-update を次のように定義します。

(define (list-update xs k x)
  (append (list-head xs k)
          (cons x (list-tail xs (+ k 1)))))

ここで (list-head xs k) はリスト xs の先頭から k 個の要素をとってできるリストを返す関数です。 逆に (list-tail xs k) はリスト xs の先頭から k 個の要素を取り除いた残りのリストを返す関数です。 k 番目の要素を x に変えるには、元のリストの先頭 k 個と、x と、元のリストの先頭から k + 1 個の要素を取り除いた残りの、3つを連結すればいいわけです。

(list-head   '(0 1 2 3 4) 2)  =>  (0 1)
(list-tail   '(0 1 2 3 4) 3)  =>  (3 4)
(list-update '(0 1 2 3 4) 2 'X)  =>  (0 1 X 3 4)

これを使うと、プロセス BUFFER を次のように定義できます。

(define-process (BUFFER m)
  (alt
    (? rd (k x) (eq? x (list-ref m k))
       (BUFFER m))
    (? wr (k x)
       (BUFFER (list-update m k x)))))

書き込みは問題ないでしょう。 利用者から見て書き込みは送信ですから、バッファから見れば受信になります。 関数 list-update を使って、指定されたインデックス k の要素を指定されたデータ x に置き換えます。

これに対して読み出しは妙です。 読み出しが受信である上に、ガードが付いています。 これらについて説明します。 読み出しでは、インデックスは利用者からバッファへの指定で、逆にデータはバッファから利用者への引渡しです。 ふつうの相互作用であれば2回に分けて行われる(あるいは関数の呼び出しと戻り)作業です。 CSP の同期型相互作用では、これを1回の同期で実現することができます。 (ユーザガイドのリーダライタ問題で出てきたカウンタプロセスと同じです。)

初期状態での遷移遷移リストは次のようになっています。

wr については定義域のすべての要素に対応する遷移が出ています。 受信ですから当然こうなります。 これに対して rd は一部しか出ていません。 ガードがあるからです。 ガードは、各インデックスに対して、実際に配列に格納されている値についてだけ遷移が出るように制限しています。 このことは次の状態を見るとよりはっきりします。

インデックス 1 のスロットに B を書き込んだ後の遷移では、確かに rd.1.B が出ています。 利用者は、読み込みを行いたいインデックスに制限した受信を行えば、対応するデータを得ることができるわけです。 ちょっと凝ったことをしましたけれども、もちろんアドレス指定とデータ引き取りを2回に分けてもかまいません。

配列を表すリスト m の初期値は、長さが N で要素が DD から取られたものであればなんでもよいでしょう。 N や DD が変更されても初期値の定義を代えずに済むようにするには、例えば次のようにします(DI はバッファのところで定義しました、(interval 0 N) です)。 長さが N で すべての要素が DD の先頭の要素であるようなリストを作っているわけです。

(define m0 (map (lambda (k) (car DD)) DI))

システムプロセス

以上のプロセスを合成して、システムを表すプロセス SYSTEM を作ります。 インターリーブによる並行合成の処方に従い、まずクライアントプロセス RECEIVER と SENDER を並行合成します。 サービス群の間には直接の相互作用はないので、これらはまとめて並行合成できます。 最後にこの2つ群を同期リストを指定して並行合成します。同期リストはそのまま隠蔽リストになるので hpar を使うことができます。

(define-process SYSTEM
  (hpar (list rd wr
              slot.wait slot.signal
              msg.wait msg.signal)
    (par '() (R 0) (S 0))
    (par '()
      (BUFFER (map (lambda (k) (car DD)) DI))
      (rename `((,wait   . ,slot.wait)
                (,signal . ,slot.signal))
        (SEM N))
      (rename `((,wait   . ,msg.wait)
                (,signal . ,msg.signal))
        (SEM 0)))))

仕様

順序が逆になってしまいましたが(わざとですけど)リングバッファの仕様を考えます。 リングバッファは長さが有限のキューになるはずです。 そこで仕様プロセスを次のように定義します。

(define-process SPEC
  (QUE N '()))

(define-process (QUE n q)
  (alt
    (if (= n (length q))
        STOP
        (? in (x) (QUE n (append q (list x)))))
    (if (null? q)
        STOP
        (! out ((car q)) (QUE n (cdr q))))))

状態遷移図によるキューの定義をプロセス式にしたものです。 汎用化のためのパラメータは落としましたが、代わりに長さ指定のパラメータを加えてあります。 ちなみに、実装で CSP が使えるのであれば、わざわざリングバッファなど作らずとも、これをそのまま使えばよいわけです(身もふたもない話です。キューだから身はあるか)。

正当性検査

では検査をします。 デッドロックと発散はありません。 しかし正当性検査で引っかかってしまいました。 (作っているときに、ほんとに引っかかったので、ちょっと焦りました。)

トレース違反で、イベントは in.B です。

仕様の計算木は次のとおりです。 B を5回入力してますからバッファはいっぱいで、次は out.B の遷移だけがあります。 これは当然の要求でしょう。

一方システムの遷移リストを見ると入力 in の遷移があります。 (out.B は tau 遷移の後にあります。)

プロセス木を見ると、MESSAGE SEMAPHORE のカウントは 0、SLOT SEMAPHORE のカウントは 5 ですから、確かにバッファはいっぱいです。 RECEIVER, SENDER はそれぞれ初期状態 (R 0), (S 0) にいます。 原因がお分かりでしょうか?

答えは、RECEIVER プロセスが入力を保持できるので、バッファ効果があるからです。 したがって、外からみると大きさ N + 1 のリングバッファに見えることになります。 実際、仕様を次のように書き換えると、検査をパスします。

(define-process SPEC
  (QUE (+ N 1) '()))

あるいは、スロットを確保してから入力を行うようにすると、バッファ効果を消すことができます。 これであれば元の仕様を満たします。

こうしてリングバッファのモデルがキューとしての仕様を満たしていることが確認できました。

2013/08/29
© 2013,2014,2015 PRINCIPIA Limited