BT

最新技術を追い求めるデベロッパのための情報コミュニティ

寄稿

Topics

地域を選ぶ

InfoQ ホームページ アーティクル Concurrency and Coordination Runtimeの利用

Concurrency and Coordination Runtimeの利用

原文(投稿日:2009/1/28)へのリンク

イントロダクション

Concurrency and Coordination Runtime (以後CCR)は.NETプラットフォーム用の非同期メッセージパッシングライブラリであり、構造化アプリケーションとは違うアプローチを実現する小さいながらもパワフルな基本要素一式を提供する。CCRを効率的に使うことで、より高い応答性、スケーラビリティ、そしてより堅牢なアプリケーションを構築できる。CCRの興味深い面はこれらの利点がスレッド(そしてスレッドで起こるエラーについても)、ロック、mutexやその他の同期のための低レベルの要素を明示的に扱う必要を減らし(ときには完全になくし)ながら実現している点だ。

もしあなたのアプリケーションが今はシングルスレッドなら、応答性を改善し利用可能なコアをもっと有効活用できる可能性がある。それも、オリジナルのコードベースの概念的なシンプルさを維持した上でだ。また、すでにマルチスレッドなアプリケーションであってもCCRはスループットをマッチさせる(ときには改善させる)ことができる。もちろん、コードベースのシンプルさはそのままだ。

具体的にはCCRは以下のものを提供する。:

  • シンプルで高性能なメッセージパッシング実装:アクター指向の世界観でオブジェクトに接続する非常に軽量でタイプセーフなチャネル。
  • スケジューリング制約のための一式の基本要素:スケジューリングはまさにCCRが目的とするものだ。タスクを起こし、プロセスの中で他のコンポーネントにメッセージをポストし、Arbiter(仲介役)に呼ばれたオブジェクトを経由して入力リクエストと同様に結果についても扱える制約を宣言する。
  • エラーを判断するためのよりよいモデル:CCRは因果関係、つまり一連の関連する非同期なサブタスクを通して効率的に文脈を伝播する手段を提供する。たとえばタスクが失敗すると(例外をスローするなど)、エラーはある場所に分離され、例外が投げられた元のスレッドとは関係なく扱われる。
  • 現在の(そして将来の)CPUパワーの有効活用:CCRのスケジューリングは、既存のスレッドプールでも、望むなら性能を改善させたカスタムの実装でも行うことができる。それでもこの仕組みへのコードへの影響は通常最小限に抑えられる。
  • 非同期I/O操作の簡単な統合:個々のプロセスの拡張性と性能を改善させるカギは、I/O関連の操作の効率化であることが多い。これらの操作は計算寄りのタスクより何桁も遅い傾向があるため、ブロッキングI/Oでは有用なリソース(この場合スレッド)を中断させ、待っている他のタスク使うことも邪魔してしまう。I/O操作を非同期にすることで、操作が完了するまでこれらのリソースを他のタスクに使うことができる。しかしながら、非同期操作はそれぞれの操作の開始と完了が離れてしまい、多用すると結果として追いづらいソースコードになってしまう。CCRはC#のイテレーターを使った目新しい実装を使用してこれらの操作をコントロールする。

「非同期メッセージパッシング」とは、コンポーネントが互いにデータを送ることによってコミュニケーションすることである。その上、データとその結果のどんな返答にも保証された時間的な関係がない。送られたメッセージはおそらく未来のある時点で処理され、返答はきっとそれより後に受け取るだろう。

実際には、CCRを純粋にプロセス内で使えばこれよりはいくらか強い保証が手に入る。また、このアプローチはどこでもエラーが起こりうる場所における、プロセス内計算の大部分のモデルで必要なものである。このことがCCRの基本要素は低レベルのI/Oだけではなく、拡張性のある分散システムを構築するためのビルディングブロックとしても使える所以である。

CCRの基本型

CCRの基本型の数は少なく、以下のもので構成されている。:

  1. Task:実行するコードの任意の要素。
  2. TaskQueue:正確にはDispatcherQueue。これは実行待ちタスクのただのリストである。スレッドは標準的なCLRスレッドプールや(Dispatcher として知られる)カスタムのCCRスレッドプールのどちらからでも、これらのキューからタスクを取り出し、実行される。
  3. Port:コンポーネントと接続するプロセス内メッセージキュー。本質的には連結リスト以外の何ものでもなく、プロデューサーはメッセージをここに置く。CCRは型安全のための総称型オーバーロードを提供する。
  4. Arbiter:CCR の基本要素で、ポートとタスクを結びつける仲介役。これは、タスクがメッセージがポートに届いたときに作るためや、タスクキューがそれを受けるための制約を規定する。CCRにはさまざまな仲介役があり、それらの多くは高レベルの構成要素を構築するために組み立てられる。

