accumulated updates improvements

This commit is contained in:
InsanusMokrassar 2022-04-04 13:16:18 +06:00
parent 87ee2f280b
commit 4207d89c92
2 changed files with 59 additions and 48 deletions

View File

@ -7,6 +7,8 @@
* `MediaGroupContent` and all subsequent inheritors have been replaced to the package `dev.inmo.tgbotapi.types.message.content.media` * `MediaGroupContent` and all subsequent inheritors have been replaced to the package `dev.inmo.tgbotapi.types.message.content.media`
* `MediaGroupContent` Now extends `TextedMediaContent` instead of `MediaContent` * `MediaGroupContent` Now extends `TextedMediaContent` instead of `MediaContent`
* Add `reply` functions with the texted content with including of text * Add `reply` functions with the texted content with including of text
* `Utils`:
* Improve work with retrieving of accumulated updates
## 0.38.11 ## 0.38.11

View File

@ -15,6 +15,7 @@ import dev.inmo.tgbotapi.types.update.abstracts.Update
import dev.inmo.tgbotapi.updateshandlers.* import dev.inmo.tgbotapi.updateshandlers.*
import dev.inmo.tgbotapi.utils.* import dev.inmo.tgbotapi.utils.*
import io.ktor.client.features.HttpRequestTimeoutException import io.ktor.client.features.HttpRequestTimeoutException
import io.ktor.utils.io.CancellationException
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
@ -89,6 +90,29 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling(
updatesReceiver updatesReceiver
) )
/**
* @return [kotlinx.coroutines.flow.Flow] which will emit updates to the collector while they will be accumulated. Works
* the same as [longPollingFlow], but it will cancel the flow after the first one [HttpRequestTimeoutException]
*/
fun TelegramBot.createAccumulatedUpdatesRetrieverFlow(
avoidInlineQueries: Boolean = false,
avoidCallbackQueries: Boolean = false,
exceptionsHandler: ExceptionHandler<Unit>? = null,
allowedUpdates: List<String>? = null
): Flow<Update> = longPollingFlow(
timeoutSeconds = 0,
exceptionsHandler = {
if (it is HttpRequestTimeoutException) {
throw CancellationException("Cancel due to absence of new updates")
} else {
exceptionsHandler ?.invoke(it)
}
},
allowedUpdates = allowedUpdates
).filter {
!(it is InlineQueryUpdate && avoidInlineQueries || it is CallbackQueryUpdate && avoidCallbackQueries)
}
fun TelegramBot.retrieveAccumulatedUpdates( fun TelegramBot.retrieveAccumulatedUpdates(
avoidInlineQueries: Boolean = false, avoidInlineQueries: Boolean = false,
avoidCallbackQueries: Boolean = false, avoidCallbackQueries: Boolean = false,
@ -96,51 +120,15 @@ fun TelegramBot.retrieveAccumulatedUpdates(
exceptionsHandler: (ExceptionHandler<Unit>)? = null, exceptionsHandler: (ExceptionHandler<Unit>)? = null,
allowedUpdates: List<String>? = null, allowedUpdates: List<String>? = null,
updatesReceiver: UpdateReceiver<Update> updatesReceiver: UpdateReceiver<Update>
): Job = scope.launch { ): Job = createAccumulatedUpdatesRetrieverFlow(
safelyWithoutExceptions { avoidInlineQueries,
startGettingOfUpdatesByLongPolling( avoidCallbackQueries,
0, exceptionsHandler,
CoroutineScope(coroutineContext + SupervisorJob()), allowedUpdates
{ ).subscribeSafelyWithoutExceptions(
if (it is HttpRequestTimeoutException) { scope.LinkedSupervisorScope()
throw CancellationException("Cancel due to absence of new updates") ) {
} else { updatesReceiver(it)
exceptionsHandler ?.invoke(it)
}
},
allowedUpdates
) {
when {
it is InlineQueryUpdate && avoidInlineQueries ||
it is CallbackQueryUpdate && avoidCallbackQueries -> return@startGettingOfUpdatesByLongPolling
else -> updatesReceiver(it)
}
}.join()
}
}
/**
* @return [kotlinx.coroutines.flow.Flow] which will emit updates to the collector while they will be accumulated. Works
* the same as [retrieveAccumulatedUpdates], but pass [kotlinx.coroutines.flow.FlowCollector.emit] as a callback
*/
fun TelegramBot.createAccumulatedUpdatesRetrieverFlow(
avoidInlineQueries: Boolean = false,
avoidCallbackQueries: Boolean = false,
exceptionsHandler: ExceptionHandler<Unit>? = null,
allowedUpdates: List<String>? = null
): Flow<Update> = channelFlow {
val parentContext = kotlin.coroutines.coroutineContext
channel.apply {
retrieveAccumulatedUpdates(
avoidInlineQueries,
avoidCallbackQueries,
CoroutineScope(parentContext),
exceptionsHandler,
allowedUpdates,
::send
).join()
close()
}
} }
fun TelegramBot.retrieveAccumulatedUpdates( fun TelegramBot.retrieveAccumulatedUpdates(
@ -149,9 +137,30 @@ fun TelegramBot.retrieveAccumulatedUpdates(
avoidCallbackQueries: Boolean = false, avoidCallbackQueries: Boolean = false,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default), scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
exceptionsHandler: ExceptionHandler<Unit>? = null exceptionsHandler: ExceptionHandler<Unit>? = null
) = flowsUpdatesFilter.run { ) = retrieveAccumulatedUpdates(
retrieveAccumulatedUpdates(avoidInlineQueries, avoidCallbackQueries, scope, exceptionsHandler, allowedUpdates, asUpdateReceiver) avoidInlineQueries,
} avoidCallbackQueries,
scope,
exceptionsHandler,
flowsUpdatesFilter.allowedUpdates,
flowsUpdatesFilter.asUpdateReceiver
)
suspend fun TelegramBot.flushAccumulatedUpdates(
avoidInlineQueries: Boolean = false,
avoidCallbackQueries: Boolean = false,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
allowedUpdates: List<String>? = null,
exceptionsHandler: ExceptionHandler<Unit>? = null,
updatesReceiver: UpdateReceiver<Update> = {}
) = retrieveAccumulatedUpdates(
avoidInlineQueries,
avoidCallbackQueries,
scope,
exceptionsHandler,
allowedUpdates,
updatesReceiver
).join()
/** /**
* Will [startGettingOfUpdatesByLongPolling] using incoming [flowsUpdatesFilter]. It is assumed that you ALREADY CONFIGURE * Will [startGettingOfUpdatesByLongPolling] using incoming [flowsUpdatesFilter]. It is assumed that you ALREADY CONFIGURE