MicroUtils/ktor/client/src/commonMain/kotlin/dev/inmo/micro_utils/ktor/client/NewFlowsWebsockets.kt

104 lines
4.1 KiB
Kotlin
Raw Normal View History

2022-06-03 06:22:51 +00:00
package dev.inmo.micro_utils.ktor.client
2022-06-14 17:48:10 +00:00
import dev.inmo.micro_utils.common.Warning
2022-06-03 06:22:51 +00:00
import dev.inmo.micro_utils.coroutines.runCatchingSafely
import dev.inmo.micro_utils.coroutines.safely
import dev.inmo.micro_utils.ktor.common.*
import io.ktor.client.HttpClient
import io.ktor.client.plugins.pluginOrNull
import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.HttpRequestBuilder
2022-06-14 17:48:10 +00:00
import io.ktor.http.URLProtocol
import kotlinx.coroutines.channels.SendChannel
2022-06-03 06:22:51 +00:00
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.isActive
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
2022-06-14 17:48:10 +00:00
@Warning("This feature is internal and should not be used directly. It is can be changed without any notification and warranty on compile-time or other guaranties")
inline fun <T : Any> openBaseWebSocketFlow(
2022-06-03 11:52:58 +00:00
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
2022-06-14 17:48:10 +00:00
noinline webSocketSessionRequest: suspend SendChannel<T>.() -> Unit
2022-06-03 06:22:51 +00:00
): Flow<T> {
return channelFlow {
do {
val reconnect = runCatchingSafely {
2022-06-14 17:48:10 +00:00
webSocketSessionRequest()
2022-06-03 06:22:51 +00:00
checkReconnection(null)
}.getOrElse { e ->
checkReconnection(e).also {
if (!it) {
close(e)
}
}
}
} while (reconnect && isActive)
if (isActive) {
safely {
close()
}
}
}
}
2022-06-14 17:48:10 +00:00
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.openWebSocketFlow(
url: String,
useSecureConnection: Boolean,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> {
pluginOrNull(WebSockets) ?: error("Plugin $WebSockets must be installed for using createStandardWebsocketFlow")
return openBaseWebSocketFlow(checkReconnection) {
2022-06-14 17:48:10 +00:00
val block: suspend DefaultClientWebSocketSession.() -> Unit = {
while (isActive) {
send(receiveDeserialized<T>())
}
}
if (useSecureConnection) {
wss(url, requestBuilder, block)
} else {
ws(url, requestBuilder, block)
}
}
}
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.openWebSocketFlow(
url: String,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> = openWebSocketFlow(url, false, checkReconnection, requestBuilder)
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.openSecureWebSocketFlow(
url: String,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> = openWebSocketFlow(url, true, checkReconnection, requestBuilder)
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.createStandardWebsocketFlow(
url: String,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> = openWebSocketFlow(url, checkReconnection, requestBuilder)