From 4207d89c92572c07d124acf1a9b45ed5a268fc77 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Mon, 4 Apr 2022 13:16:18 +0600 Subject: [PATCH] accumulated updates improvements --- CHANGELOG.md | 2 + .../utils/updates/retrieving/LongPolling.kt | 105 ++++++++++-------- 2 files changed, 59 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f212b39e1c..7659f30c43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ * `MediaGroupContent` and all subsequent inheritors have been replaced to the package `dev.inmo.tgbotapi.types.message.content.media` * `MediaGroupContent` Now extends `TextedMediaContent` instead of `MediaContent` * Add `reply` functions with the texted content with including of text +* `Utils`: + * Improve work with retrieving of accumulated updates ## 0.38.11 diff --git a/tgbotapi.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt b/tgbotapi.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt index e972a61121..b58806440a 100644 --- a/tgbotapi.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt +++ b/tgbotapi.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt @@ -15,6 +15,7 @@ import dev.inmo.tgbotapi.types.update.abstracts.Update import dev.inmo.tgbotapi.updateshandlers.* import dev.inmo.tgbotapi.utils.* import io.ktor.client.features.HttpRequestTimeoutException +import io.ktor.utils.io.CancellationException import kotlinx.coroutines.* import kotlinx.coroutines.flow.* @@ -89,6 +90,29 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling( updatesReceiver ) +/** + * @return [kotlinx.coroutines.flow.Flow] which will emit updates to the collector while they will be accumulated. Works + * the same as [longPollingFlow], but it will cancel the flow after the first one [HttpRequestTimeoutException] + */ +fun TelegramBot.createAccumulatedUpdatesRetrieverFlow( + avoidInlineQueries: Boolean = false, + avoidCallbackQueries: Boolean = false, + exceptionsHandler: ExceptionHandler? = null, + allowedUpdates: List? = null +): Flow = longPollingFlow( + timeoutSeconds = 0, + exceptionsHandler = { + if (it is HttpRequestTimeoutException) { + throw CancellationException("Cancel due to absence of new updates") + } else { + exceptionsHandler ?.invoke(it) + } + }, + allowedUpdates = allowedUpdates +).filter { + !(it is InlineQueryUpdate && avoidInlineQueries || it is CallbackQueryUpdate && avoidCallbackQueries) +} + fun TelegramBot.retrieveAccumulatedUpdates( avoidInlineQueries: Boolean = false, avoidCallbackQueries: Boolean = false, @@ -96,51 +120,15 @@ fun TelegramBot.retrieveAccumulatedUpdates( exceptionsHandler: (ExceptionHandler)? = null, allowedUpdates: List? = null, updatesReceiver: UpdateReceiver -): Job = scope.launch { - safelyWithoutExceptions { - startGettingOfUpdatesByLongPolling( - 0, - CoroutineScope(coroutineContext + SupervisorJob()), - { - if (it is HttpRequestTimeoutException) { - throw CancellationException("Cancel due to absence of new updates") - } else { - exceptionsHandler ?.invoke(it) - } - }, - allowedUpdates - ) { - when { - it is InlineQueryUpdate && avoidInlineQueries || - it is CallbackQueryUpdate && avoidCallbackQueries -> return@startGettingOfUpdatesByLongPolling - else -> updatesReceiver(it) - } - }.join() - } -} - -/** - * @return [kotlinx.coroutines.flow.Flow] which will emit updates to the collector while they will be accumulated. Works - * the same as [retrieveAccumulatedUpdates], but pass [kotlinx.coroutines.flow.FlowCollector.emit] as a callback - */ -fun TelegramBot.createAccumulatedUpdatesRetrieverFlow( - avoidInlineQueries: Boolean = false, - avoidCallbackQueries: Boolean = false, - exceptionsHandler: ExceptionHandler? = null, - allowedUpdates: List? = null -): Flow = channelFlow { - val parentContext = kotlin.coroutines.coroutineContext - channel.apply { - retrieveAccumulatedUpdates( - avoidInlineQueries, - avoidCallbackQueries, - CoroutineScope(parentContext), - exceptionsHandler, - allowedUpdates, - ::send - ).join() - close() - } +): Job = createAccumulatedUpdatesRetrieverFlow( + avoidInlineQueries, + avoidCallbackQueries, + exceptionsHandler, + allowedUpdates +).subscribeSafelyWithoutExceptions( + scope.LinkedSupervisorScope() +) { + updatesReceiver(it) } fun TelegramBot.retrieveAccumulatedUpdates( @@ -149,9 +137,30 @@ fun TelegramBot.retrieveAccumulatedUpdates( avoidCallbackQueries: Boolean = false, scope: CoroutineScope = CoroutineScope(Dispatchers.Default), exceptionsHandler: ExceptionHandler? = null -) = flowsUpdatesFilter.run { - retrieveAccumulatedUpdates(avoidInlineQueries, avoidCallbackQueries, scope, exceptionsHandler, allowedUpdates, asUpdateReceiver) -} +) = retrieveAccumulatedUpdates( + avoidInlineQueries, + avoidCallbackQueries, + scope, + exceptionsHandler, + flowsUpdatesFilter.allowedUpdates, + flowsUpdatesFilter.asUpdateReceiver +) + +suspend fun TelegramBot.flushAccumulatedUpdates( + avoidInlineQueries: Boolean = false, + avoidCallbackQueries: Boolean = false, + scope: CoroutineScope = CoroutineScope(Dispatchers.Default), + allowedUpdates: List? = null, + exceptionsHandler: ExceptionHandler? = null, + updatesReceiver: UpdateReceiver = {} +) = retrieveAccumulatedUpdates( + avoidInlineQueries, + avoidCallbackQueries, + scope, + exceptionsHandler, + allowedUpdates, + updatesReceiver +).join() /** * Will [startGettingOfUpdatesByLongPolling] using incoming [flowsUpdatesFilter]. It is assumed that you ALREADY CONFIGURE