これらの基本的な概念を念頭に置いて簡単なCCRのコードを見ていこう。最初は、すべての例をホストするのに使う単純なC#のコンソールアプリケーションを書いてみる。このアプリケーションで注目すべきは、カスタムのCCRスレッドプール(Dispacher)を使いタスクキューにそれを結びつけていることだ。つまりこのキューからのタスクは、このカスタムプールに属するスレッドを越えて実行される。

static void Main(string[] args)
{
    using (var dr = new Dispatcher())
    {
        using (var taskQueue = new DispatcherQueue("samples", dr))
        {
            // 例はここに...


// アプリケーションが終わらないようにブロック
// exiting.
Console.ReadLine(); } } }
ここのサンプルはシングルタスクキューでのみ実行されているが、複数のタスクキューの利用が推奨される。CCRはタスクの取り出し時に登録されたタスクキューを総当たり式に使う。シングルタスクキューでは他を阻止してしまう。

最初にタスクを直接キューに入れてみよう。これはCCRでタスクを実行するもっとも単純な方法であり、ポートを必要としない。Arbiterクラスはたくさんの便利なメソッドを持っている。FromHandler()はデリゲートからタスクを作るもので、この場合は無名でよい。次のようにタスクキューに置くことで、ディスパッチャーによって実行される。

    // タスクを直接キューに入れる
    taskQueue.Enqueue(Arbiter.FromHandler(() =>
        Console.WriteLine("Hello, world.")));

これはタスクを直接キューに入れる一般的なやり方ではない。普通はどこかに関係するポートがある。次のコード片では、ポート、仲介役を宣言し、ポートにポストしている。このケースでは、String型のポートとStringをとるシグネチャのハンドラーを使っている。

// タスクをスケジューリングしたポートにメッセージをポストする
var port = new Port<string>();
Arbiter.Activate(taskQueue, port.Receive(Console.WriteLine));
port.Post("Hello (again), world");

この例はたくさんのことを含んでいるのでしっかり理解してほしい。port.Receive()はもっとも簡単な仲介役、すなわちレシーバを作成する。これはポートにメッセージが1つ届くとすぐに実行される。メッセージが到着すると、そのメッセージとともにハンドラーを呼ぶタスクが作られる。 Arbiter.Activate()は作成されたタスクと指定したタスクキューを結びつけている。

CCRの仲介役を理解するためのもっとも重要なことは、これらがスレッドをブロックしないことである。一度活性化されたレシーバーはほんの数バイトのメモリしか消費せず、スレッドは待ち状態の他のタスクのために利用できる。

CCRの概念で重要なのは、仲介役をどこでも宣言できることだ。次に示すように、上の例の最後の2行を入れ替えても結果は同じである。

// タスクをスケジューリングしたポートにメッセージをポストする
var port = new Port();
port.Post("Hello (again), world");
Arbiter.Activate(taskQueue, port.Receive(Console.WriteLine));

では、サンプルを少しだけ変えてみよう。今度はレシーバーを活性化する前に2つのメッセージをポートに送ってみよう。何か起こるかな...

// タスクをスケジューリングしたポートにメッセージをポストする
var port = new Port();
port.Post("Hello (again), world");
port.Post("Hello (thrice), world");
Arbiter.Activate(taskQueue, port.Receive(Console.WriteLine));

さて、これを実行したら最初のメッセージだけが出力されたはずだ。その訳はport.Receive()が次の構文と同等の、単純化するために拡張したメソッドだからである。:

    Arbiter.Activate(
        taskQueue,
        Arbiter.Receive(false, port, Console.WriteLine));

重要なパラメーターはArbiter.Receive()で渡される最初のBoolean引数である。これはレシーバーが一時的なのか、つまり1つのメッセージを処理した後に破棄されるかどうかを示している。ポートに届いたすべてのメッセージを処理させたければ、このパラメーターの値を逆にすればいい。

    // タスクをスケジューリングしたポートにメッセージをポストする
    var port = new Port();
	port.Post("Hello (again), world");
	port.Post("Hello (thrice), world");
    Arbiter.Activate(
        taskQueue,
        Arbiter.Receive(true, port, Console.WriteLine));

