diff --git a/tgbotapi.behaviour_builder.fsm/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/BehaviourContextWithFSMBuilder.kt b/tgbotapi.behaviour_builder.fsm/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/BehaviourContextWithFSMBuilder.kt index f0efd52d45..d3452123f6 100644 --- a/tgbotapi.behaviour_builder.fsm/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/BehaviourContextWithFSMBuilder.kt +++ b/tgbotapi.behaviour_builder.fsm/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/BehaviourContextWithFSMBuilder.kt @@ -58,6 +58,7 @@ suspend fun TelegramBot.buildBehaviourWithFSMAndStartLongPolling( timeoutSeconds: Seconds = 30, autoDisableWebhooks: Boolean = true, autoSkipTimeoutExceptions: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, block: CustomBehaviourContextReceiver, Unit> ): Pair, Job> = buildBehaviourWithFSM( upstreamUpdatesFlow, @@ -70,7 +71,7 @@ suspend fun TelegramBot.buildBehaviourWithFSMAndStartLongPolling( ).run { this to scope.launch { start() - longPolling(flowsUpdatesFilter, timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, defaultExceptionsHandler) + longPolling(flowsUpdatesFilter, timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, mediaGroupsDebounceTimeMillis, defaultExceptionsHandler) } } @@ -131,6 +132,7 @@ suspend fun TelegramBot.buildBehaviourWithFSMAndStartLongPolling( timeoutSeconds: Seconds = 30, autoDisableWebhooks: Boolean = true, autoSkipTimeoutExceptions: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, block: CustomBehaviourContextReceiver, Unit> ) = FlowsUpdatesFilter().let { buildBehaviourWithFSM( @@ -149,6 +151,7 @@ suspend fun TelegramBot.buildBehaviourWithFSMAndStartLongPolling( scope, autoDisableWebhooks, autoSkipTimeoutExceptions, + mediaGroupsDebounceTimeMillis, defaultExceptionsHandler ) } diff --git a/tgbotapi.behaviour_builder.fsm/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/TelegramBotWithFSM.kt b/tgbotapi.behaviour_builder.fsm/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/TelegramBotWithFSM.kt index 97a1cd09a6..b4dd08dca6 100644 --- a/tgbotapi.behaviour_builder.fsm/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/TelegramBotWithFSM.kt +++ b/tgbotapi.behaviour_builder.fsm/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/TelegramBotWithFSM.kt @@ -46,6 +46,7 @@ suspend fun telegramBotWithBehaviourAndFSM( timeoutSeconds: Seconds = 30, autoDisableWebhooks: Boolean = true, autoSkipTimeoutExceptions: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, block: CustomBehaviourContextReceiver, Unit> ): TelegramBot = telegramBot( token, @@ -63,6 +64,7 @@ suspend fun telegramBotWithBehaviourAndFSM( timeoutSeconds, autoDisableWebhooks, autoSkipTimeoutExceptions, + mediaGroupsDebounceTimeMillis, block ) } @@ -91,6 +93,7 @@ suspend fun telegramBotWithBehaviourAndFSMAndStartLongPolling( timeoutSeconds: Seconds = 30, autoDisableWebhooks: Boolean = true, autoSkipTimeoutExceptions: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, block: CustomBehaviourContextReceiver, Unit> ): Pair { return telegramBot( @@ -108,6 +111,7 @@ suspend fun telegramBotWithBehaviourAndFSMAndStartLongPolling( timeoutSeconds, autoDisableWebhooks, autoSkipTimeoutExceptions, + mediaGroupsDebounceTimeMillis, block ) } diff --git a/tgbotapi.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/BehaviourBuilders.kt b/tgbotapi.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/BehaviourBuilders.kt index 730122b758..9ad2c878c1 100644 --- a/tgbotapi.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/BehaviourBuilders.kt +++ b/tgbotapi.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/BehaviourBuilders.kt @@ -57,6 +57,7 @@ suspend fun TelegramBot.buildBehaviourWithLongPolling( timeoutSeconds: Seconds = 30, autoDisableWebhooks: Boolean = true, autoSkipTimeoutExceptions: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, block: BehaviourContextReceiver ): Job { val behaviourContext = buildBehaviour( @@ -69,6 +70,7 @@ suspend fun TelegramBot.buildBehaviourWithLongPolling( scope = behaviourContext, timeoutSeconds = timeoutSeconds, autoDisableWebhooks = autoDisableWebhooks, - autoSkipTimeoutExceptions = autoSkipTimeoutExceptions + autoSkipTimeoutExceptions = autoSkipTimeoutExceptions, + mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis ) } diff --git a/tgbotapi.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/TelegramBot.kt b/tgbotapi.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/TelegramBot.kt index 36c8df0ff9..0abbf6cefe 100644 --- a/tgbotapi.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/TelegramBot.kt +++ b/tgbotapi.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/TelegramBot.kt @@ -70,6 +70,7 @@ suspend fun telegramBotWithBehaviourAndLongPolling( timeoutSeconds: Seconds = 30, autoDisableWebhooks: Boolean = true, autoSkipTimeoutExceptions: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, block: BehaviourContextReceiver ): Pair { return telegramBot( @@ -84,6 +85,7 @@ suspend fun telegramBotWithBehaviourAndLongPolling( timeoutSeconds, autoDisableWebhooks, autoSkipTimeoutExceptions, + mediaGroupsDebounceTimeMillis, block ) } diff --git a/tgbotapi.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt b/tgbotapi.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt index 50a592c86f..5849c705df 100644 --- a/tgbotapi.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt +++ b/tgbotapi.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt @@ -19,12 +19,17 @@ import io.ktor.utils.io.CancellationException import kotlinx.coroutines.* import kotlinx.coroutines.flow.* +/** + * @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null + * in case you wish to enable classic way of updates handling + */ fun TelegramBot.longPollingFlow( timeoutSeconds: Seconds = 30, exceptionsHandler: (ExceptionHandler)? = null, allowedUpdates: List? = ALL_UPDATES_LIST, autoDisableWebhooks: Boolean = true, - autoSkipTimeoutExceptions: Boolean = true + autoSkipTimeoutExceptions: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, ): Flow = channelFlow { if (autoDisableWebhooks) { runCatchingSafely { @@ -47,6 +52,47 @@ fun TelegramBot.longPollingFlow( var lastUpdateIdentifier: UpdateIdentifier? = null + val updatesHandler: (suspend (List) -> Unit) = if (mediaGroupsDebounceTimeMillis != null) { + val scope = CoroutineScope(contextToWork) + val updatesReceiver = scope.updateHandlerWithMediaGroupsAdaptation( + ::send, + mediaGroupsDebounceTimeMillis + ); + { originalUpdates: List -> + originalUpdates.forEach { + updatesReceiver(it) + } + } + } else { + { originalUpdates: List -> + val converted = originalUpdates.convertWithMediaGroupUpdates() + /** + * Dirty hack for cases when the media group was retrieved not fully: + * + * We are throw out the last media group and will reretrieve it again in the next get updates + * and it will guarantee that it is full + */ + val updates = if ( + originalUpdates.size == getUpdatesLimit.last + && ((converted.last() as? BaseSentMessageUpdate) ?.data as? CommonMessage<*>) ?.content is MediaGroupContent<*> + ) { + converted - converted.last() + } else { + converted + } + + safelyWithResult { + for (update in updates) { + send(update) + + lastUpdateIdentifier = update.updateId + } + }.onFailure { + cancel(it as? CancellationException ?: return@onFailure) + } + } + } + withContext(contextToWork) { while (isActive) { safely( @@ -64,44 +110,14 @@ fun TelegramBot.longPollingFlow( } } ) { - val updates = execute( + execute( GetUpdates( offset = lastUpdateIdentifier?.plus(1), timeout = timeoutSeconds, allowed_updates = allowedUpdates ) ).let { originalUpdates -> - val converted = originalUpdates.convertWithMediaGroupUpdates() - /** - * Dirty hack for cases when the media group was retrieved not fully: - * - * We are throw out the last media group and will reretrieve it again in the next get updates - * and it will guarantee that it is full - */ - /** - * Dirty hack for cases when the media group was retrieved not fully: - * - * We are throw out the last media group and will reretrieve it again in the next get updates - * and it will guarantee that it is full - */ - if ( - originalUpdates.size == getUpdatesLimit.last - && ((converted.last() as? BaseSentMessageUpdate) ?.data as? CommonMessage<*>) ?.content is MediaGroupContent<*> - ) { - converted - converted.last() - } else { - converted - } - } - - safelyWithResult { - for (update in updates) { - send(update) - - lastUpdateIdentifier = update.updateId - } - }.onFailure { - cancel(it as? CancellationException ?: return@onFailure) + updatesHandler(originalUpdates) } } } @@ -115,13 +131,15 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling( allowedUpdates: List? = ALL_UPDATES_LIST, autoDisableWebhooks: Boolean = true, autoSkipTimeoutExceptions: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, updatesReceiver: UpdateReceiver ): Job = longPollingFlow( timeoutSeconds = timeoutSeconds, exceptionsHandler = exceptionsHandler, allowedUpdates = allowedUpdates, autoDisableWebhooks = autoDisableWebhooks, - autoSkipTimeoutExceptions = autoSkipTimeoutExceptions + autoSkipTimeoutExceptions = autoSkipTimeoutExceptions, + mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis ).subscribeSafely( scope, exceptionsHandler ?: defaultSafelyExceptionHandler, @@ -137,7 +155,8 @@ fun TelegramBot.createAccumulatedUpdatesRetrieverFlow( avoidCallbackQueries: Boolean = false, exceptionsHandler: ExceptionHandler? = null, allowedUpdates: List? = ALL_UPDATES_LIST, - autoDisableWebhooks: Boolean = true + autoDisableWebhooks: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, ): Flow = longPollingFlow( timeoutSeconds = 0, exceptionsHandler = { @@ -149,7 +168,8 @@ fun TelegramBot.createAccumulatedUpdatesRetrieverFlow( }, allowedUpdates = allowedUpdates, autoDisableWebhooks = autoDisableWebhooks, - autoSkipTimeoutExceptions = false + autoSkipTimeoutExceptions = false, + mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis ).filter { !(it is InlineQueryUpdate && avoidInlineQueries || it is CallbackQueryUpdate && avoidCallbackQueries) } @@ -161,13 +181,15 @@ fun TelegramBot.retrieveAccumulatedUpdates( exceptionsHandler: (ExceptionHandler)? = null, allowedUpdates: List? = ALL_UPDATES_LIST, autoDisableWebhooks: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, updatesReceiver: UpdateReceiver ): Job = createAccumulatedUpdatesRetrieverFlow( avoidInlineQueries, avoidCallbackQueries, exceptionsHandler, allowedUpdates, - autoDisableWebhooks + autoDisableWebhooks, + mediaGroupsDebounceTimeMillis ).subscribeSafelyWithoutExceptions( scope.LinkedSupervisorScope() ) { @@ -180,6 +202,7 @@ fun TelegramBot.retrieveAccumulatedUpdates( avoidCallbackQueries: Boolean = false, scope: CoroutineScope = CoroutineScope(Dispatchers.Default), autoDisableWebhooks: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, exceptionsHandler: ExceptionHandler? = null ) = retrieveAccumulatedUpdates( avoidInlineQueries, @@ -188,6 +211,7 @@ fun TelegramBot.retrieveAccumulatedUpdates( exceptionsHandler, flowsUpdatesFilter.allowedUpdates, autoDisableWebhooks, + mediaGroupsDebounceTimeMillis, flowsUpdatesFilter.asUpdateReceiver ) @@ -198,6 +222,7 @@ suspend fun TelegramBot.flushAccumulatedUpdates( allowedUpdates: List? = ALL_UPDATES_LIST, exceptionsHandler: ExceptionHandler? = null, autoDisableWebhooks: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, updatesReceiver: UpdateReceiver = {} ) = retrieveAccumulatedUpdates( avoidInlineQueries, @@ -206,6 +231,7 @@ suspend fun TelegramBot.flushAccumulatedUpdates( exceptionsHandler, allowedUpdates, autoDisableWebhooks, + mediaGroupsDebounceTimeMillis, updatesReceiver ).join() @@ -219,6 +245,7 @@ fun TelegramBot.longPolling( scope: CoroutineScope = CoroutineScope(Dispatchers.Default), autoDisableWebhooks: Boolean = true, autoSkipTimeoutExceptions: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, exceptionsHandler: ExceptionHandler? = null ): Job = updatesFilter.run { startGettingOfUpdatesByLongPolling( @@ -228,6 +255,7 @@ fun TelegramBot.longPolling( allowedUpdates = allowedUpdates, autoDisableWebhooks = autoDisableWebhooks, autoSkipTimeoutExceptions = autoSkipTimeoutExceptions, + mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis, updatesReceiver = asUpdateReceiver ) } @@ -245,8 +273,9 @@ fun TelegramBot.longPolling( flowsUpdatesFilterUpdatesKeeperCount: Int = 100, autoDisableWebhooks: Boolean = true, autoSkipTimeoutExceptions: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, flowUpdatesPreset: FlowsUpdatesFilter.() -> Unit -): Job = longPolling(FlowsUpdatesFilter(flowsUpdatesFilterUpdatesKeeperCount).apply(flowUpdatesPreset), timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, exceptionsHandler) +): Job = longPolling(FlowsUpdatesFilter(flowsUpdatesFilterUpdatesKeeperCount).apply(flowUpdatesPreset), timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, mediaGroupsDebounceTimeMillis, exceptionsHandler) fun RequestsExecutor.startGettingOfUpdatesByLongPolling( updatesFilter: UpdatesFilter, @@ -254,6 +283,7 @@ fun RequestsExecutor.startGettingOfUpdatesByLongPolling( exceptionsHandler: ExceptionHandler? = null, scope: CoroutineScope = CoroutineScope(Dispatchers.Default), autoDisableWebhooks: Boolean = true, + mediaGroupsDebounceTimeMillis: Long? = 1000L, autoSkipTimeoutExceptions: Boolean = true, ): Job = startGettingOfUpdatesByLongPolling( timeoutSeconds, @@ -262,5 +292,6 @@ fun RequestsExecutor.startGettingOfUpdatesByLongPolling( updatesFilter.allowedUpdates, autoDisableWebhooks, autoSkipTimeoutExceptions, + mediaGroupsDebounceTimeMillis, updatesFilter.asUpdateReceiver )