1
0
mirror of https://github.com/InsanusMokrassar/TelegramBotAPI.git synced 2025-09-03 23:29:33 +00:00
This commit is contained in:
2023-02-06 13:28:38 +06:00
parent f81d28dd5f
commit e3acdf1802
6 changed files with 136 additions and 55 deletions

View File

@@ -5,7 +5,6 @@ import dev.inmo.tgbotapi.bot.RequestsExecutor
import dev.inmo.tgbotapi.bot.TelegramBot
import dev.inmo.tgbotapi.bot.exceptions.*
import dev.inmo.tgbotapi.extensions.utils.updates.convertWithMediaGroupUpdates
import dev.inmo.tgbotapi.extensions.utils.updates.lastUpdateIdentifier
import dev.inmo.tgbotapi.requests.GetUpdates
import dev.inmo.tgbotapi.requests.webhook.DeleteWebhook
import dev.inmo.tgbotapi.types.*
@@ -24,7 +23,8 @@ fun TelegramBot.longPollingFlow(
timeoutSeconds: Seconds = 30,
exceptionsHandler: (ExceptionHandler<Unit>)? = null,
allowedUpdates: List<String>? = ALL_UPDATES_LIST,
autoDisableWebhooks: Boolean = true
autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true
): Flow<Update> = channelFlow {
if (autoDisableWebhooks) {
runCatchingSafely {
@@ -32,58 +32,77 @@ fun TelegramBot.longPollingFlow(
}
}
val contextSafelyExceptionHandler = coroutineContext[ContextSafelyExceptionHandlerKey]
val contextToWork = if (contextSafelyExceptionHandler == null || !autoSkipTimeoutExceptions) {
coroutineContext
} else {
coroutineContext + ContextSafelyExceptionHandler { e ->
if (e is HttpRequestTimeoutException || (e is CommonBotException && e.cause is HttpRequestTimeoutException)) {
return@ContextSafelyExceptionHandler
} else {
contextSafelyExceptionHandler.handler(e)
}
}
}
var lastUpdateIdentifier: UpdateIdentifier? = null
while (isActive) {
safely(
{ e ->
exceptionsHandler ?.invoke(e)
if (e is RequestException) {
delay(1000L)
withContext(contextToWork) {
while (isActive) {
safely(
{ e ->
val isHttpRequestTimeoutException = e is HttpRequestTimeoutException || (e is CommonBotException && e.cause is HttpRequestTimeoutException)
if (isHttpRequestTimeoutException && autoSkipTimeoutExceptions) {
return@safely
}
exceptionsHandler ?.invoke(e)
if (e is RequestException) {
delay(1000L)
}
if (e is GetUpdatesConflict && (exceptionsHandler == null || exceptionsHandler == defaultSafelyExceptionHandler)) {
println("Warning!!! Other bot with the same bot token requests updates with getUpdate in parallel")
}
}
if (e is GetUpdatesConflict && (exceptionsHandler == null || exceptionsHandler == defaultSafelyExceptionHandler)) {
println("Warning!!! Other bot with the same bot token requests updates with getUpdate in parallel")
) {
val updates = execute(
GetUpdates(
offset = lastUpdateIdentifier?.plus(1),
timeout = timeoutSeconds,
allowed_updates = allowedUpdates
)
).let { originalUpdates ->
val converted = originalUpdates.convertWithMediaGroupUpdates()
/**
* Dirty hack for cases when the media group was retrieved not fully:
*
* We are throw out the last media group and will reretrieve it again in the next get updates
* and it will guarantee that it is full
*/
/**
* Dirty hack for cases when the media group was retrieved not fully:
*
* We are throw out the last media group and will reretrieve it again in the next get updates
* and it will guarantee that it is full
*/
if (
originalUpdates.size == getUpdatesLimit.last
&& ((converted.last() as? BaseSentMessageUpdate) ?.data as? CommonMessage<*>) ?.content is MediaGroupContent<*>
) {
converted - converted.last()
} else {
converted
}
}
}
) {
val updates = execute(
GetUpdates(
offset = lastUpdateIdentifier?.plus(1),
timeout = timeoutSeconds,
allowed_updates = allowedUpdates
)
).let { originalUpdates ->
val converted = originalUpdates.convertWithMediaGroupUpdates()
/**
* Dirty hack for cases when the media group was retrieved not fully:
*
* We are throw out the last media group and will reretrieve it again in the next get updates
* and it will guarantee that it is full
*/
/**
* Dirty hack for cases when the media group was retrieved not fully:
*
* We are throw out the last media group and will reretrieve it again in the next get updates
* and it will guarantee that it is full
*/
if (
originalUpdates.size == getUpdatesLimit.last
&& ((converted.last() as? BaseSentMessageUpdate) ?.data as? CommonMessage<*>) ?.content is MediaGroupContent<*>
) {
converted - converted.last()
} else {
converted
}
}
safelyWithResult {
for (update in updates) {
send(update)
safelyWithResult {
for (update in updates) {
send(update)
lastUpdateIdentifier = update.updateId
lastUpdateIdentifier = update.updateId
}
}.onFailure {
cancel(it as? CancellationException ?: return@onFailure)
}
}.onFailure {
cancel(it as? CancellationException ?: return@onFailure)
}
}
}
@@ -95,8 +114,15 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling(
exceptionsHandler: (ExceptionHandler<Unit>)? = null,
allowedUpdates: List<String>? = ALL_UPDATES_LIST,
autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true,
updatesReceiver: UpdateReceiver<Update>
): Job = longPollingFlow(timeoutSeconds, exceptionsHandler, allowedUpdates, autoDisableWebhooks).subscribeSafely(
): Job = longPollingFlow(
timeoutSeconds = timeoutSeconds,
exceptionsHandler = exceptionsHandler,
allowedUpdates = allowedUpdates,
autoDisableWebhooks = autoDisableWebhooks,
autoSkipTimeoutExceptions = autoSkipTimeoutExceptions
).subscribeSafely(
scope,
exceptionsHandler ?: defaultSafelyExceptionHandler,
updatesReceiver
@@ -111,7 +137,7 @@ fun TelegramBot.createAccumulatedUpdatesRetrieverFlow(
avoidCallbackQueries: Boolean = false,
exceptionsHandler: ExceptionHandler<Unit>? = null,
allowedUpdates: List<String>? = ALL_UPDATES_LIST,
autoDisableWebhooks: Boolean = true,
autoDisableWebhooks: Boolean = true
): Flow<Update> = longPollingFlow(
timeoutSeconds = 0,
exceptionsHandler = {
@@ -122,7 +148,8 @@ fun TelegramBot.createAccumulatedUpdatesRetrieverFlow(
}
},
allowedUpdates = allowedUpdates,
autoDisableWebhooks = autoDisableWebhooks
autoDisableWebhooks = autoDisableWebhooks,
autoSkipTimeoutExceptions = false
).filter {
!(it is InlineQueryUpdate && avoidInlineQueries || it is CallbackQueryUpdate && avoidCallbackQueries)
}
@@ -191,9 +218,18 @@ fun TelegramBot.longPolling(
timeoutSeconds: Seconds = 30,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true,
exceptionsHandler: ExceptionHandler<Unit>? = null
): Job = updatesFilter.run {
startGettingOfUpdatesByLongPolling(timeoutSeconds, scope, exceptionsHandler, allowedUpdates, autoDisableWebhooks, asUpdateReceiver)
startGettingOfUpdatesByLongPolling(
timeoutSeconds = timeoutSeconds,
scope = scope,
exceptionsHandler = exceptionsHandler,
allowedUpdates = allowedUpdates,
autoDisableWebhooks = autoDisableWebhooks,
autoSkipTimeoutExceptions = autoSkipTimeoutExceptions,
updatesReceiver = asUpdateReceiver
)
}
/**
@@ -208,8 +244,9 @@ fun TelegramBot.longPolling(
exceptionsHandler: ExceptionHandler<Unit>? = null,
flowsUpdatesFilterUpdatesKeeperCount: Int = 100,
autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true,
flowUpdatesPreset: FlowsUpdatesFilter.() -> Unit
): Job = longPolling(FlowsUpdatesFilter(flowsUpdatesFilterUpdatesKeeperCount).apply(flowUpdatesPreset), timeoutSeconds, scope, autoDisableWebhooks, exceptionsHandler)
): Job = longPolling(FlowsUpdatesFilter(flowsUpdatesFilterUpdatesKeeperCount).apply(flowUpdatesPreset), timeoutSeconds, scope, autoDisableWebhooks, autoSkipTimeoutExceptions, exceptionsHandler)
fun RequestsExecutor.startGettingOfUpdatesByLongPolling(
updatesFilter: UpdatesFilter,
@@ -217,11 +254,13 @@ fun RequestsExecutor.startGettingOfUpdatesByLongPolling(
exceptionsHandler: ExceptionHandler<Unit>? = null,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
autoDisableWebhooks: Boolean = true,
autoSkipTimeoutExceptions: Boolean = true,
): Job = startGettingOfUpdatesByLongPolling(
timeoutSeconds,
scope,
exceptionsHandler,
updatesFilter.allowedUpdates,
autoDisableWebhooks,
autoSkipTimeoutExceptions,
updatesFilter.asUpdateReceiver
)