solution of #293

This commit is contained in:
InsanusMokrassar 2021-02-08 19:35:32 +06:00
parent 4449893608
commit 10f4817283
3 changed files with 76 additions and 2 deletions

View File

@ -4,6 +4,9 @@
* `Core`:
* Add `mention` variants for user ids and receiver variants ([#294](https://github.com/InsanusMokrassar/TelegramBotAPI/issues/294))
* Now `AbstractRequestCallFactory` will set up one-second delay for zero timeouts in `GetUpdate` requests
* Several extensions for `TelegramBotAPI` like `retrieveAccumulatedUpdates` have been added as a solution for
[#293](https://github.com/InsanusMokrassar/TelegramBotAPI/issues/293)
## 0.32.4

View File

@ -16,6 +16,8 @@ import io.ktor.http.ContentType
import kotlinx.serialization.json.Json
import kotlin.collections.set
var defaultUpdateTimeoutForZeroDelay = 1000L
abstract class AbstractRequestCallFactory : KtorCallFactory {
private val methodsCache: MutableMap<String, String> = mutableMapOf()
override suspend fun <T : Any> makeCall(
@ -41,6 +43,11 @@ abstract class AbstractRequestCallFactory : KtorCallFactory {
requestTimeoutMillis = customTimeoutMillis
socketTimeoutMillis = customTimeoutMillis
}
} else {
timeout {
requestTimeoutMillis = defaultUpdateTimeoutForZeroDelay
socketTimeoutMillis = defaultUpdateTimeoutForZeroDelay
}
}
}
}

View File

@ -1,7 +1,6 @@
package dev.inmo.tgbotapi.extensions.utils.updates.retrieving
import dev.inmo.micro_utils.coroutines.ExceptionHandler
import dev.inmo.micro_utils.coroutines.safely
import dev.inmo.micro_utils.coroutines.*
import dev.inmo.tgbotapi.bot.RequestsExecutor
import dev.inmo.tgbotapi.bot.TelegramBot
import dev.inmo.tgbotapi.bot.exceptions.RequestException
@ -14,7 +13,10 @@ import dev.inmo.tgbotapi.types.update.MediaGroupUpdates.*
import dev.inmo.tgbotapi.types.update.abstracts.Update
import dev.inmo.tgbotapi.updateshandlers.*
import dev.inmo.tgbotapi.utils.*
import io.ktor.client.features.HttpRequestTimeoutException
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.coroutineContext
fun TelegramBot.startGettingOfUpdatesByLongPolling(
timeoutSeconds: Seconds = 30,
@ -66,6 +68,68 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling(
}
}
fun TelegramBot.retrieveAccumulatedUpdates(
avoidInlineQueries: Boolean = false,
avoidCallbackQueries: Boolean = false,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
exceptionsHandler: (ExceptionHandler<Unit>)? = null,
allowedUpdates: List<String>? = null,
updatesReceiver: UpdateReceiver<Update>
): Job = scope.launch {
safelyWithoutExceptions {
startGettingOfUpdatesByLongPolling(
0,
CoroutineScope(coroutineContext + SupervisorJob()),
{
if (it is HttpRequestTimeoutException) {
throw CancellationException("Cancel due to absence of new updates")
} else {
exceptionsHandler ?.invoke(it)
}
},
allowedUpdates
) {
when {
it is InlineQueryUpdate && avoidInlineQueries ||
it is CallbackQueryUpdate && avoidCallbackQueries -> return@startGettingOfUpdatesByLongPolling
else -> updatesReceiver(it)
}
}.join()
}
}
/**
* @return [kotlinx.coroutines.flow.Flow] which will emit updates to the collector while they will be accumulated. Works
* the same as [retrieveAccumulatedUpdates], but pass [kotlinx.coroutines.flow.FlowCollector.emit] as a callback
*/
fun TelegramBot.createAccumulatedUpdatesRetrieverFlow(
avoidInlineQueries: Boolean = false,
avoidCallbackQueries: Boolean = false,
exceptionsHandler: ExceptionHandler<Unit>? = null,
allowedUpdates: List<String>? = null
): Flow<Update> = flow {
val parentContext = kotlin.coroutines.coroutineContext
retrieveAccumulatedUpdates(
avoidInlineQueries,
avoidCallbackQueries,
CoroutineScope(parentContext),
exceptionsHandler,
allowedUpdates
) {
withContext(parentContext) { emit(it) }
}.join()
}
fun TelegramBot.retrieveAccumulatedUpdates(
flowsUpdatesFilter: FlowsUpdatesFilter,
avoidInlineQueries: Boolean = false,
avoidCallbackQueries: Boolean = false,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
exceptionsHandler: ExceptionHandler<Unit>? = null
) = flowsUpdatesFilter.run {
retrieveAccumulatedUpdates(avoidInlineQueries, avoidCallbackQueries, scope, exceptionsHandler, allowedUpdates, asUpdateReceiver)
}
/**
* Will [startGettingOfUpdatesByLongPolling] using incoming [flowsUpdatesFilter]. It is assumed that you ALREADY CONFIGURE
* all updates receivers, because this method will trigger getting of updates and.