BT

最新技術を追い求めるデベロッパのための情報コミュニティ

寄稿

Topics

地域を選ぶ

InfoQ ホームページ アーティクル Kotlinのコルーチンを実装面から検証する

Kotlinのコルーチンを実装面から検証する

ブックマーク

原文(投稿日:2020/01/11)へのリンク

コルーチン(coroutine)は現在注目されているテーマですが、新しいものではありません。さまざまな資料に見るように、これまでにも、特に軽量なスレッディングや、あるいは"コールバック地獄"のソリューションが必要とされた場合には、何度も再評価されてきました。

最近、JVM上において、リアクティブプログラミングの代用としてのコルーチンに注目が集まるようになりました。ReJavaProject Reactorといったフレームワークでは、スロッティングやパラレリズムといった高度なサポートを備えた、入力情報のインクリメンタルな処理をクライアントで行う方法を提供していますが、リアクティブシステムでの機能操作を中心にコードを再編成する必要があるため、コストがメリットを上回るケースが少なくありません。

このような理由から、(例えば)Androidコミュニティでは、よりシンプルな代用策へのニーズがありました。Kotlin言語ではこのニーズに応えるため、実験的機能としてコルーチンが導入され、いくつかの修正を経た後、同言語のバージョン1.3で正式な機能になりました。Kotlinコルーチンの適用はUI開発のみに留まらず、サーバ側フレームワーク(Spring 5でのサポート追加など)や、さらにはArrow(Arrow Fxによる)のような機能フレームワークにまで拡張されています。

コルーチンを理解する上での課題

残念ながら、コルーチンの理解は簡単ではありません。Kotlin上級者によるコルーチンの解説はたくさんありますが、大部分は啓発的なもので、コルーチンとは何であるか(あるいは、どのように利用されるべきか)という点に関する見解の統一は難しいのが現状です。コルーチンは並列プログラミングの単子(モナド)である、と言ってよいでしょう。

問題のひとつはその実装にあります。Kotlinのコルーチンでは、コンパイラが実装しているのはsuspendキーワードのみで、その他はすべてコルーチンライブラリによって処理されます。Kotlinのコルーチンは極めて強力かつ柔軟ですが、漠然としたものでもあります。この点が、確立したガイドラインや厳格な思想を必要とする初学者にとって障壁となっているのです。この記事ではコルーチンをボトムアップに解説することで、その理解のための基礎を提供できればと思っています。

サンプルアプリケーション(サーバ側)

今回のアプリケーションでは、RESTfulサービスを複数回コールする際の安全性と効率性に関する、ごく一般的な問題を取り上げています。記事では"Where's Waldo(ウォーリーを探せ)"のテキスト版 — "Waldo"を見つけるまで、名前のチェーンを追いかける処理を開発します。

ここにHttp4kを使って記述した、完全なRESTfulサービスがあります。Http4kは、Marius Eriksen氏の有名な論文で述べられている関数サーバアーキテクチャのKotlin版で、Kotlin以外にもScala(Http4s)やJava 8以降(Http4j)など、さまざまな言語で実装されています。

今回のサービスには、Mapによって名前のチェーンを実装した単一のエンドポイントがあり、名前を与えることで、マッチした値とステータスコード200か、あるいはエラーメッセージと404のいずれかが返されます。

fun main() {
   val names = mapOf(
       "Jane" to "Dave",
       "Dave" to "Mary",
       "Mary" to "Pete",
       "Pete" to "Lucy",
       "Lucy" to "Waldo"
   )

   val lookupName = { request: Request ->
       val name = request.path("name")
       val headers = listOf("Content-Type" to "text/plain")
       val result = names[name]
       if (result != null) {
           Response(OK)
               .headers(headers)
               .body(result)
       } else {
           Response(NOT_FOUND)
               .headers(headers)
               .body("No match for $name")
       }
   }

   routes(
       "/wheresWaldo" bind routes(
           "/{name:.*}" bind Method.GET to lookupName
       )
   ).asServer(Netty(8080))
       .start()
}

基本的にクライアントでは、次のような要求チェーンを生成したいと思っています。


サンプルアプリケーション(クライアント側)

