KtorサーバーにおけるWebSockets
必須の依存関係: io.ktor:ktor-server-websockets
コード例: server-websockets
WebSocketは、単一のTCP接続を介してユーザーのブラウザとサーバー間で全二重通信セッションを提供するプロトコルです。これは、サーバーとの間でリアルタイムのデータ転送を必要とするアプリケーションの作成に特に役立ちます。
Ktorは、サーバー側とクライアント側の両方でWebSocketプロトコルをサポートしています。
Ktorを使用すると、次のことができます。
- 基本的なWebSocket設定(フレームサイズ、ping期間など)を構成できます。
- サーバーとクライアント間でメッセージを交換するためのWebSocketセッションを処理できます。
- WebSocket拡張機能を追加できます。例えば、Deflate拡張機能を使用したり、カスタム拡張機能を実装したりできます。
クライアント側でのWebSocketサポートについては、WebSocketsクライアントプラグインを参照してください。
一方向通信セッションの場合、Server-Sent Events (SSE)の使用を検討してください。SSEは、サーバーがクライアントにイベントベースの更新を送信する必要がある場合に特に役立ちます。
依存関係を追加
WebSocketsを使用するには、ビルドスクリプトにktor-server-websocketsアーティファクトを含める必要があります。
WebSocketsをインストールする
アプリケーションにWebSocketsプラグインをインストールするには、指定された
install関数に渡します。 以下のコードスニペットは、WebSocketsをインストールする方法を示しています... - ...
embeddedServer関数呼び出しの内部。 - ... 明示的に定義された
module(Applicationクラスの拡張関数)の内部。
WebSocketsを構成する
オプションで、WebSocketOptionsを渡すことで、installブロック内でプラグインを構成できます。
pingPeriodプロパティを使用して、ping間の期間を指定します。timeoutプロパティを使用して、接続が閉じられるまでのタイムアウトを設定します。maxFrameSizeプロパティを使用して、送受信できる最大フレームを設定します。maskingプロパティを使用して、マスキングが有効になっているかどうかを指定します。contentConverterプロパティを使用して、シリアライゼーション/デシリアライゼーション用のコンバーターを設定します。
install(WebSockets) {
pingPeriod = 15.seconds
timeout = 15.seconds
maxFrameSize = Long.MAX_VALUE
masking = false
}WebSocketセッションを処理する
APIの概要
WebSocketsプラグインをインストールして構成したら、WebSocketセッションを処理するエンドポイントを定義できます。サーバー上でWebSocketエンドポイントを定義するには、ルーティングブロック内でwebSocket関数を呼び出します。
routing {
webSocket("/echo") {
// Handle a WebSocket session
}
}この例では、デフォルト構成が使用されている場合、サーバーはws://localhost:8080/echoへのWebSocketリクエストを受け入れます。
webSocketブロック内では、DefaultWebSocketServerSessionクラスで表されるWebSocketセッションのハンドラーを定義します。 ブロック内で以下の関数とプロパティを使用できます。
send関数を使用して、テキストコンテンツをクライアントに送信します。incomingおよびoutgoingプロパティを使用して、WebSocketフレームを受信および送信するためのチャネルにアクセスします。フレームはFrameクラスで表されます。close関数を使用して、指定された理由でクローズフレームを送信します。
セッションを処理する際に、フレームタイプを確認できます。例えば:
Frame.Textはテキストフレームです。このフレームタイプの場合、Frame.Text.readText()を使用してその内容を読み取ることができます。Frame.Binaryはバイナリフレームです。このタイプの場合、Frame.Binary.readBytes()を使用してその内容を読み取ることができます。
incomingチャネルには、ping/pongやクローズフレームなどの制御フレームは含まれていないことに注意してください。 制御フレームを処理し、断片化されたフレームを再構築するには、webSocketRaw関数を使用してWebSocketセッションを処理します。
クライアントに関する情報(クライアントのIPアドレスなど)を取得するには、
callプロパティを使用します。一般的なリクエスト情報について学びましょう。
以下では、このAPIの使用例を見ていきます。
例: 単一セッションの処理
以下の例は、1つのクライアントとのセッションを処理するecho WebSocketエンドポイントを作成する方法を示しています。
routing {
webSocket("/echo") {
send("Please enter your name")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val receivedText = frame.readText()
if (receivedText.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
} else {
send(Frame.Text("Hi, $receivedText!"))
}
}
}
}完全な例については、server-websocketsを参照してください。
例: 複数セッションの処理
複数のWebSocketセッションを効率的に管理し、ブロードキャストを処理するには、KotlinのSharedFlowを利用できます。このアプローチは、WebSocket通信を管理するためのスケーラブルで並行処理に適した方法を提供します。このパターンを実装する方法を以下に示します。
- メッセージをブロードキャストするための
SharedFlowを定義します。
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()- WebSocketルートで、ブロードキャストとメッセージ処理ロジックを実装します。
webSocket("/ws") {
send("You are connected to WebSocket!")
val job = launch {
sharedFlow.collect { message ->
send(message.message)
}
}
runCatching {
incoming.consumeEach { frame ->
if (frame is Frame.Text) {
val receivedText = frame.readText()
val messageResponse = MessageResponse(receivedText)
messageResponseFlow.emit(messageResponse)
}
}
}.onFailure { exception ->
println("WebSocket exception: ${exception.localizedMessage}")
}.also {
job.cancel()
}
}runCatchingブロックは、受信メッセージを処理し、それらをSharedFlowに発行し、その後すべてのコレクターにブロードキャストします。
このパターンを使用することで、個々の接続を手動で追跡することなく、複数のWebSocketセッションを効率的に管理できます。このアプローチは、多数の同時WebSocket接続を持つアプリケーションでうまくスケールし、メッセージブロードキャストを処理するためのクリーンでリアクティブな方法を提供します。
完全な例については、server-websockets-sharedflowを参照してください。
WebSocket APIとKtor
WebSocket APIの標準イベントは、Ktorでは次のようにマッピングされます。
onConnectはブロックの開始時に発生します。onMessageは、メッセージを正常に読み取った後(例:incoming.receive()を使用した場合)、またはfor(frame in incoming)でサスペンドされたイテレーションを使用した後に発生します。onCloseは、incomingチャネルが閉じられたときに発生します。これにより、サスペンドされたイテレーションが完了するか、メッセージを受信しようとしたときにClosedReceiveChannelExceptionがスローされます。onErrorは他の例外と同等です。
onCloseとonErrorの両方で、closeReasonプロパティが設定されます。
以下の例では、無限ループは例外が発生した場合にのみ終了します(ClosedReceiveChannelExceptionまたは他の例外)。
webSocket("/echo") {
println("onConnect")
try {
for (frame in incoming){
val text = (frame as Frame.Text).readText()
println("onMessage")
received += text
outgoing.send(Frame.Text(text))
}
} catch (e: ClosedReceiveChannelException) {
println("onClose ${closeReason.await()}")
} catch (e: Throwable) {
println("onError ${closeReason.await()}")
e.printStackTrace()
}
}