新SkyWayのSubscribeは順番に

新SkyWayは実装がしやすくなった

SkyWayのライブラリが一新され、弊社では旧SkyWayで構築されたシステムを新SkyWayへリプレースする開発を進めております。

新SkyWayではPubSubモデルが採用され、特に3人以上での多人数通話システムを非常に開発しやすくなりました。

また映像・音声・データをそれぞれPublish・Subscribeできるようになったことで、
「話している人の音声は全員が受信するが、映像は希望者のみ受信する」といったような柔軟な制御もしやすくなっています。しかし、、

Subscribeが終わらない?

新SkyWayのAPIにはいずれかのユーザーがメディアやデータをPublishした事を知らせるイベントがあります。
あるプロジェクトではそのイベントリスナで直接PublicationをSubscribeしていましたが、そこで行っているSubscribe処理が確率的にハングする事がありました。
その時の実装は以下のような形です。

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers

val scope = CoroutineScope(Dispatchers.IO)

room.onStreamPublishedHandler = { pub ->
    // いずれかのStreamがPublishされた

    scope.launch {
        val sub = localRoomMember.subscribe(pub.id) // ←ここでsubscribeが終わらない事がある
        // do something...
    }
}

問題点

上記のような実装では、1つのSubscribeが処理されている最中に別のSubscribeが並行稼働する可能性があります。
このパターンに陥ったとき、Subscribeが完了しなくなってしまうようでした。

改善策

複数のsubscribeが並行稼働しないよう、以下のような改修を行いました。

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.sync.Mutex

val scope = CoroutineScope(Dispatchers.IO)
val subscribingMutex = Mutex()

room.onStreamPublishedHandler = { pub ->
    // いずれかのStreamがPublishされた

    scope.launch {
        subscribingMutex.withLock {
            val sub = localRoomMember.subscribe(pub.id) // ←安定してsubscribeできるようになった
            // do something...
        }
    }
}

ポイントとなるのは12行目のMutex.withLockです。
onStreamPublishedHandlerが並行稼働したとしても、subscribingMutex.withLockのブロックを複数のスレッドが同時に実行することはできません。
もしも該当ブロックを処理している最中に後続のスレッドがそこへ到達した場合、後続のスレッドは前のスレッドが処理を完了するまで待つことになり、subscribeが一つ一つ順番に処理されます。
一見すると並行稼働するよりも全体の処理時間がかかってしまうように感じますが、これによってSubscribeがハングする確率を明らかに下げ、接続処理にかかる時間や安定性を改善できました。

まとめ

Mutexを用いることで非同期処理の順序を整理することができ、それによって処理を安定させる事ができます。
新SkyWayのsubscribe処理においては、これは非常に効果的でした。

弊社ではWebRTC開発案件も承っておりますので、問い合わせフォームよりお気軽にご相談ください。