クライアントアプリケーションは、JavaFXライブラリで作成したデスクトップユーザインターフェースが中心です。ただし、不必要な部分を省略して作業を簡単にするため、JavaFX上のKotlin DSLであるTornadoFXを使用します。

クライアントの完全な定義は、次のようになっています。

class HelloWorldView: View("Coroutines Client UI") {
   private val finder: HttpWaldoFinder by inject()
   private val inputText = SimpleStringProperty("Jane")
   private val resultText = SimpleStringProperty("")

   override val root = form {
       fieldset("Lets Find Waldo") {
           field("First Name:") {
               textfield().bind(inputText)
               button("Search") {
                   action {
                       println("Running event handler".addThreadId())
                       searchForWaldo()
                   }
               }
           }
           field("Result:") {
               label(resultText)
           }
       }
   }
   private fun searchForWaldo() {
       GlobalScope.launch(Dispatchers.Main) {
           println("Doing Coroutines".addThreadId())
           val input = inputText.value
           val output = finder.wheresWaldo(input)
           resultText.value = output
       }
   }
}

String型の拡張として、次のようなヘルパ関数も使用します。

fun String.addThreadId() = "$this on thread ${Thread.currentThread().id}"

実行時のUIはこのようになります。

ボタンをクリックするとコルーチンが起動されて、'HttpWaldoFinder'型のサービスオブジェクト経由でRESTfulエンドポイントにアクセスします。

Kotlinのコルーチンは'CoroutineScope'内にあり、下位の並行動作モデルを表現する特定のDispatcherに関連付けられています。並行動作モデルは一般的にはスレッドプールですが、その他の場合もあります。

利用可能なDispatcherが何であるかは、Kotlinコードを実行する環境に依存しています。Main DispatcherはUIライブラリのイベント処理スレッドであり、従って(JVMでは)Android、JavaFX、Swingのみで利用可能です。Kotlin Nativeでは当初、コルーチンでのマルチスレッドはまったくサポートされていませんでしたが、現在は変更されています。サーバ側では自身の選択でコルーチンを導入できますが、Spring 5など、デフォルトで使用可能なものも増えています。

今回の例では、サスペンドするメソッドのコールを開始する前に、コルーチンと'CoroutineScope'と'Dispatcher'を用意しておく必要があります。これが最初のコール(上記コードのように)であれば、'launch'や'async'などの'コルーチンビルダ'関数を使用してプロセスを起動することができます。

いずれかのコルーチンビルダ関数か、あるいは'withContext'のようなスコープ関数をコールすることで、新たな'CoroutineScope'が常に生成されます。このスコープ内では、タスクは'Job'インスタンスのひとつの階層として表されます。

ここでは次のように、いくつかの興味深い特徴があります。

  • ジョブは完了前、すべてのコルーチンが完了するまで、自身をブロックして待機する。
  • ジョブをキャンセルすると、その子供もすべてキャンセルされる。
  • 子供のキャセルの失敗は親に伝搬する。  

この設計は、親ジョブの停止時に子供が終了しなくなるというような、並列プログラミングで一般的な問題を回避するためのものです。

RESTエンドポイントにアクセスするサービス

以下にあげたのは、今回使用するHttpWaldoFinderサービスの全コードです。

class HttpWaldoFinder : Controller(), WaldoFinder {
   override suspend fun wheresWaldo(starterName: String): String {
       val firstName = fetchNewName(starterName)
       println("Found $firstName name".addThreadId())

       val secondName = fetchNewName(firstName)
       println("Found $secondName name".addThreadId())

       val thirdName = fetchNewName(secondName)
       println("Found $thirdName name".addThreadId())

       val fourthName = fetchNewName(thirdName)
       println("Found $fourthName name".addThreadId())

       return fetchNewName(fourthName)
   }
   private suspend fun fetchNewName(inputName: String): String {
       val url = URI("http://localhost:8080/wheresWaldo/$inputName")
       val client = HttpClient.newBuilder().build()
       val handler = HttpResponse.BodyHandlers.ofString()
       val request = HttpRequest.newBuilder().uri(url).build()

       return withContext<String>(Dispatchers.IO) {
           println("Sending HTTP Request for $inputName".addThreadId())
           client
               .send(request, handler)
               .body()
       }
   }

}

