1
0
mirror of https://github.com/InsanusMokrassar/TelegramBotAPI.git synced 2024-11-25 19:48:43 +00:00
This commit is contained in:
InsanusMokrassar 2021-06-28 01:00:20 +06:00
parent 47fe048b10
commit 77dff639f0
7 changed files with 69 additions and 28 deletions

View File

@ -5,7 +5,7 @@
* `Common`: * `Common`:
* `Version`: * `Version`:
* `Kotlin`: `1.5.10` -> `1.5.20` * `Kotlin`: `1.5.10` -> `1.5.20`
* `MicroUtils`: `0.5.6` -> `0.5.14` * `MicroUtils`: `0.5.6` -> `0.5.15`
* `Behaviour Builder`: * `Behaviour Builder`:
* New extensions `telegramBotWithBehaviour` * New extensions `telegramBotWithBehaviour`
* All triggers (`on*` extensions) have been modified to work in parallel by default (new parameter * All triggers (`on*` extensions) have been modified to work in parallel by default (new parameter

View File

@ -12,7 +12,7 @@ klock_version=2.1.2
uuid_version=0.3.0 uuid_version=0.3.0
ktor_version=1.6.0 ktor_version=1.6.0
micro_utils_version=0.5.14 micro_utils_version=0.5.15
javax_activation_version=1.1.1 javax_activation_version=1.1.1

View File

@ -5,6 +5,7 @@ import dev.inmo.tgbotapi.types.update.*
import dev.inmo.tgbotapi.types.update.MediaGroupUpdates.* import dev.inmo.tgbotapi.types.update.MediaGroupUpdates.*
import dev.inmo.tgbotapi.types.update.abstracts.UnknownUpdate import dev.inmo.tgbotapi.types.update.abstracts.UnknownUpdate
import dev.inmo.tgbotapi.types.update.abstracts.Update import dev.inmo.tgbotapi.types.update.abstracts.Update
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
interface FlowsUpdatesFilter : UpdatesFilter { interface FlowsUpdatesFilter : UpdatesFilter {
@ -43,9 +44,10 @@ fun FlowsUpdatesFilter(
@Suppress("EXPERIMENTAL_API_USAGE", "unused") @Suppress("EXPERIMENTAL_API_USAGE", "unused")
class DefaultFlowsUpdatesFilter( class DefaultFlowsUpdatesFilter(
broadcastChannelsSize: Int = 100 broadcastChannelsSize: Int = 100,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): FlowsUpdatesFilter { ): FlowsUpdatesFilter {
private val updatesSharedFlow = MutableSharedFlow<Update>(extraBufferCapacity = broadcastChannelsSize) private val updatesSharedFlow = MutableSharedFlow<Update>(extraBufferCapacity = broadcastChannelsSize, onBufferOverflow = onBufferOverflow)
@Suppress("MemberVisibilityCanBePrivate") @Suppress("MemberVisibilityCanBePrivate")
override val allUpdatesFlow: Flow<Update> = updatesSharedFlow.asSharedFlow() override val allUpdatesFlow: Flow<Update> = updatesSharedFlow.asSharedFlow()
@Suppress("MemberVisibilityCanBePrivate") @Suppress("MemberVisibilityCanBePrivate")

View File

@ -1,7 +1,6 @@
package dev.inmo.tgbotapi.extensions.behaviour_builder package dev.inmo.tgbotapi.extensions.behaviour_builder
import dev.inmo.micro_utils.coroutines.subscribeSafelyWithoutExceptions import dev.inmo.micro_utils.coroutines.*
import dev.inmo.micro_utils.coroutines.weakLaunch
import dev.inmo.tgbotapi.bot.TelegramBot import dev.inmo.tgbotapi.bot.TelegramBot
import dev.inmo.tgbotapi.types.update.abstracts.Update import dev.inmo.tgbotapi.types.update.abstracts.Update
import dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter import dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter
@ -37,15 +36,18 @@ suspend fun <T> BehaviourContext.doInSubContextWithFlowsUpdatesFilterSetup(
newFlowsUpdatesFilterSetUp: BehaviourContextAndTypeReceiver<Unit, FlowsUpdatesFilter>?, newFlowsUpdatesFilterSetUp: BehaviourContextAndTypeReceiver<Unit, FlowsUpdatesFilter>?,
stopOnCompletion: Boolean = true, stopOnCompletion: Boolean = true,
behaviourContextReceiver: BehaviourContextReceiver<T> behaviourContextReceiver: BehaviourContextReceiver<T>
): T = supervisorScope { ): T {
val newContext = copy( return copy(
flowsUpdatesFilter = FlowsUpdatesFilter(), flowsUpdatesFilter = FlowsUpdatesFilter(),
scope = this scope = LinkedSupervisorScope()
) ).run {
newFlowsUpdatesFilterSetUp ?.let { newFlowsUpdatesFilterSetUp ?.let {
it.apply { invoke(newContext, this@doInSubContextWithFlowsUpdatesFilterSetup.flowsUpdatesFilter) } it.apply { invoke(this@run, this@doInSubContextWithFlowsUpdatesFilterSetup.flowsUpdatesFilter) }
}
withContext(coroutineContext) {
behaviourContextReceiver().also { if (stopOnCompletion) stop() }
}
} }
newContext.behaviourContextReceiver().also { if (stopOnCompletion) stop() }
} }
/** /**

View File

@ -1,5 +1,6 @@
package dev.inmo.tgbotapi.extensions.behaviour_builder.expectations package dev.inmo.tgbotapi.extensions.behaviour_builder.expectations
import dev.inmo.micro_utils.coroutines.safelyWithResult
import dev.inmo.micro_utils.coroutines.safelyWithoutExceptions import dev.inmo.micro_utils.coroutines.safelyWithoutExceptions
import dev.inmo.tgbotapi.bot.TelegramBot import dev.inmo.tgbotapi.bot.TelegramBot
import dev.inmo.tgbotapi.extensions.behaviour_builder.BehaviourContext import dev.inmo.tgbotapi.extensions.behaviour_builder.BehaviourContext
@ -38,11 +39,11 @@ suspend fun <T> FlowsUpdatesFilter.expectFlow(
filter: suspend (Update) -> List<T> filter: suspend (Update) -> List<T>
): Flow<T> { ): Flow<T> {
val flow = allUpdatesFlow.flatMapConcat { val flow = allUpdatesFlow.flatMapConcat {
val result = safelyWithoutExceptions { filter(it) } val result = safelyWithResult { filter(it) }
(if (result == null || result.isEmpty()) { (if (result.isFailure || result.getOrThrow().isEmpty()) {
if (cancelTrigger(it)) { if (cancelTrigger(it)) {
cancelRequestFactory(it) ?.also { cancelRequestFactory(it) ?.also {
safelyWithoutExceptions { bot.execute(it) } safelyWithResult { bot.execute(it) }
throw cancelledByFilterException throw cancelledByFilterException
} }
} }
@ -51,7 +52,7 @@ suspend fun <T> FlowsUpdatesFilter.expectFlow(
} }
emptyList() emptyList()
} else { } else {
result result.getOrThrow()
}).asFlow() }).asFlow()
} }
val result = if (count == null) { val result = if (count == null) {

View File

@ -28,16 +28,34 @@ class EntitiesBuilder internal constructor(
*/ */
fun build(): TextSourcesList = entities.toList() fun build(): TextSourcesList = entities.toList()
fun add(source: TextSource) { fun add(source: TextSource): EntitiesBuilder {
entitiesList.add(source) entitiesList.add(source)
return this
}
fun addAll(sources: Iterable<TextSource>): EntitiesBuilder {
entitiesList.addAll(sources)
return this
} }
operator fun TextSource.unaryPlus() = add(this) operator fun TextSource.unaryPlus() = add(this)
operator fun TextSourcesList.unaryPlus() = entitiesList.addAll(this) operator fun TextSourcesList.unaryPlus() = addAll(this)
operator fun invoke(vararg source: TextSource) = entitiesList.addAll(source) operator fun invoke(vararg source: TextSource) = addAll(source.toList())
operator fun String.unaryPlus() { operator fun String.unaryPlus(): EntitiesBuilder {
add(dev.inmo.tgbotapi.types.MessageEntity.textsources.regular(this)) add(dev.inmo.tgbotapi.types.MessageEntity.textsources.regular(this))
return this@EntitiesBuilder
}
operator fun plus(text: String) = text.unaryPlus()
operator fun plus(source: TextSource) = add(source)
operator fun plus(sources: Iterable<TextSource>) = addAll(sources)
operator fun plus(other: EntitiesBuilder) = if (other == this) {
// do nothing; assume user
this
} else {
addAll(other.entitiesList)
} }
} }

View File

@ -17,13 +17,11 @@ import io.ktor.client.features.HttpRequestTimeoutException
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
fun TelegramBot.startGettingOfUpdatesByLongPolling( fun TelegramBot.longPollingFlow(
timeoutSeconds: Seconds = 30, timeoutSeconds: Seconds = 30,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
exceptionsHandler: (ExceptionHandler<Unit>)? = null, exceptionsHandler: (ExceptionHandler<Unit>)? = null,
allowedUpdates: List<String>? = null, allowedUpdates: List<String>? = null,
updatesReceiver: UpdateReceiver<Update> ): Flow<Update> = channelFlow {
): Job = scope.launch {
var lastUpdateIdentifier: UpdateIdentifier? = null var lastUpdateIdentifier: UpdateIdentifier? = null
while (isActive) { while (isActive) {
@ -43,6 +41,12 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling(
) )
).let { originalUpdates -> ).let { originalUpdates ->
val converted = originalUpdates.convertWithMediaGroupUpdates() val converted = originalUpdates.convertWithMediaGroupUpdates()
/**
* Dirty hack for cases when the media group was retrieved not fully:
*
* We are throw out the last media group and will reretrieve it again in the next get updates
* and it will guarantee that it is full
*/
/** /**
* Dirty hack for cases when the media group was retrieved not fully: * Dirty hack for cases when the media group was retrieved not fully:
* *
@ -56,17 +60,31 @@ fun TelegramBot.startGettingOfUpdatesByLongPolling(
} }
} }
safely { safelyWithResult {
for (update in updates) { for (update in updates) {
updatesReceiver(update) send(update)
lastUpdateIdentifier = update.lastUpdateIdentifier() lastUpdateIdentifier = update.lastUpdateIdentifier()
} }
}.onFailure {
cancel(it as? CancellationException ?: return@onFailure)
} }
} }
} }
} }
fun TelegramBot.startGettingOfUpdatesByLongPolling(
timeoutSeconds: Seconds = 30,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
exceptionsHandler: (ExceptionHandler<Unit>)? = null,
allowedUpdates: List<String>? = null,
updatesReceiver: UpdateReceiver<Update>
): Job = longPollingFlow(timeoutSeconds, exceptionsHandler, allowedUpdates).subscribeSafely(
scope,
exceptionsHandler ?: defaultSafelyExceptionHandler,
updatesReceiver
)
fun TelegramBot.retrieveAccumulatedUpdates( fun TelegramBot.retrieveAccumulatedUpdates(
avoidInlineQueries: Boolean = false, avoidInlineQueries: Boolean = false,
avoidCallbackQueries: Boolean = false, avoidCallbackQueries: Boolean = false,