From e2d1c5d6a1e1de99a02dc1d522bee067c3626c3b Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Wed, 28 Oct 2020 14:17:39 +0600 Subject: [PATCH] corotoutines actualization --- CHANGELOG.md | 2 + .../coroutines/BroadcastStateFlow.kt | 46 ++++++++++++++----- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d9b508e394..c94ef7aaeeb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ * `K/JS` * Add several extensions for `Element` objects to detect that object is on screen viewport * Add several extensions for `Element` objects to detect object visibility +* `Coroutines` + * `BroadcastStateFlow` now use different strategy for getting of state and implements `replayCache` ## 0.2.2 diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/BroadcastStateFlow.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/BroadcastStateFlow.kt index f8d54aab376..48b22b4f431 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/BroadcastStateFlow.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/BroadcastStateFlow.kt @@ -5,29 +5,51 @@ import kotlinx.coroutines.channels.BroadcastChannel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.* +const val defaultBroadcastStateFlowReplayCacheSize = 1 + class BroadcastStateFlow internal constructor( parentFlow: Flow, - private val stateGetter: () -> T + initial: T, + replayCacheSize: Int = defaultBroadcastStateFlowReplayCacheSize, + replayScope: CoroutineScope ) : StateFlow, Flow by parentFlow { + private val deque = ArrayDeque(1).also { + it.add(initial) + } + override val replayCache: List + get() = deque.toList() override val value: T - get() = stateGetter() -} + get() = deque.last() -fun BroadcastChannel.asStateFlow(value: T, scope: CoroutineScope): StateFlow = asFlow().let { - var state: T = value - it.onEach { state = it }.launchIn(scope) - BroadcastStateFlow(it) { - state + init { + if (replayCacheSize < 1) { + error("Replay cache size can't be less than 1, but was $replayCacheSize") + } + parentFlow.onEach { + deque.addLast(it) + if (deque.size > replayCacheSize) { + deque.removeFirst() + } + }.launchIn(replayScope) } } -fun BroadcastChannel.asStateFlow(scope: CoroutineScope): StateFlow = asStateFlow(null, scope) +fun BroadcastChannel.asStateFlow( + value: T, + scope: CoroutineScope, + replayCacheSize: Int = defaultBroadcastStateFlowReplayCacheSize +): StateFlow = BroadcastStateFlow(asFlow(), value, replayCacheSize, scope) -fun broadcastStateFlow(initial: T, scope: CoroutineScope, channelSize: Int = Channel.BUFFERED) = BroadcastChannel( +fun BroadcastChannel.asStateFlow( + scope: CoroutineScope, + replayCacheSize: Int = defaultBroadcastStateFlowReplayCacheSize +): StateFlow = asStateFlow(null, scope, replayCacheSize) + +fun broadcastStateFlow(initial: T, scope: CoroutineScope, channelSize: Int = Channel.BUFFERED, replayCacheSize: Int = defaultBroadcastStateFlowReplayCacheSize) = BroadcastChannel( channelSize ).let { - it to it.asStateFlow(initial, scope) + it to it.asStateFlow(initial, scope, replayCacheSize) } -fun broadcastStateFlow(scope: CoroutineScope, channelSize: Int = Channel.BUFFERED) = broadcastStateFlow(null, scope, channelSize) +fun broadcastStateFlow(scope: CoroutineScope, channelSize: Int = Channel.BUFFERED, replayCacheSize: Int = defaultBroadcastStateFlowReplayCacheSize) = broadcastStateFlow(null, scope, channelSize, replayCacheSize)