package com.github.insanusmokrassar.TelegramBotAPI.extensions.utils import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.BroadcastChannel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.* /** * Analog of [merge] function for [Flow]s. The difference is in the usage of [BroadcastChannel] in this case */ fun aggregateFlows( withScope: CoroutineScope, vararg flows: Flow, internalBufferSize: Int = Channel.BUFFERED ): Flow { val bc = BroadcastChannel(internalBufferSize) flows.forEach { it.onEach { safely { bc.send(it) } }.launchIn(withScope) } return bc.asFlow() } fun Flow>.flatMap(): Flow = flow { collect { it.forEach { emit(it) } } }