Think Stitch
PRINCIPIA  最近の更新


ロックフリーアルゴリズム:キューへの追加

ロックフリーアルゴリズムの検査に挑戦してみたいと思います。 お題はリンクリストで表したキューで、追加の操作だけ考えることにします。 CAS (Compare-And-Swap) 命令を使います。ABA 問題は扱いません。 今回はこういったキーワードを知っている人向けです。説明はなしです。 (説明できるほど知らないからです。)

追記 2013/09/20: プロセス P2 のところにコメントを追記しました。

追記 2013/09/20: プロセス P2 には問題がありました。ロックフリーアルゴリズム:キュー 2 に問題となる場合について書きました。

キューへの追加の復習

追加だけを扱うので、tail ポインタだけを考えます。 キューが空の状態でも、ダミーとして1つだけノードを持っておくことにします。

次は要素が2つ入っている状態です。

要素の追加は3ステップで行います。まず追加用の新しいノードを割り当てます。

次に tail ポインタからたどって、末尾のノードの next ポインタを新しいノードを指すように書き換えます。

最後に tail ポインタを新しいノードを指すように書き換えます。

プロセス構成

テストを行う状況を3つのプロセスで作ることにします。

プロセス QUE

メモリ空間を配列で表します。配列のインデックスがポインタ(アドレス)になります。 配列のスロット1つ1つをノードと考えます。 ノードが持つデータは省略してしまって、next ポインタだけを持つとします。 スロットに格納されている値が next ポインタの値です。 配列に格納できる値は、ポインタの他に終端を表す空列 () と、未使用ノードであることを示す #f であるとします。 未使用ノードを区別するのは、キューが正しい状態にあるかどうかを判別できるようにするためです。 スロット 0 はダミーとして使用します。したがってインデックス 0 の値が常にリストの先頭を指すポインタであるということになります。

プロセス QUE には3つのプロセスパラメータを持たせます。

tail ポインタを読み書きするチャネルを rd-tail, wr-tail とします。

メモリ空間、つまりノードの next ポインタを読み書きするチャネルを rd-next, wr-next とします。 これらはインデックスと値の2つのパラメータを取ります。 以前にリングバッファ等で作ったバッファと同じです。

プロセス定義は次のようになります。 基本的には以前に作った1変数とバッファを1つにまとめただけのものです。

(define-process (QUE c tail ns)
  (alt
    (! rd-tail (tail)
       (QUE c tail ns))
    (? wr-tail (p)
       (QUE (consistent p ns) p ns))
    (? rd-next (k p) (eqv? (ref ns k) p)
       (QUE c tail ns))
    (? wr-next (k p)
       (let ((ns-u (update ns k p)))
         (QUE (consistent tail ns-u)
              tail ns-u)))
    (! terminate
       (if (= c 1)
           SKIP
           STOP))))

関数 ref, update は、それぞれインデックスで指定されたリストの要素を参照、更新する関数です。 これも以前に作りました。 名前をちょっと短くしただけです。

(define (ref xs k) (list-ref xs k))

(define (update xs k x)
  (if (= k 0)
      (cons x (cdr xs))
      (cons (car xs)
            (update (cdr xs) (- k 1) x))))

関数 consistent はキューの状態が正しいかどうかに応じて 1, 0 を返す関数です。 キューを更新した場合は、この関数を使ってパラメータ c を更新しておきます。 こうしておくと、プロセスの実行過程を計算木上で見たときに、いつ正しい状態にあるかひとめでわかります。 わざわざ先頭においてあるのも、値を 1, 0 と短くて見やすいものにしてあるのもそのためです。 さらに、終了時にキューの状態が正しいかどうかに応じて、正常終了するか停止するかを分けておきます。 こうしておくと、デッドロック検査でアルゴリズムが正しかったかどうか判別できるからです。

キューが正しい状態にある条件は次のようになります。

  1. ノードのリストは循環しておらず、空列 () で終端していなければなりません。
  2. リストに含まれていないノードはすべて未使用のノードでなければなりません。つまり宙に浮いたノードはないということです。

1番目の条件を検査するために、補助関数 track を定義します。 先頭のインデックス 0 から next ポインタをたどりながら各ノードのポインタ値を変数 rs に集積します。 途中循環が発見された場合や、未使用ノードが紛れ込んでいた場合には #f を返します。 正しく終端 () に達した場合は、インデックスのリスト rs を返します。これはあとで使います。

