Think Stitch
PRINCIPIA  最近の更新


マクロとクロージャで作る並行プログラミング言語

Lisp Advent Calendar 2013の参加エントリです.

Lisp の特徴の1つとして,マクロを使って拡張言語(埋め込み言語)がかんたんに作れるという点があります. また Lisp はファーストクラスオブジェクトとしてのクロージャを持っており,継続渡しスタイル(Continuation-Passing Style, CPS)と組み合わせると,軽量プロセスがかんたんに作れるということもよく知られていると思います. そこで Lisp が持つこの2つの特徴を生かして,小さな並行プログラミング言語を作ってみたいと思います.

処理系は Gauche を使います.以下のモジュールを使います.

(use util.match)
(use gauche.record)
(use srfi-1)
(use srfi-27)

CSP について

ここで作る並行プログラミング言語は,CSP という理論を基礎にします. CSP とは Communicating Sequential Processes の略で,並行に動作するプロセスの振る舞いを記述し,その性質を調べるための理論です. クイックソートやプログラムの正当性証明のための Hoare 論理などで有名な Tony Hoare 先生によって作られました. 歴史のある理論で,オペレーティングシステムやプログラミング言語の設計に影響を与えたといわれています.比較的最近だと,プログラミング言語 Go の通信部分は CSP を基礎としています(http://golang.org/doc/faq)

ここでは CSP の知識は必要ありませんが,興味のある方は,Hoare 先生の本をご覧ください.以下のページからダウンロードすることができます.

並行プログラミング言語の仕様

並行プログラミング言語の構文を以下のように定めます. ここでは CSP に習って,並行に動作する実行主体のことをプロセスと呼ぶことにします. プロセスは define-process で定義することにします.

(define-process プロセス名 プロセス式)

プロセス式はプロセスの動作を表す式です.以下のような構成要素を持つものとします.

プロセス式 ::=
      SKIP
    | (! イベント プロセス式)
    | (alt プロセス式 ...)
    | (par イベントリスト プロセス式1 プロセス式2 プロセス式3 ...)
    | プロセス名

より厳密な意味についてはこちらを見てください.この言語の操作的意味論と表示的意味論を書きました.

プロセス定義の例

例えばイベント a, b を順に発生させて終了するプロセス P は次のように定義できます.

(define-process P (! a (! b SKIP)))

次は,相手のプロセスが a を出してきたら x を返し,b を出してきたら y を返すプロセスです. 実際には同期は対称なので,どちらがどちらに出すという方向性はないですけど,そういうふうに意味付けすることはできます.

(define-process Q
  (alt
    (! a (! x SKIP))
    (! b (! y SKIP))))

もう1つ例を書きます.イベント a を受けたら,2つの子プロセスを生成する例です.イベントリストに c が指定されていますから,子プロセスの間ではイベント c で同期します.それ以外のイベントはそれぞれが独立して発生できます.

(define-process R
  (! a
    (par (c)
      (! b (! c SKIP))
      (! c (! d SKIP)))))

イベント a は仕事の開始依頼だとしましょう.これを受けてワーカープロセスを2つ作るわけです. 最初の子プロセスが外からイベント b を受け取ります.これが処理すべきデータを意味しているとします. 担当分の処理が済んだら,それを次の子プロセスに渡します.これがイベント c による同期の意味です. 2番目の子プロセスはさらに自分の処理を行って,外部に結果を出力します.これがイベント d の意味です. 処理が終わったらすべてのプロセスが終了します.

文法の制限

文法をそのまま適用すると,例えば次のようなプロセスを定義することができます.

(alt
  (! a SKIP)
  (par (c)
    (! c SKIP)
    (! c SKIP)))

理論上はこのようなプロセスを考えることもできるのですが,実装上はちょっと困ったことになります. イベント c が発生可能であることを知るためには,2つの子プロセスを作って同期することを確かめなければなりません. しかし,外部のプロセスがイベント a を出してきた場合は,プロセス生成がなかったことにしなければなりません. これを実装で行うのは難しいので,ここでは alt の引数として指定できるのは (! e P) と (alt P ...) だけと制限しておくことにします.

カーネルのシステムコール

この言語を実際に動かすためには,プロセスを生成したり同期の面倒を見たりするオペレーティングシステムに相当するものが必要になります.ここでは単にカーネルと呼ぶことにします.カーネルは次の3つのシステムコールを持っているとします.

csp-par と csp-alt はどちらも CPS になっています.継続は必ず thunk で指定し,システムコールからリターンすることはありません(というか,そういう約束で使います).同じく csp-skip も末尾位置に置かなければならないとします.

マクロによる変換

並行プログラミング言語のコードを,カーネルのシステムコールを呼び出す Lisp 式に変換します. 変換はマクロで行います.プロセスの定義構文 define-process をマクロとして実装します. ここでは伝統的なマクロを使うことにします. プロセス式の展開(変換)はヘルパ関数 expand-process で行います.

プロセスは引数のない手続き(クロージャ,thunk)として表すことにします.

(define-macro (define-process name pexpr)
  `(define ,name (lambda () ,(expand-process pexpr))))

ヘルパ関数 expand-process は各プロセス式を対応するシステムコールに変換します. 式の場合分けを行うために,パターンマッチングのためのマクロ match を使いました.これを書いていたら,ちょうど athos さんがClojure Contrib Library Advent Calendar 2013 のエントリーで match の解説をされていました.Clojure と Scheme で言語は違いますが,考え方は同じだと思います.match を使うと場合分けを見通しよく書くことができます.

(define (expand-process pexpr)
  (match pexpr
    ('SKIP '(csp-skip))
    (('! event pexpr2)
     `(csp-alt
       (list (cons ',event
                   (lambda ()
                     ,(expand-process pexpr2))))))
    (('alt . process-list)
     `(csp-alt
       (append ,@(map (lambda (p)
                        (expand-process-in-alt p))
                      process-list))))
    (('par sync-list . process-list)
     `(csp-par ',sync-list
               (list ,@(map (lambda (p)
                              `(lambda ()
                                 ,(expand-process p)))
                            process-list))))
    ((? symbol? p) `(,p)) ))

SKIP と par はそのままシステムコール csp-skip, csp-par に対応します. 単独で現れるイベント発生の式 (! e P) は csp-alt に変換します.イベントと thunk のペアは1つだけです.

alt では,引数として並んでいるイベント発生の式やネストした alt を処理するために,専用のヘルパ関数 expand-process-in-alt を呼び出します.

expand-process-in-alt は alt 式の引数となっているプロセス式からイベントと thunk を集めてきます.これらを連結してまとめ,csp-alt を呼び出すようにします.

プロセス名の場合は,プロセスをクロージャで表しているので,そのまま呼び出すだけです.

(define (expand-process-in-alt pexpr)
  (match pexpr
    (('! event pexpr2)
     `(list (cons ',event
                  (lambda ()
                   ,(expand-process pexpr2)))))
    (('alt . process-list)
     `(append ,@(map (lambda (p)
                       (expand-process-in-alt p))
                     process-list)))))

展開例

展開の例を2つ示します.展開の結果にはかなり冗長な部分がありますけど,それでも元の記述の方がずっとシンプルでわかりやすいと思います. 原理的には展開後のコードを書けばマクロは要らないわけですが,マクロのおかげで専用言語としてわかりやすさが高まっていることは明らかだと思います.

(macroexpand
  '(define-process P
     (alt (! a SKIP)
          (! b SKIP))))
=>
(define P
  (lambda ()
    (csp-alt
     (append (list (cons 'a (lambda () (csp-skip))))
             (list (cons 'b (lambda () (csp-skip))))))))
(macroexpand
  '(define-process P
     (par (c)
       (! c (! a SKIP))
       (! c (! b SKIP)))))
=>
(define P
  (lambda ()
    (csp-par '(c)
      (list (lambda ()
              (csp-alt
               (list (cons 'c
                           (lambda ()
                             (csp-alt
                              (list (cons 'a
                                          (lambda ()
                                            (csp-skip))))))))))
            (lambda ()
              (csp-alt
               (list (cons 'c
                           (lambda ()
                             (csp-alt
                              (list (cons 'b
                                          (lambda ()
                                            (csp-skip))))))))))))))

カーネルの実装

つづいてカーネルの実装をします.オペレーティングシステムが持つ機能のうち,プロセス管理部分だけを作るという感じです.

プロセスコントロールブロック

まず,いわゆるプロセスコントロールブロック(PCB)を定義します.レコードを使います.

(define-record-type process #t #t
  (state) parent (children) (sync) (cont))

カーネルの状態を表すグローバル変数

カーネルの状態を表すグローバル変数を4つ用意します.

(define root-process #f)
(define current-process #f)
(define ready-queue '())
(define trace '())

システムの起動 start-process

続いて手続きの定義に移ります.最初は起動用の手続き start-process です. thunk を指定します.プロセスコントロールブロックを作成し,root-process と current-process に登録します. ready-queue と trace を初期化したら,thunk を呼び出します.これで制御はプロセスに移ったことになります. カーネルはシステムコールのいずれかが呼び出されるまで何もしません.

(define (start-process thunk)
  (let ((p (make-process
            'running             ; state
            #f                   ; parent (root: #f)
            #f                   ; children (leaf: #f)
            '()                  ; sync
            thunk)))             ; cont
    (set! root-process p)
    (set! current-process p)
    (set! ready-queue '())
    (set! trace '())
    (thunk)))

csp-par システムコール

システムコール csp-par は指定された各 thunk に対応する子プロセスを作ります. これらをすべて実行可能状態として ready-queue に登録します. 次に,csp-par を呼び出したカレントプロセスを待ち状態にします. プロセスコントロールブロックの sync に,同期するイベントのリストを格納しておきます. 最後にスケジューラを呼び出して,次に実行するプロセスを選択してもらいます.

(define (csp-par sync-list thunk-list)
  (let ((ps
         (map (lambda (thunk)
                (make-process
                 'ready               ; state
                 current-process      ; parent
                 #f                   ; children (leaf: #f)
                 '()                  ; sync
                 thunk))              ; cont
              thunk-list)))
    (set! ready-queue (append ps ready-queue))
    (process-state-set! current-process 'wait-for-children)
    (process-children-set! current-process ps)
    (process-sync-set! current-process sync-list)
    (sched)))

スケジューラ sched

スケジューラは ready-queue の中からランダムにプロセスを1つ選び,実行します. ready-queue が空の場合は,デッドロックを起こしているということになるので,エラーを報告します.

(define (sched)
  (if (null? ready-queue)
      (error 'sched "deadlock")
      (let ((n (length ready-queue)))
        (let ((k (random-integer n)))
          (let ((p (list-ref ready-queue k)))
            (set! ready-queue (delete p ready-queue))
            (process-state-set! p 'running)
            (set! current-process p)
            ((process-cont p)))))))

csp-skip システムコール

システムコール csp-skip は呼び出したプロセスを終了させるのですが,もう少し仕事があります. 終了するカレントプロセスが,親プロセスの子供たち,つまり兄弟姉妹の中で最後のプロセスだった場合は,親プロセスも終了することになるからです.これは必要なだけプロセスツリーの根に向かって伝播します.もし根まで達した場合は,root-process が終了したということですから,カーネル自体を終了させます.

兄弟姉妹がすべて終了しているかどうかは,プロセスコントロールブロックの state が omega であるかどうかで判定します.

(define (csp-skip)
  (let loop ((p current-process))
    (if p
        (let ((children (process-children p)))
          (if (or (not children) (every omega? children))
              (begin
                (process-state-set! p 'omega)
                (loop (process-parent p)))
              (sched)))
        ;; root process is terminated
        'done)))
(define (omega? p)
  (eq? (process-state p) 'omega))

csp-alt システムコール

最後は csp-alt です.ちょっと複雑です.まず大まかな流れをいうと,指定されたイベントの中に同期するものがあるかどうか調べます.もしあれば,待っていたプロセスを待ち状態から解放して実行可能状態とし,呼び出したプロセスも含めて,スケジューラに次に実行すべきプロセスを決めてもらいます. もし同期するイベントがない場合は待ち状態にします.プロセスコントロールブロックの sync にイベントと thunk のペアリストを保存しておきます.この場合は ready-queue から次のプロセスを選んでもらいます.

(define (csp-alt sync-list)
  (let loop ((xs sync-list))
    (if (pair? xs)
        (let ((event (caar xs)))
          (let ((ps (find-signaling-processes event)))
            (if ps
                (begin
                  (set! trace (cons event trace))
                  (release-processes event ps)
                  (process-state-set! current-process 'ready)
                  (process-cont-set! current-process (cdar xs))
                  (set! ready-queue
                        (cons current-process ready-queue))
                  (sched))
                (loop (cdr xs)))))
        (begin
          (process-state-set! current-process 'wait-for-sync)
          (process-sync-set! current-process sync-list)
          (sched)))))

関数 find-signaling-processes は指定されたイベントで同期するプロセスのリストを求めます. 単独で発生できる場合は,そのプロセスだけからなるリストを返します. もし同期しない場合は #f を返します.

ここがいちばんの難所です.イベントが発生するかどうかを知るには,プロセスツリーを根からたどって調べる必要があります.もし指定されたイベントがある節において同期するイベントのリストに含まれている場合,子供たちの枝すべてが発生させなければ同期しません.逆にイベントリストに含まれていない場合は,子供たちの誰か1人が発生させれば,発生します.これらはプロセスツリー上で再帰的に調べる必要があります.

カレントプロセスは当然同期に参加するので,特別扱いします.

(define (find-signaling-processes event)
  (define (traverse p)
    (if (eq? p current-process)
        (list p)
        (let ((ps (process-children p)))
          (if ps
              ;; node
              (if (memq event (process-sync p))
                  ;; and
                  (let loop ((ps ps) (rs '()))
                    (if (null? ps)
                        rs
                        (let ((qs (traverse (car ps))))
                          (if qs
                              (loop (cdr ps) (append qs rs))
                              #f))))
                  ;; or
                  (let loop ((ps ps))
                    (if (null? ps)
                        #f
                        (let ((qs (traverse (car ps))))
                          (if qs
                              qs
                              (loop (cdr ps)))))))
              ;; leaf
              (if (assq event (process-sync p))
                  (list p)
                  #f)))))
  (traverse root-process))

イベント発生に関与するプロセスのリストが求められたら,これらを解放します. 指定されたイベントに対応する継続(thunk)をプロセスコントロールブロックの cont に設定し,実行可能状態に変えて ready-queue に登録します.

(define (release-processes e ps)
  (dolist (p ps)
    (unless (eq? p current-process)
      (release-process e p))))
(define (release-process e p)
  (process-cont-set! p (cdr (assq e (process-sync p))))
  (process-state-set! p 'ready)
  (process-sync-set! p '())
  (set! ready-queue (cons p ready-queue)))

カーネルのコードは以上です.

実行例

実際にプロセスを定義して,実行してみます.

単一プロセスの例

イベント a, b を順に発生させて終了するプロセスです.

(define-process P (! a (! b SKIP)))

(start-process P)
=> done

(reverse trace)
=> (a b)

並行実行の例

2つのプロセスを同時に実行します.イベント a は同期します.b と c はそれぞれが単独で発生します. 結果として,b, c の発生する順序は不定(非決定的)になります.何度も実行してみてください.

(define-process P (! a (! b SKIP)))
(define-process Q (! a (! c SKIP)))
(define-process R (par (a) P Q))

(begin
  (start-process R)
  (reverse trace))
=> (a b c) または (a c b)

通信の例 その1

プロセス P は外部から受け取ったイベント in に対応してイベント x を発行します. これを受けたプロセス Q は外部へイベント out を発行します.

(define-process P (! in (! x SKIP)))
(define-process Q (! x (! out SKIP)))
(define-process R (par (x) P Q))

(start-process R)
(reverse trace)
=> (in x out)

通信の例 その2

ちょっと拡張して,入力を3回処理するようにします. 3回実行したら終了するように,制御用のプロセス R を用意します. イベント z は終了指令です.

(define-process P
  (alt
    (! in (! x P))
    (! z SKIP)))

(define-process Q
  (alt
    (! x (! out Q))
    (! z SKIP)))

(define-process R
  (! in (! in (! in (! z SKIP)))))

(define-process S
  (par (in z)
    R
    (par (x z) P Q)))

(begin
  (start-process S)
  (reverse trace))
=> (in x out in x out in x out z) 他

イベント x の後に out ではなく in が発生することがあります.プロセス P と Q は並行に動作できるので,かならず out が先とは限らないのですね.

2リソース問題

排他制御を必要とするリソースが2つあって,これらを同時に使いたいプロセスが2つあるとします. 排他制御のためにミューテックスを2つ用意します. どちらのプロセスも,まず2つのミューテックスをロックし,それからリソースを使い,その後ミューテックスをアンロックします.

ロックの順序を合わせておかないと,デッドロックすることがあります.

(define-process MUTEX1 (! lock1 (! unlock1 MUTEX1)))

(define-process MUTEX2 (! lock2 (! unlock2 MUTEX2)))

(define-process P
  (! lock1 (! lock2
              (! p-work
                 (! unlock2 (! unlock1 P))))))

(define-process Q
  (! lock2 (! lock1
              (! q-work
                 (! unlock1 (! unlock2 Q))))))

(define-process R
  (par (lock1 lock2 unlock1 unlock2)
    (par () MUTEX1 MUTEX2)
    (par () P Q)))

(start-process R)
=>
*** ERROR: sched "deadlock"

プロセスツリーを表示する次のようなユーティリティ関数を作ると,問題を分析できます.

(define (ps)
  (define (traverse p)
    (d p)
    (newline)
    (let ((ps (process-children p)))
      (when ps
        (dolist (p ps)
          (traverse p)))))
  (traverse root-process))

(ps)
=>
#<process 0x100735c80> is an instance of class process
slots:
  state     : wait-for-children
  parent    : #f
  children  : (#<process 0x100735c40> #<process 0x100735c00>)
  sync      : (lock1 lock2 unlock1 unlock2)
  cont      : #<closure R>

#<process 0x100735c40> is an instance of class process
slots:
  state     : wait-for-children
  parent    : #<process 0x100735c80>
  children  : (#<process 0x100735880> #<process 0x1007357c0>)
  sync      : ()
  cont      : #<closure (R #:G974)>

#<process 0x100735880> is an instance of class process
slots:
  state     : wait-for-sync
  parent    : #<process 0x100735c40>
  children  : #f
  sync      : ((unlock1 . #<closure (MUTEX1 #:G958)>))
  cont      : #<closure (MUTEX1 #:G957)>

#<process 0x1007357c0> is an instance of class process
slots:
  state     : wait-for-sync
  parent    : #<process 0x100735c40>
  children  : #f
  sync      : ((unlock2 . #<closure (MUTEX2 #:G960)>))
  cont      : #<closure (MUTEX2 #:G959)>

#<process 0x100735c00> is an instance of class process
slots:
  state     : wait-for-children
  parent    : #<process 0x100735c80>
  children  : (#<process 0x100735bc0> #<process 0x100735b80>)
  sync      : ()
  cont      : #<closure (R #:G971)>

#<process 0x100735bc0> is an instance of class process
slots:
  state     : wait-for-sync
  parent    : #<process 0x100735c00>
  children  : #f
  sync      : ((lock2 . #<closure (P #:G962)>))
  cont      : #<closure (P #:G961)>

#<process 0x100735b80> is an instance of class process
slots:
  state     : wait-for-sync
  parent    : #<process 0x100735c00>
  children  : #f
  sync      : ((lock1 . #<closure (Q #:G967)>))
  cont      : #<closure (Q #:G966)>

()

Gauche はクロージャをわかりやすく名前で表示してくれるので,各プロセスの状態がとてもよくわかると思います.中間の par に対応する節となるプロセスがあるので,プロセスは全部で7個になります.

プロセスの動的生成

プロセス P は外部から仕事の依頼を受けたらワーカプロセスを複数,動的に生成して対処します. イベント a が依頼だとします.これには仕事の規模が書いてあるとしましょう.仮にこれが3だとします. プロセス P はワーカプロセスを3つ生成します. 各ワーカに仕事を分配したあと,結果を引き取るプロセス R に移行します.

(define-process P
  (! a
     (par (b1 b2 b3 c1 c2 c3)
       (! b1 (! b2 (! b3 R)))
       (par '() Q1 Q2 Q3))))

各ワーカは受け取った仕事を処理して,結果を返します.

(define-process Q1
  (! b1 (! c1 SKIP)))

(define-process Q2
  (! b2 (! c2 SKIP)))

(define-process Q3
  (! b3 (! c3 SKIP)))

プロセス R は早く終わったワーカから順に結果を引き取ります. 固定の順序で受け取っても結果は同じですが,R でも後処理がある場合はこのようにした方が並列性を高めることができます(並列化が可能なシステムで動かした場合).

(define-process R
  (alt
    (! c1
       (alt (! c2 (! c3 (! d SKIP)))
            (! c3 (! c2 (! d SKIP)))))
    (! c2
       (alt (! c3 (! c1 (! d SKIP)))
            (! c1 (! c3 (! d SKIP)))))
    (! c3
       (alt (! c1 (! c2 (! d SKIP)))
            (! c2 (! c1 (! d SKIP)))))))

(begin
  (start-process P)
  (reverse trace))

何度も実行してみると,順序が変化することがわかります.

補足

おもちゃのような言語ですけど,拡張すれば実用的に使えます. 実は拡張しないと Lisp を書くところがないので,ぜんぜん埋め込み言語じゃないじゃないかという話もありますが... 次のような拡張が考えられます.

他にも CSP の演算子を増やす方向があります.こちらについては CSP の本を見てください.

今回,スケジューリングアルゴリズムは単純にランダムにしました.プロセスの優先順位もないですし,当然動的調整もありません.ready-queue の構造を変えたり,アルゴリズムを変えたりするといろいろ遊べると思います.アルゴリズムによっては,あるはずのデッドロックが出なくなったりするので面白いです.

システムコールはすべて CPS なので,末尾再帰最適化のない Lisp だともしかしたらスタックオーバーフローを起こすかもしれません.

最後にまとめとして,こういったことが簡単にできるのは,S式とそれを料理するためにあらかじめ用意された関数たち,そしてマクロとクロージャのおかげだと改めて認識することができました.

2013/12/08

追記

チャネル通信とプロセスパラメータを実現してみました。

2014/01/11
© 2013,2014,2015 PRINCIPIA Limited