Compare commits

..

6 Commits

Author SHA1 Message Date
798128256e fixes in websockets 2022-06-14 23:48:10 +06:00
72c2df47fd start 0.11.3 2022-06-14 19:35:17 +06:00
c9c6d4c0c1 Merge pull request #163 from InsanusMokrassar/0.11.2
0.11.2
2022-06-13 23:21:14 +06:00
2f4f9f3003 fixes 2022-06-13 23:20:25 +06:00
22e8f8e5d6 start 0.11.2 2022-06-13 22:19:33 +06:00
04cf8c3d9a Merge pull request #162 from InsanusMokrassar/0.11.1
0.11.1
2022-06-12 16:32:05 +06:00
5 changed files with 82 additions and 20 deletions

View File

@@ -1,5 +1,17 @@
# Changelog # Changelog
## 0.11.3
* `Ktor`:
* Support of `WebSockets` has been improved
* `Client`:
* New extensions: `HttpClient#openBaseWebSocketFlow`, `HttpClient#openWebSocketFlow`, `HttpClient#openSecureWebSocketFlow`
## 0.11.2
* `Ktor`:
* Support of `WebSockets` has been improved and added fixes inside of clients
## 0.11.1 ## 0.11.1
* `Repos` * `Repos`

View File

@@ -14,5 +14,5 @@ crypto_js_version=4.1.1
# Project data # Project data
group=dev.inmo group=dev.inmo
version=0.11.1 version=0.11.3
android_code_version=125 android_code_version=127

View File

@@ -1,5 +1,6 @@
package dev.inmo.micro_utils.ktor.client package dev.inmo.micro_utils.ktor.client
import dev.inmo.micro_utils.common.Warning
import dev.inmo.micro_utils.coroutines.runCatchingSafely import dev.inmo.micro_utils.coroutines.runCatchingSafely
import dev.inmo.micro_utils.coroutines.safely import dev.inmo.micro_utils.coroutines.safely
import dev.inmo.micro_utils.ktor.common.* import dev.inmo.micro_utils.ktor.common.*
@@ -7,35 +8,25 @@ import io.ktor.client.HttpClient
import io.ktor.client.plugins.pluginOrNull import io.ktor.client.plugins.pluginOrNull
import io.ktor.client.plugins.websocket.* import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.HttpRequestBuilder import io.ktor.client.request.HttpRequestBuilder
import io.ktor.websocket.Frame import io.ktor.http.URLProtocol
import io.ktor.websocket.readBytes import kotlinx.coroutines.channels.SendChannel
import io.ktor.websocket.serialization.sendSerializedBase
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.serialization.DeserializationStrategy
/** /**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish * @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting * connection. Must return true in case if must be reconnected. By default always reconnecting
*/ */
inline fun <reified T> HttpClient.createStandardWebsocketFlow( @Warning("This feature is internal and should not be used directly. It is can be changed without any notification and warranty on compile-time or other guaranties")
url: String, inline fun <reified T : Any> openBaseWebSocketFlow(
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true }, noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {} noinline webSocketSessionRequest: suspend SendChannel<T>.() -> Unit
): Flow<T> { ): Flow<T> {
pluginOrNull(WebSockets) ?: error("Plugin $WebSockets must be installed for using createStandardWebsocketFlow")
val correctedUrl = url.asCorrectWebSocketUrl
return channelFlow { return channelFlow {
do { do {
val reconnect = runCatchingSafely { val reconnect = runCatchingSafely {
ws(correctedUrl, requestBuilder) { webSocketSessionRequest()
for (received in incoming) {
sendSerialized(received.data)
}
}
checkReconnection(null) checkReconnection(null)
}.getOrElse { e -> }.getOrElse { e ->
checkReconnection(e).also { checkReconnection(e).also {
@@ -53,3 +44,60 @@ inline fun <reified T> HttpClient.createStandardWebsocketFlow(
} }
} }
} }
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.openWebSocketFlow(
url: String,
useSecureConnection: Boolean,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> {
pluginOrNull(WebSockets) ?: error("Plugin $WebSockets must be installed for using createStandardWebsocketFlow")
return openBaseWebSocketFlow<T>(checkReconnection) {
val block: suspend DefaultClientWebSocketSession.() -> Unit = {
while (isActive) {
send(receiveDeserialized<T>())
}
}
if (useSecureConnection) {
wss(url, requestBuilder, block)
} else {
ws(url, requestBuilder, block)
}
}
}
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.openWebSocketFlow(
url: String,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> = openWebSocketFlow(url, false, checkReconnection, requestBuilder)
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.openSecureWebSocketFlow(
url: String,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> = openWebSocketFlow(url, true, checkReconnection, requestBuilder)
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T : Any> HttpClient.createStandardWebsocketFlow(
url: String,
noinline checkReconnection: suspend (Throwable?) -> Boolean = { true },
noinline requestBuilder: HttpRequestBuilder.() -> Unit = {}
): Flow<T> = openWebSocketFlow(url, checkReconnection, requestBuilder)

View File

@@ -12,6 +12,7 @@ import io.ktor.websocket.send
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.serialization.SerializationStrategy import kotlinx.serialization.SerializationStrategy
@Deprecated("This method will be removed soon")
fun <T> Route.includeWebsocketHandling( fun <T> Route.includeWebsocketHandling(
suburl: String, suburl: String,
flow: Flow<T>, flow: Flow<T>,

View File

@@ -15,7 +15,8 @@ import kotlinx.serialization.SerializationStrategy
inline fun <reified T : Any> Route.includeWebsocketHandling( inline fun <reified T : Any> Route.includeWebsocketHandling(
suburl: String, suburl: String,
flow: Flow<T>, flow: Flow<T>,
protocol: URLProtocol? = null protocol: URLProtocol? = null,
noinline dataMapper: suspend WebSocketServerSession.(T) -> T? = { it }
) { ) {
application.apply { application.apply {
pluginOrNull(WebSockets) ?: install(WebSockets) pluginOrNull(WebSockets) ?: install(WebSockets)
@@ -23,7 +24,7 @@ inline fun <reified T : Any> Route.includeWebsocketHandling(
webSocket(suburl, protocol ?.name) { webSocket(suburl, protocol ?.name) {
safely { safely {
flow.collect { flow.collect {
sendSerialized(it) sendSerialized(dataMapper(it) ?: return@collect)
} }
} }
} }