diff --git a/CHANGELOG.md b/CHANGELOG.md index 457bd88e9d5..48edbd27042 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ * Ktor: * Client: * New function `UnifiedRequester#createStandardWebsocketFlow` without `checkReconnection` arg + * Server: + * Now it is possible to filter data in `Route#includeWebsocketHandling` + * Callback in `Route#includeWebsocketHandling` is `suspend` since now ## 0.9.9 diff --git a/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/FlowsWebsocket.kt b/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/FlowsWebsocket.kt index d26cd03cb08..0a47a932fc4 100644 --- a/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/FlowsWebsocket.kt +++ b/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/FlowsWebsocket.kt @@ -7,6 +7,7 @@ import io.ktor.application.install import io.ktor.http.cio.websocket.* import io.ktor.routing.Route import io.ktor.routing.application +import io.ktor.websocket.WebSocketServerSession import io.ktor.websocket.webSocket import kotlinx.coroutines.flow.Flow import kotlinx.serialization.SerializationStrategy @@ -14,7 +15,7 @@ import kotlinx.serialization.SerializationStrategy fun Route.includeWebsocketHandling( suburl: String, flow: Flow, - converter: (T) -> StandardKtorSerialInputData + converter: suspend WebSocketServerSession.(T) -> StandardKtorSerialInputData? ) { application.apply { featureOrNull(io.ktor.websocket.WebSockets) ?: install(io.ktor.websocket.WebSockets) @@ -22,7 +23,9 @@ fun Route.includeWebsocketHandling( webSocket(suburl) { safely { flow.collect { - send(converter(it)) + converter(it) ?.let { data -> + send(data) + } } } } @@ -32,10 +35,22 @@ fun Route.includeWebsocketHandling( suburl: String, flow: Flow, serializer: SerializationStrategy, - serialFormat: StandardKtorSerialFormat = standardKtorSerialFormat + serialFormat: StandardKtorSerialFormat = standardKtorSerialFormat, + filter: (suspend WebSocketServerSession.(T) -> Boolean)? = null ) = includeWebsocketHandling( suburl, - flow -) { - serialFormat.encodeDefault(serializer, it) -} + flow, + converter = if (filter == null) { + { + serialFormat.encodeDefault(serializer, it) + } + } else { + { + if (filter(it)) { + serialFormat.encodeDefault(serializer, it) + } else { + null + } + } + } +) diff --git a/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/ServerRoutingShortcuts.kt b/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/ServerRoutingShortcuts.kt index fac6c49af08..eec3976f64e 100644 --- a/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/ServerRoutingShortcuts.kt +++ b/ktor/server/src/jvmMain/kotlin/dev/inmo/micro_utils/ktor/server/ServerRoutingShortcuts.kt @@ -18,6 +18,7 @@ import io.ktor.util.asStream import io.ktor.util.cio.writeChannel import io.ktor.util.pipeline.PipelineContext import io.ktor.utils.io.core.* +import io.ktor.websocket.WebSocketServerSession import kotlinx.coroutines.flow.Flow import kotlinx.serialization.* import java.io.File @@ -30,8 +31,9 @@ class UnifiedRouter( fun Route.includeWebsocketHandling( suburl: String, flow: Flow, - serializer: SerializationStrategy - ) = includeWebsocketHandling(suburl, flow, serializer, serialFormat) + serializer: SerializationStrategy, + filter: (suspend WebSocketServerSession.(T) -> Boolean)? = null + ) = includeWebsocketHandling(suburl, flow, serializer, serialFormat, filter) suspend fun PipelineContext<*, ApplicationCall>.unianswer( answerSerializer: SerializationStrategy,