mirror of
https://github.com/InsanusMokrassar/TelegramBotAPI.git
synced 2025-12-14 18:25:44 +00:00
fix of #1019 and improve longPollingFlow
This commit is contained in:
@@ -3705,7 +3705,9 @@ public final class dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPol
|
||||
public static synthetic fun longPolling$default (Ldev/inmo/tgbotapi/bot/RequestsExecutor;ILkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function2;IZZLjava/lang/Long;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/Job;
|
||||
public static synthetic fun longPolling$default (Ldev/inmo/tgbotapi/bot/RequestsExecutor;Ldev/inmo/tgbotapi/updateshandlers/UpdatesFilter;ILkotlinx/coroutines/CoroutineScope;ZZLjava/lang/Long;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/Job;
|
||||
public static final fun longPollingFlow (Ldev/inmo/tgbotapi/bot/RequestsExecutor;ILkotlin/jvm/functions/Function2;Ljava/util/List;ZZLjava/lang/Long;)Lkotlinx/coroutines/flow/Flow;
|
||||
public static final fun longPollingFlow (Ldev/inmo/tgbotapi/bot/RequestsExecutor;Lkotlin/jvm/functions/Function2;ZZLjava/lang/Long;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
|
||||
public static synthetic fun longPollingFlow$default (Ldev/inmo/tgbotapi/bot/RequestsExecutor;ILkotlin/jvm/functions/Function2;Ljava/util/List;ZZLjava/lang/Long;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
|
||||
public static synthetic fun longPollingFlow$default (Ldev/inmo/tgbotapi/bot/RequestsExecutor;Lkotlin/jvm/functions/Function2;ZZLjava/lang/Long;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
|
||||
public static final fun retrieveAccumulatedUpdates (Ldev/inmo/tgbotapi/bot/RequestsExecutor;Ldev/inmo/tgbotapi/updateshandlers/FlowsUpdatesFilter;ZZLkotlinx/coroutines/CoroutineScope;ZLjava/lang/Long;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/Job;
|
||||
public static final fun retrieveAccumulatedUpdates (Ldev/inmo/tgbotapi/bot/RequestsExecutor;ZZLkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function2;Ljava/util/List;ZLjava/lang/Long;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/Job;
|
||||
public static synthetic fun retrieveAccumulatedUpdates$default (Ldev/inmo/tgbotapi/bot/RequestsExecutor;Ldev/inmo/tgbotapi/updateshandlers/FlowsUpdatesFilter;ZZLkotlinx/coroutines/CoroutineScope;ZLjava/lang/Long;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/Job;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package dev.inmo.tgbotapi.extensions.utils.updates.retrieving
|
||||
|
||||
import dev.inmo.kslog.common.logger
|
||||
import dev.inmo.micro_utils.coroutines.*
|
||||
import dev.inmo.tgbotapi.bot.RequestsExecutor
|
||||
import dev.inmo.tgbotapi.bot.TelegramBot
|
||||
@@ -15,30 +14,36 @@ import dev.inmo.tgbotapi.types.update.*
|
||||
import dev.inmo.tgbotapi.types.update.abstracts.BaseSentMessageUpdate
|
||||
import dev.inmo.tgbotapi.types.update.abstracts.Update
|
||||
import dev.inmo.tgbotapi.updateshandlers.*
|
||||
import dev.inmo.tgbotapi.utils.DefaultKTgBotAPIKSLog
|
||||
import dev.inmo.tgbotapi.utils.causedCancellationException
|
||||
import dev.inmo.tgbotapi.utils.isCausedByCancellation
|
||||
import dev.inmo.tgbotapi.utils.isCausedUnresolvedAddressException
|
||||
import dev.inmo.tgbotapi.utils.subscribeWithBotLogger
|
||||
import io.ktor.client.network.sockets.ConnectTimeoutException
|
||||
import io.ktor.client.plugins.HttpRequestTimeoutException
|
||||
import io.ktor.util.network.UnresolvedAddressException
|
||||
import io.ktor.utils.io.CancellationException
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.*
|
||||
|
||||
/**
|
||||
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
|
||||
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
|
||||
* retrieved in different updates
|
||||
* Starts a long polling flow to receive updates continuously from the Telegram Bot API.
|
||||
*
|
||||
* This method retrieves updates from the bot, processes them, and emits them as a flow of `Update` objects.
|
||||
* It allows handling of updates with features like automatic webhook disabling, timeout exception skipping,
|
||||
* and media group handling with debounce time.
|
||||
*
|
||||
* @param exceptionsHandler Optional exception handler to manage exceptions that occur during the polling process.
|
||||
* @param autoDisableWebhooks Specifies whether to automatically disable existing webhooks before starting the long polling flow (default: `true`).
|
||||
* @param autoSkipTimeoutExceptions Defines if timeout-related exceptions should automatically be skipped during the polling (default: `true`).
|
||||
* @param mediaGroupsDebounceTimeMillis The debounce time in milliseconds for processing media group updates.
|
||||
* If set to `null`, media group handling is disabled (default: `1000L`).
|
||||
* @param getUpdatesRequestCreator A function that creates a `GetUpdates` request for retrieving updates.
|
||||
* This function accepts the identifier of the most recent update as an input and returns the new request.
|
||||
* @return A [Flow] of [Update] objects that represents the continuous stream of updates received.
|
||||
*/
|
||||
fun TelegramBot.longPollingFlow(
|
||||
timeoutSeconds: Seconds = 30,
|
||||
exceptionsHandler: (ExceptionHandler<Unit>)? = null,
|
||||
allowedUpdates: List<String>? = ALL_UPDATES_LIST,
|
||||
autoDisableWebhooks: Boolean = true,
|
||||
autoSkipTimeoutExceptions: Boolean = true,
|
||||
mediaGroupsDebounceTimeMillis: Long? = 1000L,
|
||||
getUpdatesRequestCreator: (sinceUpdate: UpdateId?) -> GetUpdates
|
||||
): Flow<Update> = channelFlow {
|
||||
if (autoDisableWebhooks) {
|
||||
runCatchingLogging(logger = Log) {
|
||||
@@ -119,11 +124,7 @@ fun TelegramBot.longPollingFlow(
|
||||
while (isActive) {
|
||||
runCatchingLogging(logger = Log) {
|
||||
execute(
|
||||
GetUpdates(
|
||||
offset = lastUpdateIdentifier?.plus(1),
|
||||
timeout = timeoutSeconds,
|
||||
allowed_updates = allowedUpdates
|
||||
)
|
||||
getUpdatesRequestCreator(lastUpdateIdentifier ?.plus(1))
|
||||
).let { originalUpdates ->
|
||||
updatesHandler(originalUpdates)
|
||||
}
|
||||
@@ -148,6 +149,45 @@ fun TelegramBot.longPollingFlow(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiates a long polling flow for receiving updates continuously from the Telegram Bot API.
|
||||
* This method provides a customized way to handle the retrieval of updates with options
|
||||
* to configure timeouts, update types, exception handling, and media group processing.
|
||||
*
|
||||
* @param timeoutSeconds The maximum time in seconds for the server to wait for available updates
|
||||
* before responding (default: `30`).
|
||||
* @param exceptionsHandler An optional exception handler for managing exceptions that occur during the
|
||||
* long polling process. If not provided, exceptions will pass through unhandled.
|
||||
* @param allowedUpdates A list of the update types to retrieve. By default, retrieves all possible
|
||||
* update types as defined in `ALL_UPDATES_LIST`.
|
||||
* @param autoDisableWebhooks Whether the current webhook should be disabled automatically before starting the
|
||||
* long polling flow (default: `true`).
|
||||
* @param autoSkipTimeoutExceptions Determines if timeout-related exceptions should be automatically skipped
|
||||
* during the polling process (default: `true`).
|
||||
* @param mediaGroupsDebounceTimeMillis The debounce time in milliseconds for processing updates containing
|
||||
* media groups. If set to `null`, media group handling is disabled (default: `1000L`).
|
||||
* @return A Flow that emits Update objects representing the updates fetched from the Telegram Bot API.
|
||||
*/
|
||||
fun TelegramBot.longPollingFlow(
|
||||
timeoutSeconds: Seconds = 30,
|
||||
exceptionsHandler: (ExceptionHandler<Unit>)? = null,
|
||||
allowedUpdates: List<String>? = ALL_UPDATES_LIST,
|
||||
autoDisableWebhooks: Boolean = true,
|
||||
autoSkipTimeoutExceptions: Boolean = true,
|
||||
mediaGroupsDebounceTimeMillis: Long? = 1000L,
|
||||
): Flow<Update> = longPollingFlow(
|
||||
exceptionsHandler = exceptionsHandler,
|
||||
autoDisableWebhooks = autoDisableWebhooks,
|
||||
autoSkipTimeoutExceptions = autoSkipTimeoutExceptions,
|
||||
mediaGroupsDebounceTimeMillis = mediaGroupsDebounceTimeMillis,
|
||||
) {
|
||||
GetUpdates(
|
||||
offset = it,
|
||||
timeout = timeoutSeconds,
|
||||
allowed_updates = allowedUpdates
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param mediaGroupsDebounceTimeMillis Will be used for calling of [updateHandlerWithMediaGroupsAdaptation]. Pass null
|
||||
* in case you wish to enable classic way of updates handling, but in that mode some media group messages can be
|
||||
|
||||
Reference in New Issue
Block a user