コネヒト開発者ブログ

コネヒト開発者ブログ

Kotlin1.3でリリースされたCoroutineを試してみた

2017年11月にAndroidエンジニアとしてjoinした関根です。 最近はiOSアプリの開発も担当しております。 こちらはコネヒト Advent Calendar 2018 - Qiitaの10日目の記事です。

さて、先日Kotlin1.3の正式版がリリースされました。

下記が主要なリリースの一覧となります

  • CoroutineのStable版
  • Kotlin/Native ベータ
  • マルチプラットフォームの拡充
  • Ktor 1.0 ベータ
    • ※現在はStable版となっています

かねてからマルチプラットフォーム化を進めること目標としてきたKotlinですが、今回のリリースで、その未来へ大きく前進したように思えます。 Kotlinからますます目が離せないですね!

さて、それではマルチプラットフォームのご紹介を・・・・といきたいところですが、今回はStableとなったCoroutineの紹介をさせていただこうと思います。 下記が今回のアジェンダです。

  • Coroutineとは何者か?
  • ChannelでのCoroutine間通信
  • Coroutineを利用したPub/Subの仕組み

Coroutineとは何者か?

今回、StableとなったCoroutineはKotlinでのnon-blockingプログラミングを支える機能です。kotlinxパッケージの配下に存在します。 利用目的としては ファイルI/OやネットワークI/Oなどの非同期処理が一番に浮かびます。

// Coroutineの生成
GlobalScope.launch(Dispatchers.Main) {
    progressDialog.isVisible = true
    // asyncでnon-blocking関数化,awaitで待ち受けの開始
    // ※ここで処理は停止されるが、ブロッキングは発生しない
    val bigData = async { loadBigData() }.await()
    progressDialog.isVisible = false
    displayData(bigData)
}

上記のようにblockingが発生しうる処理を直列的な記述を維持しながら、non-blockingな処理を実現できるのがCoroutineの強みだと思っています。また下記のようにすると並行処理もお手軽に実現できます。

// Coroutineの生成
GlobalScope.launch(Dispatchers.Main) {
    progressDialog.isVisible = true

    // asyncでnon-blocking関数化,awaitで待ち受けの開始
    // ※ここで処理は停止されるが、ブロッキングは発生しない
    val bigDataInt = async { loadBigDataInt() }
    val bigDataString = async { loadBigDataString() }
    val bigDataPair = Pair(loadBigDataInt.await(), loadBigDataString.await()) // ここで並行処理が実施される

    progressDialog.isVisible = false
    displayData(bigDataPair)
}

Coroutineが発表されて以降、RxJavaと比較されることが多いですが、筆者としては、どちらを利用することにメリットが大きいかを適材適所で判断し、利用していくのが良いと捉えており、 今回のような単純な非同期処理だけであれば、Coroutineでも十分に有用であると考えております。

ChannelでのCoroutine間通信

アプリ開発においては非同期処理は様々な理由で発生します。 例えば、先ほど例に上げたようなAPI通信に関わるネットワーク I/Oもその一例です。 他にはバックグラウンド処理との同期をしたい場合や、OSからブロードキャストされてきたメッセージングのハンドリングなど。 そのようなユースケースの場合、異なるライフサイクルで存在するコンポーネント間での、メッセージ授受の仕組みが肝要になってきます。この仕組みはCoroutineのChannelという機能を利用することで実現が可能です。

// 送受信用のChannelを生成しておく
val channel = Channel<Int>()

GlobalScope.launch { // 送信側のCoroutine
    for (x in 1..5) channel.send(x * x)
}

GlobalScope.launch { // 受信側のCoroutine
    repeat(5) { println(channel.receive()) } // 5つデータを取得して表示する
}

このようにChannelはCoroutine間で値を安全に授受する仕組みです。 本例以外にもChannelを利用すると、パイプライン処理やFan-in/Fan-outパターンの実装を、楽に記述することができるようになります。こちらはKotlinのサンプルでも紹介されていますので、詳しくはそちらをお読みください。 https://kotlinlang.org/docs/reference/coroutines/channels.html#channels-experimental

なお現時点で、Channelはexperimentalとなっており、今後の開発状況によっては破壊的変更が加わる可能性が残っています。 十分ご検討の上ご利用ください!

Coroutineを利用したPub/Subの仕組み

さて、今回はChannelを利用してPub/Subの仕組みを試してみましたので紹介しようと思います。 ソースコードの全体像は下記です。

class PubSub<T> {
  // 1. Pub/Sub用のChannelを生成
    val bus = BroadcastChannel<T>(1)

    // 2. Channelへの送信
    fun send(item: T) = bus.sendBlocking(item)  

    // 3. CoroutineScopeを受け取りsubscribeを開始する
    fun subscribe(scope: CoroutineScope, onConsume: (T) -> Unit) {  
        scope.async(Dispatchers.IO) { // 4. IOスレッドを利用する指定
            Timber.d("async:"+Thread.currentThread().name)
            bus.openSubscription().consumeEach { // 5. 受信の開始
                onConsume(it)
            }
        }
    }
}
・・・

// 受信
val pubSub = PubSub<Int>()
pubSub.subscribe(this) {
    launch(Dispatchers.Main) { // 6. UIスレッドで実行
      // 何かUIに関わる処理
    }
}

// 送信
pubSub.send(1)

全体的な流れを見てみると以下のようになります。

1. Pub/Sub用のChannelを生成

BroadcastChannelは1対多でのデータ授受を目的としたChannelです。 EventBusのように一つのEventを複数の画面やクラスから購読する際に利用します。

2. Channelへの送信

今回の例ではsendBlockingで値を送信していますが、 呼び出し元がCoroutineの場合はsendメソッドを利用しましょう。

SendChannel.send - kotlinx-coroutines-core

3. CoroutineScopeを受け取りsubscribeを開始する

CoroutineScopeはCoroutine自身とその子Coroutineのスコープを決めるものです。 スコープを決めることにより、呼び出し元のLyfecycleに応じて中断処理を行うことが可能となります。 今回のサンプルではActivity単位のスコープとすることを想定しています。

4. IOスレッドを利用する指定

受信待ち受け処理は、I/Oスレッドで走らせています。

5. 受信の開始

openSubscriptionを呼び出すことにより、受信用のChannelが新たに生成されます。 これにより1対多のメッセージ授受を実現できます。 1対1で事足りる場合には通常のChannelを利用すると良いと思われます。

6. UIスレッドで実行

メッセージ受信後の処理をUIスレッドで行うように指定しています。 ここをDispatchers.IOのように指定するとワーカースレッドで処理が行われます。

実装は以上となります。 kotlinとkotlinxのパッケージのみで、Pub/Subが実現できると夢が広がりますね。

終わりに

今回はCoroutineとChannelについての紹介とそれらを利用したPub/Sub実装の紹介をさせて頂きました。 弊社ではRxJavaを利用した実装が多く存在するのですが、Coroutineで代用できる部分がないか、探って行きたいと思います! CoroutineとChannelの良き実装アプローチがあれば、教えてください!!

それでは!!