fixes in websockets

This commit is contained in:
InsanusMokrassar 2022-06-14 23:48:10 +06:00
parent 72c2df47fd
commit 798128256e
2 changed files with 69 additions and 16 deletions

View File

@ -2,6 +2,11 @@
## 0.11.3 ## 0.11.3
* `Ktor`:
* Support of `WebSockets` has been improved
* `Client`:
* New extensions: `HttpClient#openBaseWebSocketFlow`, `HttpClient#openWebSocketFlow`, `HttpClient#openSecureWebSocketFlow`
## 0.11.2 ## 0.11.2
* `Ktor`: * `Ktor`:

View File

@ -1,5 +1,6 @@
package dev.inmo.micro_utils.ktor.client package dev.inmo.micro_utils.ktor.client
import dev.inmo.micro_utils.common.Warning
import dev.inmo.micro_utils.coroutines.runCatchingSafely import dev.inmo.micro_utils.coroutines.runCatchingSafely
import dev.inmo.micro_utils.coroutines.safely import dev.inmo.micro_utils.coroutines.safely
import dev.inmo.micro_utils.ktor.common.* import dev.inmo.micro_utils.ktor.common.*
@ -7,35 +8,25 @@ import io.ktor.client.HttpClient
import io.ktor.client.plugins.pluginOrNull import io.ktor.client.plugins.pluginOrNull
import io.ktor.client.plugins.websocket.* import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.HttpRequestBuilder import io.ktor.client.request.HttpRequestBuilder
import io.ktor.websocket.Frame import io.ktor.http.URLProtocol
import io.ktor.websocket.readBytes import kotlinx.coroutines.channels.SendChannel
import io.ktor.websocket.serialization.sendSerializedBase
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.serialization.DeserializationStrategy
/** /**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting * connection. Must return true in case if must be reconnected. By default always reconnecting
*/ */
inline fun <reified T : Any> HttpClient.createStandardWebsocketFlow( @Warning("This feature is internal and should not be used directly. It is can be changed without any notification and warranty on compile-time or other guaranties")
url: String, inline fun <reified T : Any> openBaseWebSocketFlow(
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} noinline webSocketSessionRequest: suspend SendChannel<T>.() -> Unit
): Flow<T> { ): Flow<T> {
pluginOrNull(WebSockets) ?: error("Plugin $WebSockets must be installed for using createStandardWebsocketFlow")
val correctedUrl = url.asCorrectWebSocketUrl
return channelFlow { return channelFlow {
do { do {
val reconnect = runCatchingSafely { val reconnect = runCatchingSafely {
ws(correctedUrl, requestBuilder) { webSocketSessionRequest()
while (isActive) {
send(receiveDeserialized<T>())
}
}
checkReconnection(null) checkReconnection(null)
}.getOrElse { e -> }.getOrElse { e ->
checkReconnection(e).also { checkReconnection(e).also {
@ -53,3 +44,60 @@ inline fun <reified T : Any> HttpClient.createStandardWebsocketFlow(
} }
} }
} }
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.openWebSocketFlow(
url: String,
useSecureConnection: Boolean,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> {
pluginOrNull(WebSockets) ?: error("Plugin $WebSockets must be installed for using createStandardWebsocketFlow")
return openBaseWebSocketFlow<T>(checkReconnection) {
val block: suspend DefaultClientWebSocketSession.() -> Unit = {
while (isActive) {
send(receiveDeserialized<T>())
}
}
if (useSecureConnection) {
wss(url, requestBuilder, block)
} else {
ws(url, requestBuilder, block)
}
}
}
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.openWebSocketFlow(
url: String,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> = openWebSocketFlow(url, false, checkReconnection, requestBuilder)
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.openSecureWebSocketFlow(
url: String,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> = openWebSocketFlow(url, true, checkReconnection, requestBuilder)
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.createStandardWebsocketFlow(
url: String,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> = openWebSocketFlow(url, checkReconnection, requestBuilder)