package dev.inmo.tgbotapi.extensions.utils import dev.inmo.micro_utils.coroutines.safely import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.BroadcastChannel import kotlinx.coroutines.flow.* /** * Analog of [merge] function for [Flow]s. The difference is in the usage of [BroadcastChannel] in this case */ fun aggregateFlows( withScope: CoroutineScope, vararg flows: Flow, internalBufferSize: Int = 64 ): Flow { val sharedFlow = MutableSharedFlow(extraBufferCapacity = internalBufferSize) flows.forEach { it.onEach { safely { sharedFlow.emit(it) } }.launchIn(withScope) } return sharedFlow } fun Flow>.flatten(): Flow = flow { collect { it.forEach { emit(it) } } } fun Flow.flatMap(mapper: suspend (T) -> Iterable): Flow = flow { collect { mapper(it).forEach { emit(it) } } }