add aggregation of flows

This commit is contained in:
InsanusMokrassar 2020-08-10 11:53:54 +06:00
parent ff2c70fc76
commit 63b2bd61b5
2 changed files with 36 additions and 29 deletions

View File

@ -0,0 +1,20 @@
package com.github.insanusmokrassar.TelegramBotAPI.extensions.utils
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
fun <T> aggregateFlows(
withScope: CoroutineScope,
vararg flows: Flow<T>,
internalBufferSize: Int = Channel.BUFFERED
): Flow<T> {
val bc = BroadcastChannel<T>(internalBufferSize)
flows.forEach {
it.onEach {
safely { bc.send(it) }
}.launchIn(withScope)
}
return bc.asFlow()
}

View File

@ -1,5 +1,6 @@
package com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.shortcuts
import com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.aggregateFlows
import com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.updates.asContentMessagesFlow
import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.ContentMessage
import com.github.insanusmokrassar.TelegramBotAPI.types.message.content.*
@ -8,10 +9,16 @@ import com.github.insanusmokrassar.TelegramBotAPI.types.message.content.media.*
import com.github.insanusmokrassar.TelegramBotAPI.types.message.payments.InvoiceContent
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
inline fun <reified T : MessageContent> filterForContentMessage(): suspend (ContentMessage<*>) -> ContentMessage<T>? = {
if (it.content is T) {
it as ContentMessage<T>
} else {
null
}
}
/**
* @param scopeToIncludeChannels This parameter is required when you want to include [textMessages] for channels too.
* In this case will be created new channel which will agregate messages from [FlowsUpdatesFilter.messageFlow] and
@ -21,34 +28,14 @@ import kotlinx.coroutines.flow.*
inline fun <reified T: MessageContent> FlowsUpdatesFilter.filterContentMessages(
scopeToIncludeChannels: CoroutineScope? = null
): Flow<ContentMessage<T>> {
val filter = filterForContentMessage<T>()
return scopeToIncludeChannels ?.let { scope ->
val bc = BroadcastChannel<ContentMessage<T>>(Channel.BUFFERED)
channelPostFlow.asContentMessagesFlow().mapNotNull {
if (it.content is T) {
it as ContentMessage<T>
} else {
null
}
}.onEach {
bc.send(it)
}.launchIn(scope)
messageFlow.asContentMessagesFlow().mapNotNull {
if (it.content is T) {
it as ContentMessage<T>
} else {
null
}
}.onEach {
bc.send(it)
}.launchIn(scope)
bc.asFlow()
} ?: messageFlow.asContentMessagesFlow().mapNotNull {
if (it.content is T) {
it as ContentMessage<T>
} else {
null
}
}
aggregateFlows(
scope,
messageFlow.asContentMessagesFlow().mapNotNull(filter),
channelPostFlow.asContentMessagesFlow().mapNotNull(filter)
)
} ?: messageFlow.asContentMessagesFlow().mapNotNull(filter)
}
fun FlowsUpdatesFilter.animationMessages(