1
0
mirror of https://github.com/InsanusMokrassar/TelegramBotAPI.git synced 2024-11-22 16:23:48 +00:00

add opportunity to collect media groups with debounce

This commit is contained in:
InsanusMokrassar 2023-03-01 11:55:17 +06:00
parent 4182d8f3fe
commit 5a456bcdbf
5 changed files with 82 additions and 40 deletions

View File

@ -58,6 +58,7 @@ suspend fun <T : State> TelegramBot.buildBehaviourWithFSMAndStartLongPolling(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit> block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit>
): Pair<DefaultBehaviourContextWithFSM<T>, Job> = buildBehaviourWithFSM( ): Pair<DefaultBehaviourContextWithFSM<T>, Job> = buildBehaviourWithFSM(
upstreamUpdatesFlow, upstreamUpdatesFlow,
@ -70,7 +71,7 @@ suspend fun <T : State> TelegramBot.buildBehaviourWithFSMAndStartLongPolling(
).run { ).run {
this to scope.launch { this to scope.launch {
start() start()
longPolling(flowsUpdatesFilter, timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, defaultExceptionsHandler) longPolling(flowsUpdatesFilter, timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, mediaGroupsDebounceTimeMillis, defaultExceptionsHandler)
} }
} }
@ -131,6 +132,7 @@ suspend fun <T : State> TelegramBot.buildBehaviourWithFSMAndStartLongPolling(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit> block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit>
) = FlowsUpdatesFilter().let { ) = FlowsUpdatesFilter().let {
buildBehaviourWithFSM( buildBehaviourWithFSM(
@ -149,6 +151,7 @@ suspend fun <T : State> TelegramBot.buildBehaviourWithFSMAndStartLongPolling(
scope, scope,
autoDisableWebhooks, autoDisableWebhooks,
autoSkipTimeoutExceptions, autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis,
defaultExceptionsHandler defaultExceptionsHandler
) )
} }

View File

@ -46,6 +46,7 @@ suspend fun <T : State> telegramBotWithBehaviourAndFSM(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit> block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit>
): TelegramBot = telegramBot( ): TelegramBot = telegramBot(
token, token,
@ -63,6 +64,7 @@ suspend fun <T : State> telegramBotWithBehaviourAndFSM(
timeoutSeconds, timeoutSeconds,
autoDisableWebhooks, autoDisableWebhooks,
autoSkipTimeoutExceptions, autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis,
block block
) )
} }
@ -91,6 +93,7 @@ suspend fun <T : State> telegramBotWithBehaviourAndFSMAndStartLongPolling(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit> block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit>
): Pair<TelegramBot, Job> { ): Pair<TelegramBot, Job> {
return telegramBot( return telegramBot(
@ -108,6 +111,7 @@ suspend fun <T : State> telegramBotWithBehaviourAndFSMAndStartLongPolling(
timeoutSeconds, timeoutSeconds,
autoDisableWebhooks, autoDisableWebhooks,
autoSkipTimeoutExceptions, autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis,
block block
) )
} }

View File

@ -57,6 +57,7 @@ suspend fun TelegramBot.buildBehaviourWithLongPolling(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
block: BehaviourContextReceiver<Unit> block: BehaviourContextReceiver<Unit>
): Job { ): Job {
val behaviourContext = buildBehaviour( val behaviourContext = buildBehaviour(
@ -69,6 +70,7 @@ suspend fun TelegramBot.buildBehaviourWithLongPolling(
scope = behaviourContext, scope = behaviourContext,
timeoutSeconds = timeoutSeconds, timeoutSeconds = timeoutSeconds,
autoDisableWebhooks = autoDisableWebhooks, autoDisableWebhooks = autoDisableWebhooks,
autoSkipTimeoutExceptions = autoSkipTimeoutExceptions autoSkipTimeoutExceptions = autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis
) )
} }

View File

