これまでに実装したライブラリでは、生成されたスレッドは各々ばらばらに動くことしかできなかった。 スレッド間で協調して何かの処理をおこなえるようにするためには、スレッド間の同期 (synchronization) 機構をライブラリが提供してやる必要がある。
例えばスレッドを3つ作って、次の3つの関数をそれぞれ実行させたとする。
int sum1 = 0, sum2 = 0; void thread1(int* array) { int i; for (i = 0; i < N / 2; ++i) sum1 += array[i]; } void thread2(int* array) { int i; for (i = N / 2; i < N; ++i) sum2 += array[i]; } int thread3() { return sum1 + sum2; }
スレッド1は配列 array の先頭から半分 (N は偶数とする) までの和を計算して sum1 に代入し、スレッド2は配列 array の半分から最後までの和を計算して sum2 に代入する。 スレッド3は sum1 + sum2 を計算し、配列 array の各要素の和を返す。
スレッド3が配列 array の各要素の和を正しく返すためには、まずスレッド1と2が走り sum1 と sum2 の値を計算し終わった後に、スレッド3が走らなければならない。 ところが上のプログラムをそのまま走らせただけでは、3つのスレッドがそのような都合のよい順序で動くことは期待できない。 スレッドライブラリの実装によっては、偶然そのような順序で動く可能性もあるが、特定のスレッドライブラリの実装方法に依存したプログラムを書くのは好ましいやり方ではない。 上のプログラムが必ず正しい計算をおこなうようにするためには、3つのスレッドが同期 (synchronization) しながら動くように、プログラムを書き換えてやらなければならない。 まずはスレッドライブラリが同期のための機構をまったく提供していない場合の対処を示す。 この場合、プログラムを次のように書きかえればよい。
int sum1 = 0, sum2 = 0; boolean lock1 = true, lock2 = ture; void thread1(int* array) { int i; for (i = 0; i < N / 2; ++i) sum1 += array[i]; lock1 = false; } void thread2(int* array) { int i; for (i = N / 2; i < N; ++i) sum2 += array[i]; lock2 = false; } int thread3() { while (lock1 || lock2) ; return sum1 + sum2; }
書き換えたプログラムには、新たに lock1, lock2 という変数が導入された。 これらの変数ははじめ true だが、スレッド1、スレッド2の計算が終了すると、それぞれ false に変わる。 スレッド3は、計算をはじめる前に、これらの変数を調べ、両方とも false になるまで while ループで待つようにしてある。 これにより、スレッド1とスレッド2の計算が終了した後に sum1 + sum2 を計算するようになっている。
ここで使われている手法は spin wait と呼ばれるものである。 ある変数 (ここでは lock1 と lock2) の値が変化するのを、空ループをぐるぐる回りながら (spin しながら) 待つので、この名がついている。 この方法は単純であるが、プロセッサを有効利用しているとはいえない。 スレッド3は他のスレッドの計算の終了を待って、while ループを無駄にまわり続けなければならないからである。 マルチプロセッサ計算機で、スレッド1、2とスレッド3に別々のプロセッサを割り当てられるときは、スレッド1、2が終了した文字通り瞬間にスレッド3が while ループを抜けて計算を開始できるので、spin wait 方式は非常に高速なスレッド間の同期を実現するといえる。 しかしそうでなければ、spin wait 方式によるスレッド間の同期はかえって全体の実行速度を低下させる。 全てのスレッドに平等にプロセッサ時間が割り当てられるとすると、スレッド1、2の計算が終了するまでの間、スレッド3に割り当てられるプロセッサ時間は空の while ループによって、まったく無駄に消費されるだけである。 スレッド1、2の計算が終了するまで、スレッド3に切り替えないようにすれば、全体としては短い時間でプログラムを終了させることができる。
スレッド間の同期処理にともなうプロセッサ時間の無駄使いを避けるためには、スレッドライブラリが明示的な同期機構を提供する必要がある。 それによって、どのスレッドが他のスレッドの処理の完了をまっているのかを知り、 スレッド切り替えの際に、そのようなスレッドには切り替えないようにしなければならない。
スレッドの同期機構としては様々なものがあるが、ここではまずバリア同期 を実装することにする。 この同期機構は、各スレッドの処理の進みぐあいを揃えるための機構であり、barrier() という関数によって実現される。 この関数を呼んだスレッドは一時停止させられ、全てのスレッドがこの関数を呼ぶまで待たされる。 先に barrier() を呼んだスレッドは待たされ、最後のスレッドが barrier() を呼んだところで、全てのスレッドが一斉に実行を再開するので、各スレッドの処理速度のばらつきを barrier() で揃えることができる。
barrier() を使うと、先の例は次のように修正すれば正しく動くようになる。
int sum1 = 0, sum2 = 0; void thread1(int* array) { int i; for (i = 0; i < N / 2; ++i) sum1 += array[i]; barrier(); } void thread2(int* array) { int i; for (i = N / 2; i < N; ++i) sum2 += array[i]; barrier(); } int thread3() { barrier(); return sum1 + sum2; }
スレッド3はおそらく一番先に barrier() を呼ぶだろうが、いったんそこで一時停止し、スレッド1と2が計算を終了して barrier() を呼ぶのを待たされる。 これによって、スレッド1と2が計算を終了して sum1 と sum2 が正しい値になる前に、sum1 + sum2 を計算してしまうのを防ぐことができる。
先に barrier() を呼んだスレッドは一時停止させ、そのスレッドへ実行が切り替わらないようにしなければならない。 このためには Thread 構造体の status の値として、あらたに BLOCKING を導入する。 ライブラリ関数 barrier() の仕事は Thread 構造体の status の値をこの BLOCKING にして、別なスレッドに切り替えることである。 BLOCKING 状態になったスレッドの実行はけして再開されないので、barrier() を呼んだスレッドが他のスレッドを待っている間、無駄にプロセッサ時間を浪費することがなくなる。
void barrier() { currentThread->status = BLOCKING; ThreadYield(); }
いったん BLOCKING 状態になったスレッドを再び RUNNING 状態に変えるのは、ThreadYield() の仕事である。 ThreadYield() を次のように変えればよい。
void ThreadYield() { 現在実行中のスレッド currentThread 以外で、 実行可能 (RUNNING) なスレッド t を探す。 : if (t が発見された) { currentThread = t; _ContextSwitch(currentThread->context, t->context); } else if (全てのスレッドが BLOCKING か FINISH) { BLOCKING 状態にある全てのスレッドを RUNNING に変える。 t = 適当に選んだ RUNNING 状態のスレッド。 currentThread = t; _ContextSwitch(currentThread->context, t->context); } else if (残りは FINISH 状態の main() スレッドのみ) main() スレッドの実行を再開する。 }
全てのスレッドが barrier() を呼んで BLOCKING 状態になったら、各スレッドを RUNNING 状態に戻し、一斉に実行を再開するようにする。
Unix の wait システムコールは、子プロセスの終了を親プロセスが待つためのシステムコールで、プロセス間の同期を取るための機構である。 スレッドライブラリにも、wait システムコールに似せて、他のスレッドの終了を待つ同期機構を追加することができる。
他のスレッドの終了をまつための関数 ThreadWait() が使えれば、先の例は次のように書きかえることができる。
int sum1 = 0, sum2 = 0; void thread1(int* array) { int i; for (i = 0; i < N / 2; ++i) sum1 += array[i]; } void thread2(int* array) { int i; for (i = N / 2; i < N; ++i) sum2 += array[i]; } main() { int t1, t2; int array[] = { ... }; t1 = ThreadCreate(thread1, array); t2 = ThreadCreate(thread2, array); ThreadWait(t1); /* thread1() の終了をまつ */ ThreadWait(t2); /* thread2() の終了をまつ */ return sum1 + sum2; }
ThreadWait() は引数で指定されたスレッドが終了するまで、ThreadWait() を呼んだスレッドを一時停止させる。
ThreadWait() を実装するには、Thread 構造体の status の値として、あらたに WAIT_THREAD を導入する。 また Thread 構造体を手直しして、終了を待つスレッドの識別子を保存できるようにする。
void ThreadWait(int t) { currentThread->wait_thread = t; currentThread->status = WAIT_THREAD; ThreadYield(); }
WAIT_THREAD 状態になったスレッドを再び RUNNING 状態に変えるのは、ThreadExit() の仕事である。 この関数はスレッドが終了する際に呼ばれるので、そのスレッドの終了を待っている WAIT_THREAD 状態のスレッドがないか調べ、もし存在する場合には、そのスレッドを WAIT_THREAD 状態から RUNNING 状態に変更する。
しかしこれだけでは、あるスレッドの終了を待とうとして ThreadWait() を呼んだとき、そのスレッドが既に終了してしまっていると、ThreadWait() が正しく動作しない。 これを防ぐには、ThreadWait() の中で、待とうとしているスレッドが既に終了していないかどうか確かめるようにすればよい。 threadList が指すリストの中に該当するスレッドが含まれていれば、そのスレッドはまだ終了していない。 含まれていなければ既に終了している。
void ThreadWait(int t) { kernelLock = TRUE; thread_id が t であるスレッドが終了しているか否か調べる。 if (終了した) { kernelLock = FALSE; if (needContextSwitch) ThreadYield(); } else { currentThread->status = WAIT_THREAD; currentThread->wait_thread = t; kernelLock = FALSE; ThreadYield(); } }
待とうとしてるスレッドが既に終了しているときには、ThreadWait() はスレッドを切り替えず、即座に終了する。 ThreadWait() 実行中に、PreemptiveScheduler() によってスレッドが切り替えられてしまわないように、全体を critical section にしなければならない。 待とうとしているスレッドがまだ終了していないことを確認した後、status に WAIT_THREAD を代入する前に、スレッドが切り替わり、待とうとしているスレッドが終わってしまうと、ThreadWait() が正しく動かないからである。
スレッドライブラリが提供する同期機構が増えてくると、同期機構の誤った利用によりプログラムが止まってしまう可能性がでてくる。 例えばスレッドを二つ作成して、お互いがお互いの終了を ThreadWait() で待つと、プログラムはそこで止まってしまう。 このような状態を deadlock と呼ぶ。
実は deadlock が起きている状態では、これまで実装してきた ThreadYield() は正しく動作しない。 これまで、切り替える先のスレッドの候補が見つからない場合、ThreadYield() はそのまま終了して、ThreadYield() を呼び出したスレッドの実行を継続させていた。 しかし ThreadWait() を導入すると、ThreadYield() を呼び出したスレッドが THREAD_WAIT 状態になっており、実行を継続できない可能性がある。
void ThreadYield() { 現在実行中のスレッド currentThread 以外で、 実行可能 (RUNNING) なスレッド t を探す。 if (t が発見された) { Thread* cur = currentThread; currentThread = t; _ContextSwitch(cur->context, t->context); } else if (残りは FINISH 状態の main() スレッドのみ) main() スレッドの実行を再開する。 else if (currentThread が RUNNING でない) { /* 追加 */ puts("*** dead lock"); exit(1); } }
末尾に currentThread が RUNNING か否かを調べ、RUNNING でないときは deadlock 状態になっているので、エラーメッセージを出力してプログラムを強制終了させなければならない。
Unix のプロセスとプロセスをつなぐ pipe も、重要な同期機構のひとつである。 pipe を読み出すプロセスは、読み出すべきデータがまだ書き込まれていないと、データが書きこまれるまで待たされる。 一方、pipe に書きこむプロセスも、書きこまれたまま、まだ読み出されていないデータが一定量を超えると、データが読み出されて空きができるまで待たされる。
同期機構としての pipe は、しばしば bounded buffer とも呼ばれる。 pipe とは、要は最大容量の決まっているバッファであると見なせるからである。 以下では書きこめるデータは int 型のみ、最大容量は 1 の簡単化した bounded buffer (pipe) をスレッドライブラリのために実装する。 また読み出しをおこなうスレッドと、書きこみをおこなうスレッドは、それぞれひとつだけであるとする。
まずは pipe で使うデータ構造を定義する。
typedef struct { int n; int buffer[1]; Thread* reader; Thread* writer; } Pipe;
初期化のための関数は次のとおりである。
InitializePipe(Pipe* p) { n = 0; reader = writer = NULL; }
pipe に書きこみを行う関数は n の値を見て、0 であれば書き込みをおこない、そうでなければ n の値が 0 になるまでスレッドを一時停止させる。 一時停止中のスレッドの状態は WAIT_PIPE とする。
void WritePipe(Pipe* p, int i) { kernelLock = TRUE; if (p->n > 0) { p->writer = currentThread; currentThread->status = WAIT_PIPE; kernelLock = FALSE; ThreadYield(); kernelLock = TRUE; } p->buffer[0] = i; p->n = 1; 読みこみを待っているスレッドがあれば RUNNING に変更する。 kernelLock = FALSE; if (needContextSwitch) ThreadYield(); }
一方、pipe を読む関数は n の値を見て、1 であれば読みこみをおこない、そうでなければ n の値が 1 になるまでスレッドを一時停止させる。 また読みこみ後、n が 0 になってバッファに空きができるのを待っているスレッドがあるならば、そのスレッドを RUNNING 状態にする。
int ReadPipe(Pipe* p) { int i; kernelLock = TRUE; if (p->n < 1) 読みこみ可能になるまで一時停止する。 i = p->buffer[0]; p->n = 0; if (p->writer != NULL) p->writer->status = RUNNING; p->writer = NULL; } kernelLock = FALSE; if (needContextSwitch) ThreadYield(); return i; }
Pipe を使うと先の例題プログラムは次のようになる。
Pipe pipe1, pipe2; void thread1(int* array) { int sum = 0; int i; for (i = 0; i < N / 2; ++i) sum += array[i]; WritePipe(&pipe1, sum); } void thread2(int* array) { int sum = 0; int i; for (i = N / 2; i < N; ++i) sum += array[i]; WritePipe(&pipe2, sum); } main() { int array[] = { ... }; InitializePipe(&pipe1); InitializePipe(&pipe2); ThreadCreate(thread1, array); ThreadCreate(thread2, array); return ReadPipe(&pipe1) + ReadPipe(&pipe2); }
最後に紹介する同期機構 semaphore (セマフォ)は、スレッド間で critical section を実現するための同期機構である。 この同期機構はオランダの Dijkstra によって提案された。
Semaphore とは信号のことで、P 操作と V 操作からなる。 P (Passeren: 通過を許す) 操作は critical section に入るための操作で、もし信号が青なら critical section に入り、他のスレッドが続いて入ってこれないように信号を赤にする。 もし信号が既に赤であれば、青になるまでスレッドを一時停止する。 一方、V (Verhoog: 高める) 操作は critical section から出るための操作で、信号を青にして、他のスレッドが critical section に入れるようにする。
一度にひとつのスレッドにしか critical section へ入れないようにする semaphore のことをとくに binary semaphore という。 一般に semaphore といった場合には、一度に複数のスレッド (個数の上限は決まっている) が critical section に入ることを許可する semaphore を意味する。
Semaphore を使うと 0 から 199 までの和を計算するプログラムを次のように書くことが出来る。
int sem = 1; /* semaphore の初期化 */ main() { ThreadCreate(thread1, 0); ThreadCreate(thread2, 0); } int get() /* 0, 1, 2, ... と数を返す */ { static int num = 0; int i; SemaphoreP(&sem); i = num++; /* critical section */ SemaphoreV(&sem); return i; } void thread1(int dummy) { int s, i; s = 0; for (i = 0; i < 100; ++i) s += get(); printf("thread1: %d\n", s); } void thread2(int dummy) { int s, i; s = 0; for (i = 0; i < 100; ++i) s += get(); printf("thread2: %d\n", s); }
get() は呼ばれる度に 0, 1, 2, ..., 199 と順に返す。 ふたつのスレッドは get() を並行に呼ぶので、結果的に 0 から 199 までの排他的な部分和をそれぞれ計算することになる。 ふたつのスレッドが出力する値を足した数が 0 から 200 までの和となる。
Sempahore を実現する関数 SemaphoreP(), SemaphoreV() は次のようになる。
void SemaphoreP(int* sem) { kernelLock = TRUE; if (*sem <= 0) { currentThread->status = WAIT_SEMAPHORE; currentThread->semaphore = sem; /* semaphore 変数の記録 */ kernelLock = FALSE; ThreadYield(); } else { --*sem; kernelLock = FALSE; } } void SemaphoreV(int* sem) { kernelLock = TRUE; ++*sem; kernelLock = FALSE; }
Critical section に入れるスレッドの最大個数は引数 sem が指す semaphore 変数の初期値である。 1 であれば binary semaphore になる。
SemaphoreP() によって一時停止したスレッドを再び RUNNING 状態にするのは ThreadYield() である。
void ThreadYield() { 現在実行中のスレッド currentThread 以外で、 実行可能 (RUNNING) なスレッド t を探す。 if (t が発見された) { Thread* cur = currentThread; currentThread = t; _ContextSwitch(cur->context, t->context); } else if (残りは FINISH 状態の main() スレッドのみ) main() スレッドの実行を再開する。 else if (currentThread が RUNNING でない) { /* 追加 */ puts("*** dead lock"); exit(1); } }
実行可能 (RUNNING) なスレッド t を探す際に、WAIT_SEMAPHORE 状態になっているスレッドがあれば、semaphore 変数が 1 以上になっているかどうか調べ、そうであれば RUNNING 状態に変更するようにすればよい。 ただし RUNNING 状態に変更するときには、同時に semaphore 変数の値を 1 減らすのを忘れないようにしなければならない。 semaphore 変数の値が 1 以上である限りは、複数個の WAIT_SEMAPHORE 状態のスレッドを RUNNING に変更できる。
完成後、組込んだ同期機構を使うように test4.c を修正し、ライブラリが正しく動くことを確かめよ。 コンパイルするには次のようにすればよい。
% gcc -g -o test4 test4.c thread.c csw-i386.S
Copyright (C) 1999-2000 Shigeru Chiba
Email: chiba@is.tsukuba.ac.jp