さて、上のコードはしばしば意外な結果を返してしまう。バラバラの順序で出力されることがあるのだ!どうしてだろう?

CCR では、仲介役(この場合はReceive)が実行されるとすぐに、関連するメッセージを処理するタスクが生成される。仲介役がより大きな構造の中のネストされた一部であるなら、これらのタスクは実行のためにスケジュールされたタスクキューに配置される。上の例では、永続的なレシーバーがすぐに2つのタスクを各アイテムに1つずつ生じさせる。もし十分なスレッドが利用可能ならばこれらのタスクは同時に実行され、結果として順不同になるのだ。

CCR スレッドプールの実装はCLRスレッドプールとは多くの点で異なる。一番重要なのは、CCRスレッドプールは固定数のスレッドを持ち、それは構築時に決定するということだ。非ブロッキング操作をスレッドで実行すると予想するならこれは問題ないのだが、ブロッキング呼び出しをするのであれば、動的にサイズを増減できるCLRスレッドプールにタスクキューをスケジュールした方がいい。そのようなタスクキューはデフォルトのDispatcherQueueコンストラクタを呼ぶことで作ることができる。

この振る舞いを抑制し順番を保存するための方法は複数ある。おそらくもっとも簡単なのは、ループの中に一時的なレシーバを置き一度だけ処理することだ。幸いなことにCCRは反復タスク(Iterative Task)と呼ばれるとてもパワフルな仕組みを持っており、これによってC#のイテレーター機能を使った極めて自然な方法でこれを表現できる。

まず、既存のArbiter.Activate()呼び出しを次のように置き換えてみよう。:

Arbiter.Activate(
	taskQueue,
	new Arbiter<Arbiter<string>>(port, ProcessMessages));

これでPrcessMessagesという新しい反復タスクがスケジュールされる。ProcessMessagesは次のように定義される。:

static IEnumerator<ITask> ProcessMessages(Port<string> port)
{
        while (true)
		yield return port.Receive(Console.WriteLine);
}

このメソッドは永久に繰り返し、レシーバーが実行されるのを(ブロックすることなく)待っている。レシーバーがメッセージを受けるとハンドラーが発動されるがループは継続される。もし空文字列がポートに着くまでループさせたいのなら、次のように書けばよい(ラムダ式を使った無名デリゲートでメッセージを処理させていることに注目)。:

    static IEnumerator> ProcessMessages(Port port)
{
bool fDone = false;
while (!fDone)
{
yield return port.Receive(message =>
{
if (String.IsNullOrEmpty(message))
fDone = true;
else
Console.WriteLine(message);
});
}
Console.WriteLine("Finished");
}

イテレーターはCCRの道具箱の中でも極めてパワフルな道具だ。これによって同期処理とよく似た方法で、非同期で非ブロッキング操作の順番を記述することができる。反復タスクはさらにネストさせることもできる。たとえば次のように、高レベルのタスクがProcessMessages()をyieldすることができる。:

static IEnumerator<ITask> TopLevelTask(Port<static> port)
{
   // Yield to the nested task
   yield return new IterativeTask<string>>(port, ProcessMessages);

   Console.WriteLine("Finished nested task.");
}

これまでのところ、私たちは簡単なレシーバーしか見てきていない。つまり、1つのアイテムを1つのポートにポストする際にタスクをスケジュールする仲介役だけである。そろそろより高度な仲介役を持ち込むときだろう。それらはレシーバーをよりパワフルな構造へとネストさせるための糊を提供する。

これのもっとも一般的なのはChoiceだ。これは単純にn個のレシーバーから1つだけを選んでアクティブにするものだ。たとえば次の例の反復タスクは続行する前に文字列とシグナルの両方を待っている。:

static IEnumerator<ITask> ProcessChoice(PortSet<string, EmptyValue> portSet)
{
     bool fDone = false;
     while (!fDone)
     {
         yield return portSet.Choice(
             message => Console.WriteLine(message),
             signal => fDone = true);
     }
     Console.WriteLine("Finished");
}

Choiceは非同期処理が成功したか失敗したかを決めるのによく利用されるものだ。しかし、任意の数のレシーバーから1つを選び出すのにも利用できる。

