From 6d782f28c327f93234ecba8e4f6d0670dd228a43 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Tue, 27 Oct 2020 15:11:57 +0600 Subject: [PATCH] updates after coroutines version change --- CHANGELOG.md | 3 +++ .../inmo/tgbotapi/updateshandlers/FlowsUpdatesFilter.kt | 7 +++---- .../dev/inmo/tgbotapi/utils/extensions/ReceiveChannel.kt | 1 + .../dev/inmo/tgbotapi/extensions/utils/FlowsAggregation.kt | 3 ++- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dae596bb51..0c3b61ffa9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ * `Common`: * Version updates: * `Coroutines`: `1.3.9` -> `1.4.0` + * Internal broadcast channels were replaced with `SharedFlow` +* `TelegramBotAPI-extensions-utils`: + * Extension `ReceiveChannel#debounceByValue` has been deprecated ## 0.29.1 diff --git a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/updateshandlers/FlowsUpdatesFilter.kt b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/updateshandlers/FlowsUpdatesFilter.kt index 616fa4bfce..1e8adcf5f0 100644 --- a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/updateshandlers/FlowsUpdatesFilter.kt +++ b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/updateshandlers/FlowsUpdatesFilter.kt @@ -5,21 +5,20 @@ import dev.inmo.tgbotapi.types.update.* import dev.inmo.tgbotapi.types.update.MediaGroupUpdates.* import dev.inmo.tgbotapi.types.update.abstracts.UnknownUpdate import dev.inmo.tgbotapi.types.update.abstracts.Update -import kotlinx.coroutines.channels.BroadcastChannel import kotlinx.coroutines.flow.* @Suppress("EXPERIMENTAL_API_USAGE", "unused") class FlowsUpdatesFilter( broadcastChannelsSize: Int = 100 ): UpdatesFilter { - private val updatesReceivingChannel = BroadcastChannel(broadcastChannelsSize) + private val updatesSharedFlow = MutableSharedFlow(extraBufferCapacity = broadcastChannelsSize) @Suppress("MemberVisibilityCanBePrivate") - val allUpdatesFlow: Flow = updatesReceivingChannel.asFlow() + val allUpdatesFlow: Flow = updatesSharedFlow.asSharedFlow() override val allowedUpdates: List get() = ALL_UPDATES_LIST override val asUpdateReceiver: UpdateReceiver = { - updatesReceivingChannel.send(it) + updatesSharedFlow.emit(it) } val messageFlow: Flow = allUpdatesFlow.filterIsInstance() diff --git a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/utils/extensions/ReceiveChannel.kt b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/utils/extensions/ReceiveChannel.kt index fe239ed191..4094709bce 100644 --- a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/utils/extensions/ReceiveChannel.kt +++ b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/utils/extensions/ReceiveChannel.kt @@ -11,6 +11,7 @@ private sealed class DebounceAction { private data class AddValue(override val value: T) : DebounceAction() private data class RemoveJob(override val value: T, val job: Job) : DebounceAction() +@Deprecated("Unused and will be removed in next major release") fun ReceiveChannel.debounceByValue( delayMillis: Long, scope: CoroutineScope = CoroutineScope(Dispatchers.Default), diff --git a/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/FlowsAggregation.kt b/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/FlowsAggregation.kt index 3eb1b9f88b..49e717f89e 100644 --- a/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/FlowsAggregation.kt +++ b/tgbotapi.extensions.utils/src/commonMain/kotlin/dev/inmo/tgbotapi/extensions/utils/FlowsAggregation.kt @@ -13,10 +13,11 @@ fun aggregateFlows( vararg flows: Flow, internalBufferSize: Int = Channel.BUFFERED ): Flow { + val sharedFlow = MutableSharedFlow(extraBufferCapacity = internalBufferSize) val bc = BroadcastChannel(internalBufferSize) flows.forEach { it.onEach { - safely { bc.send(it) } + safely { sharedFlow.emit(it) } }.launchIn(withScope) } return bc.asFlow()