Think Stitch
PRINCIPIA  最近の更新


並行プログラミング言語 8:逐次合成 2

前回は逐次合成 seq を追加しましたが,話を簡単にするために,改名 rename と隠蔽 hide を追加する前の版を土台にしました. 今回は両方合わせたものを考えることにします.

過去の記事は以下のとおりです。

改名・隠蔽と逐次合成を合わせる

まずは2つのコードを単純にマージします. マクロ define-process のためのヘルパ関数 expand-process と,各システムコール csp-rename, csp-hide, csp-seq の追加,および csp-skip の修正は独立しているので,特に問題はありません. プロセスコントロールブロック(PCB)には rename-map と seq-stack の両方が追加されることになります.

(define-record-type process make-process #t
  (state)
  parent
  (children)
  (sync)
  (cont)
  (rename-map)      ; 改名・隠蔽マップ
  (seq-stack))      ; 逐次合成用スタック

PCB を作成している start-process と csp-par の該当箇所には初期値の指定が必要です.

(define (start-process thunk)
  (let ((p (make-process
            'running                    ; state
            #f                          ; parent (root: #f)
            #f                          ; children (leaf: #f)
            '()                         ; sync
            thunk                       ; cont
            '()                         ; rename-map 
            '() )))                     ; seq-stack 
    (set! root-process p)
    (set! current-process p)
    (set! ready-queue '())
    (set! trace '())
    (thunk)))
(define (csp-par sync-list thunk-list)
  (let ((rename-map (process-rename-map current-process)))
    (let ((ps
           (map (lambda (thunk)
                  (make-process
                   'ready               ; state
                   current-process      ; parent
                   #f                   ; children (leaf: #f)
                   '()                  ; sync
                   thunk                ; cont
                   rename-map           ; rename-map ★親から継承
                   '() ))               ; seq-stack 
                thunk-list)))
      (set! ready-queue (append ps ready-queue))
      (process-state-set! current-process 'wait-for-children)
      (process-children-set! current-process ps)
      (process-sync-set! current-process
                         (map (lambda (e)
                                (effective-event rename-map e))
                              sync-list))
      (sched))))

逐次実行と改名の同時使用

逐次実行と改名を同時に使ってみます.

(define-event a)
(define-event b)
(define-event u)
(define-event w)

(define-process P
  (seq
    (rename `((,a . ,u))
      (! a (! b SKIP)))
    (! a (! b SKIP))))

実行してみると次のようになります.

(begin
  (start-process P)
  (map de (reverse trace)))
=> (u b u b)

これは間違っています.正しいトレースは (u b a b) のはずです. rename は seq の1つ目のプロセスにだけ影響すべきなのに,2つ目のプロセスにも影響してしまっています.

なぜこのようなことになってしまったかというと,1つ目のプロセスが設定した rename-map がプロセス終了後も残ってしまっているからです. 本来であれば,2つ目のプロセスの rename-map は,rename を実行する前の初期状態に戻らなければなりません. より正確にいうと,seq を実行した時点でのプロセス P の rename-map に戻らなければならないということです. 例えば次のようなプロセスを考えます:

(define-process P
  (rename `((,a . ,u))
    (seq
     (! a (! b SKIP))
     (rename `((,a . ,w))
       (! a (! b SKIP)))
     (! a (! b SKIP)))))

このプロセスのトレースは (u b w b u b) となるべきだということです. しかし実際には (u b w b w b) となってしまいます.

修正

この問題を解決するのは簡単です. 逐次合成 seq の引数である1つのプロセスが終了したら,改名マップを seq 開始時点のものに戻せばよいわけです. そこで,逐次合成のシステムコール csp-seq でプロセスの thunk を seq-stack に積むときに, 一緒に rename-map も積んでおくことにします. 具体的には csp-seq が呼ばれたときの rename-map を取り出し(★1),各 thunk とこの rename-map をペアにして seq-stack に積みます(★2).

(define (csp-seq thunk-list)
  (if (null? thunk-list)
      (csp-skip)
      (let ((rename-map (process-rename-map current-process))) ; ★1
        (process-seq-stack-set!
         current-process
         (append (map (lambda (thunk)
                        (cons thunk rename-map))   ; ★2
                      (cdr thunk-list))
                 (process-seq-stack current-process)))
        (process-cont-set!
         current-process
         (car thunk-list))
        (set! ready-queue (cons current-process ready-queue))
        (sched))))

システムコール csp-skip でプロセスが終了したら seq-stack を調べ,空でなければ thunk を取り出して実行を継続します(★3). このとき,rename-map も取り出して設定します(★4).

(define (csp-skip)
  (let loop ((p current-process))
    (if p
        (let ((children (process-children p)))
          (if (or (not children) (every omega? children))
              (let ((ss (process-seq-stack p)))
                (if (null? ss)
                    (begin
                      (process-state-set! p 'omega)
                      (loop (process-parent p)))
                    (begin
                      (process-seq-stack-set! p (cdr ss))
                      (process-children-set! p #f)
                      (process-sync-set! p '())
                      (process-cont-set! p (caar ss))       ; ★3
                      (process-rename-map-set! p (cdar ss)) ; ★4
                      (set! ready-queue (cons p ready-queue))
                      (sched))))
              (sched)))
        ;; root process is terminated
        'done)))

実行例

最初の例です.

(define-process P
  (seq
    (rename `((,a . ,u))
      (! a (! b SKIP)))
    (! a (! b SKIP))))

(begin
  (start-process P)
  (map de (reverse trace)))
=> (u b a b)

2番目の例もちゃんと元に戻るようになりました.

(define-process P
  (rename `((,a . ,u))
    (seq
     (! a (! b SKIP))
     (rename `((,a . ,w))
       (! a (! b SKIP)))
     (! a (! b SKIP)))))

(begin
  (start-process P)
  (map de (reverse trace)))
=> (u b w b u b)

もう少し込み入った例を2つ見てみます.

(define-process P
  (seq
    (! a (! b SKIP))
    (rename `((,a . ,u))
      (seq
        (! a (! b SKIP))
        (rename `((,a . ,w))
          (! a (! b SKIP)))
        (! a (! b SKIP))))
    (! a (! b SKIP))))

(begin
  (start-process P)
  (map de (reverse trace)))
=> (a b u b w b u b a b)
(define-process P
  (seq
    (! a (! b SKIP))
    (rename `((,a . ,u))
      (seq
        (! a (! b SKIP))
        (rename `((,b . ,w))
          (! a (! b SKIP)))
        (! a (! b SKIP))))
    (! a (! b SKIP))))

(begin
  (start-process P)
  (map de (reverse trace)))
=> (a b u b u w u b a b)

コメント

これで主要な CSP の構成要素がほぼそろいました.

改名のところで,rename がネストしたときに rename-map をマージしてしまわずにリストにしたのは,ここで見たとおり,逐次実行と組み合わせると一部だけを元に戻さなければならない場合があるからです. もっとも,高速化するには以前のものを残しつつマージしてしまう方がよいかもしれません.on-demand でキャッシュするという方法もあります.

2014/06/14

全ソースコード

(use util.match)
(use gauche.record)
(use srfi-1)
(use srfi-27)

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;  channels and channel events

;;; (define-channel channel-name domain-expr)

(define-record-type event #t #t name creator)

(define-record-type channel-event #t #t name (ch) arg)

(define ch->chev-list '())

(define (lookup-chev-list ch)
  (let ((p (assq ch ch->chev-list)))
    (if p
        (cdr p)
        (error "unknown channel" ch))))

(define (register-chev-list ch chev-list)
  (set! ch->chev-list
        (cons (cons ch chev-list)
              ch->chev-list)))

(define-macro (define-event e)
  `(define ,e (make-event ',e #f)))

;; (define-event e)

(define-macro (define-channel ch domain-expr)
  `(define ,ch
     (let ((domain ,domain-expr)
           (chev-list #f))
       (let ((ch
              (lambda (x)
                (let ((chev
                       (find (lambda (chev)
                               (equal? x (channel-event-arg chev)))
                             chev-list)))
                  (if chev
                      chev
                      (error "out of domain" ',ch x))))))
         (set! chev-list
               (map (lambda (arg)
                      (make-channel-event ',ch ch arg))
                    domain))
         (register-chev-list ch chev-list)
         ch))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;  macros

;;; (define-process (P x ...) pexpr)

(define-macro (define-process pspec pexpr)
  (if (symbol? pspec)
      `(define (,pspec) ,(expand-process pexpr))
      `(define ,pspec ,(expand-process pexpr))))

(define (expand-process pexpr)
  (match pexpr
    ('STOP '(csp-alt '()))
    ('SKIP '(csp-skip))
    (('! event pexpr2)
     (let ((e (gensym)))
       `(csp-alt
         (list (cons ,event (lambda (,e) ,(expand-process pexpr2)))))))
    (('? ch (z) pexpr2)
     (let ((e (gensym)))
       `(let ((proc
               (lambda (,e)
                 (let ((,z (channel-event-arg ,e)))
                   ,(expand-process pexpr2)))))
          (csp-alt
           (map (lambda (chev) (cons chev proc))
                (lookup-chev-list ,ch))))))
    (('? ch (z) guard pexpr2)
     (let ((e (gensym)))
       `(let ((proc
               (lambda (,e)
                 (let ((,z (channel-event-arg ,e)))
                   ,(expand-process pexpr2)))))
          (csp-alt
           (map (lambda (chev) (cons chev proc))
                (filter (lambda (,e)
                          (let ((,z (channel-event-arg ,e)))
                            ,guard))
                        (lookup-chev-list ,ch)))))))
    (('alt . process-list)
     `(csp-alt
       (append ,@(map (lambda (p) (expand-process-in-alt p))
                      process-list))))
    (('par sync-list . process-list)
     `(csp-par (make-sync-list ,sync-list)
               (list ,@(map (lambda (p)
                              `(lambda () ,(expand-process p)))
                            process-list))))
    (('rename rename-alist process)
     `(csp-rename ,rename-alist
                  (lambda ()
                    ,(expand-process process))))
    (('hide event-list process)
     `(csp-hide ,event-list
                (lambda ()
                  ,(expand-process process))))
    (('seq . process-list)
     `(csp-seq
       (list ,@(map (lambda (p)
                      `(lambda () ,(expand-process p)))
                    process-list))))
    (('if test pexpr1 pexpr2)
     `(if ,test
          ,(expand-process pexpr1)
          ,(expand-process pexpr2)))
    ((? symbol?) `(,pexpr))
    (_ pexpr)))

(define (expand-process-in-alt pexpr)
  (match pexpr
    ('STOP '())
    (('! event pexpr2)
     (let ((e (gensym)))
       `(list (cons ,event (lambda (,e) ,(expand-process pexpr2))))))
    (('? ch (z) pexpr2)
     (let ((e (gensym)))
       `(let ((proc
               (lambda (,e)
                 (let ((,z (channel-event-arg ,e)))
                   ,(expand-process pexpr2)))))
          (map (lambda (chev) (cons chev proc))
               (lookup-chev-list ,ch)))))
    (('? ch (z) guard pexpr2)
     (let ((e (gensym)))
       `(let ((proc
               (lambda (,e)
                 (let ((,z (channel-event-arg ,e)))
                   ,(expand-process pexpr2)))))
          (map (lambda (chev) (cons chev proc))
               (filter (lambda (,e)
                          (let ((,z (channel-event-arg ,e)))
                            ,guard))
                       (lookup-chev-list ,ch))))))
    (('alt . process-list)
     `(append ,@(map (lambda (p) (expand-process-in-alt p))
                     process-list)))))

(define (make-sync-list syncobj-list)
  (append-map
   (lambda (so)
     (if (procedure? so)
         (lookup-chev-list so)
         (list so)))
   syncobj-list))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;  kernel

(define-record-type process make-process #t
  (state)              ; running ready wait-for-sync wait-for-children omega
  parent
  (children)           ; leaf: #f, node: list of child processes
  (sync)               ; leaf: event-cont-alist, node: sync-list
  (cont)
  (rename-map)
  (seq-stack))

(define root-process #f)
(define current-process #f)
(define ready-queue '())
(define trace '())

(define (start-process thunk)
  (let ((p (make-process
            'running                    ; state
            #f                          ; parent (root: #f)
            #f                          ; children (leaf: #f)
            '()                         ; sync
            thunk                       ; cont
            '()                         ; rename-map
            '() )))                     ; seq-stack
    (set! root-process p)
    (set! current-process p)
    (set! ready-queue '())
    (set! trace '())
    (thunk)))

(define (csp-par sync-list thunk-list)
  (let ((rename-map (process-rename-map current-process)))
    (let ((ps
           (map (lambda (thunk)
                  (make-process
                   'ready               ; state
                   current-process      ; parent
                   #f                   ; children (leaf: #f)
                   '()                  ; sync
                   thunk                ; cont
                   rename-map           ; rename-map (inherited)
                   '() ))               ; seq-stack
                thunk-list)))
      (set! ready-queue (append ps ready-queue))
      (process-state-set! current-process 'wait-for-children)
      (process-children-set! current-process ps)
      (process-sync-set! current-process
                         (map (lambda (e)
                                (effective-event rename-map e))
                              sync-list))
      (sched))))

(define (csp-seq thunk-list)
  (if (null? thunk-list)
      (csp-skip)
      (let ((rename-map (process-rename-map current-process)))
        (process-seq-stack-set!
         current-process
         (append (map (lambda (thunk)
                        (cons thunk rename-map))
                      (cdr thunk-list))
                 (process-seq-stack current-process)))
        (process-cont-set!
         current-process
         (car thunk-list))
        (set! ready-queue (cons current-process ready-queue))
        (sched))))

(define (csp-hide event-list thunk)
  (csp-rename (map (lambda (e)
                     (cons e (make-event (event-name-symbol e) current-process)))
                   event-list)
              thunk))

(define (event-name-symbol e)
  (string->symbol
   (if (channel-event? e)
       (format #f "~A.~A" (channel-event-name e) (channel-event-arg e))
       (format #f "~A" (event-name e)))))

(define (csp-rename rename-alist thunk)
  (process-rename-map-set!
   current-process
   (extend-rename-map rename-alist
                      (process-rename-map current-process)))
  (thunk))

;;; rename-map ::= (rename-alist ...)
(define (extend-rename-map rename-alist rename-map)
  (cons rename-alist rename-map))

(define (effective-event rename-map event)
  (if (null? rename-map)
      event
      (let ((p (assq event (car rename-map))))
        (if p
            (effective-event (cdr rename-map) (cdr p))
            (effective-event (cdr rename-map) event)))))

(define (rename-sync-list rename-map sync-list)
  (map (lambda (p)
         (let ((event (car p))
               (cont (cdr p)))
           (let ((e (effective-event rename-map event)))
             (if (eq? e event)
                 p
                 (cons e
                       (lambda (dummy) (cont event)))))))
       sync-list))

(define (rename-channel ch1 ch2)
  (map (lambda (chev)
         (cons chev
               (ch2 (channel-event-arg chev))))
       (lookup-chev-list ch1)))

;;; simple
(define (sched)
  (if (null? ready-queue)
      (error 'sched "deadlock")
      (let ((p (car ready-queue)))
        (set! ready-queue (cdr ready-queue))
        (process-state-set! p 'running)
        (set! current-process p)
        ((process-cont p)))))

;;; random
(define (sched)
  (if (null? ready-queue)
      (error 'sched "deadlock")
      (let ((n (length ready-queue)))
        (let ((k (random-integer n)))
          (let ((p (list-ref ready-queue k)))
            (set! ready-queue (delete p ready-queue))
            (process-state-set! p 'running)
            (set! current-process p)
            ((process-cont p)))))))

(define (csp-skip)
  (let loop ((p current-process))
    (if p
        (let ((children (process-children p)))
          (if (or (not children) (every omega? children))
              (let ((ss (process-seq-stack p)))
                (if (null? ss)
                    (begin
                      (process-state-set! p 'omega)
                      (loop (process-parent p)))
                    (begin
                      (process-seq-stack-set! p (cdr ss))
                      (process-children-set! p #f)
                      (process-sync-set! p '())
                      (process-cont-set! p (caar ss))
                      (process-rename-map-set! p (cdar ss)) ; ###
                      (set! ready-queue (cons p ready-queue))
                      (sched))))
              (sched)))
        ;; root process is terminated
        'done)))

(define (omega? p)
  (eq? (process-state p) 'omega))

(define (csp-alt sync-list)
  (let ((rename-map (process-rename-map current-process)))
    (let loop ((xs sync-list))
      (if (pair? xs)
          (let ((event (caar xs))
                (proc (cdar xs)))
            (let ((e (effective-event rename-map event)))
              (let ((ps (find-signaling-processes e)))
                (if ps
                    (begin
                      (set! trace (cons e trace))
                      (release-processes e ps)
                      (process-state-set! current-process 'ready)
                      (process-cont-set! current-process (lambda () (proc event)))
                      (set! ready-queue (cons current-process ready-queue))
                      (sched))
                    (loop (cdr xs))))))
          (begin
            (process-state-set! current-process 'wait-for-sync)
            (process-sync-set! current-process
                               (rename-sync-list rename-map sync-list))
            (sched))))))

(define count 0)
(define flag #f)

(define (find-signaling-processes event)
  (define (traverse p)
    (set! count (+ count 1))
    (if (eq? p current-process)
        (list p)
        (let ((ps (process-children p)))
          (if ps
              ;; node
              (if (memq event (process-sync p))
                  ;; and
                  (let loop ((ps ps) (rs '()))
                    (if (null? ps)
                        rs
                        (let ((qs (traverse (car ps))))
                          (if qs
                              (loop (cdr ps) (append qs rs))
                              #f))))
                  ;; or
                  (let loop ((ps ps))
                    (if (null? ps)
                        #f
                        (let ((qs (traverse (car ps))))
                          (if qs
                              qs
                              (loop (cdr ps)))))))
              ;; leaf
              (if (assq event (process-sync p))
                  (list p)
                  #f)))))
  (if flag
      (traverse
       (if (and (event? event)
                (event-creator event))
           (event-creator event)
           root-process))
      (traverse root-process)))

(define (release-processes e ps)
  (dolist (p ps)
    (unless (eq? p current-process)
      (release-process e p))))

(define (release-process e p)
  (let ((proc (cdr (assq e (process-sync p)))))
    (process-cont-set! p (lambda () (proc e)))
    (process-state-set! p 'ready)
    (process-sync-set! p '())
    (set! ready-queue (cons p ready-queue))))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;  utils

(define (ps)
  (define (traverse p)
    (describe p)
    (let ((ps (process-children p)))
      (when ps
        (dolist (p ps)
          (traverse p)))))
  (traverse root-process))

(define (de e)
  (if (channel-event? e)
      (event-name-symbol e)
      (if (event-creator e)
          (list (event-name e))
          (event-name e))))
© 2013,2014,2015 PRINCIPIA Limited