From 10f4817283aff3f94e59197a392c2c711d7ddb05 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Mon, 8 Feb 2021 19:35:32 +0600 Subject: [PATCH] solution of #293 --- CHANGELOG.md | 3 + .../Ktor/base/AbstractRequestCallFactory.kt | 7 ++ .../utils/updates/retrieving/LongPolling.kt | 68 ++++++++++++++++++- 3 files changed, 76 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ae55ef7b3..d3e828c00e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ * `Core`: * Add `mention` variants for user ids and receiver variants ([#294](https://github.com/InsanusMokrassar/TelegramBotAPI/issues/294)) + * Now `AbstractRequestCallFactory` will set up one-second delay for zero timeouts in `GetUpdate` requests + * Several extensions for `TelegramBotAPI` like `retrieveAccumulatedUpdates` have been added as a solution for + [#293](https://github.com/InsanusMokrassar/TelegramBotAPI/issues/293) ## 0.32.4 diff --git a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/Ktor/base/AbstractRequestCallFactory.kt b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/Ktor/base/AbstractRequestCallFactory.kt index bedbc96283..b6d31e8875 100644 --- a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/Ktor/base/AbstractRequestCallFactory.kt +++ b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/Ktor/base/AbstractRequestCallFactory.kt @@ -16,6 +16,8 @@ import io.ktor.http.ContentType import kotlinx.serialization.json.Json import kotlin.collections.set +var defaultUpdateTimeoutForZeroDelay = 1000L + abstract class AbstractRequestCallFactory : KtorCallFactory { private val methodsCache: MutableMap = mutableMapOf() override suspend fun makeCall( @@ -41,6 +43,11 @@ abstract class AbstractRequestCallFactory : KtorCallFactory { requestTimeoutMillis = customTimeoutMillis socketTimeoutMillis = customTimeoutMillis } + } else { + timeout { + requestTimeoutMillis = defaultUpdateTimeoutForZeroDelay + socketTimeoutMillis = defaultUpdateTimeoutForZeroDelay + } } } } diff --git a/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt b/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt index 7519638811..c5ac93a0d4 100644 --- a/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt +++ b/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt @@ -1,7 +1,6 @@ package dev.inmo.tgbotapi.extensions.utils.updates.retrieving -import dev.inmo.micro_utils.coroutines.ExceptionHandler -import dev.inmo.micro_utils.coroutines.safely +import dev.inmo.micro_utils.coroutines.* import dev.inmo.tgbotapi.bot.RequestsExecutor import dev.inmo.tgbotapi.bot.TelegramBot import dev.inmo.tgbotapi.bot.exceptions.RequestException @@ -14,7 +13,10 @@ import dev.inmo.tgbotapi.types.update.MediaGroupUpdates.* import dev.inmo.tgbotapi.types.update.abstracts.Update import dev.inmo.tgbotapi.updateshandlers.* import dev.inmo.tgbotapi.utils.* +import io.ktor.client.features.HttpRequestTimeoutException import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlin.coroutines.coroutineContext fun TelegramBot.startGettingOfUpdatesByLongPolling( timeoutSeconds: Seconds = 30, @@ -66,6 +68,68 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling( } } +fun TelegramBot.retrieveAccumulatedUpdates( + avoidInlineQueries: Boolean = false, + avoidCallbackQueries: Boolean = false, + scope: CoroutineScope = CoroutineScope(Dispatchers.Default), + exceptionsHandler: (ExceptionHandler)? = null, + allowedUpdates: List? = null, + updatesReceiver: UpdateReceiver +): Job = scope.launch { + safelyWithoutExceptions { + startGettingOfUpdatesByLongPolling( + 0, + CoroutineScope(coroutineContext + SupervisorJob()), + { + if (it is HttpRequestTimeoutException) { + throw CancellationException("Cancel due to absence of new updates") + } else { + exceptionsHandler ?.invoke(it) + } + }, + allowedUpdates + ) { + when { + it is InlineQueryUpdate && avoidInlineQueries || + it is CallbackQueryUpdate && avoidCallbackQueries -> return@startGettingOfUpdatesByLongPolling + else -> updatesReceiver(it) + } + }.join() + } +} + +/** + * @return [kotlinx.coroutines.flow.Flow] which will emit updates to the collector while they will be accumulated. Works + * the same as [retrieveAccumulatedUpdates], but pass [kotlinx.coroutines.flow.FlowCollector.emit] as a callback + */ +fun TelegramBot.createAccumulatedUpdatesRetrieverFlow( + avoidInlineQueries: Boolean = false, + avoidCallbackQueries: Boolean = false, + exceptionsHandler: ExceptionHandler? = null, + allowedUpdates: List? = null +): Flow = flow { + val parentContext = kotlin.coroutines.coroutineContext + retrieveAccumulatedUpdates( + avoidInlineQueries, + avoidCallbackQueries, + CoroutineScope(parentContext), + exceptionsHandler, + allowedUpdates + ) { + withContext(parentContext) { emit(it) } + }.join() +} + +fun TelegramBot.retrieveAccumulatedUpdates( + flowsUpdatesFilter: FlowsUpdatesFilter, + avoidInlineQueries: Boolean = false, + avoidCallbackQueries: Boolean = false, + scope: CoroutineScope = CoroutineScope(Dispatchers.Default), + exceptionsHandler: ExceptionHandler? = null +) = flowsUpdatesFilter.run { + retrieveAccumulatedUpdates(avoidInlineQueries, avoidCallbackQueries, scope, exceptionsHandler, allowedUpdates, asUpdateReceiver) +} + /** * Will [startGettingOfUpdatesByLongPolling] using incoming [flowsUpdatesFilter]. It is assumed that you ALREADY CONFIGURE * all updates receivers, because this method will trigger getting of updates and.