<< Prev | - Up - | Next >> |
これまで、私達は1スレッドでの実行しか見てきませんでした。今こそ並行性を導入する時です。Oz では制御の新しい並行スレッドは次によって生み出されます:
thread
Send
この文の実行により、現在のスレッドと並行して走るスレッドが分岐(fork)します。現在のスレッドは次の文によって再開します。それぞれの終了していないかつブロックされていないスレッドは、プロセッサの時分割で最終的に割り当てられます。これはスレッドは公平に実行される事を意味します。
しかしながら、3つの優先度レベルがあります: high, medium, そして low です。これらは時分割でどの程度スレッドが割り当てられるかを決定します。Oz では、高優先度のスレッドは低優先度のスレッドを飢え(starve)させません。優先度はスレッドが得る事の出来るプロセッサの資源がどの程度の割合の大きさなのかを指定するだけです。
それぞれのスレッドはユニークな名前を持っています。現在のスレッドの名前を得るには、手続き Thread.this/1
を呼び出します。名前を使ってスレッドへの参照を持つ事は、スレッドにおいてスレッドの終了やスレッドでの例外発生の操作を行う事を可能にします。スレッド操作は基本モジュール Thread
で定義されています。
私達がスレッドについて何が出来るかを見てみましょう。最初に、各スレッドはデータの依存によりブロックされるデータフロースレッドである事を思い出して下さい。以下のプログラムについて考えましょう:
declare X0 X1 X2 X3 in
thread
local Y0 Y1 Y2 Y3 in
{Browse [Y0 Y1 Y2 Y3]}
Y0 = X0+1
Y1 = X1+Y0
Y2 = X2+Y1
Y3 = X3+Y2
{Browse completed}
end
end
{Browse [X0 X1 X2 X3]}
このプログラムを入力して Browser で見ると、変数が未束縛な事が見えるでしょう。今、以下の文を一度に一つずつ入力して何が起こるかを観察しましょう:
X0 = 0
X1 = 1
X2 = 2
X3 = 3
あなたはどの様にスレッドが再開し再び一時停止するかを見るでしょう。最初に X0
が束縛されるとスレッドは Y0 = X0+1
を実行し、次の Y1 = X1+Y0
の実行中に X1
の値を必要とするので再び一時停止します、等々。
fun {Map Xs F}
case Xs
of nil then nil
[] X|Xr then thread {F X} end |{Map Xr F}
end
end
Figure 8.1 で示されるプログラムは並行 Map
関数を定義しています。thread ... end
がここでは一つの式として使われている事に注視して下さい。このプログラムの振る舞いについて論じましょう。私達が以下の文に入った時:
declare
F X Y Z
{Browse thread {Map X F} end}
スレッド実行 Map
が作られます。それは X
が未束縛なためにcase文で即時に一時停止するでしょう。その後、私達は以下の文に入ります:
X = 1|2|Y
fun {F X} X*X end
メインのスレッドはリストの最初の2つの引数 thread {F 1} end
と thread {F 2} end
のために2つのスレッドを作るリストを走査し、リストの尾部 Y
で再び一時停止するでしょう。最終的に、
Y = 3|Z
Z = nil
はメインのスレッドと新しく生成されたスレッド thread {F 3} end
の計算を完了し、最終的なリスト [1 4 9]
として結果が出ます。
Figure 8.2 で示されるプログラムは並行分割統治プログラムで、それは Fibonacci 関数を計算するかなり非効率的なやり方です。このプログラムは指数的な数のスレッドを作ります!並行スレッドを生成するのがいかに容易かを見ましょう。あなたはこのプログラムをあなたの Oz の環境がどの程度多くのスレッドを生成出来るかのテストに使う事が出来ます。次を試して見て下さい
{Browse {Fib 25}}
Oz メニューで panel プログラムを使ってスレッドを見る事が出来ます。うまく動いたら、もっと大きな数で試してみて下さい。panel は Figure 8.3 に示されています。
fun {Fib X}
case X
of 0 then 1
[] 1 then 1
else thread {Fib X-1} end + {Fib X-2} end
end
Oz での明示的なスレッド生成のアイディアはプログラマに彼/彼女のアプリケーションをモジュールとして構築する事を可能にします。この点において Mozart システムはとりわけ優れています。スレッドは 100000 個を生成出来る程に安価です。Mozart 1.0 でのスレッド生成は Java JDK 1.2 に比べて約60倍高速です。並行性があなたのプログラムの構造を易しくするなら、遠慮なくそれを使いましょう。しかしながら同じ構造を持っているのであれば線形プログラムの方が並行プログラムより常により速いです。Figure 8.2 の Fib
プログラムは thread ... end
を除いた方が速いです。それゆえ、ただ並行性が楽しいからという理由ではなく、アプリケーションが必要とする時のみスレッドを生成するようにしましょう。
モジュール Time
では、いくつもの有用なソフトリアルタイム手続きを見られます。それらは:
{Alarm I ?U}
即座に自身のスレッドを作り、U
を unit
に I
ミリ秒後に束縛します。
{Delay I}
スレッドの実行を最低でも I
ミリ秒一時停止し、skip
に減らします。
local
proc {Ping N}
if N==0 then {Browse 'ping terminated'}
else {Delay 500} {Browse ping} {Ping N-1} end
end
proc {Pong N}
{For 1 N 1
proc {$ I} {Delay 600} {Browse pong} end }
{Browse 'pong terminated'}
end
in
{Browse 'game started'}
thread {Ping 50} end
thread {Pong 50} end
end
Figure 8.4 で示されるプログラムは2つのスレッドを開始し、1つは ping
を500ミリ秒後に定期的に表示し、他方は pong
を600秒後に表示します。いくつかの ping
は周期の違いのためにそれぞれの後に即座に表示されるでしょう。
Mozart でスタンドアロンなアプリケーションを作るのは簡単です。私達はこの事を Figure 8.4 のプログラムを Figure 8.5 に示されるプログラムのファンクタを作る事によってスタンドアロンにする事によって見せ、それをファイル PingPong.oz
に保存します。その後、次のコマンドを使って下さい:
ozc -x PingPong.oz
そしたら PingPong とシェルに打ち込んでプログラムを開始して下さい。1
functor
import
Browser(browse:Browse) % Browser モジュールから Browse をインポート
define
proc {Ping N}
if N==0 then {Browse 'ping terminated'}
else {Delay 500} {Browse ping} {Ping N-1} end
end
proc {Pong N}
{For 1 N 1
proc {$ I} {Delay 600} {Browse pong} end }
{Browse 'pong terminated'}
end
in
{Browse 'game started'}
thread {Ping 50} end
thread {Pong 50} end
end
Oz のデータフローの性質は生産者-消費者パターンでストリームを通したやり取りを行うスレッドを書くのを容易にしてくれます。ストリームは生産者スレッドによって漸増的に作られ、その後に消費者スレッドによって消費されるリストです。スレッドはストリームの同じ要素を消費します。例として Figure 8.6 のプログラムはストリームコミュニケーションの例で、そこでは生産者が数のリストを作り、消費者が全ての数を足し合わせています。
fun {Generator N}
if N > 0 then N|{Generator N-1}
else nil end
end
local
fun {Sum1 L A}
case L
of nil then A
[] X|Xs then {Sum1 Xs A+X}
end
end
in fun {Sum L} {Sum1 L 0} end
end
以下のプログラムを走らせて上のプログラムを試して下さい:
{Browse thread {Sum thread {Generator 150000} end } end}
11250075000
という数が出てくるでしょう。ストリームコミュニケーションの働きについて理解しましょう。以下の例でvolvoを作る時、生産者は漸増的に要素のストリーム(リスト)を作ります。これは一般に性急(eager)な流儀です。
fun {Producer ...} ... volvo|{Producer ...} ... end
消費者はストリームをアイテムが到着するまで待ち、到着するとアイテムは次の様に消費されます:
proc {Consumer Ls ...}
'Consume volvo'
case Ls of volvo|Lr then... end
{Consumer Lr}
end
case 文のデータフローの振る舞いは消費者をストリームの次のアイテムが到着するまで一時停止させます。再帰呼び出しは消費者に動作の繰り返しを可能にします。以下のパターンは反復を代わりに使う事により再帰の使用を避けています:
proc {Consumer Ls ...}
Consume volvo
{ForAll Ls
proc {$ Item}
case Item of volvo then
...
end
end}
end
Figure 8.7 はこのパターンを使った簡単な例を示しています。消費者は受け取った車(volvo)を数えます。1000
の車を受け取るごとに、Browser にメッセージを表示します。
fun {Producer N}
if N > 0 then
volvo|{Producer N-1}
else nil end
end
local
proc {ConsumerN Ls N}
case Ls of nil then skip
[] volvo|Lr then
if N mod 1000 == 0 then
{Browse 'riding a new volvo'}
end
{ConsumerN Lr N+1}
else
{ConsumerN {List.drop Ls 1} N}
end
end
in
proc {Consumer Ls} {ConsumerN Ls 1} end
end
このプログラムは以下を使って走らせます:
{Consumer thread {Producer 10000} end}
注意:文を emulator にフィードすると、それは自身のスレッドで実行されます。それゆえ、上の文をフィードすると2つのスレッドが作られます。メインは消費者のためのもので、フォークしたスレッドは生産者のためのものです。
消費者は再帰パターンを使って書かれている事に注目して下さい。私達はこのプログラムを繰り返しの ForAll/2
構造を使ってプログラム出来るでしょうか?消費者が結果を次の再帰に受け渡すための追加の引数 N を伴っているのでこれは不可能です(訳注:この様な引数はaccumulaterと呼ばれ関数型プログラミングでは一般的に使われます)。この引数は状態の一種に対応します。一般的に、2つの解法があります。Section 9.4で行われるようにステートフル(mutable 代入可能)なデータ構造を導入するか、状態を持ち回る別のイテレータ(iterator 反復子)を定義するかです。私達の場合、目的に適するいくつかの反復子はモジュール List
にあります。最初に、私達は volvo 以外をフィルターして除去する反復子を必要とします。Ys
に Boolean 関数として使われる手続き P/2
を満たす全要素を出力する {Filter Xs P ?Ys}
を使う事が出来ます。2番目の構築は {List.forAllInd Xs P}
で、それは ForAll
に似ていますが P/2
は最初の引数としてリストの現在の要素の 1
から始まるインデックスを取り、2番目の引数としてリストの要素を取ります。これがプログラムです:
proc {Consumer Ls}
fun {IsVolvo X} X == volvo end
Ls1
in
thread Ls1 = {Filter Ls IsVolvo} end
{List.forAllInd Ls1
proc {$ N X}
if N mod 1000 == 0 then
{Browse 'riding a new volvo'}
end
end}
end
以下の文を使ってプログラムを走らせてみましょう:
{Consumer thread {Producer 5000000} end}
panel に切り替えてプログラムのメモリの振る舞いを観察しましょう。あなたはすぐにこのプログラムが良い振る舞いをしていない事に気付くでしょう。その理由は非同期メッセージパッシング(asynchronous message passing)です。生産者がメッセージを送った時、すなわちストリームに新しい要素を生成した時、それが消費者が消費するより速いレートの場合、システムが壊れ始めるまでより多いバッファを必要とし続けます。2 この問題を解決するためのいくつかの方法があります。一つは生産者と消費者の間で有界バッファ(bounded buffer)を作る事で、これは後で論じます。他の方法はスレッドの実行スピードをスレッド優先度の変更によって変える事で、消費者が生産者より多くの時分割を得るようにする事です。
モジュール Thread
と Property
はスレッドへのいくつもの適切な操作を提供します。これらのうちいくつかは Table 8.1 で分類されます。
手続き | 記述 |
---|---|
|
|
|
|
|
|
|
|
|
|
| 現在のスレッド |
|
|
| 現在のスレッドの優先度を設定する |
| システムの優先度の比を取得する |
| システムの優先度の比を設定する |
Oz は3つの優先度レベルを持っています。システム手続き
{Property.put priorities(high:X medium:Y)}
はプロセッサ時間の比を高優先度(high)のスレッド:中優先度(medium)のスレッドで X:1
と設定します。これは同時にプロセッサ時間の比を中優先度(medium):低優先度(low)のスレッドで Y:1
と設定します。X
と Y
は整数です。そのため、もし私達が次を実行すると
{Property.put priorities(high:10 medium:10)}
走る事の出来る高優先度スレッドの 10
回の時分割の割り当てごとにシステムは中優先度のスレッドを1回割り当て、同様の事が中優先度と低優先度の間で行われます。同じ優先度レベルでは、スケジューリングは公平でラウンドロビンです。さあ、私達の生産者-消費者プログラムを動くようにしましょう。私達は生産者に低い優先度を与え、消費者に高い優先度を与えます。優先度の比を 10:1
と 10:1
に設定します。
local L in
{Property.put threads priorities(high:10 medium:10)}
thread
{Thread.setThisPriority low}
L = {Producer 5000000}
end
thread
{Thread.setThisPriority high}
{Consumer L}
end
end
やや過激な代替の解法は、生産者を遅延実行(lazy)にし、消費者が要求した時のみアイテムを生産する事です。消費者はこの場合、未束縛の変数(空の箱)によるストリームを構築します。生産者は未束縛の変数(空の箱)がストリームに見えるまで待ちます。それから変数を束縛します(箱を埋めます)。生産者の一般的なパターンは以下になります。
proc {Producer Xs}
case Xs of X|Xr then
I in 'Produce I'
X=I ...
{Producer Xr}
end
end
消費者の一般的なパターンは以下になります。
proc {Consumer ... Xs}
X Xr in
...
Xs = X|Xr
'Consume X'
... {Consumer ... Xr}
end
Figure 8.8 で見るプログラムは Figure 8.7 のプログラムの要求駆動版です。あなたはこれを大量のvolvoで走らせられます!
local
proc {Producer Xs}
case Xs of X|Xr then X = volvo {Producer Xr}
[] nil then {Browse 'end of line'}
end
end
proc {Consumer N Xs}
if N=<0 then Xs=nil
else X|Xr = Xs in
if X == volvo then
if N mod 1000 == 0 then
{Browse 'riding a new volvo'}
end
{Consumer N-1 Xr}
else
{Consumer N Xr}
end
end
end
in
{Consumer 10000000 thread {Producer $} end}
end
プログラムを要求駆動計算にするのに他の方法があります。future の記法と ByNeed
プリミティブ操作を使う事です。フューチャ(future)は論理変数のリードオンリー版です。例として変数 X
のフューチャを作るのに、私達は操作 !!
でフューチャ Y
を作ります。
Y = !!X
スレッドがフューチャの値を使おうとする時、例えば Y
を使うときは X
が束縛されるまで一時停止するでしょう。
手続きを遅延的に実行しようとする一つの方法、つまり、要求駆動の様式で行う事とは、操作 {ByNeed +P ?F}
を使う事です。ByNeed
は1引数の手続き P
を取り、フューチャ F
を返します。スレッドが F
の値にアクセスしようとする時、手続き {P X}
が呼び出され、結果の値 X
が F
に束縛されます。これは私達に直接的な様式での要求駆動計算を可能にします。例として次をフィードして下さい
declare Y
{ByNeed proc {$ X} X=1 end Y}
{Browse Y}
私達は Y
がフューチャになる事を観察するでしょう、つまり私達は Y<Future>
と Browser で見るでしょう。もし私達が Y
の値にアクセスしようとしれば、それは 1
に束縛されるでしょう。Y
にアクセスする一つの方法は、手続きの生成を発火(trigger)する操作 {Wait Y}
を行う事です。
今、私達は Figure 8.8 のプログラムを Figure 8.9 の様に書き直せます。これは Figure 8.7 とよく似て見えます。
local
proc {Producer Xs}
Xr in
Xs = volvo|{ByNeed {Producer Xr} $}
end
proc {Consumer N Xs}
if N>0 then
case Xs of X|Xr then
if X==volvo then
if N mod 1000 == 0 then
{Browse 'riding a new volvo'}
end
{Consumer N-1 Xr}
else {Consume N Xr} end
end
end
end
in
{Consumer 10000000 thread {Producer $} end}
end
私達はスレッドが文 thread
S end
を使ってどの様に分岐するかを見ました。ここで思いつく自然な質問は、どうやって分岐したスレッドを元の制御のスレッドに合流させて戻すかです。実際には、これは複数のスレッドの終了検知の特別な場合で、他のスレッドをそのイベントを待つようにします。Oz はデータフロー言語なので、一般的な枠組みはとても簡単です。
thread
T1X1
=unit end
T2
threadX2
=
X1end
TN
...
threadXN
=
XN-1end
XN
{Wait}
MainThread
全てのスレッドが終了する時、変数 X1 ...
XN は一緒に併合され unit
に束縛されます。{Wait
XN}
sは XN が束縛されるまでメインのスレッドを一時停止させます。
Figure 8.10 で私達は、上の筋書きに沿った制御構造の並行組み合わせを実装する、高階の構造(コンビネータ)を定義します。それは引数無しの手続きのリストを単一の引数として取ります。それが実行される時、手続きは並行に分岐します。次の文はリスト中の全ての手続きが終了した時のみ実行されます。
local
proc {Conc1 Ps I O}
case Ps of P|Pr then M in
thread {P} M = I end
{Conc1 Pr M O}
[] nil then O = I
end
end
in
proc {Conc Ps} {Wait {Conc1 Ps unit $}} end
end
プログラム Figure 8.5 は Ping
と Pong
スレッドが終了した時には適切に終了しません。このプログラムに今手を打つ事が出来ます。私達が Application.exit/1
を使えば、スタンドアロンのアプリケーションは残ったスレッドをアボートして終了します。Ping
と Pong
のスレッドが終了した時のみメインのスレッドが終了するように変更する事も出来ます。これは Figure 8.11 で示されます。
functor
import
Browser(browse:Browse) % Browser モジュールから Browse をインポート
Application
define
proc {Ping N}
if N==0 then {Browse 'ping terminated'}
else {Delay 500} {Browse ping} {Ping N-1} end
end
proc {Pong N}
{For 1 N 1
proc {$ I} {Delay 600} {Browse pong} end }
{Browse 'pong terminated'}
end
X1 X2
in
{Browse 'game started'}
thread {Ping 50} X1=unit end
thread {Pong 50} X2=X1 end
{Wait X2}
{Application.exit 0}
end
<< Prev | - Up - | Next >> |