extend functionality of websockets

This commit is contained in:
InsanusMokrassar 2022-03-04 20:16:59 +06:00
parent 14ebe01fc6
commit 88854020ac
5 changed files with 24 additions and 14 deletions

View File

@ -7,7 +7,8 @@
* New function `UnifiedRequester#createStandardWebsocketFlow` without `checkReconnection` arg * New function `UnifiedRequester#createStandardWebsocketFlow` without `checkReconnection` arg
* Server: * Server:
* Now it is possible to filter data in `Route#includeWebsocketHandling` * Now it is possible to filter data in `Route#includeWebsocketHandling`
* Callback in `Route#includeWebsocketHandling` is `suspend` since now * Callback in `Route#includeWebsocketHandling` and dependent methods is `suspend` since now
* Add `URLProtocol` support in `Route#includeWebsocketHandling` and dependent methods
## 0.9.9 ## 0.9.9

View File

@ -4,6 +4,7 @@ import dev.inmo.micro_utils.coroutines.safely
import dev.inmo.micro_utils.ktor.common.* import dev.inmo.micro_utils.ktor.common.*
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import io.ktor.client.features.websocket.ws import io.ktor.client.features.websocket.ws
import io.ktor.client.request.HttpRequestBuilder
import io.ktor.http.cio.websocket.Frame import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.readBytes import io.ktor.http.cio.websocket.readBytes
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
@ -17,6 +18,7 @@ import kotlinx.serialization.DeserializationStrategy
inline fun <T> HttpClient.createStandardWebsocketFlow( inline fun <T> HttpClient.createStandardWebsocketFlow(
url: String, url: String,
crossinline checkReconnection: (Throwable?) -> Boolean = { true }, crossinline checkReconnection: (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {},
crossinline conversation: suspend (StandardKtorSerialInputData) -> T crossinline conversation: suspend (StandardKtorSerialInputData) -> T
): Flow<T> { ): Flow<T> {
val correctedUrl = url.asCorrectWebSocketUrl val correctedUrl = url.asCorrectWebSocketUrl
@ -26,7 +28,7 @@ inline fun <T> HttpClient.createStandardWebsocketFlow(
do { do {
val reconnect = try { val reconnect = try {
safely { safely {
ws(correctedUrl) { ws(correctedUrl, requestBuilder) {
for (received in incoming) { for (received in incoming) {
when (received) { when (received) {
is Frame.Binary -> producerScope.send(conversation(received.readBytes())) is Frame.Binary -> producerScope.send(conversation(received.readBytes()))
@ -65,10 +67,12 @@ inline fun <T> HttpClient.createStandardWebsocketFlow(
url: String, url: String,
crossinline checkReconnection: (Throwable?) -> Boolean = { true }, crossinline checkReconnection: (Throwable?) -> Boolean = { true },
deserializer: DeserializationStrategy<T>, deserializer: DeserializationStrategy<T>,
serialFormat: StandardKtorSerialFormat = standardKtorSerialFormat serialFormat: StandardKtorSerialFormat = standardKtorSerialFormat,
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {},
) = createStandardWebsocketFlow( ) = createStandardWebsocketFlow(
url, url,
checkReconnection checkReconnection,
requestBuilder
) { ) {
serialFormat.decodeDefault(deserializer, it) serialFormat.decodeDefault(deserializer, it)
} }

View File

@ -86,13 +86,15 @@ class UnifiedRequester(
fun <T> createStandardWebsocketFlow( fun <T> createStandardWebsocketFlow(
url: String, url: String,
checkReconnection: (Throwable?) -> Boolean, checkReconnection: (Throwable?) -> Boolean,
deserializer: DeserializationStrategy<T> deserializer: DeserializationStrategy<T>,
) = client.createStandardWebsocketFlow(url, checkReconnection, deserializer, serialFormat) requestBuilder: HttpRequestBuilder.() -> Unit = {},
) = client.createStandardWebsocketFlow(url, checkReconnection, deserializer, serialFormat, requestBuilder)
fun <T> createStandardWebsocketFlow( fun <T> createStandardWebsocketFlow(
url: String, url: String,
deserializer: DeserializationStrategy<T> deserializer: DeserializationStrategy<T>,
) = createStandardWebsocketFlow(url, { true }, deserializer) requestBuilder: HttpRequestBuilder.() -> Unit = {},
) = createStandardWebsocketFlow(url, { true }, deserializer, requestBuilder)
} }
val defaultRequester = UnifiedRequester() val defaultRequester = UnifiedRequester()

View File

@ -4,23 +4,24 @@ import dev.inmo.micro_utils.coroutines.safely
import dev.inmo.micro_utils.ktor.common.* import dev.inmo.micro_utils.ktor.common.*
import io.ktor.application.featureOrNull import io.ktor.application.featureOrNull
import io.ktor.application.install import io.ktor.application.install
import io.ktor.http.URLProtocol
import io.ktor.http.cio.websocket.* import io.ktor.http.cio.websocket.*
import io.ktor.routing.Route import io.ktor.routing.Route
import io.ktor.routing.application import io.ktor.routing.application
import io.ktor.websocket.WebSocketServerSession import io.ktor.websocket.*
import io.ktor.websocket.webSocket
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.serialization.SerializationStrategy import kotlinx.serialization.SerializationStrategy
fun <T> Route.includeWebsocketHandling( fun <T> Route.includeWebsocketHandling(
suburl: String, suburl: String,
flow: Flow<T>, flow: Flow<T>,
protocol: URLProtocol = URLProtocol.WS,
converter: suspend WebSocketServerSession.(T) -> StandardKtorSerialInputData? converter: suspend WebSocketServerSession.(T) -> StandardKtorSerialInputData?
) { ) {
application.apply { application.apply {
featureOrNull(io.ktor.websocket.WebSockets) ?: install(io.ktor.websocket.WebSockets) featureOrNull(io.ktor.websocket.WebSockets) ?: install(io.ktor.websocket.WebSockets)
} }
webSocket(suburl) { webSocket(suburl, protocol.name) {
safely { safely {
flow.collect { flow.collect {
converter(it) ?.let { data -> converter(it) ?.let { data ->
@ -36,10 +37,12 @@ fun <T> Route.includeWebsocketHandling(
flow: Flow<T>, flow: Flow<T>,
serializer: SerializationStrategy<T>, serializer: SerializationStrategy<T>,
serialFormat: StandardKtorSerialFormat = standardKtorSerialFormat, serialFormat: StandardKtorSerialFormat = standardKtorSerialFormat,
protocol: URLProtocol = URLProtocol.WS,
filter: (suspend WebSocketServerSession.(T) -> Boolean)? = null filter: (suspend WebSocketServerSession.(T) -> Boolean)? = null
) = includeWebsocketHandling( ) = includeWebsocketHandling(
suburl, suburl,
flow, flow,
protocol,
converter = if (filter == null) { converter = if (filter == null) {
{ {
serialFormat.encodeDefault(serializer, it) serialFormat.encodeDefault(serializer, it)

View File

@ -5,8 +5,7 @@ import dev.inmo.micro_utils.coroutines.safely
import dev.inmo.micro_utils.ktor.common.* import dev.inmo.micro_utils.ktor.common.*
import io.ktor.application.ApplicationCall import io.ktor.application.ApplicationCall
import io.ktor.application.call import io.ktor.application.call
import io.ktor.http.ContentType import io.ktor.http.*
import io.ktor.http.HttpStatusCode
import io.ktor.http.content.PartData import io.ktor.http.content.PartData
import io.ktor.http.content.forEachPart import io.ktor.http.content.forEachPart
import io.ktor.request.receive import io.ktor.request.receive
@ -32,8 +31,9 @@ class UnifiedRouter(
suburl: String, suburl: String,
flow: Flow<T>, flow: Flow<T>,
serializer: SerializationStrategy<T>, serializer: SerializationStrategy<T>,
protocol: URLProtocol = URLProtocol.WS,
filter: (suspend WebSocketServerSession.(T) -> Boolean)? = null filter: (suspend WebSocketServerSession.(T) -> Boolean)? = null
) = includeWebsocketHandling(suburl, flow, serializer, serialFormat, filter) ) = includeWebsocketHandling(suburl, flow, serializer, serialFormat, protocol, filter)
suspend fun <T> PipelineContext<*, ApplicationCall>.unianswer( suspend fun <T> PipelineContext<*, ApplicationCall>.unianswer(
answerSerializer: SerializationStrategy<T>, answerSerializer: SerializationStrategy<T>,