1
0
mirror of https://github.com/InsanusMokrassar/TelegramBotAPI.git synced 2024-11-26 03:58:44 +00:00

Merge pull request #729 from InsanusMokrassar/6.0.2

6.0.2
This commit is contained in:
InsanusMokrassar 2023-03-01 15:33:43 +06:00 committed by GitHub
commit 4c4aa491cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 187 additions and 62 deletions

View File

@ -1,5 +1,10 @@
# TelegramBotAPI changelog # TelegramBotAPI changelog
## 6.0.2
* `Core`:
* Long polling now uses media groups debounce as in webhooks
## 6.0.1 ## 6.0.1
* `Versions`: * `Versions`:

View File

@ -6,4 +6,4 @@ kotlin.incremental=true
kotlin.incremental.js=true kotlin.incremental.js=true
library_group=dev.inmo library_group=dev.inmo
library_version=6.0.1 library_version=6.0.2

View File

@ -1,7 +1,8 @@
package dev.inmo.tgbotapi.extensions.api.utils package dev.inmo.tgbotapi.extensions.api.utils
import dev.inmo.micro_utils.coroutines.launchSafelyWithoutExceptions
import dev.inmo.tgbotapi.extensions.api.InternalUtils.convertWithMediaGroupUpdates import dev.inmo.tgbotapi.extensions.api.InternalUtils.convertWithMediaGroupUpdates
import dev.inmo.tgbotapi.types.message.abstracts.PossiblySentViaBotCommonMessage import dev.inmo.tgbotapi.types.message.abstracts.PossiblyMediaGroupMessage
import dev.inmo.tgbotapi.types.update.abstracts.BaseMessageUpdate import dev.inmo.tgbotapi.types.update.abstracts.BaseMessageUpdate
import dev.inmo.tgbotapi.types.update.abstracts.Update import dev.inmo.tgbotapi.types.update.abstracts.Update
import dev.inmo.tgbotapi.updateshandlers.UpdateReceiver import dev.inmo.tgbotapi.updateshandlers.UpdateReceiver
@ -28,26 +29,18 @@ fun CoroutineScope.updateHandlerWithMediaGroupsAdaptation(
) )
launch { launch {
launch { launchSafelyWithoutExceptions {
for (update in updatesChannel) { for (update in updatesChannel) {
val dataAsPossiblySentViaBotCommonMessage = update.data as? PossiblySentViaBotCommonMessage<*> val data = update.data
when {
if (dataAsPossiblySentViaBotCommonMessage == null) { data is PossiblyMediaGroupMessage<*> && data.mediaGroupId != null -> {
output(update) mediaGroupChannel.send("${data.mediaGroupId}${update::class.simpleName}" to update as BaseMessageUpdate)
continue
} }
else -> output(update)
val mediaGroupId = dataAsPossiblySentViaBotCommonMessage.mediaGroupId
if (mediaGroupId == null) {
output(update)
continue
}
mediaGroupChannel.send("${mediaGroupId}${update::class.simpleName}" to update as BaseMessageUpdate)
} }
} }
launch { }
launchSafelyWithoutExceptions {
for ((_, mediaGroup) in mediaGroupAccumulatedChannel) { for ((_, mediaGroup) in mediaGroupAccumulatedChannel) {
mediaGroup.convertWithMediaGroupUpdates().forEach { mediaGroup.convertWithMediaGroupUpdates().forEach {
output(it) output(it)

View File

@ -8,6 +8,7 @@ import dev.inmo.micro_utils.fsm.common.utils.StateHandlingErrorHandler
import dev.inmo.micro_utils.fsm.common.utils.defaultStateHandlingErrorHandler import dev.inmo.micro_utils.fsm.common.utils.defaultStateHandlingErrorHandler
import dev.inmo.tgbotapi.bot.TelegramBot import dev.inmo.tgbotapi.bot.TelegramBot
import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.longPolling import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.longPolling
import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.updateHandlerWithMediaGroupsAdaptation
import dev.inmo.tgbotapi.types.Seconds import dev.inmo.tgbotapi.types.Seconds
import dev.inmo.tgbotapi.types.update.abstracts.Update import dev.inmo.tgbotapi.types.update.abstracts.Update
import dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter import dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter
@ -47,6 +48,10 @@ suspend fun <T : State> TelegramBot.buildBehaviourWithFSM(
* Use [buildBehaviourWithFSM] to create [BehaviourContextWithFSM] and launch getting of updates * Use [buildBehaviourWithFSM] to create [BehaviourContextWithFSM] and launch getting of updates
* using [longPolling]. For [longPolling] will be used result [BehaviourContextWithFSM] for both parameters * using [longPolling]. For [longPolling] will be used result [BehaviourContextWithFSM] for both parameters
* flowsUpdatesFilter and scope * flowsUpdatesFilter and scope
*
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*/ */
suspend fun <T : State> TelegramBot.buildBehaviourWithFSMAndStartLongPolling( suspend fun <T : State> TelegramBot.buildBehaviourWithFSMAndStartLongPolling(
upstreamUpdatesFlow: Flow<Update>? = null, upstreamUpdatesFlow: Flow<Update>? = null,
@ -58,6 +63,7 @@ suspend fun <T : State> TelegramBot.buildBehaviourWithFSMAndStartLongPolling(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit> block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit>
): Pair<DefaultBehaviourContextWithFSM<T>, Job> = buildBehaviourWithFSM( ): Pair<DefaultBehaviourContextWithFSM<T>, Job> = buildBehaviourWithFSM(
upstreamUpdatesFlow, upstreamUpdatesFlow,
@ -70,7 +76,7 @@ suspend fun <T : State> TelegramBot.buildBehaviourWithFSMAndStartLongPolling(
).run { ).run {
this to scope.launch { this to scope.launch {
start() start()
longPolling(flowsUpdatesFilter, timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, defaultExceptionsHandler) longPolling(flowsUpdatesFilter, timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, mediaGroupsDebounceTimeMillis, defaultExceptionsHandler)
} }
} }
@ -116,6 +122,10 @@ suspend fun <T : State> TelegramBot.buildBehaviourWithFSM(
* using [longPolling]. For [longPolling] will be used result [BehaviourContextWithFSM] for both parameters * using [longPolling]. For [longPolling] will be used result [BehaviourContextWithFSM] for both parameters
* flowsUpdatesFilter and scope * flowsUpdatesFilter and scope
* *
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*
* @see buildBehaviourWithFSMAndStartLongPolling * @see buildBehaviourWithFSMAndStartLongPolling
* @see BehaviourContext * @see BehaviourContext
* @see longPolling * @see longPolling
@ -131,6 +141,7 @@ suspend fun <T : State> TelegramBot.buildBehaviourWithFSMAndStartLongPolling(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit> block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit>
) = FlowsUpdatesFilter().let { ) = FlowsUpdatesFilter().let {
buildBehaviourWithFSM( buildBehaviourWithFSM(
@ -149,6 +160,7 @@ suspend fun <T : State> TelegramBot.buildBehaviourWithFSMAndStartLongPolling(
scope, scope,
autoDisableWebhooks, autoDisableWebhooks,
autoSkipTimeoutExceptions, autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis,
defaultExceptionsHandler defaultExceptionsHandler
) )
} }

View File

@ -11,6 +11,7 @@ import dev.inmo.tgbotapi.bot.ktor.KtorRequestsExecutorBuilder
import dev.inmo.tgbotapi.bot.ktor.telegramBot import dev.inmo.tgbotapi.bot.ktor.telegramBot
import dev.inmo.tgbotapi.bot.TelegramBot import dev.inmo.tgbotapi.bot.TelegramBot
import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.startGettingOfUpdatesByLongPolling import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.startGettingOfUpdatesByLongPolling
import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.updateHandlerWithMediaGroupsAdaptation
import dev.inmo.tgbotapi.types.Seconds import dev.inmo.tgbotapi.types.Seconds
import dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter import dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter
import dev.inmo.tgbotapi.utils.telegramBotAPIDefaultUrl import dev.inmo.tgbotapi.utils.telegramBotAPIDefaultUrl
@ -26,6 +27,10 @@ import kotlin.coroutines.coroutineContext
* **WARNING** This method WILL NOT launch any listening of updates. Use something like * **WARNING** This method WILL NOT launch any listening of updates. Use something like
* [startGettingOfUpdatesByLongPolling] or tools for work with webhooks * [startGettingOfUpdatesByLongPolling] or tools for work with webhooks
* *
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*
* @return Created bot which has been used to create [BehaviourContext] via [buildBehaviourWithFSM] * @return Created bot which has been used to create [BehaviourContext] via [buildBehaviourWithFSM]
* *
* @see [BehaviourContext] * @see [BehaviourContext]
@ -46,6 +51,7 @@ suspend fun <T : State> telegramBotWithBehaviourAndFSM(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit> block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit>
): TelegramBot = telegramBot( ): TelegramBot = telegramBot(
token, token,
@ -63,6 +69,7 @@ suspend fun <T : State> telegramBotWithBehaviourAndFSM(
timeoutSeconds, timeoutSeconds,
autoDisableWebhooks, autoDisableWebhooks,
autoSkipTimeoutExceptions, autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis,
block block
) )
} }
@ -71,6 +78,10 @@ suspend fun <T : State> telegramBotWithBehaviourAndFSM(
* Create bot using [telegramBot] and start listening for updates using [buildBehaviourWithFSMAndStartLongPolling]. This * Create bot using [telegramBot] and start listening for updates using [buildBehaviourWithFSMAndStartLongPolling]. This
* method will launch updates retrieving via long polling inside of [buildBehaviourWithFSMAndStartLongPolling] * method will launch updates retrieving via long polling inside of [buildBehaviourWithFSMAndStartLongPolling]
* *
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*
* @return Pair of [TelegramBot] and [Job]. This [Job] can be used to stop listening updates in your [block] you passed * @return Pair of [TelegramBot] and [Job]. This [Job] can be used to stop listening updates in your [block] you passed
* here * here
* *
@ -91,6 +102,7 @@ suspend fun <T : State> telegramBotWithBehaviourAndFSMAndStartLongPolling(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit> block: CustomBehaviourContextReceiver<DefaultBehaviourContextWithFSM<T>, Unit>
): Pair<TelegramBot, Job> { ): Pair<TelegramBot, Job> {
return telegramBot( return telegramBot(
@ -108,6 +120,7 @@ suspend fun <T : State> telegramBotWithBehaviourAndFSMAndStartLongPolling(
timeoutSeconds, timeoutSeconds,
autoDisableWebhooks, autoDisableWebhooks,
autoSkipTimeoutExceptions, autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis,
block block
) )
} }

View File

@ -5,6 +5,7 @@ import dev.inmo.micro_utils.coroutines.ExceptionHandler
import dev.inmo.tgbotapi.bot.TelegramBot import dev.inmo.tgbotapi.bot.TelegramBot
import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.longPolling import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.longPolling
import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.startGettingOfUpdatesByLongPolling import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.startGettingOfUpdatesByLongPolling
import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.updateHandlerWithMediaGroupsAdaptation
import dev.inmo.tgbotapi.types.Seconds import dev.inmo.tgbotapi.types.Seconds
import dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter import dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter
import kotlinx.coroutines.* import kotlinx.coroutines.*
@ -47,6 +48,10 @@ suspend fun TelegramBot.buildBehaviour(
* Use this method to build bot behaviour and run it via long polling. In case you wish to get [FlowsUpdatesFilter] for * Use this method to build bot behaviour and run it via long polling. In case you wish to get [FlowsUpdatesFilter] for
* additional manipulations, you must provide external [FlowsUpdatesFilter] in other [buildBehaviour] function. * additional manipulations, you must provide external [FlowsUpdatesFilter] in other [buildBehaviour] function.
* *
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*
* @see buildBehaviour * @see buildBehaviour
* @see BehaviourContext * @see BehaviourContext
* @see startGettingOfUpdatesByLongPolling * @see startGettingOfUpdatesByLongPolling
@ -57,6 +62,7 @@ suspend fun TelegramBot.buildBehaviourWithLongPolling(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
block: BehaviourContextReceiver<Unit> block: BehaviourContextReceiver<Unit>
): Job { ): Job {
val behaviourContext = buildBehaviour( val behaviourContext = buildBehaviour(
@ -69,6 +75,7 @@ suspend fun TelegramBot.buildBehaviourWithLongPolling(
scope = behaviourContext, scope = behaviourContext,
timeoutSeconds = timeoutSeconds, timeoutSeconds = timeoutSeconds,
autoDisableWebhooks = autoDisableWebhooks, autoDisableWebhooks = autoDisableWebhooks,
autoSkipTimeoutExceptions = autoSkipTimeoutExceptions autoSkipTimeoutExceptions = autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis
) )
} }

View File

@ -5,6 +5,7 @@ import dev.inmo.tgbotapi.bot.TelegramBot
import dev.inmo.tgbotapi.bot.ktor.KtorRequestsExecutorBuilder import dev.inmo.tgbotapi.bot.ktor.KtorRequestsExecutorBuilder
import dev.inmo.tgbotapi.bot.ktor.telegramBot import dev.inmo.tgbotapi.bot.ktor.telegramBot
import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.startGettingOfUpdatesByLongPolling import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.startGettingOfUpdatesByLongPolling
import dev.inmo.tgbotapi.extensions.utils.updates.retrieving.updateHandlerWithMediaGroupsAdaptation
import dev.inmo.tgbotapi.types.Seconds import dev.inmo.tgbotapi.types.Seconds
import dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter import dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter
import dev.inmo.tgbotapi.utils.telegramBotAPIDefaultUrl import dev.inmo.tgbotapi.utils.telegramBotAPIDefaultUrl
@ -53,6 +54,10 @@ suspend fun telegramBotWithBehaviour(
* *
* **WARNING** This method WILL launch updates listening inside of calling [buildBehaviourWithLongPolling] * **WARNING** This method WILL launch updates listening inside of calling [buildBehaviourWithLongPolling]
* *
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*
* @return Pair of [TelegramBot] and [Job]. This [Job] can be used to stop listening updates in your [block] you passed * @return Pair of [TelegramBot] and [Job]. This [Job] can be used to stop listening updates in your [block] you passed
* here * here
* *
@ -70,6 +75,7 @@ suspend fun telegramBotWithBehaviourAndLongPolling(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
block: BehaviourContextReceiver<Unit> block: BehaviourContextReceiver<Unit>
): Pair<TelegramBot, Job> { ): Pair<TelegramBot, Job> {
return telegramBot( return telegramBot(
@ -84,6 +90,7 @@ suspend fun telegramBotWithBehaviourAndLongPolling(
timeoutSeconds, timeoutSeconds,
autoDisableWebhooks, autoDisableWebhooks,
autoSkipTimeoutExceptions, autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis,
block block
) )
} }

View File

@ -27,7 +27,8 @@ fun List<Update>.lastUpdateIdentifier(): UpdateIdentifier? {
} }
/** /**
* Will convert incoming list of updates to list with [MediaGroupUpdate]s * Will convert incoming list of [Update]s to list with [Update]s, which include [dev.inmo.tgbotapi.types.message.abstracts.ContentMessage]s
* with [dev.inmo.tgbotapi.types.message.content.MediaGroupContent]
*/ */
fun List<Update>.convertWithMediaGroupUpdates(): List<Update> { fun List<Update>.convertWithMediaGroupUpdates(): List<Update> {
val resultUpdates = mutableListOf<Update>() val resultUpdates = mutableListOf<Update>()
@ -67,4 +68,5 @@ fun List<Update>.convertWithMediaGroupUpdates(): List<Update> {
* *
* @throws IllegalStateException * @throws IllegalStateException
*/ */
@Deprecated("Redundant", ReplaceWith("this"))
fun BaseEditMessageUpdate.toEditMediaGroupUpdate() = this fun BaseEditMessageUpdate.toEditMediaGroupUpdate() = this

View File

@ -19,12 +19,18 @@ import io.ktor.utils.io.CancellationException
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
/**
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*/
fun TelegramBot.longPollingFlow( fun TelegramBot.longPollingFlow(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
exceptionsHandler: (ExceptionHandler<Unit>)? = null, exceptionsHandler: (ExceptionHandler<Unit>)? = null,
allowedUpdates: List<String>? = ALL_UPDATES_LIST, allowedUpdates: List<String>? = ALL_UPDATES_LIST,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
): Flow<Update> = channelFlow { ): Flow<Update> = channelFlow {
if (autoDisableWebhooks) { if (autoDisableWebhooks) {
runCatchingSafely { runCatchingSafely {
@ -47,6 +53,52 @@ fun TelegramBot.longPollingFlow(
var lastUpdateIdentifier: UpdateIdentifier? = null var lastUpdateIdentifier: UpdateIdentifier? = null
val updatesHandler: (suspend (List<Update>) -> Unit) = if (mediaGroupsDebounceTimeMillis != null) {
val scope = CoroutineScope(contextToWork)
val updatesReceiver = scope.updateHandlerWithMediaGroupsAdaptation(
{
withContext(contextToWork) {
send(it)
}
},
mediaGroupsDebounceTimeMillis
);
{ originalUpdates: List<Update> ->
originalUpdates.forEach {
updatesReceiver(it)
lastUpdateIdentifier = maxOf(lastUpdateIdentifier ?: it.updateId, it.updateId)
}
}
} else {
{ originalUpdates: List<Update> ->
val converted = originalUpdates.convertWithMediaGroupUpdates()
/**
* Dirty hack for cases when the media group was retrieved not fully:
*
* We are throw out the last media group and will reretrieve it again in the next get updates
* and it will guarantee that it is full
*/
val updates = if (
originalUpdates.size == getUpdatesLimit.last
&& ((converted.last() as? BaseSentMessageUpdate) ?.data as? CommonMessage<*>) ?.content is MediaGroupContent<*>
) {
converted - converted.last()
} else {
converted
}
safelyWithResult {
for (update in updates) {
send(update)
lastUpdateIdentifier = update.updateId
}
}.onFailure {
cancel(it as? CancellationException ?: return@onFailure)
}
}
}
withContext(contextToWork) { withContext(contextToWork) {
while (isActive) { while (isActive) {
safely( safely(
@ -64,50 +116,25 @@ fun TelegramBot.longPollingFlow(
} }
} }
) { ) {
val updates = execute( execute(
GetUpdates( GetUpdates(
offset = lastUpdateIdentifier?.plus(1), offset = lastUpdateIdentifier?.plus(1),
timeout = timeoutSeconds, timeout = timeoutSeconds,
allowed_updates = allowedUpdates allowed_updates = allowedUpdates
) )
).let { originalUpdates -> ).let { originalUpdates ->
val converted = originalUpdates.convertWithMediaGroupUpdates() updatesHandler(originalUpdates)
}
}
}
}
}
/** /**
* Dirty hack for cases when the media group was retrieved not fully: * @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* * in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* We are throw out the last media group and will reretrieve it again in the next get updates * retrieved in different updates
* and it will guarantee that it is full
*/ */
/**
* Dirty hack for cases when the media group was retrieved not fully:
*
* We are throw out the last media group and will reretrieve it again in the next get updates
* and it will guarantee that it is full
*/
if (
originalUpdates.size == getUpdatesLimit.last
&& ((converted.last() as? BaseSentMessageUpdate) ?.data as? CommonMessage<*>) ?.content is MediaGroupContent<*>
) {
converted - converted.last()
} else {
converted
}
}
safelyWithResult {
for (update in updates) {
send(update)
lastUpdateIdentifier = update.updateId
}
}.onFailure {
cancel(it as? CancellationException ?: return@onFailure)
}
}
}
}
}
fun TelegramBot.startGettingOfUpdatesByLongPolling( fun TelegramBot.startGettingOfUpdatesByLongPolling(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default), scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
@ -115,13 +142,15 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling(
allowedUpdates: List<String>? = ALL_UPDATES_LIST, allowedUpdates: List<String>? = ALL_UPDATES_LIST,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
updatesReceiver: UpdateReceiver<Update> updatesReceiver: UpdateReceiver<Update>
): Job = longPollingFlow( ): Job = longPollingFlow(
timeoutSeconds = timeoutSeconds, timeoutSeconds = timeoutSeconds,
exceptionsHandler = exceptionsHandler, exceptionsHandler = exceptionsHandler,
allowedUpdates = allowedUpdates, allowedUpdates = allowedUpdates,
autoDisableWebhooks = autoDisableWebhooks, autoDisableWebhooks = autoDisableWebhooks,
autoSkipTimeoutExceptions = autoSkipTimeoutExceptions autoSkipTimeoutExceptions = autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis
).subscribeSafely( ).subscribeSafely(
scope, scope,
exceptionsHandler ?: defaultSafelyExceptionHandler, exceptionsHandler ?: defaultSafelyExceptionHandler,
@ -129,6 +158,10 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling(
) )
/** /**
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*
* @return [kotlinx.coroutines.flow.Flow] which will emit updates to the collector while they will be accumulated. Works * @return [kotlinx.coroutines.flow.Flow] which will emit updates to the collector while they will be accumulated. Works
* the same as [longPollingFlow], but it will cancel the flow after the first one [HttpRequestTimeoutException] * the same as [longPollingFlow], but it will cancel the flow after the first one [HttpRequestTimeoutException]
*/ */
@ -137,7 +170,8 @@ fun TelegramBot.createAccumulatedUpdatesRetrieverFlow(
avoidCallbackQueries: Boolean = false, avoidCallbackQueries: Boolean = false,
exceptionsHandler: ExceptionHandler<Unit>? = null, exceptionsHandler: ExceptionHandler<Unit>? = null,
allowedUpdates: List<String>? = ALL_UPDATES_LIST, allowedUpdates: List<String>? = ALL_UPDATES_LIST,
autoDisableWebhooks: Boolean = true autoDisableWebhooks: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
): Flow<Update> = longPollingFlow( ): Flow<Update> = longPollingFlow(
timeoutSeconds = 0, timeoutSeconds = 0,
exceptionsHandler = { exceptionsHandler = {
@ -149,11 +183,17 @@ fun TelegramBot.createAccumulatedUpdatesRetrieverFlow(
}, },
allowedUpdates = allowedUpdates, allowedUpdates = allowedUpdates,
autoDisableWebhooks = autoDisableWebhooks, autoDisableWebhooks = autoDisableWebhooks,
autoSkipTimeoutExceptions = false autoSkipTimeoutExceptions = false,
mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis
).filter { ).filter {
!(it is InlineQueryUpdate && avoidInlineQueries || it is CallbackQueryUpdate && avoidCallbackQueries) !(it is InlineQueryUpdate && avoidInlineQueries || it is CallbackQueryUpdate && avoidCallbackQueries)
} }
/**
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*/
fun TelegramBot.retrieveAccumulatedUpdates( fun TelegramBot.retrieveAccumulatedUpdates(
avoidInlineQueries: Boolean = false, avoidInlineQueries: Boolean = false,
avoidCallbackQueries: Boolean = false, avoidCallbackQueries: Boolean = false,
@ -161,25 +201,33 @@ fun TelegramBot.retrieveAccumulatedUpdates(
exceptionsHandler: (ExceptionHandler<Unit>)? = null, exceptionsHandler: (ExceptionHandler<Unit>)? = null,
allowedUpdates: List<String>? = ALL_UPDATES_LIST, allowedUpdates: List<String>? = ALL_UPDATES_LIST,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
updatesReceiver: UpdateReceiver<Update> updatesReceiver: UpdateReceiver<Update>
): Job = createAccumulatedUpdatesRetrieverFlow( ): Job = createAccumulatedUpdatesRetrieverFlow(
avoidInlineQueries, avoidInlineQueries,
avoidCallbackQueries, avoidCallbackQueries,
exceptionsHandler, exceptionsHandler,
allowedUpdates, allowedUpdates,
autoDisableWebhooks autoDisableWebhooks,
mediaGroupsDebounceTimeMillis
).subscribeSafelyWithoutExceptions( ).subscribeSafelyWithoutExceptions(
scope.LinkedSupervisorScope() scope.LinkedSupervisorScope()
) { ) {
updatesReceiver(it) updatesReceiver(it)
} }
/**
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*/
fun TelegramBot.retrieveAccumulatedUpdates( fun TelegramBot.retrieveAccumulatedUpdates(
flowsUpdatesFilter: FlowsUpdatesFilter, flowsUpdatesFilter: FlowsUpdatesFilter,
avoidInlineQueries: Boolean = false, avoidInlineQueries: Boolean = false,
avoidCallbackQueries: Boolean = false, avoidCallbackQueries: Boolean = false,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default), scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
exceptionsHandler: ExceptionHandler<Unit>? = null exceptionsHandler: ExceptionHandler<Unit>? = null
) = retrieveAccumulatedUpdates( ) = retrieveAccumulatedUpdates(
avoidInlineQueries, avoidInlineQueries,
@ -188,9 +236,15 @@ fun TelegramBot.retrieveAccumulatedUpdates(
exceptionsHandler, exceptionsHandler,
flowsUpdatesFilter.allowedUpdates, flowsUpdatesFilter.allowedUpdates,
autoDisableWebhooks, autoDisableWebhooks,
mediaGroupsDebounceTimeMillis,
flowsUpdatesFilter.asUpdateReceiver flowsUpdatesFilter.asUpdateReceiver
) )
/**
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*/
suspend fun TelegramBot.flushAccumulatedUpdates( suspend fun TelegramBot.flushAccumulatedUpdates(
avoidInlineQueries: Boolean = false, avoidInlineQueries: Boolean = false,
avoidCallbackQueries: Boolean = false, avoidCallbackQueries: Boolean = false,
@ -198,6 +252,7 @@ suspend fun TelegramBot.flushAccumulatedUpdates(
allowedUpdates: List<String>? = ALL_UPDATES_LIST, allowedUpdates: List<String>? = ALL_UPDATES_LIST,
exceptionsHandler: ExceptionHandler<Unit>? = null, exceptionsHandler: ExceptionHandler<Unit>? = null,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
updatesReceiver: UpdateReceiver<Update> = {} updatesReceiver: UpdateReceiver<Update> = {}
) = retrieveAccumulatedUpdates( ) = retrieveAccumulatedUpdates(
avoidInlineQueries, avoidInlineQueries,
@ -206,12 +261,17 @@ suspend fun TelegramBot.flushAccumulatedUpdates(
exceptionsHandler, exceptionsHandler,
allowedUpdates, allowedUpdates,
autoDisableWebhooks, autoDisableWebhooks,
mediaGroupsDebounceTimeMillis,
updatesReceiver updatesReceiver
).join() ).join()
/** /**
* Will [startGettingOfUpdatesByLongPolling] using incoming [flowsUpdatesFilter]. It is assumed that you ALREADY CONFIGURE * Will [startGettingOfUpdatesByLongPolling] using incoming [updatesFilter]. It is assumed that you ALREADY CONFIGURE
* all updates receivers, because this method will trigger getting of updates and. * all updates receivers, because this method will trigger getting of updates and.
*
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*/ */
fun TelegramBot.longPolling( fun TelegramBot.longPolling(
updatesFilter: UpdatesFilter, updatesFilter: UpdatesFilter,
@ -219,6 +279,7 @@ fun TelegramBot.longPolling(
scope: CoroutineScope = CoroutineScope(Dispatchers.Default), scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
exceptionsHandler: ExceptionHandler<Unit>? = null exceptionsHandler: ExceptionHandler<Unit>? = null
): Job = updatesFilter.run { ): Job = updatesFilter.run {
startGettingOfUpdatesByLongPolling( startGettingOfUpdatesByLongPolling(
@ -228,6 +289,7 @@ fun TelegramBot.longPolling(
allowedUpdates = allowedUpdates, allowedUpdates = allowedUpdates,
autoDisableWebhooks = autoDisableWebhooks, autoDisableWebhooks = autoDisableWebhooks,
autoSkipTimeoutExceptions = autoSkipTimeoutExceptions, autoSkipTimeoutExceptions = autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis,
updatesReceiver = asUpdateReceiver updatesReceiver = asUpdateReceiver
) )
} }
@ -236,6 +298,10 @@ fun TelegramBot.longPolling(
* Will enable [longPolling] by creating [FlowsUpdatesFilter] with [flowsUpdatesFilterUpdatesKeeperCount] as an argument * Will enable [longPolling] by creating [FlowsUpdatesFilter] with [flowsUpdatesFilterUpdatesKeeperCount] as an argument
* and applied [flowUpdatesPreset]. It is assumed that you WILL CONFIGURE all updates receivers in [flowUpdatesPreset], * and applied [flowUpdatesPreset]. It is assumed that you WILL CONFIGURE all updates receivers in [flowUpdatesPreset],
* because of after [flowUpdatesPreset] method calling will be triggered getting of updates. * because of after [flowUpdatesPreset] method calling will be triggered getting of updates.
*
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*/ */
@Suppress("unused") @Suppress("unused")
fun TelegramBot.longPolling( fun TelegramBot.longPolling(
@ -245,15 +311,22 @@ fun TelegramBot.longPolling(
flowsUpdatesFilterUpdatesKeeperCount: Int = 100, flowsUpdatesFilterUpdatesKeeperCount: Int = 100,
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
flowUpdatesPreset: FlowsUpdatesFilter.() -> Unit flowUpdatesPreset: FlowsUpdatesFilter.() -> Unit
): Job = longPolling(FlowsUpdatesFilter(flowsUpdatesFilterUpdatesKeeperCount).apply(flowUpdatesPreset), timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, exceptionsHandler) ): Job = longPolling(FlowsUpdatesFilter(flowsUpdatesFilterUpdatesKeeperCount).apply(flowUpdatesPreset), timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, mediaGroupsDebounceTimeMillis, exceptionsHandler)
/**
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*/
fun RequestsExecutor.startGettingOfUpdatesByLongPolling( fun RequestsExecutor.startGettingOfUpdatesByLongPolling(
updatesFilter: UpdatesFilter, updatesFilter: UpdatesFilter,
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
exceptionsHandler: ExceptionHandler<Unit>? = null, exceptionsHandler: ExceptionHandler<Unit>? = null,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default), scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
autoDisableWebhooks: Boolean = true, autoDisableWebhooks: Boolean = true,
mediaGroupsDebounceTimeMillis: Long? = 1000L,
autoSkipTimeoutExceptions: Boolean = true, autoSkipTimeoutExceptions: Boolean = true,
): Job = startGettingOfUpdatesByLongPolling( ): Job = startGettingOfUpdatesByLongPolling(
timeoutSeconds, timeoutSeconds,
@ -262,5 +335,6 @@ fun RequestsExecutor.startGettingOfUpdatesByLongPolling(
updatesFilter.allowedUpdates, updatesFilter.allowedUpdates,
autoDisableWebhooks, autoDisableWebhooks,
autoSkipTimeoutExceptions, autoSkipTimeoutExceptions,
mediaGroupsDebounceTimeMillis,
updatesFilter.asUpdateReceiver updatesFilter.asUpdateReceiver
) )

View File

@ -1,6 +1,5 @@
package dev.inmo.tgbotapi.extensions.utils.updates.retrieving package dev.inmo.tgbotapi.extensions.utils.updates.retrieving
import dev.inmo.micro_utils.coroutines.launchSafely
import dev.inmo.micro_utils.coroutines.launchSafelyWithoutExceptions import dev.inmo.micro_utils.coroutines.launchSafelyWithoutExceptions
import dev.inmo.tgbotapi.extensions.utils.updates.convertWithMediaGroupUpdates import dev.inmo.tgbotapi.extensions.utils.updates.convertWithMediaGroupUpdates
import dev.inmo.tgbotapi.types.message.abstracts.PossiblyMediaGroupMessage import dev.inmo.tgbotapi.types.message.abstracts.PossiblyMediaGroupMessage
@ -12,7 +11,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
/** /**
* Create [UpdateReceiver] object which will correctly accumulate updates and send into output updates which INCLUDE * Create [UpdateReceiver] object which will correctly accumulate updates and send into output updates which INCLUDE
* [dev.inmo.tgbotapi.types.update.MediaGroupUpdates.MediaGroupUpdate]s. * [dev.inmo.tgbotapi.types.update.MediaGroupUpdates.MediaGroupUpdate]s.

View File

@ -26,6 +26,9 @@ import java.util.concurrent.Executors
* @param [scope] Will be used for mapping of media groups * @param [scope] Will be used for mapping of media groups
* @param [exceptionsHandler] Pass this parameter to set custom exception handler for getting updates * @param [exceptionsHandler] Pass this parameter to set custom exception handler for getting updates
* @param [block] Some receiver block like [dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter] * @param [block] Some receiver block like [dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter]
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
* *
* @see dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter * @see dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter
* @see UpdatesFilter * @see UpdatesFilter
@ -57,6 +60,11 @@ fun Route.includeWebhookHandlingInRoute(
} }
} }
/**
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
*/
fun Route.includeWebhookHandlingInRouteWithFlows( fun Route.includeWebhookHandlingInRouteWithFlows(
scope: CoroutineScope, scope: CoroutineScope,
exceptionsHandler: ExceptionHandler<Unit>? = null, exceptionsHandler: ExceptionHandler<Unit>? = null,
@ -76,6 +84,9 @@ fun Route.includeWebhookHandlingInRouteWithFlows(
* @param listenRoute address to listen by bot. If null - will be set up in root of host * @param listenRoute address to listen by bot. If null - will be set up in root of host
* @param scope Scope which will be used for * @param scope Scope which will be used for
* @param privateKeyConfig If configured - server will be created with [sslConnector]. [connector] will be used otherwise * @param privateKeyConfig If configured - server will be created with [sslConnector]. [connector] will be used otherwise
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
* *
* @see dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter * @see dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter
* @see UpdatesFilter * @see UpdatesFilter
@ -132,6 +143,9 @@ fun startListenWebhooks(
* @param listenPort port which will be listen by bot * @param listenPort port which will be listen by bot
* @param listenRoute address to listen by bot * @param listenRoute address to listen by bot
* @param scope Scope which will be used for * @param scope Scope which will be used for
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
* retrieved in different updates
* *
* @see dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter * @see dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter
* @see UpdatesFilter * @see UpdatesFilter