8 Concurrency

これまで、私達は1スレッドでの実行しか見てきませんでした。今こそ並行性を導入する時です。Oz では制御の新しい並行スレッドは次によって生み出されます:

thread S end

この文の実行により、現在のスレッドと並行して走るスレッドが分岐(fork)します。現在のスレッドは次の文によって再開します。それぞれの終了していないかつブロックされていないスレッドは、プロセッサの時分割で最終的に割り当てられます。これはスレッドは公平に実行される事を意味します。

しかしながら、3つの優先度レベルがあります: high, medium, そして low です。これらは時分割でどの程度スレッドが割り当てられるかを決定します。Oz では、高優先度のスレッドは低優先度のスレッドを飢え(starve)させません。優先度はスレッドが得る事の出来るプロセッサの資源がどの程度の割合の大きさなのかを指定するだけです。

それぞれのスレッドはユニークな名前を持っています。現在のスレッドの名前を得るには、手続き Thread.this/1 を呼び出します。名前を使ってスレッドへの参照を持つ事は、スレッドにおいてスレッドの終了やスレッドでの例外発生の操作を行う事を可能にします。スレッド操作は基本モジュール Thread で定義されています。

私達がスレッドについて何が出来るかを見てみましょう。最初に、各スレッドはデータの依存によりブロックされるデータフロースレッドである事を思い出して下さい。以下のプログラムについて考えましょう:

declare X0 X1 X2 X3 in 
thread 
   local Y0 Y1 Y2 Y3 in 
      {Browse [Y0 Y1 Y2 Y3]}  
      Y0 = X0+1
      Y1 = X1+Y0
      Y2 = X2+Y1
      Y3 = X3+Y2
      {Browse completed}
   end 
end 
{Browse [X0 X1 X2 X3]}

このプログラムを入力して Browser で見ると、変数が未束縛な事が見えるでしょう。今、以下の文を一度に一つずつ入力して何が起こるかを観察しましょう:

X0 = 0
X1 = 1
X2 = 2
X3 = 3

あなたはどの様にスレッドが再開し再び一時停止するかを見るでしょう。最初に X0 が束縛されるとスレッドは Y0 = X0+1 を実行し、次の Y1 = X1+Y0 の実行中に X1 の値を必要とするので再び一時停止します、等々。


fun {Map Xs F}
   case Xs
   of nil then nil
   [] X|Xr then thread {F X} end |{Map Xr F}
   end 
end

Figure 8.1: 並行 Map 関数


Figure 8.1 で示されるプログラムは並行 Map 関数を定義しています。thread ... end がここでは一つの式として使われている事に注視して下さい。このプログラムの振る舞いについて論じましょう。私達が以下の文に入った時:

declare 
F X Y Z
{Browse thread {Map X F} end}

スレッド実行 Map が作られます。それは X が未束縛なためにcase文で即時に一時停止するでしょう。その後、私達は以下の文に入ります:

X = 1|2|Y
fun {F X} X*end

メインのスレッドはリストの最初の2つの引数 thread {F 1} endthread {F 2} end のために2つのスレッドを作るリストを走査し、リストの尾部 Y で再び一時停止するでしょう。最終的に、

Y = 3|Z
Z = nil

はメインのスレッドと新しく生成されたスレッド thread {F 3} end の計算を完了し、最終的なリスト [1 4 9] として結果が出ます。

Figure 8.2 で示されるプログラムは並行分割統治プログラムで、それは Fibonacci 関数を計算するかなり非効率的なやり方です。このプログラムは指数的な数のスレッドを作ります!並行スレッドを生成するのがいかに容易かを見ましょう。あなたはこのプログラムをあなたの Oz の環境がどの程度多くのスレッドを生成出来るかのテストに使う事が出来ます。次を試して見て下さい

{Browse {Fib 25}}

Oz メニューで panel プログラムを使ってスレッドを見る事が出来ます。うまく動いたら、もっと大きな数で試してみて下さい。panel は Figure 8.3 に示されています。


fun {Fib X}
   case X
   of 0 then 1
   [] 1 then 1
   else thread {Fib X-1} end  + {Fib X-2} end 
end

Figure 8.2: 並行 Fibonacci 関数