PortSetは1つ以上の独立したポートのラッパーで、複数のポートを単独のエンティティとして順に回すのに便利なものである。CCRにある一般的な例としてはPortSetから派生したSuccessFailurePortがある。

それ以外の一般的な仲介役といえばJoinだ。これはネストしたレシーバーがすべて実行されてはじめて作用する。次の例はその原則を表している。:

var port1 = new Port<int>();
var port2 = new Port<int>();
port1.Post(1);
port2.Post(3);
Arbiter.Activate(
    taskQueue,
    Arbiter.JoinedReceive(false, port1, port2, (x, y) =>
    {
        Console.WriteLine(x + y);
    }));

Joinは制限されたリソースへのアクセスを絞るのにとても便利だ。最初のポートは入力要求をあるリソースで扱い、2つ目のポートは他の利用可能なリソースで扱う。Joinを使うことで、利用できるリソースがあるときだけ実行するように要求を抑制することができる。

それ以外の高レベルの仲介役でで一般的なのはInterleaveだ。これは概念的には非同期非ブロッキング動作にのみ使う読み書きロックと同じものである。読み込みタスクは他の読み込みタスクと同時に実行できるが、書き込みタスク(これは読み込みタスクより優先される)は他のタスクが実行していなときしか実行できない。次の例は、概念的な「キャッシュ」をインターリーブに保護する宣言である。

var updatePort = new Port<UpdateCache>();
var queryPort = new Port<QueryCache>();
var stopPort = new Port<Shutdown>();


var interleave = new Interleave(
   new TeardownReceiverGroup(
         Arbiter.Receive(false, stopPort, ClearDownCache)),
   new ExclusiveReceiverGroup(
         Arbiter.Receive(true, updatePort, OnUpdateCache)),
   new ConcurrentReceiverGroup(
         Arbiter.Receive(true, queryPort, OnQueryCache)));
Arbiter.Activate(taskQueue, interleave);

ここで永続的なレシーバーはインターリーブできることが示された適切なレシーバーグループに配置される。 ConcurrentReceiverGroup()に置かれたどんなレシーバーも関連するタスクを互いに同時に実行させることができる。逆に、 ExclusiveReceiverGroupに置かれたレシーバーはすべて独立して実行される。さらに、このグループに置かれたレシーバーを使うと、タスクの実行順をポストした順番に制約することができる。TeardownReceiverGroupに置かれたレシーバーはどれもインターリーブにシャットダウンし、最後に実行されるハンドラーとなる。これらのレシーバーが永続化されないためである。

Interleave は公平なスケジューリングを確実にするために可能なレシーバーの間で総当たり的に振る舞う。さらに実行順はExclusiveReceiverGroup の中でさえポートごとにしか制御されない。独立したポートに2つのメッセージが届いても、必ずしもそれぞれがポストされた順番と同じようには実行されない。

先に述べた通り、CCRの反復タスクは非同期の非ブロッキング操作の論理的な流れを同期ブロッキング操作の単純な記述法に近いスタイルで書くことができる。これらの非同期操作はたいていI/O関連であり、Webのリクエストであったりデータベース操作や普通のファイルI/Oであったりする。これらの操作をより簡単に私たちのコントロール化に置けるようになれば、非同期I/Oをより効果的に使う機会もでき、アプリケーションのスループットや拡張性も大きく向上できる。

APMのBeginXXX/EndXXXとCCRの世界を橋渡しする重要なパターンは、次のAsyncCallbackデリゲートによるオブザーバーをベースにしている。:

    public delegate void AsyncCallback(IAsyncResult ar);

これはPortのポスト操作によって起動される。このパターンを使って次のIterativeTaskのように非同期のファイルコピーを書くことができる。

static IEnumerator<ITask> Copy(FileStream source, FileStream target)
    {
        var buffer = new byte byte[128 * 1024];
        var port = new Port>();
        var bytesRead = 0;


        do
        {
            source.BeginRead(buffer, 0, buffer.Length, port.Post, null);
            yield return Arbiter.Receive(
                false, port, iar => bytesRead = source.EndRead(iar));
            target.BeginWrite(buffer, 0, bytesRead, port.Post, null);
            yield return Arbiter.Receive(
                false, port, iar => target.EndWrite(iar));
        } while (bytesRead > 0);
    }

