Think Stitch
  Updates


Another Neat Thing to Implement a Concurrent Programming Language in Scheme

This article will explain a way to implement a concurrent programming language based on CSP in Scheme.

ここまでのおはなし

過去に興味がない前向きな方は次の節に進んでください.経緯を知らなくても本文は読めます.

2013年の Lisp Advent Calendar で「マクロとクロージャで作る並行プログラミング言語」という文書を書きました.並行プログラミング言語を定義して,マクロを使って Scheme に変換し実行できるようにしました.実行のために小さなオペレーティングシステムに相当するカーネルも作りました.並行プログラミング言語が簡単にできたのは,マクロのおかげに加えてクロージャと継続渡しスタイル (Continuation-Passing-Style, CPS) を使ったからです.カーネルのシステムコールは継続としてクロージャを渡すようになっていました.

これらはとても便利である一方,少し課題もあります:

前者については継続に相当する手続きをグローバルに定義しておいて,毎回それを渡すようにすれば解決できます.環境はあらわに渡せばよいでしょう.いわゆる lambda-lifting と同じことになります.

後者についても単純な解決方法があります.継続を渡す代わりに返すというものです.システムコールのエントリポイントを呼び出すのではなく,戻り値でシステムコールを指定します.プロセスに実行を割り当てたカーネルがその戻り値を見てシステムコールへ振り分けるようにします.

以上の2つの案を使って並行プログラミング言語を再実装するというのがこの文書の趣旨です.加えてもう1つ,前作ではチャネルの送受信を理論通りイベントに展開して実装しました.そのためおもちゃの域を出ることができず,実用的なプログラムを書くことはできませんでした.今回はチャネルの送受信で任意の値を渡せるという実用的なレベルで実装したいと思います.

並行プログラミング言語の構文

並行システムの理論 CSP (Communicating Sequential Processes) に基づいて,並行プログラミング言語の構文を次のように決めます:

\[\begin{split}P ::= & \quad & \mathtt{SKIP} \\| & & \mathtt{(! }\ \ e\ \ P\mathtt{)} \\| & & \mathtt{(? }\ \ ch\ \ \mathtt{(}x \ldots\mathtt{)}\ \ [guard]\ \ P\mathtt{)} \\| & & \mathtt{(alt }\ \ Q\ \ldots\mathtt{)} \\| & & \mathtt{(par }\ \ X\ \ P\ \ P\ \ P\ \ldots\mathtt{)} \\| & & \mathtt{(if }\ \ b\ \ P\ \ P\mathtt{)} \\| & & \mathtt{(let\ \ ((}x\ \ a\mathtt{)}\ \ldots\mathtt{)}\ \ P\mathtt{)} \\| & & N \\| & & \mathtt{(}N\ \ a\ \ldots\mathtt{)} \\& & \\Q ::= & & \mathtt{STOP} \\| & & \mathtt{(! }\ \ e\ \ P\mathtt{)} \\| & & \mathtt{(? }\ \ ch\ \ \mathtt{(}x\ \ldots\mathtt{)}\ \ [guard]\ \ P\mathtt{)} \\| & & \mathtt{(if }\ \ b\ \ Q\ \ Q\mathtt{)}\end{split}\]

チャネル送信はイベント同期の構文を使います.チャネルは引数を与えると対応するイベントを返す関数として実装することにします.したがって,チャネル \(ch\) を通じて \(a, b\) という2つの値を送信したい場合は \(\mathtt{(!\ \ (}ch\ \ a\ \ b\mathtt{)}\ \ P\mathtt{)}\) と書きます.

選択の中に書けるプロセスは \(\mathtt{STOP}\), イベント同期,チャネル受信と \(\mathtt{if}\) に限定されています.選択を行うためには提示されるイベントを知る必要があるためです.

イベント定義

イベントの定義は define-event で行います.イベントの名前を指定します.

(define-event  name)

チャネル定義

チャネルの定義は define-channel で行います.チャネルの名前と仮パラメータリストを指定します.

(define-channel  name  (x  ...))

プロセス定義

プロセスの定義は define-process で行います.

(define-process (name  x  ...)  P)

パラメータを持たないプロセスの場合は,次の省略形が使えます.

(define-process  name  P)

記述例

キューの記述例です.

(define-channel in (x))
(define-channel out (x))

(define-process (QUEUE s)
  (alt
    (? in (x) (QUEUE (append s (list x))))
    (if (null? s)
        STOP
        (! (out (car s)) (QUEUE (cdr s))))))

Scheme への変換

プロセスを実行可能にするために,Scheme の手続きに変換します.まず例を使って変換の方法を説明してから,実際の変換コードを見ていくことにします.

コマンドベクタ cmd

内部的な計算や判断を除くと,プロセスができることは次の3つです:

そこでプロセスをこれらの情報を返す Scheme の手続きに変換します.返す値は上記の情報を含むベクタとします.これを cmd と呼ぶことにします.

cmd の0番目のスロットには,上記3つのいずれであるかを示すシンボル SKIP, SYNC, PAR を入れることにします.1番目以降のスロットの値は0番目の値によります.

例えば次のように選択を行うプロセスを考えます:

(define-process P
  (alt
    (! e P)
    (? ch (x) Q)))

これを次のように変換します:

