add opportunity for reconnection for websockets

This commit is contained in:
InsanusMokrassar 2020-08-11 13:12:18 +06:00
parent 13b6dad2fb
commit 8b74e8534c
2 changed files with 41 additions and 21 deletions

View File

@ -4,41 +4,60 @@ import com.insanusmokrassar.postssystem.ktor.*
import com.insanusmokrassar.postssystem.utils.common.safely
import io.ktor.client.HttpClient
import io.ktor.client.features.websocket.ws
import io.ktor.client.request.url
import io.ktor.http.HttpMethod
import io.ktor.http.cio.websocket.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.isActive
/**
* @param checkReconnection This lambda will be called when it is required to reconnect to websocket to establish
* connection. Must return true in case if must be reconnected. By default always reconnecting
*/
inline fun <reified T> createStandardWebsocketFlow(
client: HttpClient,
url: String,
crossinline checkReconnection: (Throwable?) -> Boolean = { true },
crossinline conversation: suspend (ByteArray) -> T
): Flow<T> {
val correctedUrl = url.asCorrectWebSocketUrl
return channelFlow {
val producerScope = this
safely(
{
producerScope.close()
throw it
}
) {
client.ws(
correctedUrl
) {
while (true) {
when (val received = incoming.receive()) {
is Frame.Binary -> producerScope.send(
conversation(received.readBytes())
)
else -> {
producerScope.close()
return@ws
do {
val reconnect = try {
safely(
{
throw it
}
) {
client.ws(
correctedUrl
) {
while (true) {
when (val received = incoming.receive()) {
is Frame.Binary -> producerScope.send(
conversation(received.readBytes())
)
else -> {
producerScope.close()
return@ws
}
}
}
}
}
checkReconnection(null)
} catch (e: Throwable) {
checkReconnection(e).also {
if (!it) {
producerScope.close(e)
}
}
}
} while (reconnect)
if (!producerScope.isClosedForSend) {
safely(
{ /* do nothing */ }
) {
producerScope.close()
}
}
}

View File

@ -47,7 +47,8 @@ class WebsocketsTest {
}
val incomingWebsocketFlow = createStandardWebsocketFlow(
client,
"$serverUrl/$suburl"
"$serverUrl/$suburl",
{ false } // always skip reconnection
) {
standardKtorSerializer.load(Int.serializer(), it)
}