@ -70,6 +70,7 @@ suspend fun telegramBotWithBehaviourAndLongPolling(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
block: BehaviourContextReceiver<Unit> block: BehaviourContextReceiver<Unit>
): Pair<TelegramBot, Job> { ): Pair<TelegramBot, Job> {
return telegramBot( return telegramBot(
@ -84,6 +85,7 @@ suspend fun telegramBotWithBehaviourAndLongPolling(
timeoutSeconds, timeoutSeconds,
autoDisableWebhooks, autoDisableWebhooks,
autoSkipTimeoutExceptions, autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis,
block block
) )
} }

View File

@ -19,12 +19,17 @@ import io.ktor.utils.io.CancellationException
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
/**
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling
*/
fun TelegramBot.longPollingFlow( fun TelegramBot.longPollingFlow(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
exceptionsHandler: (ExceptionHandler<Unit>)? = null, exceptionsHandler: (ExceptionHandler<Unit>)? = null,
allowedUpdates: List<String>? = ALL_UPDATES_LIST, allowedUpdates: List<String>? = ALL_UPDATES_LIST,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
): Flow<Update> = channelFlow { ): Flow<Update> = channelFlow {
if (autoDisableWebhooks) { if (autoDisableWebhooks) {
runCatchingSafely { runCatchingSafely {
@ -47,6 +52,47 @@ fun TelegramBot.longPollingFlow(
var lastUpdateIdentifier: UpdateIdentifier? = null var lastUpdateIdentifier: UpdateIdentifier? = null
val updatesHandler: (suspend (List<Update>) -> Unit) = if (mediaGroupsDebounceTimeMillis != null) {
val scope = CoroutineScope(contextToWork)
val updatesReceiver = scope.updateHandlerWithMediaGroupsAdaptation(
::send,
mediaGroupsDebounceTimeMillis
);
{ originalUpdates: List<Update> ->
originalUpdates.forEach {
updatesReceiver(it)
}
}
} else {
{ originalUpdates: List<Update> ->
val converted = originalUpdates.convertWithMediaGroupUpdates()
/**
* Dirty hack for cases when the media group was retrieved not fully:
*
* We are throw out the last media group and will reretrieve it again in the next get updates
* and it will guarantee that it is full
*/
val updates = if (
originalUpdates.size == getUpdatesLimit.last
&& ((converted.last() as? BaseSentMessageUpdate) ?.data as? CommonMessage<*>) ?.content is MediaGroupContent<*>
) {
converted - converted.last()
} else {
converted
}
safelyWithResult {
for (update in updates) {
send(update)
lastUpdateIdentifier = update.updateId
}
}.onFailure {
cancel(it as? CancellationException ?: return@onFailure)
}
}
}
withContext(contextToWork) { withContext(contextToWork) {
while (isActive) { while (isActive) {
safely( safely(
@ -64,44 +110,14 @@ fun TelegramBot.longPollingFlow(
} }
} }
) { ) {
val updates = execute( execute(
GetUpdates( GetUpdates(
offset = lastUpdateIdentifier?.plus(1), offset = lastUpdateIdentifier?.plus(1),
timeout = timeoutSeconds, timeout = timeoutSeconds,
allowed_updates = allowedUpdates allowed_updates = allowedUpdates
) )
).let { originalUpdates -> ).let { originalUpdates ->
val converted = originalUpdates.convertWithMediaGroupUpdates() updatesHandler(originalUpdates)
/**
* Dirty hack for cases when the media group was retrieved not fully:
*
* We are throw out the last media group and will reretrieve it again in the next get updates
* and it will guarantee that it is full
*/
/**
* Dirty hack for cases when the media group was retrieved not fully:
*
* We are throw out the last media group and will reretrieve it again in the next get updates
* and it will guarantee that it is full
*/
if (
originalUpdates.size == getUpdatesLimit.last
&& ((converted.last() as? BaseSentMessageUpdate) ?.data as? CommonMessage<*>) ?.content is MediaGroupContent<*>
) {
converted - converted.last()
} else {
converted
}
}
safelyWithResult {
for (update in updates) {
send(update)
lastUpdateIdentifier = update.updateId
}
}.onFailure {
cancel(it as? CancellationException ?: return@onFailure)
} }
} }
} }
@ -115,13 +131,15 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling(
allowedUpdates: List<String>? = ALL_UPDATES_LIST, allowedUpdates: List<String>? = ALL_UPDATES_LIST,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
updatesReceiver: UpdateReceiver<Update> updatesReceiver: UpdateReceiver<Update>
): Job = longPollingFlow( ): Job = longPollingFlow(
timeoutSeconds = timeoutSeconds, timeoutSeconds = timeoutSeconds,
exceptionsHandler = exceptionsHandler, exceptionsHandler = exceptionsHandler,
allowedUpdates = allowedUpdates, allowedUpdates = allowedUpdates,
autoDisableWebhooks = autoDisableWebhooks, autoDisableWebhooks = autoDisableWebhooks,
autoSkipTimeoutExceptions = autoSkipTimeoutExceptions autoSkipTimeoutExceptions = autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis
).subscribeSafely( ).subscribeSafely(
scope, scope,
exceptionsHandler ?: defaultSafelyExceptionHandler, exceptionsHandler ?: defaultSafelyExceptionHandler,
@ -137,7 +155,8 @@ fun TelegramBot.createAccumulatedUpdatesRetrieverFlow(
avoidCallbackQueries: Boolean = false, avoidCallbackQueries: Boolean = false,
exceptionsHandler: ExceptionHandler<Unit>? = null, exceptionsHandler: ExceptionHandler<Unit>? = null,
allowedUpdates: List<String>? = ALL_UPDATES_LIST, allowedUpdates: List<String>? = ALL_UPDATES_LIST,
autoDisableWebhooks: Boolean = true autoDisableWebhooks: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
): Flow<Update> = longPollingFlow( ): Flow<Update> = longPollingFlow(
timeoutSeconds = 0, timeoutSeconds = 0,
exceptionsHandler = { exceptionsHandler = {
@ -149,7 +168,8 @@ fun TelegramBot.createAccumulatedUpdatesRetrieverFlow(
}, },
allowedUpdates = allowedUpdates, allowedUpdates = allowedUpdates,
autoDisableWebhooks = autoDisableWebhooks, autoDisableWebhooks = autoDisableWebhooks,
autoSkipTimeoutExceptions = false autoSkipTimeoutExceptions = false,
mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis
).filter { ).filter {
!(it is InlineQueryUpdate && avoidInlineQueries || it is CallbackQueryUpdate && avoidCallbackQueries) !(it is InlineQueryUpdate && avoidInlineQueries || it is CallbackQueryUpdate && avoidCallbackQueries)
} }
@ -161,13 +181,15 @@ fun TelegramBot.retrieveAccumulatedUpdates(
exceptionsHandler: (ExceptionHandler<Unit>)? = null, exceptionsHandler: (ExceptionHandler<Unit>)? = null,
allowedUpdates: List<String>? = ALL_UPDATES_LIST, allowedUpdates: List<String>? = ALL_UPDATES_LIST,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
updatesReceiver: UpdateReceiver<Update> updatesReceiver: UpdateReceiver<Update>
): Job = createAccumulatedUpdatesRetrieverFlow( ): Job = createAccumulatedUpdatesRetrieverFlow(
avoidInlineQueries, avoidInlineQueries,
avoidCallbackQueries, avoidCallbackQueries,
exceptionsHandler, exceptionsHandler,
allowedUpdates, allowedUpdates,
autoDisableWebhooks autoDisableWebhooks,
mediaGroupsDebounceTimeMillis
).subscribeSafelyWithoutExceptions( ).subscribeSafelyWithoutExceptions(
scope.LinkedSupervisorScope() scope.LinkedSupervisorScope()
) { ) {
@ -180,6 +202,7 @@ fun TelegramBot.retrieveAccumulatedUpdates(
avoidCallbackQueries: Boolean = false, avoidCallbackQueries: Boolean = false,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default), scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
exceptionsHandler: ExceptionHandler<Unit>? = null exceptionsHandler: ExceptionHandler<Unit>? = null
) = retrieveAccumulatedUpdates( ) = retrieveAccumulatedUpdates(
avoidInlineQueries, avoidInlineQueries,
@ -188,6 +211,7 @@ fun TelegramBot.retrieveAccumulatedUpdates(
exceptionsHandler, exceptionsHandler,
flowsUpdatesFilter.allowedUpdates, flowsUpdatesFilter.allowedUpdates,
autoDisableWebhooks, autoDisableWebhooks,
mediaGroupsDebounceTimeMillis,
flowsUpdatesFilter.asUpdateReceiver flowsUpdatesFilter.asUpdateReceiver
) )
@ -198,6 +222,7 @@ suspend fun TelegramBot.flushAccumulatedUpdates(
allowedUpdates: List<String>? = ALL_UPDATES_LIST, allowedUpdates: List<String>? = ALL_UPDATES_LIST,
exceptionsHandler: ExceptionHandler<Unit>? = null, exceptionsHandler: ExceptionHandler<Unit>? = null,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
updatesReceiver: UpdateReceiver<Update> = {} updatesReceiver: UpdateReceiver<Update> = {}
) = retrieveAccumulatedUpdates( ) = retrieveAccumulatedUpdates(
avoidInlineQueries, avoidInlineQueries,
@ -206,6 +231,7 @@ suspend fun TelegramBot.flushAccumulatedUpdates(
exceptionsHandler, exceptionsHandler,
allowedUpdates, allowedUpdates,
autoDisableWebhooks, autoDisableWebhooks,
mediaGroupsDebounceTimeMillis,
updatesReceiver updatesReceiver
).join() ).join()
@ -219,6 +245,7 @@ fun TelegramBot.longPolling(
scope: CoroutineScope = CoroutineScope(Dispatchers.Default), scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
exceptionsHandler: ExceptionHandler<Unit>? = null exceptionsHandler: ExceptionHandler<Unit>? = null
): Job = updatesFilter.run { ): Job = updatesFilter.run {
startGettingOfUpdatesByLongPolling( startGettingOfUpdatesByLongPolling(
@ -228,6 +255,7 @@ fun TelegramBot.longPolling(
allowedUpdates = allowedUpdates, allowedUpdates = allowedUpdates,
autoDisableWebhooks = autoDisableWebhooks, autoDisableWebhooks = autoDisableWebhooks,
autoSkipTimeoutExceptions = autoSkipTimeoutExceptions, autoSkipTimeoutExceptions = autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis,
updatesReceiver = asUpdateReceiver updatesReceiver = asUpdateReceiver
) )
} }
@ -245,8 +273,9 @@ fun TelegramBot.longPolling(
flowsUpdatesFilterUpdatesKeeperCount: Int = 100, flowsUpdatesFilterUpdatesKeeperCount: Int = 100,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
flowUpdatesPreset: FlowsUpdatesFilter.() -> Unit flowUpdatesPreset: FlowsUpdatesFilter.() -> Unit
): Job = longPolling(FlowsUpdatesFilter(flowsUpdatesFilterUpdatesKeeperCount).apply(flowUpdatesPreset), timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, exceptionsHandler) ): Job = longPolling(FlowsUpdatesFilter(flowsUpdatesFilterUpdatesKeeperCount).apply(flowUpdatesPreset), timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, mediaGroupsDebounceTimeMillis, exceptionsHandler)
fun RequestsExecutor.startGettingOfUpdatesByLongPolling( fun RequestsExecutor.startGettingOfUpdatesByLongPolling(
updatesFilter: UpdatesFilter, updatesFilter: UpdatesFilter,
@ -254,6 +283,7 @@ fun RequestsExecutor.startGettingOfUpdatesByLongPolling(
exceptionsHandler: ExceptionHandler<Unit>? = null, exceptionsHandler: ExceptionHandler<Unit>? = null,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default), scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
): Job = startGettingOfUpdatesByLongPolling( ): Job = startGettingOfUpdatesByLongPolling(
timeoutSeconds, timeoutSeconds,
@ -262,5 +292,6 @@ fun RequestsExecutor.startGettingOfUpdatesByLongPolling(
updatesFilter.allowedUpdates, updatesFilter.allowedUpdates,
autoDisableWebhooks, autoDisableWebhooks,
autoSkipTimeoutExceptions, autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis,
updatesFilter.asUpdateReceiver updatesFilter.asUpdateReceiver
) )