From 798128256eccec3474fbe6ced1d7bcd13960351e Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Tue, 14 Jun 2022 23:48:10 +0600 Subject: [PATCH] fixes in websockets --- CHANGELOG.md | 5 ++ .../ktor/client/NewFlowsWebsockets.kt | 80 +++++++++++++++---- 2 files changed, 69 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc265555044..1c3f89fb99d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## 0.11.3 +* `Ktor`: + * Support of `WebSockets` has been improved + * `Client`: + * New extensions: `HttpClient#openBaseWebSocketFlow`, `HttpClient#openWebSocketFlow`, `HttpClient#openSecureWebSocketFlow` + ## 0.11.2 * `Ktor`: diff --git a/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt b/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt index e7adf61ba99..95fc92cde3c 100644 --- a/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt +++ b/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt @@ -1,5 +1,6 @@ package dev.inmo.micro_utils.ktor.client +import dev.inmo.micro_utils.common.Warning import dev.inmo.micro_utils.coroutines.runCatchingSafely import dev.inmo.micro_utils.coroutines.safely import dev.inmo.micro_utils.ktor.common.* @@ -7,35 +8,25 @@ import io.ktor.client.HttpClient import io.ktor.client.plugins.pluginOrNull import io.ktor.client.plugins.websocket.* import io.ktor.client.request.HttpRequestBuilder -import io.ktor.websocket.Frame -import io.ktor.websocket.readBytes -import io.ktor.websocket.serialization.sendSerializedBase +import io.ktor.http.URLProtocol +import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.isActive -import kotlinx.serialization.DeserializationStrategy /** * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish * connection. Must return true in case if must be reconnected. By default always reconnecting */ -inline fun HttpClient.createStandardWebsocketFlow( - url: String, +@Warning("This feature is internal and should not be used directly. It is can be changed without any notification and warranty on compile-time or other guaranties") +inline fun openBaseWebSocketFlow( noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, - noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} + noinline webSocketSessionRequest: suspend SendChannel.() -> Unit ): Flow { - pluginOrNull(WebSockets) ?: error("Plugin $WebSockets must be installed for using createStandardWebsocketFlow") - - val correctedUrl = url.asCorrectWebSocketUrl - return channelFlow { do { val reconnect = runCatchingSafely { - ws(correctedUrl, requestBuilder) { - while (isActive) { - send(receiveDeserialized()) - } - } + webSocketSessionRequest() checkReconnection(null) }.getOrElse { e -> checkReconnection(e).also { @@ -53,3 +44,60 @@ inline fun HttpClient.createStandardWebsocketFlow( } } } + +/** + * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish + * connection. Must return true in case if must be reconnected. By default always reconnecting + */ +inline fun HttpClient.openWebSocketFlow( + url: String, + useSecureConnection: Boolean, + noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, + noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} +): Flow { + pluginOrNull(WebSockets) ?: error("Plugin $WebSockets must be installed for using createStandardWebsocketFlow") + + return openBaseWebSocketFlow(checkReconnection) { + val block: suspend DefaultClientWebSocketSession.() -> Unit = { + while (isActive) { + send(receiveDeserialized()) + } + } + + if (useSecureConnection) { + wss(url, requestBuilder, block) + } else { + ws(url, requestBuilder, block) + } + } +} + +/** + * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish + * connection. Must return true in case if must be reconnected. By default always reconnecting + */ +inline fun HttpClient.openWebSocketFlow( + url: String, + noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, + noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} +): Flow = openWebSocketFlow(url, false, checkReconnection, requestBuilder) + +/** + * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish + * connection. Must return true in case if must be reconnected. By default always reconnecting + */ +inline fun HttpClient.openSecureWebSocketFlow( + url: String, + noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, + noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} +): Flow = openWebSocketFlow(url, true, checkReconnection, requestBuilder) + +/** + * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish + * connection. Must return true in case if must be reconnected. By default always reconnecting + */ +inline fun HttpClient.createStandardWebsocketFlow( + url: String, + noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, + noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} +): Flow = openWebSocketFlow(url, checkReconnection, requestBuilder)