Compare commits

...

3 Commits

Author SHA1 Message Date
1c52e04cdb
Merge pull request #164 from InsanusMokrassar/0.11.3
0.11.3
2022-06-14 23:59:49 +06:00
798128256e fixes in websockets 2022-06-14 23:48:10 +06:00
72c2df47fd start 0.11.3 2022-06-14 19:35:17 +06:00
3 changed files with 73 additions and 18 deletions

View File

@ -1,5 +1,12 @@
# Changelog
## 0.11.3
* `Ktor`:
* Support of `WebSockets` has been improved
* `Client`:
* New extensions: `HttpClient#openBaseWebSocketFlow`, `HttpClient#openWebSocketFlow`, `HttpClient#openSecureWebSocketFlow`
## 0.11.2
* `Ktor`:

View File

@ -14,5 +14,5 @@ crypto_js_version=4.1.1
# Project data
group=dev.inmo
version=0.11.2
android_code_version=126
version=0.11.3
android_code_version=127

View File

@ -1,5 +1,6 @@
package dev.inmo.micro_utils.ktor.client
import dev.inmo.micro_utils.common.Warning
import dev.inmo.micro_utils.coroutines.runCatchingSafely
import dev.inmo.micro_utils.coroutines.safely
import dev.inmo.micro_utils.ktor.common.*
@ -7,35 +8,25 @@ import io.ktor.client.HttpClient
import io.ktor.client.plugins.pluginOrNull
import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.HttpRequestBuilder
import io.ktor.websocket.Frame
import io.ktor.websocket.readBytes
import io.ktor.websocket.serialization.sendSerializedBase
import io.ktor.http.URLProtocol
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.isActive
import kotlinx.serialization.DeserializationStrategy
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.createStandardWebsocketFlow(
url: String,
@Warning("This feature is internal and should not be used directly. It is can be changed without any notification and warranty on compile-time or other guaranties")
inline fun <reified T : Any> openBaseWebSocketFlow(
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
noinline webSocketSessionRequest: suspend SendChannel<T>.() -> Unit
): Flow<T> {
pluginOrNull(WebSockets) ?: error("Plugin $WebSockets must be installed for using createStandardWebsocketFlow")
val correctedUrl = url.asCorrectWebSocketUrl
return channelFlow {
do {
val reconnect = runCatchingSafely {
ws(correctedUrl, requestBuilder) {
while (isActive) {
send(receiveDeserialized<T>())
}
}
webSocketSessionRequest()
checkReconnection(null)
}.getOrElse { e ->
checkReconnection(e).also {
@ -53,3 +44,60 @@ inline fun <reified T : Any> HttpClient.createStandardWebsocketFlow(
}
}
}
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.openWebSocketFlow(
url: String,
useSecureConnection: Boolean,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> {
pluginOrNull(WebSockets) ?: error("Plugin $WebSockets must be installed for using createStandardWebsocketFlow")
return openBaseWebSocketFlow<T>(checkReconnection) {
val block: suspend DefaultClientWebSocketSession.() -> Unit = {
while (isActive) {
send(receiveDeserialized<T>())
}
}
if (useSecureConnection) {
wss(url, requestBuilder, block)
} else {
ws(url, requestBuilder, block)
}
}
}
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.openWebSocketFlow(
url: String,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> = openWebSocketFlow(url, false, checkReconnection, requestBuilder)
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.openSecureWebSocketFlow(
url: String,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> = openWebSocketFlow(url, true, checkReconnection, requestBuilder)
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.createStandardWebsocketFlow(
url: String,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> = openWebSocketFlow(url, checkReconnection, requestBuilder)