improvements in includeWebsocketHandling

This commit is contained in:
InsanusMokrassar 2022-03-04 15:31:32 +06:00
parent 771aed0f0f
commit 14ebe01fc6
3 changed files with 29 additions and 9 deletions

View File

@ -5,6 +5,9 @@
* Ktor: * Ktor:
* Client: * Client:
* New function `UnifiedRequester#createStandardWebsocketFlow` without `checkReconnection` arg * New function `UnifiedRequester#createStandardWebsocketFlow` without `checkReconnection` arg
* Server:
* Now it is possible to filter data in `Route#includeWebsocketHandling`
* Callback in `Route#includeWebsocketHandling` is `suspend` since now
## 0.9.9 ## 0.9.9

View File

@ -7,6 +7,7 @@ import io.ktor.application.install
import io.ktor.http.cio.websocket.* import io.ktor.http.cio.websocket.*
import io.ktor.routing.Route import io.ktor.routing.Route
import io.ktor.routing.application import io.ktor.routing.application
import io.ktor.websocket.WebSocketServerSession
import io.ktor.websocket.webSocket import io.ktor.websocket.webSocket
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.serialization.SerializationStrategy import kotlinx.serialization.SerializationStrategy
@ -14,7 +15,7 @@ import kotlinx.serialization.SerializationStrategy
fun <T> Route.includeWebsocketHandling( fun <T> Route.includeWebsocketHandling(
suburl: String, suburl: String,
flow: Flow<T>, flow: Flow<T>,
converter: (T) -> StandardKtorSerialInputData converter: suspend WebSocketServerSession.(T) -> StandardKtorSerialInputData?
) { ) {
application.apply { application.apply {
featureOrNull(io.ktor.websocket.WebSockets) ?: install(io.ktor.websocket.WebSockets) featureOrNull(io.ktor.websocket.WebSockets) ?: install(io.ktor.websocket.WebSockets)
@ -22,7 +23,9 @@ fun <T> Route.includeWebsocketHandling(
webSocket(suburl) { webSocket(suburl) {
safely { safely {
flow.collect { flow.collect {
send(converter(it)) converter(it) ?.let { data ->
send(data)
}
} }
} }
} }
@ -32,10 +35,22 @@ fun <T> Route.includeWebsocketHandling(
suburl: String, suburl: String,
flow: Flow<T>, flow: Flow<T>,
serializer: SerializationStrategy<T>, serializer: SerializationStrategy<T>,
serialFormat: StandardKtorSerialFormat = standardKtorSerialFormat serialFormat: StandardKtorSerialFormat = standardKtorSerialFormat,
filter: (suspend WebSocketServerSession.(T) -> Boolean)? = null
) = includeWebsocketHandling( ) = includeWebsocketHandling(
suburl, suburl,
flow flow,
) { converter = if (filter == null) {
serialFormat.encodeDefault(serializer, it) {
} serialFormat.encodeDefault(serializer, it)
}
} else {
{
if (filter(it)) {
serialFormat.encodeDefault(serializer, it)
} else {
null
}
}
}
)

View File

@ -18,6 +18,7 @@ import io.ktor.util.asStream
import io.ktor.util.cio.writeChannel import io.ktor.util.cio.writeChannel
import io.ktor.util.pipeline.PipelineContext import io.ktor.util.pipeline.PipelineContext
import io.ktor.utils.io.core.* import io.ktor.utils.io.core.*
import io.ktor.websocket.WebSocketServerSession
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.serialization.* import kotlinx.serialization.*
import java.io.File import java.io.File
@ -30,8 +31,9 @@ class UnifiedRouter(
fun <T> Route.includeWebsocketHandling( fun <T> Route.includeWebsocketHandling(
suburl: String, suburl: String,
flow: Flow<T>, flow: Flow<T>,
serializer: SerializationStrategy<T> serializer: SerializationStrategy<T>,
) = includeWebsocketHandling(suburl, flow, serializer, serialFormat) filter: (suspend WebSocketServerSession.(T) -> Boolean)? = null
) = includeWebsocketHandling(suburl, flow, serializer, serialFormat, filter)
suspend fun <T> PipelineContext<*, ApplicationCall>.unianswer( suspend fun <T> PipelineContext<*, ApplicationCall>.unianswer(
answerSerializer: SerializationStrategy<T>, answerSerializer: SerializationStrategy<T>,