'fetctNewName'関数は既知の名前を取得して、エンドポイントに関連する名前を問い合わせます。この処理は、Java 11から標準になった'HttpClient'型を使用して行っています。実際のHTTP GETは、IO Dispatcherを使用する新たな子供のコルーチンが実行します。この方法はスレッドプールを構成するもので、ネットワークコールなどの長時間実行されるアクティビティには最適です。

'wheresWaldo'関数は名前のチェーンに沿って5回実行され、(うまくいけば)ウォーリーを見つけ出します。後で生成されたバイトコードを逆アセンブルする予定なので、実装は可能な限り単純なものにしています。ここで注目したいのは、'fetchNewName'の各呼び出しが現在のコルーチンをサスペンドしても、子供のコルーチンが動き続けることです。子がIO Dispatcherの実行中であっても、親はMain Dispatcherを実行することが可能であるため、子がHTTPリクエストを行っている間も、UIイベント処理スレッドがユーザによるビュー操作を処理することができるのです。下の図はその様子を表しています。

IntelliJには、サスペンドを伴う呼び出しが実行されたために、コルーチン間でコントロールの転送が行われたことが表示されます。Dispatcherをスイッチしなければ、呼び出しによって新たなコルーチンが生成されない場合もあることに注意してください。あるサスペンド関数が別のサスペンド関数を呼び出せば、同じコルーチン内で継続することが可能になります。同じスレッド内に留まりたい場合には、このような動作が必要です。

クライアントを実行すると、コンソールには次のように出力されます。

ここからは、Main DispatcherとUI Event Handlerがスレッド17で、IO Dispatcherがスレッド24と26を含むスレッドプール上で、それぞれ実行されていることが分かります。

調査の開始

IntelliJに付属のバイトコード逆アセンブルツールを使えば、実際に何が起きているのかを確認することができます。JDKで提供されている、標準的な'javap'ツールを使うことももちろん可能です。

'HttpWaldoFinder'のメソッドのシグネチャが、Continuationオブジェクトを追加パラメータとして受け入れて、汎用的なObjectを返すように変更されていることが分かります。

public final class HttpWaldoFinder extends Controller implements WaldoFinder {

  public Object wheresWaldo(String a, Continuation b)
  final synthetic Object fetchNewName(String a, Continuation b)
}

それでは、これらのメソッドに追加されたコードを調べて、'Continuation'が何であるか、何が返されているのかを確認することにしましょう。

Continuation Passing Style (CPS)

Kotlinの標準化プロセス(別名:KEEP)"proposal for coroutines"で説明されているように、コルーチンの実装はCPS(Continuation Passing Style)を基本にしています。Continuationオブジェクトは、関数がサスペンドされている間に必要となる状態の保存に使用されます。

基本的には、サスペンドされる関数のすべてのローカル変数がCotinuationのフィールドになります。パラメータと現在のオブジェクト(関数がメソッドである場合)にもフィールドを生成する必要があります。従って、4つのパラメータと5つのローカル変数を使用するメソッドには、少なくともContinuationに10個のフィールドがあることになります。

私たちの'HttpWaldoFinder'の'wheresWaldo'メソッドの場合、1つのパラメータと4つのローカル変数がありますから、Cotinuation実装型には6つのフィールドがあるはずです。Kotlinコンパイラが生成したバイトコードをJavaソースに逆アセンブルすれば、これが事実であることを確認できます。

$continuation = new ContinuationImpl($completion) {
  Object result;
  int label;
  Object L$0;
  Object L$1;
  Object L$2;
  Object L$3;
  Object L$4;
  Object L$5;

  @Nullable
  public final Object invokeSuspend(@NotNull Object $result) {
     this.result = $result;
     this.label |= Integer.MIN_VALUE;
     return HttpWaldoFinder.this.wheresWaldo((String)null, this);
  }
};

フィールドはすべてObject型なので、どのように使用されるのかはすぐには分かりませんが、調べていけば、次のようなことが分かります。

  • 'L$0'は'HttpWaldoFinder'インスタンスの参照を保持します。これは常に存在します。
  • 'L$1'は'startName'パラメータを保持します。これは常に存在します。
  • 'L$2'と'L$5'は、ローカル変数の値を保持します。これらには、コードの進行に従って値が設定されます。'L$2'も同じように、'firstName'の値を保持します。

