From 63b2bd61b5f298b6e1b01963dc9cb021d8a1ed3a Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Mon, 10 Aug 2020 11:53:54 +0600 Subject: [PATCH] add aggregation of flows --- .../extensions/utils/FlowsAggregation.kt | 20 +++++++++ .../utils/shortcuts/FlowsUpdatesFilter.kt | 45 +++++++------------ 2 files changed, 36 insertions(+), 29 deletions(-) create mode 100644 TelegramBotAPI-extensions-utils/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/utils/FlowsAggregation.kt diff --git a/TelegramBotAPI-extensions-utils/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/utils/FlowsAggregation.kt b/TelegramBotAPI-extensions-utils/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/utils/FlowsAggregation.kt new file mode 100644 index 0000000000..37333b2cec --- /dev/null +++ b/TelegramBotAPI-extensions-utils/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/utils/FlowsAggregation.kt @@ -0,0 +1,20 @@ +package com.github.insanusmokrassar.TelegramBotAPI.extensions.utils + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.* + +fun aggregateFlows( + withScope: CoroutineScope, + vararg flows: Flow, + internalBufferSize: Int = Channel.BUFFERED +): Flow { + val bc = BroadcastChannel(internalBufferSize) + flows.forEach { + it.onEach { + safely { bc.send(it) } + }.launchIn(withScope) + } + return bc.asFlow() +} diff --git a/TelegramBotAPI-extensions-utils/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/utils/shortcuts/FlowsUpdatesFilter.kt b/TelegramBotAPI-extensions-utils/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/utils/shortcuts/FlowsUpdatesFilter.kt index 5c51f06b06..8b02c3924f 100644 --- a/TelegramBotAPI-extensions-utils/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/utils/shortcuts/FlowsUpdatesFilter.kt +++ b/TelegramBotAPI-extensions-utils/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/utils/shortcuts/FlowsUpdatesFilter.kt @@ -1,5 +1,6 @@ package com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.shortcuts +import com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.aggregateFlows import com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.updates.asContentMessagesFlow import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.ContentMessage import com.github.insanusmokrassar.TelegramBotAPI.types.message.content.* @@ -8,10 +9,16 @@ import com.github.insanusmokrassar.TelegramBotAPI.types.message.content.media.* import com.github.insanusmokrassar.TelegramBotAPI.types.message.payments.InvoiceContent import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.* +inline fun filterForContentMessage(): suspend (ContentMessage<*>) -> ContentMessage? = { + if (it.content is T) { + it as ContentMessage + } else { + null + } +} + /** * @param scopeToIncludeChannels This parameter is required when you want to include [textMessages] for channels too. * In this case will be created new channel which will agregate messages from [FlowsUpdatesFilter.messageFlow] and @@ -21,34 +28,14 @@ import kotlinx.coroutines.flow.* inline fun FlowsUpdatesFilter.filterContentMessages( scopeToIncludeChannels: CoroutineScope? = null ): Flow> { + val filter = filterForContentMessage() return scopeToIncludeChannels ?.let { scope -> - val bc = BroadcastChannel>(Channel.BUFFERED) - channelPostFlow.asContentMessagesFlow().mapNotNull { - if (it.content is T) { - it as ContentMessage - } else { - null - } - }.onEach { - bc.send(it) - }.launchIn(scope) - messageFlow.asContentMessagesFlow().mapNotNull { - if (it.content is T) { - it as ContentMessage - } else { - null - } - }.onEach { - bc.send(it) - }.launchIn(scope) - bc.asFlow() - } ?: messageFlow.asContentMessagesFlow().mapNotNull { - if (it.content is T) { - it as ContentMessage - } else { - null - } - } + aggregateFlows( + scope, + messageFlow.asContentMessagesFlow().mapNotNull(filter), + channelPostFlow.asContentMessagesFlow().mapNotNull(filter) + ) + } ?: messageFlow.asContentMessagesFlow().mapNotNull(filter) } fun FlowsUpdatesFilter.animationMessages(