(define (P cmd ...)
  (vector-set! cmd 0 'SYNC)
  (vector-set! cmd 1 2)      ; num of branches
  ;; (! e P)
  (vector-set! cmd 2 e)
  (vector-set! cmd 3 #f)     ; omit
  (vector-set! cmd 4 P)
  ;; (? ch (x) Q)
  (vector-set! cmd 5 ch)
  (vector-set! cmd 6 guard-true)
  (vector-set! cmd 7 Q)
  ;; return cmd
  cmd)

ベクタ cmd はカーネルからもらえるものとしました.受信ではガードが省略されているので,いつでも #t を返す手続き guard-true を用意しておいて使います.

尚,イベント同期の2番目の値 #f は使いません.単に3個にそろえて同期判定で楽をしたかっただけです.ですので実際には vector-set! を省略することにします.

継続手続き

通信をしたあとのプロセスはいつでも名前のついたプロセスとは限りません.たとえば次の例を考えます:

(define-process P
  (! a (! b Q)))

この場合,イベント a で同期した後に移行するプロセスは (! b Q) です.したがってこれも Scheme の手続きに変換する必要があります.そこで一意な名前を gensym で作って,手続きの定義を生成します.

(define (#:Gnnn cmd ...)
  (vector-set! cmd 0 'SYNC)
  (vector-set! cmd 1 1)
  (vector-set! cmd 2 b)
  (vector-set! cmd 4 Q)
  cmd)

プロセス P の変換では継続としてこの手続きを指定します:

(define (P cmd ...)
  (vector-set! cmd 0 'SYNC)
  (vector-set! cmd 1 1)
  (vector-set! cmd 2 a)
  (vector-set! cmd 4 #:Gnnn)
  cmd)

変数の扱いと環境ベクタ env

次に考えなければならないのは変数の扱いです.たとえば次のようなプロセスを考えます:

(define-process (P x)
  (! (out x) Q))

これを変換した Scheme の手続きでもパラメータ x を参照できる必要があります.変換した手続きにパラメータを持たせればいいのですが,パラメータの数はプロセスごとに異なるので,変換した手続きもそれぞれ異なる数のパラメータを持つことになります.Scheme では問題になりませんが,他の言語で実装する場合のことを考えて,より低レベルな表現に変換することにします.

どうするかというと,パラメータはすべて1つのベクタに入れて渡すことにします.このベクタを env と呼ぶことにします.env のスロット0には変数の数が入っています.スロット1以降に変数の値が入っています.変数の参照は対応するインデックスでのベクタ参照 (vector-ref env k) に置き換えます.

上の例を変換すると次のようになります:

(define (P env cmd)
  (vector-set! cmd 0 'SYNC)
  (vector-set! cmd 1 1)
  (vector-set! cmd 2 (out (vector-ref env 1)))
  (vector-set! cmd 4 Q)
  cmd)

このようにプロセスに対応するすべての Scheme 手続きは2つのパラメータ env と cmd を持つ形に統一できます.

チャネル受信のパラメータ

変数にはプロセスパラメータの他にあと2つ,チャネル受信のパラメータと let で導入される局所変数があります.まず受信パラメータについてです.次のプロセスを考えます:

(define-process (P x)
  (? in (y) (! (out (+ x y)) Q)))

in から受信したあとのプロセス (! (out (+ x y)) Q) では,プロセスパラメータ x に加えて受信パラメータ y を参照できます.このプロセスに対応する Scheme 手続きでは env が受信した y の値で拡大されているものとします.カーネルが呼び出す前に拡大してくれるということです.

(define (#:Gnnn env cmd)
  (vector-set! cmd 0 'SYNC)
  (vector-set! cmd 1 1)
  (vector-set! cmd 2
    (out (+ (vector-ref env 1)        ; x
            (vector-ref env 2))))     ; y
  (vector-set! cmd 4 Q)
  cmd)

let 式

let 式による局所変数の導入はプロセスの内部で行われるので,カーネルは助けてくれません.そこで自分で env を拡大します.

(define-process (P x)
  (let ((y (+ x 1)))
    ...))

局所変数に束縛する値を計算して env に追加します.env に入っている値の数はスロット0にありますから,そのあとに追加します.話をかんたんにするために,env は十分な大きさを持っていると仮定します.スロット0の数も更新しておきます.

(define (P env cmd)
  (let ((tmp (+ (vector-ref env 1) 1)))   ; (+ x 1)
    (let ((n (vector-ref env 0)))         ; num of vars in env
      (vector-set! env 0 (+ n 1))         ; update num of vars
      (vector-set! env (+ n 1) tmp)       ; extend env
      ...)))

プロセス呼び出し

プロセス呼び出しをしている箇所でも env の変換が必要になります.たとえば次のプロセスを考えます:

(define-process (P x)
  (? ch (y) (Q (+ x y) (- x y))))

プロセス Q に対応する Scheme 手続きは2つの値が格納された env を期待しているので,それを作って渡す必要があります.これを行うのはチャネル ch からの受信の継続です.したがってプロセス呼び出しの場合には間接的な呼び出しが1つ入ることになります.

中間に入る継続手続きでは実引数の値を計算して一時変数においてから env を破壊的に更新し,プロセスに対応する手続きを呼び出します.

(define (#:Gnnn env cmd)
  (let ((tmp0 (+ (vector-ref env 1) (vector-ref env 2)))
        (tmp1 (- (vector-ref env 1) (vector-ref env 2))))
    (vector-set! env 0 2)
    (vector-set! env 1 tmp0)
    (vector-set! env 2 tmp1)
    (Q env cmd)))

変換手続き translate-process

以上で変換の考え方をだいたい説明したので,実際の変換コードを見ていきたいと思います.

変換手続きを translate-process とします.translate-process はパラメータを5つとります:

translate-process は累積した定義 acc と変換したコードの2つの値を返します.

まず構文にしたがって場合分けします:

(define (translate-process acc bs env cmd p)
  (match p
    ('SKIP             (translate-process-skip acc cmd))
    (('! e q)          (translate-process-sync acc bs env cmd (list p)))
    (('? ch ps z . r)  (translate-process-sync acc bs env cmd (list p)))
    (('alt . ps)       (translate-process-sync acc bs env cmd ps))
    (('par x . ps)     (translate-process-par acc bs env cmd x ps))
    (('if test p q)    (translate-process-if acc bs env cmd test p q))
    (('let bindings p) (translate-process-let acc bs env cmd bindings p))
    ((q . args)
     (if (symbol? q)
         (translate-process-call acc bs env cmd q args)
         (error "invalid process" p)))
    (q
     (if (symbol? q)
         (translate-process-call acc bs env cmd q '())
         (error "invalid process" p)))))

終了 SKIP

SKIP はコマンドベクタ cmd のスロット0にシンボル SKIP を入れるだけです.

(define (translate-process-skip acc cmd)
  (values acc
          `(begin
             (vector-set! ,cmd 0 'SKIP)
             ,cmd)))

選択と通信

選択 alt の場合は,選択の対象となるイベント同期やチャネル受信を集めてコマンドベクタを作ります.if がありうるので選択肢の数は可変です.すべて集めたら式全体の外枠を作ります:

(define (translate-process-sync acc bs env cmd ps)
  (let ((s (make-env-subst bs)))
    (with-gensyms (k)
      (let loop ((xs ps) (rs '()) (acc acc))
        (if (null? xs)
            (values acc
                    `(let ((,k 2))
                       ,@(reverse rs)
                       (vector-set! ,cmd 0 'SYNC)
                       (vector-set! ,cmd 1 (div (- ,k 2) 3))
                       ,cmd))
            (let-values
                (((acc expr)
                  (translate-process-sync-term acc bs env cmd s k (car xs))))
              (loop (cdr xs) (cons expr rs) acc)))))))

各選択肢で場合分けをします.オプションの受信ガードはここで処理してしまいます:

(define (translate-process-sync-term acc bs env cmd s k p)
  (match p
    ('STOP
     (values acc #f))
    (('! e q)
     (translate-process-sync-prefix acc bs env cmd s k e q))
    (('? ch ys z . r)                   ; (? ch ys [g] q), guard is option
     (let-values (((g q)
                   (if (null? r)
                       (values #t z)
                       (values z (car r)))))
       (translate-process-sync-receive acc bs env cmd s k ch ys g q)))
    (('if test p q)
     (translate-process-sync-if acc bs env cmd s k test p q))
    (_ (error "invalid form in alt" p))))

イベント同期の3つ組(2番目は省略)を作ります.イベント部分は変数を含む式ですから,変数参照をベクタ env への参照に置き換えます.

(define (translate-process-sync-prefix acc bs env cmd s k e p)
  (let-values (((acc cont) (generate-cont-def acc bs env cmd p)))
    (values acc
            `(begin
               (vector-set! ,cmd ,k ,(vsubst env s e))
               (vector-set! ,cmd (+ ,k 2) ,cont)
               (set! ,k (+ ,k 3))))))

この置き換えを行うのが手続き vsubst です.v は環境ベクタ env の名前,s は変数名とベクタインデックスの連想リストです.

式は一般にマクロを含んでいる可能性がありますから macroexpand で展開しています.展開結果は手続き呼び出しまたは Gauche の組み込み構文になります.束縛構文がある場合はスコープを切らなければいけないので対応が必要ですが,勇気を出して省略します.Hygienic マクロを使っている場合は identifier についても対応する必要があります.

(define (vsubst v s expr)
  (let ((e (macroexpand expr)))
    (match e
      (('quote x) e)
      (('set! var x)
       (let ((p (assq var s))
             (y (vsubst v s x)))
         (if p
             `(vector-set! ,v ,(cdr p) ,y)
             `(set! ,var ,y))))
      ((or ('if _ _)
           ('if _ _ _)
           ('begin _ . _))
       `(,(car e) ,@(map (lambda (x) (vsubst v s x)) (cdr e))))
      (('lambda ps x . xs)
       (let ((s-u (shadow-vars s ps)))
         `(lambda ,ps
            ,@(map (lambda (x) (vsubst v s-u x)) (cons x xs)))))

      ...

      ((f . args)
       (map (lambda (x) (vsubst v s x)) e))
      (_
       (if (symbol? e)
           (let ((p (assq e s)))
             (if p
                 `(vector-ref ,v ,(cdr p))
                 e))
           e)))))

継続プロセスに対応する手続きはグローバルに定義する必要があります.そこで再帰的に呼び出した translate-process の結果を define でくるんで acc に追加します.

(define (generate-cont-def acc bs env cmd p)
  (let-values
      (((acc expr)
        (translate-process acc bs env cmd p)))
    (with-gensyms (name)
      (values (cons `(define (,name ,env ,cmd) ,expr) acc)
              name))))

チャネル受信の場合は,ガードおよび継続プロセスで受信パラメータによる環境拡大が必要です.

(define (translate-process-sync-receive acc bs env cmd s k ch xs g p)
  (let ((bs-x (append bs xs)))
    (let ((s-x (make-env-subst bs-x)))
      (let-values (((acc gf) (make-guard-fun acc g env s-x)))
        (let-values (((acc cont) (generate-cont-def acc bs-x env cmd p)))
          (values acc
                  `(begin
                     (vector-set! ,cmd ,k ,(vsubst env s ch))
                     (vector-set! ,cmd (+ ,k 1) ,gf)
                     (vector-set! ,cmd (+ ,k 2) ,cont)
                     (set! ,k (+ ,k 3)))))))))

ガードは独立した手続きとして定義します.受信のたびにクロージャを生成することを避けるためです.

(define (make-guard-fun acc guard env s)
  (if (eq? guard #t)
      (values acc 'guard-true)
      (with-gensyms (g)
        (values (cons `(define (,g ,env) ,(vsubst env s guard))
                      acc)
                g))))

選択肢の中に if があった場合は,再帰的に式を構成して Scheme の if で選ぶようにします:

(define (translate-process-sync-if acc bs env cmd s k test p q)
  (let*-values
      (((acc p-expr) (translate-process-sync-term acc bs env cmd s k p))
       ((acc q-expr) (translate-process-sync-term acc bs env cmd s k q)))
    (values acc
            `(if ,(vsubst env s test)
                 ,p-expr
                 ,q-expr))))

プロセス生成 par

プロセス生成の展開例は示しませんでしたが,選択の場合と同様にできます.まず生成する各プロセスに対応する手続きの定義を作ります.それらをコマンドベクタに登録して返すコードを生成します:

(define (translate-process-par acc bs env cmd x ps)
  (let ((s (make-env-subst bs)))
    (let loop ((xs ps) (k 3)
               (rs `((vector-set! ,cmd 2 ,(length ps))
                     (vector-set! ,cmd 1 ,(vsubst env s x))
                     (vector-set! ,cmd 0 'PAR)))
               (acc acc))
      (if (null? xs)
          (values acc `(begin ,@(reverse rs) ,cmd))
          (let-values (((acc cont)
                        (generate-cont-def acc bs env cmd (car xs))))
            (loop (cdr xs) (+ k 1)
                  (cons* `(vector-set! ,cmd ,k ,cont)
                         rs)
                  acc))))))

分岐 if

alt の外に現れる if の扱いも中の場合と同様です:

(define (translate-process-if acc bs env cmd test p q)
  (let*-values
      (((acc p-expr) (translate-process acc bs env cmd p))
       ((acc q-expr) (translate-process acc bs env cmd q)))
    (let ((s (make-env-subst bs)))
      (values acc
              `(if ,(vsubst env s test)
                   ,p-expr
                   ,q-expr)))))

局所変数の導入 let

局所変数に束縛する値を Scheme の一時変数においてから環境ベクタ env を拡大します:

(define (translate-process-let acc bs env cmd bindings p)
  (let ((ts (map (lambda (b) (gensym)) bindings))
        (s (make-env-subst bs)))
    (let ((tmps (map (lambda (t b) (list t (vsubst env s (cadr b)))) ts bindings)))
      (let-values
          (((acc expr)
            (translate-process acc (append bs (map car bindings)) env cmd p)))
        (with-gensyms (num)
          (values acc
                  `(let ,tmps
                     (let ((,num (vector-ref ,env 0)))
                       (vector-set! ,env 0 (+ ,num ,(length bindings)))
                       ,@(map/index (lambda (k t) `(vector-set! ,env (+ ,num ,(+ k 1)) ,t)) ts)
                       ,expr))))))))

プロセス呼び出し

呼び出すプロセスの実引数を計算して Scheme の一時変数においてから,環境ベクタ env を更新します:

(define (translate-process-call acc bs env cmd q args)
  (let ((ts (map (lambda (a) (gensym)) args))
        (s (make-env-subst bs)))
    (let ((tmps (map (lambda (t a) (list t (vsubst env s a))) ts args)))
      (values acc
              `(let ,tmps
                 (vector-set! ,env 0 ,(length args))
                 ,@(map/index (lambda (k t) `(vector-set! ,env ,(+ k 1) ,t)) ts)
                 (,q ,env ,cmd))))))

プロセス変換マクロ

最後にプロセス定義全体を変換するマクロを用意します.プロセスを個別に変換するのではなく,一度に全体をまとめて変換するようにしました.

(define-macro (csp-compile . defs)
  (let loop ((xs defs) (rs '()))
    (if (null? xs)
        `(begin ,@(reverse rs))
        (match (car xs)
          (('define-event name)
           (loop (cdr xs)
                 (cons 
                  `(define ,name (make-event ',name))
                  rs)))
          (('define-channel name ps)
           (loop (cdr xs)
                 (cons 
                  `(define ,name (make-channel ',name ',ps))
                  rs)))
          (('define-process pdecl e . es)
           (let-values
               (((name ps)
                 (if (symbol? pdecl)
                     (values pdecl '())
                     (values (car pdecl) (cdr pdecl))))
                ((imps pexpr) (front-n-last (cons e es))))
             (with-gensyms (env cmd)
               (let-values
                   (((rs q) (translate-process rs ps env cmd pexpr)))
                 (let ((s (make-env-subst ps)))
                   (loop (cdr xs)
                         (cons
                          `(define (,name ,env ,cmd)
                             ,@(map (lambda (expr) (vsubst env s expr)) imps)
                             ,q)
                          rs)))))))
          (_ (error "invalid form" (car xs)))))))

イベントとチャネル

イベントとチャネルの実装にはレコードを使います:

(define-record-type event #t #t name)
(define-record-type channel %make-channel #t name ps)

チャネルを引数に適用した結果はチャネルイベントというイベントにします:

(define-record-type channel-event #t #t ch args)

すでに説明したとおり,ユーザから見たチャネルは,引数を受け取り対応するチャネルイベントを返す関数にします.チャネルを表す関数からチャネルの情報にアクセスするためにちょっとトリックを使います.関数に特別な値 channel-info-command を渡すと,レコードとしてのチャネルが返ってくるようにしておきます.表引きを使ってもいいのですが,ちょっとわけがあってこうしています,ごにょごにょ.

(define channel-info-command (cons #f #f))

(define (make-channel name ps)
  (let ((ch (%make-channel name ps)))
    (lambda x
      (if (and (pair? x)
               (eq? (car x) channel-info-command))
          ch
          (let ((n (length x)))
            (if (= n (length ps))
                (make-channel-event ch x)
                (error "wrong num of args" ch x)))))))

以上で変換はおわりです.

カーネル

つづいてカーネルの実装に移ります.

プロセスコントロールブロック PCB

プロセスの状態をレコードで表します.いわゆるプロセスコントロールブロック (PCB) です.

(define-record-type process #t #t
  (state)        ; running ready wait-for-sync wait-for-children omega
  parent
  (children)     ; leaf: #f, node: list of child processes
  (sync)
  (cont)
  (env)
  (cmd))

カーネルの状態変数

カーネルの状態は4つの変数で表します.

(define root-process #f)
(define current-process #f)
(define ready-queue '())
(define trace '())

ready-queue の操作

ready-queue からプロセスを取り出す手続きと格納する手続きを用意しておきます.これを変えるとスケジューリングポリシーを一部調整できます.

(define (ready-queue-get!)
  (pop! ready-queue))
(define (ready-queue-put! p)
  (push! ready-queue p))

スケジューラ

スケジューラは ready-queue から実行可能プロセスを1つ取り出して,その継続手続きを呼び出します.継続手続きの戻り値はコマンドベクタ cmd です.このスロット0を見て,対応するシステムコールに振り分けます.

実行可能なプロセスがなければデッドロックです.

(define (sched)
  (if (null? ready-queue)
      (error "deadlock")
      (let ((p (ready-queue-get!)))
        (process-state-set! p 'running)
        (set! current-process p)
        (let ((cmd ((process-cont p) (process-env p) (process-cmd p))))
          (case (vector-ref cmd 0)
            ((SKIP) (csp-skip p))
            ((PAR)  (csp-par p cmd))
            ((SYNC) (csp-sync p cmd))
            (else
             (error "invalid command" p cmd)))))))

カーネルの起動 start-process

初期プロセスの継続手続きを指定して,カーネルを起動します.

(define (start-process cont)
  (let ((env (make-vector vsize-env)))
    (vector-set! env 0 0)
    (set! root-process
          (make-process
           'ready                       ; state
           #f                           ; parent (root: #f)
           #f                           ; children (leaf: #f)
           (make-hash-table)            ; sync
           cont                         ; cont
           env                          ; env
           (make-vector vsize-cmd)))    ; cmd
    (set! ready-queue (list root-process))
    (set! trace '())
    (sched)))

プロセスの終了 csp-skip

システムコール csp-skip は呼び出したプロセスを終了させます.加えてもう少し仕事があります.終了するカレントプロセスが,親プロセスの子供たち,つまり兄弟姉妹の中で最後のプロセスだった場合は,親プロセスも終了することになるからです.これは必要なだけプロセスツリーの根に向かって伝播します.もし根まで達した場合は root-process が終了したということですから,カーネル自体を終了させます.

(define (csp-skip p)
  (let loop ((p p))
    (if p
        (let ((children (process-children p)))
          (if (or (not children) (every omega? children))
              (begin
                (process-state-set! p 'omega)
                (loop (process-parent p)))
              (sched)))
        'done)))

プロセスが終了しているかどうかは state が omega であるかどうかで判定します.

(define (omega? p)
  (eq? (process-state p) 'omega))

プロセスの生成 csp-par

指定された継続手続きをもとに,新しいプロセスの PCB を作って ready-queue に登録します.その後自分自身は待ち状態 wait-for-children になります.

#|
   cmd
    0   PAR
    1   sync-list
    2   num-processes
    3.. cont
|#
(define (csp-par p cmd)
  (let loop ((n (vector-ref cmd 2)) (k 3) (ps '()))
    (if (> n 0)
        (let ((q (make-process
                  'ready                               ; state
                  p                                    ; parent
                  #f                                   ; children (leaf: #f)
                  (make-hash-table)                    ; sync
                  (vector-ref cmd k)                   ; cont
                  (vector-copy (process-env p))        ; env
                  (make-vector (vector-length cmd))))) ; cmd
          (ready-queue-put! q)
          (loop (- n 1) (+ k 1) (cons q ps)))
        (begin
          (process-state-set! p 'wait-for-children)
          (process-children-set! p ps)
          (set-sync-list p (vector-ref cmd 1))
          (sched)))))

同期するイベント・チャネルのリストはハッシュテーブルに変換して sync に入れておきます.同期判定を高速に行うためです.

(define (set-sync-list p sync-list)
  (let ((syncht (process-sync p)))
    (for-each
      (lambda (so)
        (if (event? so)
            (hash-table-put! syncht so #t)
            (hash-table-put! syncht (so channel-info-command) #t)))
      sync-list)))

選択と通信 csp-sync

指定された3つ組を順に調べて,通信が成立するものがあれば相手を起こます.成立するものがなかった場合はカレントプロセスを待ち状態にします.いずれの場合も再スケジュールします.

#|
   cmd
    0        SYNC
    1        num-syncs
    2 + 3k   event #f    cont       event
             chev  #f    cont       send
             ch    guard cont       receive
|#
(define (csp-sync p cmd)
  (let loop ((n (vector-ref cmd 1)) (k 2))
    (if (> n 0)
        (let ((syncobj (vector-ref cmd k))
              (guard   (vector-ref cmd (+ k 1)))
              (cont    (vector-ref cmd (+ k 2))))
          (if (release-if-signal p syncobj guard cont)
              (sched)
              (loop (- n 1) (+ k 3))))
        (begin
          (register-sync p cmd)
          (process-state-set! p 'wait-for-sync)
          (sched)))))

指定された3つ組について,イベント同期,チャネル送信,チャネル受信で場合分けします.

(define (release-if-signal p syncobj guard cont)
  (cond ((event? syncobj)
         (release-if-signal-event p syncobj cont))
        ((procedure? syncobj)           ; channel
         (release-if-signal-receive p (syncobj channel-info-command) guard cont))
        ((channel-event? syncobj)
         (let ((ch (channel-event-ch syncobj))
               (args (channel-event-args syncobj)))
           (release-if-signal-send p ch args cont)))
        (else
         (error "invalid syncobj" syncobj))))

イベント同期の場合です.手続き collect-signal-processes-event によって同期するプロセスのリストを取得します.もし同期するプロセスがある場合はすべて ready 状態にします.

(define (release-if-signal-event p event cont)
  (let ((ps (collect-signal-processes-event p event)))
    (if ps
        (begin
          (push! trace (event-name event))
          (release-processes-event p event cont ps)
          #t)
        #f)))

チャネル送受信の場合も同じ構造ですが,同期するプロセスの情報が異なります.送信の場合は PCB と sync-term のペアのリストが返ってきます.sync-term とは (SEND  args . cont) または (RECEIVE  guard . cont) の形で,相手が送信か受信かわかるようになっています.

(define (release-if-signal-send p ch args cont)
  (let ((q (collect-signal-processes-send p ch args cont)))
    ;; q = ((process . sync-term) ...) or #f
    (if q
        (begin
          (push! trace (cons (channel-name ch) args))
          (release-processes-send p ch args q)
          #t)
        #f)))

受信の場合は同期が成立した引数リストと,それから送受信別に PCB と継続手続きのペアが返ってきます.

(define (release-if-signal-receive p ch guard cont)
  (let ((q (collect-signal-processes-receive p ch guard cont)))
    ;; q = (args (sender ...) . (receiver ...)) or #f
    ;; sender = receiver = (process . cont)
    (if q
        (begin
          (push! trace (cons (channel-name ch) (car q)))
          (release-processes-receive p (car q) (cadr q) (cddr q))
          #t)
        #f)))

同期するプロセスの集積: 概要

つづいて同期判定と同期に参加するプロセスの集積について説明します.

まずプロセス全体は木を構成することに注意します.初めに start-process で生成されたプロセスはプロセス木の根になります.下図の A です.これは変数 root-process に格納されています.

プロセスが par を実行すると,子プロセスが生成されて自分自身は休眠状態に入ります.これを繰り返すと木が形成されるわけです.したがって実行可能なプロセスはすべて葉の位置にあるプロセスです.節の位置にあるプロセスには同期するイベントおよびチャネルの集合が付随しています.これは PCB の sync フィールドにハッシュテーブルとして格納することはすでに述べました.

A B R P Q sync sync

実行のある断面において,カレントプロセスを除く葉のプロセスは実行可能 (ready) であるかあるいは同期待ち (wait-for-sync) をしています.同期待ちをしているプロセスがカレントプロセスの参加なしに同期することはありません.もしそうならば,同期可能な状態にした最後のプロセスが来た時に同期していたはずだからです.

このような状況でカレントプロセスが同期を求めてやってきます.同期判定をするにはプロセス木を探索する必要があります.例えば上の図において P がカレントプロセスであるとして,イベント e で同期するかどうかを考えます.もし B の sync に e が登録されている場合は Q も e を提示していなければ同期できません.つまり sync に登録されているイベント・チャネルについては AND 条件になります.登録されていない場合は OR 条件なので,Q の関与なしに同期できます.

この判定は根に向かって繰り返す必要があり,最終的に根プロセス A に達した時点で同期するプロセスの集合が得られます.

同期するプロセスの集積: イベント同期

イベント同期の場合に同期するプロセスリストを求めます.葉のプロセスにおける sync の値は,そのイベントで同期した場合に移行する継続手続きです.1つのイベントに対して複数の継続先が指定されていた場合は,登録のときに非決定的に1つ選んでしまうことにします.

同期するプロセスの集積は木を再帰的に探索することで行います.

最後のケースでも非決定的選択の一部を捨てていることになります.

(define (collect-signal-processes-event cur-process event)
  (define (collect p)
    (if (eq? p cur-process)
        (list p)
        (let ((ps (process-children p))
              (sync (process-sync p)))
          (let ((ent (hash-table-get sync event #f)))
            (if ps
                ;; node
                (if ent
                    ;; and
                    (let loop ((ps ps) (r '()))
                      (if (pair? ps)
                          (let ((q (collect (car ps))))
                            (if q
                                (loop (cdr ps) (append q r))
                                #f))
                          r))
                    ;; or
                    (let loop ((ps ps))
                      (if (pair? ps)
                          (let ((q (collect (car ps))))
                            (if q
                                q
                                (loop (cdr ps))))
                          #f)))
                ;; leaf
                (if (procedure? ent)
                    (list p)
                    #f))))))
  (collect root-process))

同期するプロセスの集積: チャネル送信

次はチャネル送信です.

まず確認として,チャネル通信の場合でも2個以上のプロセスが同期する可能性があります.いわゆるマルチ同期です.加えて,各プロセスが複数の送信値と継続,複数のガードと継続を指定する可能性があります.そこでチャネル通信の場合は sync の値を sync-term のリストにします.sync-term とは (SEND  args . cont) または (RECEIVE  guard . cont) のことでした.

同期するプロセスの集積はイベント同期と同様にプロセス木をたどって行いますが,結果の表現は PCB (process control block) と sync-term のペアのリストにします.複数ある sync-term のうち,どれで同期したのかわかるようにするためです.

葉のプロセスでは sync に登録されている sync-term を順に調べて初めに同期するものを選びます.ここでも非決定性を捨てています.

送信の場合は送信値が一致すれば同期します.送信値の比較には equal? を使いました.受信の場合はガードの判定が必要です.そこで引数の値で環境ベクタ env を一時的に拡大してガードに対応する手続きを呼び出します.

(define (collect-signal-processes-send cur-process ch args cont)
  (define (collect p)
    (if (eq? p cur-process)
        (list (cons* p 'SEND args cont))
        (let ((ps (process-children p))
              (sync (process-sync p)))
          (let ((ent (hash-table-get sync ch '())))
            (if ps
                ;; node
                (if (null? ent)
                    ;; or
                    (let loop ((ps ps))
                      (if (pair? ps)
                          (let ((q (collect (car ps))))
                            (if q
                                q
                                (loop (cdr ps))))
                          #f))
                    ;; and
                    (let loop ((ps ps) (r '()))
                      (if (pair? ps)
                          (let ((q (collect (car ps))))
                            (if q
                                (loop (cdr ps) (append q r))
                                #f))
                          r)))
                ;; leaf
                (let loop ((ss ent))
                  (if (null? ss)
                      #f
                      (let ((sync-term (car ss)))
                        (match sync-term
                          (('SEND args2 . cont2)
                           (if (equal? args args2) ; ### EQUIVALENCE
                               (list (cons p sync-term))
                               (loop (cdr ss))))
                          (('RECEIVE guard . cont2)
                           (let ((env (process-env p)))
                             (let ((n (vector-ref env 0))) ; save num vars
                               (extend-env p args) ; temporarily
                               (let ((b (guard env)))
                                 (vector-set! env 0 n) ; resume
                                 (if b
                                     (list (cons p sync-term))
                                     (loop (cdr ss))))))))))))))))
  ;; body
  (let ((q (collect root-process)))
    (if (null? q)
        #f
        q)))

同期するプロセスの集積: チャネル受信

最後はチャネル受信です.今回の実装ではチャネルは任意の値を渡しうるので,受信だけで同期するかどうかを判定することはできず,かならず送信が一人必要です.探索の途中で送信が見つかり,そこまでに見つかったすべての受信ガードを成立させたとしても,最終的に同期する保証はありません.そこで探索では可能性のあるすべての候補を計算することにします.

探索のある断面において,少なくとも送信が1つある場合と受信だけの場合で候補の表現を分けます:

葉が送信の場合はカレントプロセスの受信ガードを満たせば候補になります.受信の場合は無条件に候補にします.

節で同期しない場合は,子プロセスの候補をすべて計算して連結します.同期する場合は,各子プロセスの候補について総当たりで調べる必要があります.

(define (collect-signal-processes-receive cur-process ch guard cont)
  (define (collect p)
    (if (eq? p cur-process)
        (list (list #f (cons* p guard cont)))
        (let ((ps (process-children p))
              (sync (process-sync p)))
          (let ((ent (hash-table-get sync ch '())))
            (if ps
                ;; node
                (if (null? ent)
                    ;; or
                    (append-map collect ps)
                    ;; and
                    (let loop ((ps (cdr ps)) (r (collect (car ps))))
                      (if (or (null? ps) (null? r))
                          r
                          (loop (cdr ps)
                                (product-map-filter merge-sync-candidates
                                                    (collect (car ps)) r)))))
                ;; leaf
                (let loop ((ss ent) (rs '()))
                  (if (null? ss)
                      rs
                      (let ((sync-term (car ss)))
                        (match sync-term
                          (('SEND args . cont2)
                           (let ((env (process-env cur-process)))
                             (let ((n (vector-ref env 0)))
                               (extend-env cur-process args)
                               (let ((b (guard env)))
                                 (vector-set! env 0 n)
                                 (if b
                                     (loop (cdr ss)
                                           (cons (cons* args
                                                        (list (cons p cont2))
                                                        '())
                                                 rs))
                                     (loop (cdr ss) rs))))))
                          (('RECEIVE guard2 . cont2)
                           (loop (cdr ss)
                                 (cons (list #f (cons p (cdr sync-term)))
                                       rs)))))))))))) ; collect receives
  (let ((q (collect root-process)))
    (find car q))) ; at least one sender is needed

総当たりを調べるのに高階関数 product-map-filter を用意します.

(define (product-map-filter f xs ys)
  (let loop-x ((xs xs) (r '()))
    (if (null? xs)
        r
        (let loop-y ((ys ys) (r r))
          (if (null? ys)
              (loop-x (cdr xs) r)
              (let ((z (f (car xs) (car ys))))
                (if z
                    (loop-y (cdr ys) (cons z r))
                    (loop-y (cdr ys) r))))))))

候補の組み合わせは merge-sync-candidates で計算します.3つの可能性があります:

(define (merge-sync-candidates x y)
  (let ((ax (car x))    ; args or #f
        (ay (car y)))
    (if ax
        (if ay
            ;; send-send:
            ;; x = (args (sx ...) . (rx ...)), y = (args (sy ...) . (ry ...))
            ;; r = (args (sx ... sy ...) . (rx ... ry ...))
            (if (equal? ax ay)          ; ### EQUIVALENCE
                (cons* ax (append (cadr x) (cadr y)) (append (cddr x) (cddr y)))
                #f)
            ;; send-receive:
            ;; x = (args (sx ...) . (rx ...)), y = (#f ry ...)
            ;; r = (args (sx ...) . (ry ... rx ...))
            (if (every (lambda (r)      ; r = (process guard . cont)
                         (let ((p (car r))
                               (guard (cadr r)))
                           (let ((env (process-env p)))
                             (let ((n (vector-ref env 0)))
                               (extend-env p ax)
                               (let ((b (guard env)))
                                 (vector-set! env 0 n)
                                 b)))))
                       (cdr y))
                (cons* ax 
                       (cadr x)
                       (append (map (lambda (p) (cons (car p) (cddr p))) (cdr y))
                               (cddr x)))
                #f))
        (if ay
            (merge-sync-candidates y x)
            ;; receive-receive:
            ;; x = (#f rx ...), y = (#f ry ...)
            ;; r = (#f rx ... ry ...)
            (cons #f (append (cdr x) (cdr y)))))))

同期するプロセスの解放

同期するプロセスを解放する,すなわち ready 状態にして ready-queue に追加します.

まずイベント同期の場合です.この場合は PCB しか集めてこなかったので,改めて sync のハッシュテーブルを引いて継続手続きを求めます.

(define (release-processes-event cur-process event cont ps)
  (dolist (p ps)
    (if (eq? p cur-process)
        (process-cont-set! p cont)
        (let ((sync (process-sync p)))
          (process-cont-set! p (hash-table-get sync event))
          (hash-table-clear! sync)))
    (process-state-set! p 'ready)
    (ready-queue-put! p)))

チャネル送受信の場合は同期するプロセスを集積する際に継続手続きも求めてあるので,それを使えます.送信の場合と受信の場合で解放の処理が異なるので場合分けします.

;;; q = ((process . sync-term) ...)
(define (release-processes-send cur-process ch args q)
  (dolist (x q)
    (match x
      ((p 'SEND args . cont)
       (release-sender p cont (eq? p cur-process)))
      ((p 'RECEIVE guard . cont)
       (release-receiver p cont args #f)))))
;;; ss = rs = ((process . cont) ...)
(define (release-processes-receive cur-process args ss rs)
  (dolist (p.cont ss)
    (release-sender (car p.cont) (cdr p.cont) #f))
  (dolist (p.cont rs)
    (release-receiver (car p.cont) (cdr p.cont) args (eq? (car p.cont) cur-process))))

送信者の解放はイベント同期と同じで,継続手続きをセットするだけです.

(define (release-sender p cont current?)
  (process-cont-set! p cont)
  (unless current?
    (hash-table-clear! (process-sync p)))
  (process-state-set! p 'ready)
  (ready-queue-put! p))

受信者を解放する場合は,受信した値で環境ベクタ env を拡大する必要があります.

(define (release-receiver p cont args current?)
  (process-cont-set! p cont)
  (unless current?
    (hash-table-clear! (process-sync p)))
  (process-state-set! p 'ready)
  (extend-env p args)
  (ready-queue-put! p))
(define (extend-env p args)
  (let ((env (process-env p)))
    (let loop ((xs args) (k (vector-ref env 0)))
      (if (null? xs)
          (vector-set! env 0 k)
          (begin
             (vector-set! env (+ k 1) (car xs))
             (loop (cdr xs) (+ k 1)))))))

同期待ち登録

だいぶ距離が離れてしまいましたが,csp-sync で指定された3つ組を順に調べた結果,どれも同期しないことがわかったらプロセスを待ち状態にします.この際,待っているイベントとチャネルを sync に登録しておきます.

イベントの場合は継続手続きを登録します.もし1つのイベントに対して複数の指定があった場合は上書きしてしまうので最後のものが登録されることになります.ここでも非決定性を捨てていることになります.

チャネル送受信の場合は sync-term のリストを登録します.

(define (register-sync cur-process cmd)
  (let ((sync (process-sync cur-process)))
    (let loop ((n (vector-ref cmd 1)) (k 2))
      (let ((syncobj (vector-ref cmd k))
            (guard   (vector-ref cmd (+ k 1)))
            (cont    (vector-ref cmd (+ k 2))))
        (when (> n 0)
          (cond ((event? syncobj)
                 (hash-table-put! sync syncobj cont)) ; overwrite if dup
                ((procedure? syncobj)                 ; channel
                 (hash-table-push! sync
                                   (syncobj channel-info-command)
                                   (cons* 'RECEIVE guard cont)))
                (else                   ; channel-event (send)
                 (let ((ch (channel-event-ch syncobj))
                       (args (channel-event-args syncobj)))
                   (hash-table-push! sync ch (cons* 'SEND args cont)))))
          (loop (- n 1) (+ k 3)))))))

以上で実装はおわりです.

実行例

(csp-compile
 (define-event e)
 (define-channel in (x))
 (define-channel out (x))

 (define-process (QUEUE s)
   (alt
     (? in (x) (QUEUE (append s (list x))))
     (if (null? s)
         STOP
         (! (out (car s)) (QUEUE (cdr s))))
     (! e SKIP)))

 (define-process (GENERATOR n)
   (if (= n 0)
       (! e SKIP)
       (! (in n) (GENERATOR (- n 1)))))

 (define-process SYSTEM
   (par (list e in)
     (QUEUE '())
     (GENERATOR 7))))
(start-process SYSTEM)
==> done
(reverse trace)
==> ((in 7) (out 7) (in 6) (in 5) (in 4) (in 3) (in 2) (in 1)
     (out 6) (out 5) (out 4) (out 3) (out 2) (out 1) e)

コメント

2016/12/08

© 2013-2016 PRINCIPIA Limited