その他にも、処理結果を格納するフィールドと、'label'という不明な整数値があります。

サスペンドするか、サスペンドしないか — それが問題だ

生成されたコードを調べる際には、2つのユースケースに対処する必要があることに留意しなければなりません。サスペンドする関数が別の関数を呼び出した時、現在のコルーチンを中断(して、そのスレッドで別のコルーチンが実行されるようにする)する場合と、現在のコルーチンの実行を継続する場合があるのです。

データストアから値を読み出すサスペンド関数を例にしましょう。おそらくI/Oの発生によってサスペンドしますが、その結果はキャッシュされるかも知れません。次にコールされた時にはサスペンドせず、キャッシュされた値を同期的に返すでしょう。Kotlinコンパイラによって生成されたコードは、どちらのパスも許容できるものでなければなりません。

そのためにKotlinコンパイラは、実際の処理結果か、特殊な値であるCOROUTINE_SUSPENDEDのいずれかを返すように、サスペンドする関数の戻り型を調整します。後者の場合、現在のコルーチンはサスペンドされます。サスペンド関数の戻り値の型を'Object'に変更するのはこのためなのです。

サンプルアプリケーションでは、'wheresWaldo'は'fetchNewName'を繰り返し呼び出します。理論上、これらの呼び出しは現行のコルーチンをサスペンドする場合と、サスペンドしない場合があります。私たちは、サスペンドが常に発生するものとして'fecthNewName'を書きましたが、生成されたコードの観点からは、すべての可能性に対処する必要のあることを忘れてはなりません。

巨大なswitch文とラベル

逆アセンブルされたコードを読み進めると、複数のネストしたラベル内に埋め込まれたswitch文が見つかります。これは、wheresWaldo()メソッド内のさまざまなサスペンドポイントをコントロールするステートマシンを実装したもので、上位構造は次のようになっています。

// listing one: the generated switch statement and labels
String firstName;
String secondName;
String thirdName;
String fourthName;
Object var11;
Object var10000;
label48: {
  label47: {
     label46: {
        Object $result = $continuation.result;
        var11 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch($continuation.label) {
        case 0:
            // code omitted
        case 1:
            // code omitted
        case 2:
            // code omitted
        case 3:
            // code omitted
        case 4:
            // code omitted
        case 5:
            // code omitted
        default:
           throw new IllegalStateException(
                "call to 'resume' before 'invoke' with coroutine");
        } // end of switch
        // code omitted
    } // end of label 46
    // code omitted
  } // end of label 47
  // code omitted
} // end of label 48
// code omitted

ここまで読めば、Continuationの'label'フィールドの目的が分かるようになります。'wheresWaldo'のステージがそれぞれ完了すると、'label'の値が変更されます。ネストしたラベルブロックには、元のKotkinコード内のサスペンドポイント間のコードブロックが含まれています。この'label'の値を使用することで、このコードはリエントラント可能になり、最後に中断したポイント(適切なcaseステートメント)にジャンプしてContinuationからデータを抽出し、正しいラベル付きブロックにブレークすることができるのです。

ただし、すべてのサスペンドポイントが実際にはサスペンドしない場合には、ブロック全体が同期的に実行されなくてはなりません。生成されたコードを見ると、次のようなフラグメントがいくつも目に付きます。

// listing two — deciding if the current coroutine should suspend
if (var10000 == var11) {
  return var11;
}

前述のように、'var11'にはCONTINUATION_SUSPENDEDが設定されているのに対して、'var10000'は別のサスペンド関数からの戻り値を保持しています。従ってサスペンドが発生した場合にはリターンして(後に再入します)、サスペンドが起こらない場合は適切なラベル付きブロックにブレークすることによって、コードは関数の次の部分に続きます。

ここでもう一度、生成されたコードにおいては、すべての呼び出しがサスペンドする、あるいはすべての呼び出しが現在のスレッドで継続される、といった想定はできないことを思い出してください。考えられる、あらゆる組み合わせを処理する必要があるのです。

実行のトレース

プログラムを実行すると、Continuationの'label'の値は0にセットされます。それに対応するswitch文のブランチは、次のようになっています。