(define (track ns)
  (let loop ((p 0) (rs '()))
    (if (memv p rs) ; cyclic
        #f
        (let ((q (ref ns p)))
          (cond ((null? q)
                 (cons p rs))
                ((not q)
                 #f)
                (else
                 (loop q (cons p rs))))))))

関数 consistent の定義は次のとおりです。

(define (consistent tail ns)
  (let ((rs (track ns)))
    (if rs
        (if (and (eqv? tail (car rs))
                 (for-all (lambda (p) (not (ref ns p)))
                          (lset-difference eqv? I rs)))
            1 0)
        0)))

まず関数 track を呼び出してリストを構成するインデックスのリストをもらいます。 この時点で不正であれば #f が返ってくるので、不正であることを示す符号 0 を返します。

リストが正しかった場合は、未使用のノードを調べます。 全アドレス空間のうち、rs に含まれていないアドレスのノードはすべて未使用であるはずです。これを検査します。

加えて、tail ポインタはリストの末尾を指し示していなければなりません。 インデックスのリスト rs はリストの逆順なので、先頭が tail と一致するはずです。

プロセス MM

メモリのアロケータプロセス MM は、フリーなノードのポインタリストを持っていて、要求されるたびにそのうちの1つを返します。 ポインタのリストはプロセスパラメータ pool で表します。 初期値はダミー 0 を除くすべてのインデックスのリストです。

ノードをアロケートするチャネルを alloc とします。フリーノードがない場合には empty を返します。

(define-process (MM pool)
  (alt
    (if (null? pool)
        (! empty (MM '()))
        (! alloc ((car pool))
           (MM (cdr pool))))
    (! terminate SKIP)))

プロセス P0

まずはじめは、スレッドのことを考えていない、上の図で示したとおりの追加操作を行うプロセス P0 を試してみます。

(define-process P0
  (alt
    (! empty SKIP)
    (? alloc (p)               ; (1)
       (! wr-next (p '())      ; (2)
          (? rd-tail (q)       ; (3)
             (! wr-next (q p)  ; (4)
                (! wr-tail (p) ; (5)
                   P0)))))))

システムプロセス SYS

以上のプロセスを並行合成して、システムプロセスを作ります。 I はすべてのポインタ値(配列のインデックス)からなるリストで、メモリ空間の大きさを N とすると I = (interval 0 N) です。 いろいろな追加コード P とプロセス数 np を試すことができるように、パラメータ化してあります。 メモリの初期値は、インデックス 0 だけ空リスト () で、あとはすべて #f です。 MM の初期値は I からダミー 0 を除いたものです。

(define-process (SYS P np)
  (par (list rd-tail wr-tail cas-tail
             rd-next wr-next cas-next 
             empty alloc
             terminate)
    (par (list terminate)
      (QUE 1 0 (cons '() (map (lambda (p) #f) (cdr I))))
      (MM (cdr I)))
    (seq
      (if (= np 1)
          P
          (xpar k (interval 0 np) '() P))
      (! terminate SKIP))))

検査

プロセス QUE に入れた仕掛けにより、デッドロック検査でアルゴリズムの正しさを判断できます。 結果は次のとおりです。プロセスが1つの場合に問題がないのは当然です。 プロセスが2個で問題が発見されました。

問題のパスは次のとおりです。

1つ目のプロセスがノード 1 を追加したのに、2番目のプロセスがそれをノード 2 で上書きしてしまっています。

CAS (Compare-And-Swap) によるアルゴリズム

ロックフリーアルゴリズムを実装するために、プロセス QUE に CAS (Compare-And-Swap) 操作を追加します。 tail 用の cas-tail と next ポインタ用の cas-next をそれぞれ用意します。

プロセス QUE

(define-process (QUE c tail ns)
  (alt
    (! rd-tail (tail)
       (QUE c tail ns))
    (? wr-tail (p)
       (QUE (consistent p ns) p ns))
    (? rd-next (k p) (eqv? (ref ns k) p)
       (QUE c tail ns))
    (? wr-next (k p)
       (let ((ns-u (update ns k p)))
         (QUE (consistent tail ns-u)
              tail ns-u)))
    (? cas-tail (q r b)
       (eqv? (eqv? tail q) b)
       (if b
           (QUE (consistent r ns) r ns)
           (QUE c tail ns)))
    (? cas-next (p q r b)
       (eqv? (eqv? (ref ns p) q) b)
       (if b
           (let ((ns-u (update ns p r)))
             (QUE (consistent tail ns-u)
                  tail ns-u))
           (QUE c tail ns)))
    (! terminate
       (if (= c 1)
           SKIP
           STOP))))

1回の同期ですべての仕事を行うために、ちょっと凝った作りになっています。 基本は rd-next と同じく、ガードを使って入出力を同時に行う方法です。

cas-next は4つのパラメータ p, q, r, b を取ります。 p は更新の対象とするノードへのポインタ(アドレス、インデックス)、q は比較する値、r は書き込む値です。 b は更新が行われたかどうかを示す論理値です。

もしノード p の next ポインタの値が q ならば r を書き込み b=#t とします。 そうでなければ書き込みは行わず b=#f とします。 比較と更新は不可分(アトミック)に行います。

cas-tail の方は対象が tail と決まっているのでパラメータ p がありません。

プロセス P1

P0 のときは、終端ノードの next ポインタを上書きしてしまったのが問題でした。 そこで終端ノードの next ポインタを書き換えるときに、空リスト () であることを確認してから行うことにします。 確認と更新は不可分に行わなければならないので CAS を使います。

こうすると、競合するプロセスの中で誰か一人だけが更新に成功することになります。 すると後から来たプロセスは tail の指すノードの next ポインタが () でないことを発見します。 tail の更新は next ポインタの更新に成功したプロセスの責任にすれば、他のプロセスは待つだけで済みます。 こちらは排他を必要としないので、通常の更新 wr-tail でかまいません。

(define-process P1
  (alt
    (! empty SKIP)
    (? alloc (new-node)
       (! wr-next (new-node '())
          (P1A new-node)))))

(define-process (P1A new-node)
  (? rd-tail (tail)
     (? cas-next (p q r b)
        (and (eqv? p tail)
             (eqv? q '())
             (eqv? r new-node))
        (if b
            (! wr-tail (new-node) P1)
            (P1A new-node)))))

検査

検査結果は次のようになりました。少なくともこの検査の範囲では問題がないようです。

発散検査

すべてのイベントを隠蔽して発散がないかどうか調べてみます。

すべてのイベントを隠蔽したシステムプロセス HSYS

次のようにすべてのイベントを隠蔽します。

(define-process (HSYS P np)
  (hide Events
    (SYS P np)))

発散検査

発散検査の結果は次のようになりました。

P0 の場合に発散がないのはループがないのであたりまえですね。 P1 には発散があります。隠蔽をはがしてパスを求めると次のようになります。

1つ目のプロセスが終端ノードの next ポインタ更新に成功したあとでもう1つのプロセスに切り替わり、永久に空振りの cas を実行するパスがあります。 もしプロセスに優先順位があって、後者の方が高い優先順位を持っていた場合、このコードは前に進まない可能性があるということです。

発散のないアルゴリズム

ネットでロックフリーアルゴリズムを調べていたら、"Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" という論文がありました。 この中にあるコードを調べてみます。 以下は論文からの引用です。 一部間違いがありますけど、原文そのままにしてあります。 また、このコードは ABA 問題にも対処してあります。(モデルでは無視します。)

structure pointer_t { ptr: pointer to node_t,
                      count: unsigned integer g}
structure node_t { value: data type, next: pointer_t g}
structure queue_t { Head: pointer_t, Tail: pointer_t g}

initialize(Q: pointer to queue_t)
    node = new node()
    node->next.ptr = NULL
    Q->Head = Q->Tail = node

enqueue(Q: pointer to queue_t, value: data type)
E1: node = new node()
E2: node->value= value
E3: node->next.ptr = NULL
E4: loop
E5:   tail = Q->Tail
E6:   next = tail.ptr->next
E7:   if tail == Q->Tail
E8:     if next.ptr == NULL
E9:       if CAS(&tail.ptr->next, next, <node, next.count+1>)
E10:        break
E11:      endif
E12:    else
E13:      CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
E14:    endif
E15:   endif
E16: endloop
E17: CAS(&Q->Tail, tail, <node, tail.count+1>)

プロセス P2

上記のコードをほぼそのままプロセス式にしました。 ただし、Q->Tail 等の変数を複数回参照しているところは、ローカルなプロセスパラメータに引き取っているので参照を省略してあります。 (追記 2013/09/20: E7 は省略してはいけないかもしれません。取り急ぎご注意だけでご容赦ください。時間ができたらもっと追求してみます。)

(追記 2013/09/20: E7 を省略すると正しく動作しない場合を見つけました。ロックフリーアルゴリズム:キュー 2 をご覧ください。)
(define-process P2
  (alt
    (! empty SKIP)
    (? alloc (new-node)
       (! wr-next (new-node '())
          (P2A new-node)))))

(define-process (P2A new-node)
  (? rd-tail (tail)
     (? rd-next (p next) (eqv? p tail)
        (if (null? next)
            (? cas-next (p q r b)
               (and (eqv? p tail)
                    (eqv? q '())
                    (eqv? r new-node))
               (if b
                   (? cas-tail (q r b)
                      (and (eqv? q tail)
                           (eqv? r new-node))
                      P2)
                   (P2A new-node)))
            (? cas-tail (q r b)
               (and (eqv? q tail)
                    (eqv? r next))
               (P2A new-node))))))

検査

検査結果は次のようになりました。

tail が終端ノードを指していないことを最初に発見したプロセス自身が tail を更新しているので、必ず処理が前に進み、発散しないのですね。 いやはや驚きました。

2013/09/13 "プログラマの日"
© 2013,2014,2015 PRINCIPIA Limited