Figure 8.3: Mozart Panel で {Fib 26 X} の作るスレッドを見る


Oz での明示的なスレッド生成のアイディアはプログラマに彼/彼女のアプリケーションをモジュールとして構築する事を可能にします。この点において Mozart システムはとりわけ優れています。スレッドは 100000 個を生成出来る程に安価です。Mozart 1.0 でのスレッド生成は Java JDK 1.2 に比べて約60倍高速です。並行性があなたのプログラムの構造を易しくするなら、遠慮なくそれを使いましょう。しかしながら同じ構造を持っているのであれば線形プログラムの方が並行プログラムより常により速いです。Figure 8.2Fib プログラムは thread ... end を除いた方が速いです。それゆえ、ただ並行性が楽しいからという理由ではなく、アプリケーションが必要とする時のみスレッドを生成するようにしましょう。

8.1 Time

モジュール Time では、いくつもの有用なソフトリアルタイム手続きを見られます。それらは:


local 
   proc {Ping N}
      if N==then {Browse 'ping terminated'}
      else {Delay 500} {Browse ping} {Ping N-1} end 
   end 
   proc {Pong N}
      {For 1 N 1  
         proc {$ I} {Delay 600} {Browse pong} end }
      {Browse 'pong terminated'}
   end 
in 
   {Browse 'game started'}
   thread {Ping 50} end 
   thread {Pong 50} end 
end

Figure 8.4: 'Ping Pong' プログラム


Figure 8.4 で示されるプログラムは2つのスレッドを開始し、1つは ping を500ミリ秒後に定期的に表示し、他方は pong を600秒後に表示します。いくつかの ping は周期の違いのためにそれぞれの後に即座に表示されるでしょう。

8.1.1スタンドアロンなアプリケーションを作る

Mozart でスタンドアロンなアプリケーションを作るのは簡単です。私達はこの事を Figure 8.4 のプログラムを Figure 8.5 に示されるプログラムのファンクタを作る事によってスタンドアロンにする事によって見せ、それをファイル PingPong.oz に保存します。その後、次のコマンドを使って下さい:

 ozc -x PingPong.oz 

そしたら PingPong とシェルに打ち込んでプログラムを開始して下さい。1


functor 
import 
   Browser(browse:Browse) % Browser モジュールから Browse をインポート
define 
   proc {Ping N}
      if N==then {Browse 'ping terminated'}
      else {Delay 500} {Browse ping} {Ping N-1} end 
   end 
   proc {Pong N}
      {For 1 N 1  
         proc {$ I} {Delay 600} {Browse pong} end }
      {Browse 'pong terminated'}
   end 
in 
   {Browse 'game started'}
   thread {Ping 50} end 
   thread {Pong 50} end 
end

Figure 8.5: スタンドアロン 'Ping Pong' プログラム


8.2 Stream Communication

Oz のデータフローの性質は生産者-消費者パターンでストリームを通したやり取りを行うスレッドを書くのを容易にしてくれます。ストリームは生産者スレッドによって漸増的に作られ、その後に消費者スレッドによって消費されるリストです。スレッドはストリームの同じ要素を消費します。例として Figure 8.6 のプログラムはストリームコミュニケーションの例で、そこでは生産者が数のリストを作り、消費者が全ての数を足し合わせています。


fun {Generator N}
   if N > 0 then N|{Generator N-1}
   else nil end 
end 
local 
    fun {Sum1 L A}
      case L
      of nil then A
      [] X|Xs then {Sum1 Xs A+X}
      end 
    end 
in fun {Sum L} {Sum1 L 0} end 
end

Figure 8.6: リストの要素の足し合わせ


以下のプログラムを走らせて上のプログラムを試して下さい:

{Browse thread {Sum thread {Generator 150000} end } end}

11250075000 という数が出てくるでしょう。ストリームコミュニケーションの働きについて理解しましょう。以下の例でvolvoを作る時、生産者は漸増的に要素のストリーム(リスト)を作ります。これは一般に性急(eager)な流儀です。

