diff --git a/CHANGELOG.md b/CHANGELOG.md index b76d61bcdca..1c3f89fb99d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 0.11.3 + +* `Ktor`: + * Support of `WebSockets` has been improved + * `Client`: + * New extensions: `HttpClient#openBaseWebSocketFlow`, `HttpClient#openWebSocketFlow`, `HttpClient#openSecureWebSocketFlow` + ## 0.11.2 * `Ktor`: diff --git a/gradle.properties b/gradle.properties index 660b85a88f5..8ff6592186e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -14,5 +14,5 @@ crypto_js_version=4.1.1 # Project data group=dev.inmo -version=0.11.2 -android_code_version=126 +version=0.11.3 +android_code_version=127 diff --git a/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt b/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt index e7adf61ba99..95fc92cde3c 100644 --- a/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt +++ b/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt @@ -1,5 +1,6 @@ package dev.inmo.micro_utils.ktor.client +import dev.inmo.micro_utils.common.Warning import dev.inmo.micro_utils.coroutines.runCatchingSafely import dev.inmo.micro_utils.coroutines.safely import dev.inmo.micro_utils.ktor.common.* @@ -7,35 +8,25 @@ import io.ktor.client.HttpClient import io.ktor.client.plugins.pluginOrNull import io.ktor.client.plugins.websocket.* import io.ktor.client.request.HttpRequestBuilder -import io.ktor.websocket.Frame -import io.ktor.websocket.readBytes -import io.ktor.websocket.serialization.sendSerializedBase +import io.ktor.http.URLProtocol +import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.isActive -import kotlinx.serialization.DeserializationStrategy /** * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish * connection. Must return true in case if must be reconnected. By default always reconnecting */ -inline fun HttpClient.createStandardWebsocketFlow( - url: String, +@Warning("This feature is internal and should not be used directly. It is can be changed without any notification and warranty on compile-time or other guaranties") +inline fun openBaseWebSocketFlow( noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, - noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} + noinline webSocketSessionRequest: suspend SendChannel.() -> Unit ): Flow { - pluginOrNull(WebSockets) ?: error("Plugin $WebSockets must be installed for using createStandardWebsocketFlow") - - val correctedUrl = url.asCorrectWebSocketUrl - return channelFlow { do { val reconnect = runCatchingSafely { - ws(correctedUrl, requestBuilder) { - while (isActive) { - send(receiveDeserialized()) - } - } + webSocketSessionRequest() checkReconnection(null) }.getOrElse { e -> checkReconnection(e).also { @@ -53,3 +44,60 @@ inline fun HttpClient.createStandardWebsocketFlow( } } } + +/** + * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish + * connection. Must return true in case if must be reconnected. By default always reconnecting + */ +inline fun HttpClient.openWebSocketFlow( + url: String, + useSecureConnection: Boolean, + noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, + noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} +): Flow { + pluginOrNull(WebSockets) ?: error("Plugin $WebSockets must be installed for using createStandardWebsocketFlow") + + return openBaseWebSocketFlow(checkReconnection) { + val block: suspend DefaultClientWebSocketSession.() -> Unit = { + while (isActive) { + send(receiveDeserialized()) + } + } + + if (useSecureConnection) { + wss(url, requestBuilder, block) + } else { + ws(url, requestBuilder, block) + } + } +} + +/** + * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish + * connection. Must return true in case if must be reconnected. By default always reconnecting + */ +inline fun HttpClient.openWebSocketFlow( + url: String, + noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, + noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} +): Flow = openWebSocketFlow(url, false, checkReconnection, requestBuilder) + +/** + * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish + * connection. Must return true in case if must be reconnected. By default always reconnecting + */ +inline fun HttpClient.openSecureWebSocketFlow( + url: String, + noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, + noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} +): Flow = openWebSocketFlow(url, true, checkReconnection, requestBuilder) + +/** + * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish + * connection. Must return true in case if must be reconnected. By default always reconnecting + */ +inline fun HttpClient.createStandardWebsocketFlow( + url: String, + noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, + noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} +): Flow = openWebSocketFlow(url, checkReconnection, requestBuilder)