基本的に非同期操作は、完了時にIAsyncResultをポートにポストするよう指示されている。いったん実行中になるとそのポートのレシーバーが受けるまでyieldし、受信したら完了処理をして次のステップに進む。これは完全に非同期を実現している(たった2,3のスレッドで何千もの操作を実行できるかもしれない)が、そのコードは意図をはっきりと示していることに注目してほしい。ブロックを読み、その後ブロックを書き、そして完了まで繰り返している。

このファイルコピーの例では、できる限りシンプルにするために例外処理を省略している。しかし、より堅牢なコードにするには読み書き双方について失敗時の処理をする必要がある。一般的なtry/catchはここでは使うことができない。なぜなら、その例外処理の手法はスレッドに強く関係しており、そしてCCRのタスクは利用可能なあらゆるスレッドのどれかで実行しうるのだ。実際、反復タスクでは、タスクの各「ステップ」は前のステップとは異なるスレッドで実行しているかもしれない。

CCRで例外処理を行うアプローチは基本的に2つある。1つは明示的なやり方で、エラーをキャッチしポートを通して伝播させる操作をコード化するものである。しかし、これは呼び出す側、呼ばれる側のどちらのコードも大きく膨れあがることになる。先の例では、ファイルコピーは明示的に失敗を扱うためにかなりの修正が必要になってしまう。次のサンプルは、読み取り側に必要な修正を施したものだ。

       static IEnumerator<ITask> Copy(
           FileStream source, FileStream target,
           SuccessFailurePort resultPort)
       {
           var buffer = new byte [128 * 1024];
           var port = new Port<IAsyncResult>();
           var bytesRead = 0;

           do
           {
               // BeginReadのエラーを処理する
               try
               {
                   source.BeginRead(buffer, 0, buffer.Length, port.Post, null);
               }
               catch (Exception e)
               {
                   resultPort.Post(e);
                   yield break;
               }

               // EndReadのエラーを処理する
               yield return Arbiter .Receive(
                   false, port, iar =>
                   {
                       try
                       {
                           bytesRead = source.EndRead(iar);
                       }
                       catch (Exception  e)
                       {
                           resultPort.Post(e);
                       }
                   });

               // エラーがあったら処理をストップする
               if (bytesRead == 0)
                   yield break;

               // 書き込み処理は省略...
           } while (bytesRead > 0);
           resultPort.Post(new SuccessResult ());
       }

明らかにこのアプローチは見苦しく、間違いやすい。例外処理ルーチンはもちろん、どのコードも望ましいものとは言えない。

幸い、CCRはよりわかりやすくパワフルな例外処理の方法をCausalityという仕組みで提供している。上のような明示的な処理からコードを解放できるだけでなく、タスクの実行パスを通る非同期操作の任意の複雑な道筋の例外処理をスレッドについて気にすることなくただ一カ所宣言することで実現できる。

Causalityを宣言し利用するための典型的なパターンは、まずインスタンス化し、例外ポートへのハンドラーをアタッチし、それをCCRに連絡する。次のようになる。:

    var exceptionPort = new Port<Exception>();
    Arbiter.Activate(taskQueue, exceptionPort.Receive(e =>
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine("Caught " + e.ToString());
            Console.ResetColor();
        }));
    Dispatcher.AddCausality(new Causality("Example", exceptionPort));

これを済ませたら、そこでタスクを作ったりメッセージをポストすることができる。Causalityにより、どんな例外も後に続く実行パスのタスクで明示的にキャッチ、ハンドルすることもなく、例外ポートとそのハンドラーに流すことができる。

まとめ

まとめると、CCRが提供する機会の重要なことは、アプリケーションをその基本的なデータ依存性の観点で表現できることであり、タスクのスケジュールを記述することでランタイムが途切れなく利用可能なコアに割り当てられることだ。これによって同時に、開発者がスレッドやロックの明示的な処理から解放されることで、ソフトウェアが将来動作するであろうマルチコアのマシンのパワーの恩恵を完全に得られるのである。


注釈: ここでのすべての記載内容は著者に属するものであり、彼の所属組織の見解を反映するものではない。

本記事のすべてのサンプルは無償で利用できる「Microsoft Robotics Developer Studio 2008 Express Edition」で実行できる。詳細については使用許諾を確認のこと。

この記事に星をつける

おすすめ度
スタイル

BT