mirror of
https://github.com/InsanusMokrassar/MicroUtils.git
synced 2024-11-29 21:48:45 +00:00
Compare commits
No commits in common. "9b9e7dd88f9ae9bf17884992dd714f5f420ec8f9" and "fba84c8ac87b518dc0b8f473e6fd1c533ad00a07" have entirely different histories.
9b9e7dd88f
...
fba84c8ac8
@ -1,18 +1,14 @@
|
||||
package dev.inmo.micro_utils.ktor.client
|
||||
|
||||
import dev.inmo.micro_utils.coroutines.runCatchingSafely
|
||||
import dev.inmo.micro_utils.coroutines.safely
|
||||
import dev.inmo.micro_utils.ktor.common.*
|
||||
import io.ktor.client.HttpClient
|
||||
import io.ktor.client.plugins.pluginOrNull
|
||||
import io.ktor.client.plugins.websocket.WebSockets
|
||||
import io.ktor.client.plugins.websocket.ws
|
||||
import io.ktor.client.request.HttpRequestBuilder
|
||||
import io.ktor.websocket.Frame
|
||||
import io.ktor.websocket.readBytes
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.channelFlow
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.serialization.DeserializationStrategy
|
||||
|
||||
/**
|
||||
@ -21,41 +17,43 @@ import kotlinx.serialization.DeserializationStrategy
|
||||
*/
|
||||
inline fun <T> HttpClient.createStandardWebsocketFlow(
|
||||
url: String,
|
||||
crossinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
|
||||
crossinline checkReconnection: (Throwable?) -> Boolean = { true },
|
||||
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {},
|
||||
crossinline conversation: suspend (StandardKtorSerialInputData) -> T
|
||||
): Flow<T> {
|
||||
pluginOrNull(WebSockets) ?: error("Plugin $WebSockets must be installed for using createStandardWebsocketFlow")
|
||||
|
||||
val correctedUrl = url.asCorrectWebSocketUrl
|
||||
|
||||
return channelFlow {
|
||||
val producerScope = this@channelFlow
|
||||
do {
|
||||
val reconnect = runCatchingSafely {
|
||||
val reconnect = try {
|
||||
safely {
|
||||
ws(correctedUrl, requestBuilder) {
|
||||
for (received in incoming) {
|
||||
when (received) {
|
||||
is Frame.Binary -> send(conversation(received.data))
|
||||
is Frame.Binary -> producerScope.send(conversation(received.readBytes()))
|
||||
else -> {
|
||||
close()
|
||||
producerScope.close()
|
||||
return@ws
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
checkReconnection(null)
|
||||
}.getOrElse { e ->
|
||||
} catch (e: Throwable) {
|
||||
checkReconnection(e).also {
|
||||
if (!it) {
|
||||
close(e)
|
||||
producerScope.close(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (reconnect && isActive)
|
||||
|
||||
if (isActive) {
|
||||
safely {
|
||||
close()
|
||||
} while (reconnect)
|
||||
if (!producerScope.isClosedForSend) {
|
||||
safely(
|
||||
{ it.printStackTrace() }
|
||||
) {
|
||||
producerScope.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -67,8 +65,8 @@ inline fun <T> HttpClient.createStandardWebsocketFlow(
|
||||
*/
|
||||
inline fun <T> HttpClient.createStandardWebsocketFlow(
|
||||
url: String,
|
||||
crossinline checkReconnection: (Throwable?) -> Boolean = { true },
|
||||
deserializer: DeserializationStrategy<T>,
|
||||
crossinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
|
||||
serialFormat: StandardKtorSerialFormat = standardKtorSerialFormat,
|
||||
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {},
|
||||
) = createStandardWebsocketFlow(
|
||||
|
@ -87,10 +87,10 @@ class UnifiedRequester(
|
||||
|
||||
fun <T> createStandardWebsocketFlow(
|
||||
url: String,
|
||||
checkReconnection: suspend (Throwable?) -> Boolean,
|
||||
checkReconnection: (Throwable?) -> Boolean,
|
||||
deserializer: DeserializationStrategy<T>,
|
||||
requestBuilder: HttpRequestBuilder.() -> Unit = {},
|
||||
) = client.createStandardWebsocketFlow(url, deserializer, checkReconnection, serialFormat, requestBuilder)
|
||||
) = client.createStandardWebsocketFlow(url, checkReconnection, deserializer, serialFormat, requestBuilder)
|
||||
|
||||
fun <T> createStandardWebsocketFlow(
|
||||
url: String,
|
||||
|
@ -15,13 +15,13 @@ import kotlinx.serialization.SerializationStrategy
|
||||
fun <T> Route.includeWebsocketHandling(
|
||||
suburl: String,
|
||||
flow: Flow<T>,
|
||||
protocol: URLProtocol? = null,
|
||||
protocol: URLProtocol = URLProtocol.WS,
|
||||
converter: suspend WebSocketServerSession.(T) -> StandardKtorSerialInputData?
|
||||
) {
|
||||
application.apply {
|
||||
pluginOrNull(WebSockets) ?: install(WebSockets)
|
||||
}
|
||||
webSocket(suburl, protocol ?.name) {
|
||||
webSocket(suburl, protocol.name) {
|
||||
safely {
|
||||
flow.collect {
|
||||
converter(it) ?.let { data ->
|
||||
@ -37,7 +37,7 @@ fun <T> Route.includeWebsocketHandling(
|
||||
flow: Flow<T>,
|
||||
serializer: SerializationStrategy<T>,
|
||||
serialFormat: StandardKtorSerialFormat = standardKtorSerialFormat,
|
||||
protocol: URLProtocol? = null,
|
||||
protocol: URLProtocol = URLProtocol.WS,
|
||||
filter: (suspend WebSocketServerSession.(T) -> Boolean)? = null
|
||||
) = includeWebsocketHandling(
|
||||
suburl,
|
||||
|
@ -27,7 +27,7 @@ class UnifiedRouter(
|
||||
suburl: String,
|
||||
flow: Flow<T>,
|
||||
serializer: SerializationStrategy<T>,
|
||||
protocol: URLProtocol? = null,
|
||||
protocol: URLProtocol = URLProtocol.WS,
|
||||
filter: (suspend WebSocketServerSession.(T) -> Boolean)? = null
|
||||
) = includeWebsocketHandling(suburl, flow, serializer, serialFormat, protocol, filter)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user