diff --git a/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/FlowsWebsockets.kt b/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/FlowsWebsockets.kt index 176f75ee809..1b59ee4f2dd 100644 --- a/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/FlowsWebsockets.kt +++ b/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/FlowsWebsockets.kt @@ -1,5 +1,6 @@ package dev.inmo.micro_utils.ktor.client +import dev.inmo.micro_utils.coroutines.runCatchingSafely import dev.inmo.micro_utils.coroutines.safely import dev.inmo.micro_utils.ktor.common.* import io.ktor.client.HttpClient @@ -11,6 +12,7 @@ import io.ktor.websocket.Frame import io.ktor.websocket.readBytes import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.isActive import kotlinx.serialization.DeserializationStrategy /** @@ -19,7 +21,7 @@ import kotlinx.serialization.DeserializationStrategy */ inline fun HttpClient.createStandardWebsocketFlow( url: String, - crossinline checkReconnection: (Throwable?) -> Boolean = { true }, + crossinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}, crossinline conversation: suspend (StandardKtorSerialInputData) -> T ): Flow { @@ -28,36 +30,32 @@ inline fun HttpClient.createStandardWebsocketFlow( val correctedUrl = url.asCorrectWebSocketUrl return channelFlow { - val producerScope = this@channelFlow do { - val reconnect = try { - safely { - ws(correctedUrl, requestBuilder) { - for (received in incoming) { - when (received) { - is Frame.Binary -> producerScope.send(conversation(received.readBytes())) - else -> { - producerScope.close() - return@ws - } + val reconnect = runCatchingSafely { + ws(correctedUrl, requestBuilder) { + for (received in incoming) { + when (received) { + is Frame.Binary -> send(conversation(received.data)) + else -> { + close() + return@ws } } } } checkReconnection(null) - } catch (e: Throwable) { + }.getOrElse { e -> checkReconnection(e).also { if (!it) { - producerScope.close(e) + close(e) } } } - } while (reconnect) - if (!producerScope.isClosedForSend) { - safely( - { it.printStackTrace() } - ) { - producerScope.close() + } while (reconnect && isActive) + + if (isActive) { + safely { + close() } } } @@ -70,7 +68,7 @@ inline fun HttpClient.createStandardWebsocketFlow( inline fun HttpClient.createStandardWebsocketFlow( url: String, deserializer: DeserializationStrategy, - crossinline checkReconnection: (Throwable?) -> Boolean = { true }, + crossinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, serialFormat: StandardKtorSerialFormat = standardKtorSerialFormat, noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}, ) = createStandardWebsocketFlow( diff --git a/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/StandardHttpClientGetPost.kt b/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/StandardHttpClientGetPost.kt index 2e91d9acccd..21c5b28aede 100644 --- a/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/StandardHttpClientGetPost.kt +++ b/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/StandardHttpClientGetPost.kt @@ -87,7 +87,7 @@ class UnifiedRequester( fun createStandardWebsocketFlow( url: String, - checkReconnection: (Throwable?) -> Boolean, + checkReconnection: suspend (Throwable?) -> Boolean, deserializer: DeserializationStrategy, requestBuilder: HttpRequestBuilder.() -> Unit = {}, ) = client.createStandardWebsocketFlow(url, deserializer, checkReconnection, serialFormat, requestBuilder) @@ -96,7 +96,7 @@ class UnifiedRequester( url: String, deserializer: DeserializationStrategy, requestBuilder: HttpRequestBuilder.() -> Unit = {}, - ) = createStandardWebsocketFlow(url, { true }, deserializer, requestBuilder) + ) = createStandardWebsocketFlow(url, { true }, deserializer, requestBuilder) } val defaultRequester = UnifiedRequester() diff --git a/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/FlowsWebsocket.kt b/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/FlowsWebsocket.kt index 973c47bb418..446307ad93a 100644 --- a/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/FlowsWebsocket.kt +++ b/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/FlowsWebsocket.kt @@ -15,13 +15,13 @@ import kotlinx.serialization.SerializationStrategy fun Route.includeWebsocketHandling( suburl: String, flow: Flow, - protocol: URLProtocol = URLProtocol.WS, + protocol: URLProtocol? = null, converter: suspend WebSocketServerSession.(T) -> StandardKtorSerialInputData? ) { application.apply { pluginOrNull(WebSockets) ?: install(WebSockets) } - webSocket(suburl, protocol.name) { + webSocket(suburl, protocol ?.name) { safely { flow.collect { converter(it) ?.let { data -> @@ -37,7 +37,7 @@ fun Route.includeWebsocketHandling( flow: Flow, serializer: SerializationStrategy, serialFormat: StandardKtorSerialFormat = standardKtorSerialFormat, - protocol: URLProtocol = URLProtocol.WS, + protocol: URLProtocol? = null, filter: (suspend WebSocketServerSession.(T) -> Boolean)? = null ) = includeWebsocketHandling( suburl, diff --git a/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/ServerRoutingShortcuts.kt b/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/ServerRoutingShortcuts.kt index b2b4e93d751..962f72e2404 100644 --- a/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/ServerRoutingShortcuts.kt +++ b/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/ServerRoutingShortcuts.kt @@ -27,7 +27,7 @@ class UnifiedRouter( suburl: String, flow: Flow, serializer: SerializationStrategy, - protocol: URLProtocol = URLProtocol.WS, + protocol: URLProtocol? = null, filter: (suspend WebSocketServerSession.(T) -> Boolean)? = null ) = includeWebsocketHandling(suburl, flow, serializer, serialFormat, protocol, filter)