1
0
mirror of https://github.com/InsanusMokrassar/TelegramBotAPI.git synced 2024-11-22 16:23:48 +00:00

fix of updates funnel in subsequent behaviour contexts

This commit is contained in:
InsanusMokrassar 2022-05-21 21:44:27 +06:00
parent 562e8c7429
commit 27a24a18a6
3 changed files with 65 additions and 29 deletions

View File

@ -49,6 +49,19 @@ interface BehaviourContext : FlowsUpdatesFilter, TelegramBot, CoroutineScope {
val triggersHolder: TriggersHolder val triggersHolder: TriggersHolder
fun copy(
bot: TelegramBot = this.bot,
scope: CoroutineScope = this.scope,
broadcastChannelsSize: Int = 100,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
upstreamUpdatesFlow: Flow<Update>? = null,
triggersHolder: TriggersHolder = TriggersHolder()
): BehaviourContext
/**
* @param updatesFilter unused
*/
@Deprecated("Do not use this method")
fun copy( fun copy(
bot: TelegramBot = this.bot, bot: TelegramBot = this.bot,
scope: CoroutineScope = this.scope, scope: CoroutineScope = this.scope,
@ -57,7 +70,7 @@ interface BehaviourContext : FlowsUpdatesFilter, TelegramBot, CoroutineScope {
upstreamUpdatesFlow: Flow<Update>? = null, upstreamUpdatesFlow: Flow<Update>? = null,
triggersHolder: TriggersHolder = TriggersHolder(), triggersHolder: TriggersHolder = TriggersHolder(),
updatesFilter: BehaviourContextAndTypeReceiver<Boolean, Update>? = null updatesFilter: BehaviourContextAndTypeReceiver<Boolean, Update>? = null
): BehaviourContext ): BehaviourContext = copy(bot, scope, broadcastChannelsSize, onBufferOverflow, upstreamUpdatesFlow, triggersHolder)
} }
class DefaultBehaviourContext( class DefaultBehaviourContext(
@ -67,6 +80,7 @@ class DefaultBehaviourContext(
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
private val upstreamUpdatesFlow: Flow<Update>? = null, private val upstreamUpdatesFlow: Flow<Update>? = null,
override val triggersHolder: TriggersHolder = TriggersHolder(), override val triggersHolder: TriggersHolder = TriggersHolder(),
@Deprecated("This method is not used anymore")
private val updatesFilter: BehaviourContextAndTypeReceiver<Boolean, Update>? = null private val updatesFilter: BehaviourContextAndTypeReceiver<Boolean, Update>? = null
) : AbstractFlowsUpdatesFilter(), TelegramBot by bot, CoroutineScope by scope, BehaviourContext { ) : AbstractFlowsUpdatesFilter(), TelegramBot by bot, CoroutineScope by scope, BehaviourContext {
@ -80,13 +94,6 @@ class DefaultBehaviourContext(
} else { } else {
it it
} }
}.let {
val updatesFilter = updatesFilter
if (updatesFilter != null) {
it.filter { updatesFilter(it) }
} else {
it
}
}.accumulatorFlow(scope) }.accumulatorFlow(scope)
override val asUpdateReceiver: UpdateReceiver<Update> = additionalUpdatesSharedFlow::emit override val asUpdateReceiver: UpdateReceiver<Update> = additionalUpdatesSharedFlow::emit
@ -96,9 +103,8 @@ class DefaultBehaviourContext(
broadcastChannelsSize: Int, broadcastChannelsSize: Int,
onBufferOverflow: BufferOverflow, onBufferOverflow: BufferOverflow,
upstreamUpdatesFlow: Flow<Update>?, upstreamUpdatesFlow: Flow<Update>?,
triggersHolder: TriggersHolder, triggersHolder: TriggersHolder
updatesFilter: BehaviourContextAndTypeReceiver<Boolean, Update>? ): DefaultBehaviourContext = DefaultBehaviourContext(bot, scope, broadcastChannelsSize, onBufferOverflow, upstreamUpdatesFlow, triggersHolder)
): DefaultBehaviourContext = DefaultBehaviourContext(bot, scope, broadcastChannelsSize, onBufferOverflow, upstreamUpdatesFlow, triggersHolder, updatesFilter)
} }
fun BehaviourContext( fun BehaviourContext(
@ -116,24 +122,34 @@ inline fun <T> BehaviourContext(
crossinline block: BehaviourContext.() -> T crossinline block: BehaviourContext.() -> T
) = DefaultBehaviourContext(bot, scope, upstreamUpdatesFlow = flowsUpdatesFilter.allUpdatesFlow, triggersHolder = triggersHolder).run(block) ) = DefaultBehaviourContext(bot, scope, upstreamUpdatesFlow = flowsUpdatesFilter.allUpdatesFlow, triggersHolder = triggersHolder).run(block)
/**
* Creates new [BehaviourContext] using its [BehaviourContext.copy] method
*
* @param updatesFilter This param will not be used anymore
*/
fun <BC : BehaviourContext> BC.createSubContext( fun <BC : BehaviourContext> BC.createSubContext(
scope: CoroutineScope = LinkedSupervisorScope(), scope: CoroutineScope = LinkedSupervisorScope(),
triggersHolder: TriggersHolder = this.triggersHolder, triggersHolder: TriggersHolder = this.triggersHolder,
updatesUpstreamFlow: Flow<Update> = allUpdatesFlow, updatesUpstreamFlow: Flow<Update> = allUpdatesFlow,
updatesFilter: CustomBehaviourContextAndTypeReceiver<BC, Boolean, Update>? = null,
) = copy( ) = copy(
scope = scope, scope = scope,
updatesFilter = updatesFilter ?.let { _ ->
{
(this as? BC) ?.run {
updatesFilter(it)
} ?: true
}
},
upstreamUpdatesFlow = updatesUpstreamFlow, upstreamUpdatesFlow = updatesUpstreamFlow,
triggersHolder = triggersHolder triggersHolder = triggersHolder
) as BC ) as BC
/**
* Creates new [BehaviourContext] using its [BehaviourContext.copy] method
*
* @param updatesFilter This param will not be used anymore
*/
@Deprecated("It is not recommended to use updates filter anymore")
fun <BC : BehaviourContext> BC.createSubContext(
scope: CoroutineScope = LinkedSupervisorScope(),
triggersHolder: TriggersHolder = this.triggersHolder,
updatesUpstreamFlow: Flow<Update> = allUpdatesFlow,
updatesFilter: CustomBehaviourContextAndTypeReceiver<BC, Boolean, Update>?,
) = createSubContext(scope, triggersHolder, updatesUpstreamFlow)
/** /**
* Launch [behaviourContextReceiver] in context of [this] as [BehaviourContext] and as [kotlin.coroutines.CoroutineContext] * Launch [behaviourContextReceiver] in context of [this] as [BehaviourContext] and as [kotlin.coroutines.CoroutineContext]
* *
@ -158,21 +174,40 @@ suspend fun <T, BC : BehaviourContext> BC.createSubContextAndDoWithUpdatesFilter
scope: CoroutineScope = LinkedSupervisorScope(), scope: CoroutineScope = LinkedSupervisorScope(),
triggersHolder: TriggersHolder = this.triggersHolder, triggersHolder: TriggersHolder = this.triggersHolder,
updatesUpstreamFlow: Flow<Update> = allUpdatesFlow, updatesUpstreamFlow: Flow<Update> = allUpdatesFlow,
updatesFilter: CustomBehaviourContextAndTypeReceiver<BC, Boolean, Update>? = null,
stopOnCompletion: Boolean = true, stopOnCompletion: Boolean = true,
behaviourContextReceiver: CustomBehaviourContextReceiver<BC, T> behaviourContextReceiver: CustomBehaviourContextReceiver<BC, T>
): T { ): T {
return createSubContext( return createSubContext(
scope, scope,
triggersHolder, triggersHolder,
updatesUpstreamFlow, updatesUpstreamFlow
updatesFilter
).doInContext( ).doInContext(
stopOnCompletion, stopOnCompletion,
behaviourContextReceiver behaviourContextReceiver
) )
} }
/**
* Creates new one [BehaviourContext] using [createSubContext] and launches [behaviourContextReceiver] in a new context
* using [doInContext]
*
* @param stopOnCompletion ___TRUE BY DEFAULT___
* @param updatesFilter Is not used anymore
*/
@Deprecated("It is not recommended to use updates filter anymore")
suspend fun <T, BC : BehaviourContext> BC.createSubContextAndDoWithUpdatesFilter(
scope: CoroutineScope = LinkedSupervisorScope(),
triggersHolder: TriggersHolder = this.triggersHolder,
updatesUpstreamFlow: Flow<Update> = allUpdatesFlow,
updatesFilter: CustomBehaviourContextAndTypeReceiver<BC, Boolean, Update>?,
stopOnCompletion: Boolean = true,
behaviourContextReceiver: CustomBehaviourContextReceiver<BC, T>
): T {
return createSubContextAndDoWithUpdatesFilter(
scope, triggersHolder, updatesUpstreamFlow, stopOnCompletion, behaviourContextReceiver
)
}
/** /**
* Creates new one [BehaviourContext] using [createSubContext] and launches [behaviourContextReceiver] in a new context * Creates new one [BehaviourContext] using [createSubContext] and launches [behaviourContextReceiver] in a new context
* using [doInContext] * using [doInContext]

View File

@ -22,8 +22,8 @@ private suspend fun <O> BehaviourContext.waitCallbackQueries(
count, count,
errorFactory errorFactory
) { ) {
val data = it.asCallbackQueryUpdate() ?.data val data = it.asCallbackQueryUpdate() ?.data ?: return@expectFlow emptyList()
if (data != null && (filter == null || filter(data))) { if (filter == null || filter(data)) {
data.mapper().let(::listOfNotNull) data.mapper().let(::listOfNotNull)
} else { } else {
emptyList() emptyList()

View File

@ -15,16 +15,17 @@ internal suspend inline fun <BC : BehaviourContext, reified T> BC.on(
noinline updateToData: (Update) -> List<T>? noinline updateToData: (Update) -> List<T>?
) = flowsUpdatesFilter.expectFlow(bot) { ) = flowsUpdatesFilter.expectFlow(bot) {
updateToData(it) ?.mapNotNull { data -> updateToData(it) ?.mapNotNull { data ->
if (initialFilter ?.invoke(data) != false) data else null if (initialFilter ?.invoke(data) != false) it to data else null
} ?: emptyList() } ?: emptyList()
}.subscribeSafelyWithoutExceptionsAsync( }.subscribeSafelyWithoutExceptionsAsync(
scope, scope,
markerFactory::invoke { markerFactory(it.second) }
) { triggerData -> ) { (update, triggerData) ->
createSubContextAndDoWithUpdatesFilter( createSubContextAndDoWithUpdatesFilter(
updatesFilter = subcontextUpdatesFilter ?.toOneType(triggerData),
stopOnCompletion = false stopOnCompletion = false
) { ) {
if (subcontextUpdatesFilter ?.invoke(this, triggerData, update) != false) {
scenarioReceiver(triggerData) scenarioReceiver(triggerData)
} }
} }
}