// listing three — the first branch of the switch
case 0:
  ResultKt.throwOnFailure($result);
  $continuation.L$0 = this;
  $continuation.L$1 = starterName;
  $continuation.label = 1;
  var10000 = this.fetchNewName(starterName, $continuation);
  if (var10000 == var11) {
     return var11;
  }
  break;

インスタンスとパラメータをContinuationオブジェクトに設定した後、それを'fetchNewName'に渡しています。前述のように、コンパイラが生成した'fetchNewName'では、実際の処理結果とCOROUTINE SUSPEND値のどちらかを返します。

コルーチンがサスペンドされる場合は関数から戻って、再開時に'case 1'ブランチにジャンプします。現在のコルーチンで処理を継続する場合は、switchから次のようなラベル付きブロックのひとつにブレークします。

// listing four — calling ‘fetchNewName' for the second time
firstName = (String)var10000;
secondName = UtilsKt.addThreadId("Found " + firstName + " name");
boolean var13 = false;
System.out.println(secondName);
$continuation.L$0 = this;
$continuation.L$1 = starterName;
$continuation.L$2 = firstName;
$continuation.label = 2;
var10000 = this.fetchNewName(firstName, $continuation);
if (var10000 == var11) {
  return var11;
}

'var1000'には必要な戻り値が入っていることが分かっているので、適切な型にキャストした上で、ローカル変数'firstName'に格納することができます。さらに生成されたコードでは、変数'secondName'にスレッドIDの結合結果が代入された上で、それが表示されます。

次にContinuation内のフィールドを更新して、サーバから取得した値を追加します。今度は'label'の値が2になります。ここで、3回目の'fetchNewName'を実行します。

3回目の'fetchNewName'呼び出し — サスペンドしない場合

ここでもう一度、'fetchNewName'の戻り値を判断しなければなりません。戻り値がCOROUTINE_SUSPENDEDであれば、現在の関数からリターンして、次にコールされた時にswitch文の'case 2'ブランチが実行されます。

現在のコルーチンを継続する場合は、以下のブロックが実行されます。先程のものと同じように見えますが、Continuationに格納されるデータが増えています。

// listing four — calling ‘fetchNewName' for the third time
secondName = (String)var10000;
thirdName = UtilsKt.addThreadId("Found " + secondName + " name");
boolean var14 = false;
System.out.println(thirdName);
$continuation.L$0 = this;
$continuation.L$1 = starterName;
$continuation.L$2 = firstName;
$continuation.L$3 = secondName;
$continuation.label = 3;
var10000 = this.fetchNewName(secondName, (Continuation)$continuation);
if (var10000 == var11) {
  return var11;
}

このパターンが残りの呼び出し(COROUTINE_SUSPENDEDが返ることはないはずです)について、最後まで繰り返されるのです。

3回目の'fetchNewName'呼び出し — サスペンドする場合

別の場合として、コルーチンがサスペンドする時には、次のブロックが実行されます。

// listing five — the third branch of the switch
case 2:
  firstName = (String)$continuation.L$2;
  starterName = (String)$continuation.L$1;
  this = (HttpWaldoFinder)$continuation.L$0;
  ResultKt.throwOnFailure($result);
  var10000 = $result;
  break label46;

Continuationから値が取り出されて、関数のローカル変数に代入されます。次に、ラベル付きbreakを使って、4つ上のリストに実行を移動します。そして最終的に、同じ場所に到達するのです。

実行の要約

ここでコード構造のリストに戻って、各セクションで行われたことを要約してみましょう。

