From 77dff639f03d1348072b48eb5921d45417b8c5d8 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Mon, 28 Jun 2021 01:00:20 +0600 Subject: [PATCH] updates --- CHANGELOG.md | 2 +- gradle.properties | 2 +- .../updateshandlers/FlowsUpdatesFilter.kt | 8 +++-- .../behaviour_builder/BehaviourContext.kt | 20 +++++++------ .../behaviour_builder/expectations/Base.kt | 9 +++--- .../utils/formatting/EntitiesBuilder.kt | 26 +++++++++++++--- .../utils/updates/retrieving/LongPolling.kt | 30 +++++++++++++++---- 7 files changed, 69 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5313b3487..542d7294ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ * `Common`: * `Version`: * `Kotlin`: `1.5.10` -> `1.5.20` - * `MicroUtils`: `0.5.6` -> `0.5.14` + * `MicroUtils`: `0.5.6` -> `0.5.15` * `Behaviour Builder`: * New extensions `telegramBotWithBehaviour` * All triggers (`on*` extensions) have been modified to work in parallel by default (new parameter diff --git a/gradle.properties b/gradle.properties index 3e0f971c34..ce77349519 100644 --- a/gradle.properties +++ b/gradle.properties @@ -12,7 +12,7 @@ klock_version=2.1.2 uuid_version=0.3.0 ktor_version=1.6.0 -micro_utils_version=0.5.14 +micro_utils_version=0.5.15 javax_activation_version=1.1.1 diff --git a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/updateshandlers/FlowsUpdatesFilter.kt b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/updateshandlers/FlowsUpdatesFilter.kt index 9b86454801..35e565c1d1 100644 --- a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/updateshandlers/FlowsUpdatesFilter.kt +++ b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/updateshandlers/FlowsUpdatesFilter.kt @@ -5,6 +5,7 @@ import dev.inmo.tgbotapi.types.update.* import dev.inmo.tgbotapi.types.update.MediaGroupUpdates.* import dev.inmo.tgbotapi.types.update.abstracts.UnknownUpdate import dev.inmo.tgbotapi.types.update.abstracts.Update +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.* interface FlowsUpdatesFilter : UpdatesFilter { @@ -43,9 +44,10 @@ fun FlowsUpdatesFilter( @Suppress("EXPERIMENTAL_API_USAGE", "unused") class DefaultFlowsUpdatesFilter( - broadcastChannelsSize: Int = 100 + broadcastChannelsSize: Int = 100, + onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): FlowsUpdatesFilter { - private val updatesSharedFlow = MutableSharedFlow(extraBufferCapacity = broadcastChannelsSize) + private val updatesSharedFlow = MutableSharedFlow(extraBufferCapacity = broadcastChannelsSize, onBufferOverflow = onBufferOverflow) @Suppress("MemberVisibilityCanBePrivate") override val allUpdatesFlow: Flow = updatesSharedFlow.asSharedFlow() @Suppress("MemberVisibilityCanBePrivate") @@ -79,4 +81,4 @@ class DefaultFlowsUpdatesFilter( override val chatMemberUpdatedFlow: Flow = allUpdatesFlow.filterIsInstance() override val myChatMemberUpdatedFlow: Flow = allUpdatesFlow.filterIsInstance() override val unknownUpdateTypeFlow: Flow = allUpdatesFlow.filterIsInstance() -} \ No newline at end of file +} diff --git a/tgbotapi.extensions.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/BehaviourContext.kt b/tgbotapi.extensions.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/BehaviourContext.kt index 6ee5b6821e..5180e25055 100644 --- a/tgbotapi.extensions.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/BehaviourContext.kt +++ b/tgbotapi.extensions.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/BehaviourContext.kt @@ -1,7 +1,6 @@ package dev.inmo.tgbotapi.extensions.behaviour_builder -import dev.inmo.micro_utils.coroutines.subscribeSafelyWithoutExceptions -import dev.inmo.micro_utils.coroutines.weakLaunch +import dev.inmo.micro_utils.coroutines.* import dev.inmo.tgbotapi.bot.TelegramBot import dev.inmo.tgbotapi.types.update.abstracts.Update import dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter @@ -37,15 +36,18 @@ suspend fun BehaviourContext.doInSubContextWithFlowsUpdatesFilterSetup( newFlowsUpdatesFilterSetUp: BehaviourContextAndTypeReceiver?, stopOnCompletion: Boolean = true, behaviourContextReceiver: BehaviourContextReceiver -): T = supervisorScope { - val newContext = copy( +): T { + return copy( flowsUpdatesFilter = FlowsUpdatesFilter(), - scope = this - ) - newFlowsUpdatesFilterSetUp ?.let { - it.apply { invoke(newContext, this@doInSubContextWithFlowsUpdatesFilterSetup.flowsUpdatesFilter) } + scope = LinkedSupervisorScope() + ).run { + newFlowsUpdatesFilterSetUp ?.let { + it.apply { invoke(this@run, this@doInSubContextWithFlowsUpdatesFilterSetup.flowsUpdatesFilter) } + } + withContext(coroutineContext) { + behaviourContextReceiver().also { if (stopOnCompletion) stop() } + } } - newContext.behaviourContextReceiver().also { if (stopOnCompletion) stop() } } /** diff --git a/tgbotapi.extensions.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/expectations/Base.kt b/tgbotapi.extensions.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/expectations/Base.kt index 2e95e776fb..822785234a 100644 --- a/tgbotapi.extensions.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/expectations/Base.kt +++ b/tgbotapi.extensions.behaviour_builder/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/behaviour_builder/expectations/Base.kt @@ -1,5 +1,6 @@ package dev.inmo.tgbotapi.extensions.behaviour_builder.expectations +import dev.inmo.micro_utils.coroutines.safelyWithResult import dev.inmo.micro_utils.coroutines.safelyWithoutExceptions import dev.inmo.tgbotapi.bot.TelegramBot import dev.inmo.tgbotapi.extensions.behaviour_builder.BehaviourContext @@ -38,11 +39,11 @@ suspend fun FlowsUpdatesFilter.expectFlow( filter: suspend (Update) -> List ): Flow { val flow = allUpdatesFlow.flatMapConcat { - val result = safelyWithoutExceptions { filter(it) } - (if (result == null || result.isEmpty()) { + val result = safelyWithResult { filter(it) } + (if (result.isFailure || result.getOrThrow().isEmpty()) { if (cancelTrigger(it)) { cancelRequestFactory(it) ?.also { - safelyWithoutExceptions { bot.execute(it) } + safelyWithResult { bot.execute(it) } throw cancelledByFilterException } } @@ -51,7 +52,7 @@ suspend fun FlowsUpdatesFilter.expectFlow( } emptyList() } else { - result + result.getOrThrow() }).asFlow() } val result = if (count == null) { diff --git a/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/formatting/EntitiesBuilder.kt b/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/formatting/EntitiesBuilder.kt index c0f09e5d9a..1898f62f3f 100644 --- a/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/formatting/EntitiesBuilder.kt +++ b/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/formatting/EntitiesBuilder.kt @@ -28,16 +28,34 @@ class EntitiesBuilder internal constructor( */ fun build(): TextSourcesList = entities.toList() - fun add(source: TextSource) { + fun add(source: TextSource): EntitiesBuilder { entitiesList.add(source) + return this + } + + fun addAll(sources: Iterable): EntitiesBuilder { + entitiesList.addAll(sources) + return this } operator fun TextSource.unaryPlus() = add(this) - operator fun TextSourcesList.unaryPlus() = entitiesList.addAll(this) - operator fun invoke(vararg source: TextSource) = entitiesList.addAll(source) + operator fun TextSourcesList.unaryPlus() = addAll(this) + operator fun invoke(vararg source: TextSource) = addAll(source.toList()) - operator fun String.unaryPlus() { + operator fun String.unaryPlus(): EntitiesBuilder { add(dev.inmo.tgbotapi.types.MessageEntity.textsources.regular(this)) + return this@EntitiesBuilder + } + + operator fun plus(text: String) = text.unaryPlus() + operator fun plus(source: TextSource) = add(source) + operator fun plus(sources: Iterable) = addAll(sources) + + operator fun plus(other: EntitiesBuilder) = if (other == this) { + // do nothing; assume user + this + } else { + addAll(other.entitiesList) } } diff --git a/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt b/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt index 6cb849335e..1584f94c9d 100644 --- a/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt +++ b/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/updates/retrieving/LongPolling.kt @@ -17,13 +17,11 @@ import io.ktor.client.features.HttpRequestTimeoutException import kotlinx.coroutines.* import kotlinx.coroutines.flow.* -fun TelegramBot.startGettingOfUpdatesByLongPolling( +fun TelegramBot.longPollingFlow( timeoutSeconds: Seconds = 30, - scope: CoroutineScope = CoroutineScope(Dispatchers.Default), exceptionsHandler: (ExceptionHandler)? = null, allowedUpdates: List? = null, - updatesReceiver: UpdateReceiver -): Job = scope.launch { +): Flow = channelFlow { var lastUpdateIdentifier: UpdateIdentifier? = null while (isActive) { @@ -43,6 +41,12 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling( ) ).let { originalUpdates -> val converted = originalUpdates.convertWithMediaGroupUpdates() + /** + * Dirty hack for cases when the media group was retrieved not fully: + * + * We are throw out the last media group and will reretrieve it again in the next get updates + * and it will guarantee that it is full + */ /** * Dirty hack for cases when the media group was retrieved not fully: * @@ -56,17 +60,31 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling( } } - safely { + safelyWithResult { for (update in updates) { - updatesReceiver(update) + send(update) lastUpdateIdentifier = update.lastUpdateIdentifier() } + }.onFailure { + cancel(it as? CancellationException ?: return@onFailure) } } } } +fun TelegramBot.startGettingOfUpdatesByLongPolling( + timeoutSeconds: Seconds = 30, + scope: CoroutineScope = CoroutineScope(Dispatchers.Default), + exceptionsHandler: (ExceptionHandler)? = null, + allowedUpdates: List? = null, + updatesReceiver: UpdateReceiver +): Job = longPollingFlow(timeoutSeconds, exceptionsHandler, allowedUpdates).subscribeSafely( + scope, + exceptionsHandler ?: defaultSafelyExceptionHandler, + updatesReceiver +) + fun TelegramBot.retrieveAccumulatedUpdates( avoidInlineQueries: Boolean = false, avoidCallbackQueries: Boolean = false,