fun {Producer ...... volvo|{Producer ...... end

消費者はストリームをアイテムが到着するまで待ち、到着するとアイテムは次の様に消費されます:

proc {Consumer Ls ...}
   case Ls of volvo|Lr then 
'Consume volvo' ... end 
   {Consumer Lr}
end

case 文のデータフローの振る舞いは消費者をストリームの次のアイテムが到着するまで一時停止させます。再帰呼び出しは消費者に動作の繰り返しを可能にします。以下のパターンは反復を代わりに使う事により再帰の使用を避けています:

proc {Consumer Ls ...}
   {ForAll Ls
    proc {$ Item}
       case Item of volvo then  
          
Consume volvo ...  
       end 
    end}
end

Figure 8.7 はこのパターンを使った簡単な例を示しています。消費者は受け取った車(volvo)を数えます。1000 の車を受け取るごとに、Browser にメッセージを表示します。


fun {Producer N}
   if N > 0 then 
      volvo|{Producer N-1}
   else nil end 
end 
local 
   proc {ConsumerN Ls N}
      case Ls of nil then skip 
      [] volvo|Lr then 
         if N mod 1000 == 0 then 
            {Browse 'riding a new volvo'}
         end 
         {ConsumerN Lr N+1}
      else 
         {ConsumerN {List.drop Ls 1} N}
      end 
   end 
in 
   proc {Consumer Ls} {ConsumerN Ls 1} end 
end

Figure 8.7: volvo を生産


このプログラムは以下を使って走らせます:

{Consumer thread {Producer 10000} end}

注意:文を emulator にフィードすると、それは自身のスレッドで実行されます。それゆえ、上の文をフィードすると2つのスレッドが作られます。メインは消費者のためのもので、フォークしたスレッドは生産者のためのものです。

消費者は再帰パターンを使って書かれている事に注目して下さい。私達はこのプログラムを繰り返しの ForAll/2 構造を使ってプログラム出来るでしょうか?消費者が結果を次の再帰に受け渡すための追加の引数 N を伴っているのでこれは不可能です(訳注:この様な引数はaccumulaterと呼ばれ関数型プログラミングでは一般的に使われます)。この引数は状態の一種に対応します。一般的に、2つの解法があります。Section 9.4で行われるようにステートフル(mutable 代入可能)なデータ構造を導入するか、状態を持ち回る別のイテレータ(iterator 反復子)を定義するかです。私達の場合、目的に適するいくつかの反復子はモジュール List にあります。最初に、私達は volvo 以外をフィルターして除去する反復子を必要とします。Ys に Boolean 関数として使われる手続き P/2 を満たす全要素を出力する {Filter Xs P ?Ys} を使う事が出来ます。2番目の構築は {List.forAllInd Xs P} で、それは ForAll に似ていますが P/2 は最初の引数としてリストの現在の要素の 1 から始まるインデックスを取り、2番目の引数としてリストの要素を取ります。これがプログラムです:

proc {Consumer Ls}
   fun {IsVolvo X} X == volvo end 
   Ls1
in 
   thread Ls1 = {Filter Ls IsVolvo} end 
   {List.forAllInd Ls1
      proc {$ N X}
         if N mod 1000 == 0 then 
           {Browse 'riding a new volvo'}
         end 
      end}
end

8.3スレッド優先度(thread priority)とリアルタイム

以下の文を使ってプログラムを走らせてみましょう:

{Consumer thread {Producer 5000000} end}

panel に切り替えてプログラムのメモリの振る舞いを観察しましょう。あなたはすぐにこのプログラムが良い振る舞いをしていない事に気付くでしょう。その理由は非同期メッセージパッシング(asynchronous message passing)です。生産者がメッセージを送った時、すなわちストリームに新しい要素を生成した時、それが消費者が消費するより速いレートの場合、システムが壊れ始めるまでより多いバッファを必要とし続けます。2 この問題を解決するためのいくつかの方法があります。一つは生産者と消費者の間で有界バッファ(bounded buffer)を作る事で、これは後で論じます。他の方法はスレッドの実行スピードをスレッド優先度の変更によって変える事で、消費者が生産者より多くの時分割を得るようにする事です。

モジュール ThreadProperty はスレッドへのいくつもの適切な操作を提供します。これらのうちいくつかは Table 8.1 で分類されます。


手続き

記述

{Thread.state +T ?A}

T の現在の状態を返す

{Thread.suspend +T}

T を一時停止する

{Thread.resume +T}

T を再開する

{Thread.terminate +T}

T を終了する

{Thread.injectException ++E}

T で例外 E を発生させる

{Thread.this +T}

現在のスレッド T を返す

{Thread.setPriority ++P}

T の優先度を設定する

{Thread.setThisPriority +P}

現在のスレッドの優先度を設定する

{Property.get priorities ?Pr }

システムの優先度の比を取得する

{Property.put priorities(high:+X medium:+Y)}

システムの優先度の比を設定する

Table 8.1: スレッドの操作


Oz は3つの優先度レベルを持っています。システム手続き

{Property.put priorities(high:X medium:Y)}

はプロセッサ時間の比を高優先度(high)のスレッド:中優先度(medium)のスレッドで X:1 と設定します。これは同時にプロセッサ時間の比を中優先度(medium):低優先度(low)のスレッドで Y:1 と設定します。XY は整数です。そのため、もし私達が次を実行すると

{Property.put priorities(high:10 medium:10)}

走る事の出来る高優先度スレッドの 10 回の時分割の割り当てごとにシステムは中優先度のスレッドを1回割り当て、同様の事が中優先度と低優先度の間で行われます。同じ優先度レベルでは、スケジューリングは公平でラウンドロビンです。さあ、私達の生産者-消費者プログラムを動くようにしましょう。私達は生産者に低い優先度を与え、消費者に高い優先度を与えます。優先度の比を 10:110:1 に設定します。

local L in 
   {Property.put threads priorities(high:10 medium:10)}
   thread 
     {Thread.setThisPriority low}
      L = {Producer 5000000}
   end 
   thread 
       {Thread.setThisPriority high}
       {Consumer L}
   end 
end    

8.4要求駆動(demand-driven)実行

やや過激な代替の解法は、生産者を遅延実行(lazy)にし、消費者が要求した時のみアイテムを生産する事です。消費者はこの場合、未束縛の変数(空の箱)によるストリームを構築します。生産者は未束縛の変数(空の箱)がストリームに見えるまで待ちます。それから変数を束縛します(箱を埋めます)。生産者の一般的なパターンは以下になります。

proc {Producer Xs}
   case Xs of X|Xr then  
      I in 'Produce I'  
      X=I ...  
      {Producer Xr}  
   end 
end

消費者の一般的なパターンは以下になります。

proc {Consumer ... Xs}
   X Xr in  
      ...  
      Xs = X|Xr  
      'Consume X' 
      ... {Consumer ... Xr}
end  

Figure 8.8 で見るプログラムは Figure 8.7 のプログラムの要求駆動版です。あなたはこれを大量のvolvoで走らせられます!


local 
   proc {Producer Xs}
      case Xs of X|Xr then X = volvo {Producer Xr}
      [] nil then {Browse 'end of line'}
      end 
   end 
   proc {Consumer N Xs}
      if N=<then Xs=nil
      else X|Xr = Xs in 
         if X == volvo then 
            if N mod 1000 == 0 then 
               {Browse 'riding a new volvo'}
            end 
            {Consumer N-1 Xr}
         else 
            {Consumer N Xr}
         end 
      end 
   end 
in 
   {Consumer 10000000 thread {Producer $} end}
end

Figure 8.8: volvo を遅延的に生産する


8.4.1フューチャ(future)

プログラムを要求駆動計算にするのに他の方法があります。future の記法と ByNeed プリミティブ操作を使う事です。フューチャ(future)は論理変数のリードオンリー版です。例として変数 X のフューチャを作るのに、私達は操作 !! でフューチャ Y を作ります。

 Y = !!

スレッドがフューチャの値を使おうとする時、例えば Y を使うときは X が束縛されるまで一時停止するでしょう。

手続きを遅延的に実行しようとする一つの方法、つまり、要求駆動の様式で行う事とは、操作 {ByNeed +P ?F} を使う事です。ByNeed は1引数の手続き P を取り、フューチャ F を返します。スレッドが F の値にアクセスしようとする時、手続き {P X} が呼び出され、結果の値 XF に束縛されます。これは私達に直接的な様式での要求駆動計算を可能にします。例として次をフィードして下さい

declare Y
{ByNeed proc {$ X} X=1 end Y}
{Browse Y}

私達は Y がフューチャになる事を観察するでしょう、つまり私達は Y<Future> と Browser で見るでしょう。もし私達が Y の値にアクセスしようとしれば、それは 1 に束縛されるでしょう。Y にアクセスする一つの方法は、手続きの生成を発火(trigger)する操作 {Wait Y} を行う事です。

今、私達は Figure 8.8 のプログラムを Figure 8.9 の様に書き直せます。これは Figure 8.7 とよく似て見えます。


local 
   proc {Producer Xs}
      Xr in 
      Xs = volvo|{ByNeed {Producer Xr} $}
   end 
   proc {Consumer N Xs}
      if N>then 
         case Xs of X|Xr then 
            if X==volvo then 
              if N mod 1000 == 0 then 
                 {Browse 'riding a new volvo'}
              end 
              {Consumer N-1 Xr}
            else {Consume N Xr} end 
         end   
      end 
   end 
in 
   {Consumer 10000000 thread {Producer $} end}
end

Figure 8.9: volvo を ByNeed を使って生産する


8.5スレッド終了の検知

私達はスレッドが文 thread S end を使ってどの様に分岐するかを見ました。ここで思いつく自然な質問は、どうやって分岐したスレッドを元の制御のスレッドに合流させて戻すかです。実際には、これは複数のスレッドの終了検知の特別な場合で、他のスレッドをそのイベントを待つようにします。Oz はデータフロー言語なので、一般的な枠組みはとても簡単です。

thread T1 X1=unit end 
thread 
T2 X2=X1 end 
... 
thread 
TN XN=XN-1 end 
{Wait 
XN}
MainThread

全てのスレッドが終了する時、変数 X1 ... XN は一緒に併合され unit に束縛されます。{Wait XN} sは XN が束縛されるまでメインのスレッドを一時停止させます。

Figure 8.10 で私達は、上の筋書きに沿った制御構造の並行組み合わせを実装する、高階の構造(コンビネータ)を定義します。それは引数無しの手続きのリストを単一の引数として取ります。それが実行される時、手続きは並行に分岐します。次の文はリスト中の全ての手続きが終了した時のみ実行されます。


local 
    proc {Conc1 Ps I O}
      case Ps of P|Pr then M in  
         thread {P} M = I end 
         {Conc1 Pr M O}
      [] nil then O = I
      end 
    end 
in 
   proc {Conc Ps} {Wait {Conc1 Ps unit $}} end 
end

Figure 8.10: 並行組み合わせ


プログラム Figure 8.5PingPong スレッドが終了した時には適切に終了しません。このプログラムに今手を打つ事が出来ます。私達が Application.exit/1 を使えば、スタンドアロンのアプリケーションは残ったスレッドをアボートして終了します。PingPong のスレッドが終了した時のみメインのスレッドが終了するように変更する事も出来ます。これは Figure 8.11 で示されます。


functor 
import 
   Browser(browse:Browse) % Browser モジュールから Browse をインポート
   Application
define 
   proc {Ping N}
      if N==then {Browse 'ping terminated'}
      else {Delay 500} {Browse ping} {Ping N-1} end 
   end 
   proc {Pong N}
      {For 1 N 1  
         proc {$ I} {Delay 600} {Browse pong} end }
      {Browse 'pong terminated'}
   end 
   X1 X2
in 
   {Browse 'game started'}
   thread {Ping 50} X1=unit end 
   thread {Pong 50} X2=X1 end 
   {Wait X2}
   {Application.exit 0}
end

Figure 8.11: スタンドアロンの 'Ping Pong' プログラム



1. このプログラムを終了させるにはOSのシェルでCONTROL-Cを実行して下さい。後で適切に終了させる方法を見ます。
2. 皮肉な事に Mozart システムで分散プログラミングの機能を使う時には、サイト間でのストリームコミュニケーションの方がよく働きます、これはデザインされたフロー制御の機構がネットワークバッファが一杯の時に手続きを一時停止させるためです。

Seif Haridi and Nils Franz�n
Version 1.4.0 (20080704)