// listing six — the generated switch statement and labels in depth
String firstName;
String secondName;
String thirdName;
String fourthName;
Object var11;
Object var10000;
label48: {
  label47: {
     label46: {
        Object $result = $continuation.result;
        var11 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch($continuation.label) {
       case 0:
            // set label to 1 and make the first call to ‘fetchNewName'
            // if suspending return, otherwise break from the switch
        case 1:
            // extract the parameter from the continuation
            // break from the switch
        case 2:
            // extract the parameter and first result from the continuation
            // break to outside ‘label46'
        case 3:
            // extract the parameter, first and second results from the
            //   continuation
            // break to outside ‘label47'
        case 4:
            // extract the parameter, first, second and third results from
            //   the continuation
            // break to outside ‘label48'
        case 5:
            // extract the parameter, first, second, third and fourth
            //   results from the continuation
            // return the final result
        default:
           throw new IllegalStateException(
                "call to 'resume' before 'invoke' with coroutine");
        } // end of switch
        // store the parameter and first result in the continuation
        // set the label to 2 and make the second call to ‘fetchNewName'
        // if suspending return, otherwise proceed
    } // end of label 46
        // store the parameter, first and second results in the
        //   continuation
        // set the label to 3 and make the third call to ‘fetchNewName'
        // if suspending return, otherwise proceed
  } // end of label 47
        // store the parameter, first, second and third results in the
        //   continuation
        // set the label to 4 and make the fourth call to ‘fetchNewName'
        // if suspending return, otherwise proceed
} // end of label 48
// store the parameter, first, second, third and fourth results in the continuation
// set the label to 5 and make the fifth call to ‘fetchNewName'
// return either the final result or COROUTINE_SUSPENDED

結論

このコードベースを理解するのは容易ではありません。今回、Kotlinコンパイラのコードジェネレータが生成したバイトコードから逆アセンブルしたJavaコードを検証したのですが、コードジェネレータの出力は、分かりやすさよりも効率性や最小化を重視するように設計されているからです。

それでも、いくつかの有益な結論を見出すことはできました。

  1. マジックは存在しない。最初にコルーチンを学ぶ時には、すべてを結び付けることのできる特別な"マジック"があるように思いがちですが、これまで見てきたように、生成されたコードで使用されているのは、条件式やラベル付きbreakなど、手続きプログラミングの基本的なビルディングブロックに過ぎません。
  2. 実装のベースはContinuationである。オリジナルのKEEP提案書に記述されているように、関数のサスペンドや再開は、オブジェクト内に状態をキャッシュすることによって行われます。そのためにコンパイラは、N個のフィールドを持ったContinuation型をサスペンド関数毎に生成します。Nは関数のパラメータ数とフィールド数に3を加えた値です。3を加えるのは、現在のオブジェクト、最終的な結果、そしてインデクスを保持するためです。
  3. 実行は常に標準パターンに従う。サスペンド状態から実行を再開する場合には、Continuationの'label'フィールドを使って、switch文の適切なブランチへジャンプします。このブランチでは、それまでに生成したデータをContinuationオブジェクトから取得した上で、ラベル付きbreakを使用して、サスペンドが発生しなかった場合に直接実行されるはずだったコードにジャンプします。

著者について

Garth Gilmourは、Instilの教育責任者です。氏は1999年にフルタイム開発をリタイアして、最初はCプログラマにC++を、次にC++プログラマにJavaを、次にはJavaプログラマにC#を教えていました。現在はあらゆることをあらゆる人たちに教えていますが、自身はKotlinでの開発がお気に入りです。講演の数を数えていたならば、優に1,000は超えているでしょう。20以上のコースの著者であり、ミートアップでは頻繁に講演し、国内外のカンファレンスでプレゼンテーションを行っている他、開発者向けイベントであるBelfast BASHシリーズや、先日結成されたKotlin Belfast User Groupにも参画しています。ホワイトボードの前に立っていない時には、Krav Magaのコーチとして重量挙げをしています。

 

Eamonn Boyleは開発者、アーキテクト、チームリーダとして15年以上のキャリアを持っています。ここ4年ほどは、フルタイムのトレーナおよびコーチとして活動し、さまざまなトピックに関わるコースの編集や提供を幅広く行っています。取り上げるトピックには、言語の中核的スキルからフレームワーク、ツールやプロセスに至るまで、さまざまなパラダイムやテクノロジが含まれています。また、GotoやKotlinConfを始めとするさまざまなカンファレンスやイベント、ミートアップで、講演やワークショップも行っています。

この記事に星をつける

おすすめ度
スタイル

こんにちは

コメントするには InfoQアカウントの登録 または が必要です。InfoQ に登録するとさまざまなことができます。

アカウント登録をしてInfoQをお楽しみください。

HTML: a,b,br,blockquote,i,li,pre,u,ul,p

コミュニティコメント

HTML: a,b,br,blockquote,i,li,pre,u,ul,p

HTML: a,b,br,blockquote,i,li,pre,u,ul,p

BT