diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d5b52021f3..b76d61bcdca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## 0.11.2 +* `Ktor`: + * Support of `WebSockets` has been improved and added fixes inside of clients + ## 0.11.1 * `Repos` diff --git a/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt b/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt index 4ee959eceb3..e7adf61ba99 100644 --- a/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt +++ b/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt @@ -19,7 +19,7 @@ import kotlinx.serialization.DeserializationStrategy * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish * connection. Must return true in case if must be reconnected. By default always reconnecting */ -inline fun HttpClient.createStandardWebsocketFlow( +inline fun HttpClient.createStandardWebsocketFlow( url: String, noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} @@ -32,8 +32,8 @@ inline fun HttpClient.createStandardWebsocketFlow( do { val reconnect = runCatchingSafely { ws(correctedUrl, requestBuilder) { - for (received in incoming) { - sendSerialized(received.data) + while (isActive) { + send(receiveDeserialized()) } } checkReconnection(null) diff --git a/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/FlowsWebsocket.kt b/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/FlowsWebsocket.kt index 446307ad93a..32cfce7e3b0 100644 --- a/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/FlowsWebsocket.kt +++ b/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/FlowsWebsocket.kt @@ -12,6 +12,7 @@ import io.ktor.websocket.send import kotlinx.coroutines.flow.Flow import kotlinx.serialization.SerializationStrategy +@Deprecated("This method will be removed soon") fun Route.includeWebsocketHandling( suburl: String, flow: Flow, diff --git a/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/NewFlowsWebsocket.kt b/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/NewFlowsWebsocket.kt index ae9108f9235..fa2822c39c1 100644 --- a/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/NewFlowsWebsocket.kt +++ b/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/NewFlowsWebsocket.kt @@ -15,7 +15,8 @@ import kotlinx.serialization.SerializationStrategy inline fun Route.includeWebsocketHandling( suburl: String, flow: Flow, - protocol: URLProtocol? = null + protocol: URLProtocol? = null, + noinline dataMapper: suspend WebSocketServerSession.(T) -> T? = { it } ) { application.apply { pluginOrNull(WebSockets) ?: install(WebSockets) @@ -23,7 +24,7 @@ inline fun Route.includeWebsocketHandling( webSocket(suburl, protocol ?.name) { safely { flow.collect { - sendSerialized(it) + sendSerialized(dataMapper(it) ?: return@collect) } } }