From 9d2b50e55da2ebadb89cf605dcf45eb7622b2c38 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Wed, 29 Nov 2023 17:13:21 +0600 Subject: [PATCH] SpecialStateFlow --- CHANGELOG.md | 3 + .../coroutines/SpecialStateFlow.kt | 74 +++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SpecialStateFlow.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 32468cd784c..f61f0144b66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## 0.20.16 +* `Coroutines`: + * Add `SpecialStateFlow` + ## 0.20.15 * `Versions`: diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SpecialStateFlow.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SpecialStateFlow.kt new file mode 100644 index 00000000000..762e85b63cf --- /dev/null +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SpecialStateFlow.kt @@ -0,0 +1,74 @@ +package dev.inmo.micro_utils.coroutines + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.internal.synchronized + +interface SpecialMutableStateFlow : MutableStateFlow { + class Default( + initialValue: T, + internalScope: CoroutineScope = CoroutineScope(Dispatchers.Default) + ) : SpecialMutableStateFlow { + private val internalSharedFlow: MutableSharedFlow = MutableSharedFlow( + replay = 0, + extraBufferCapacity = 1, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + private val publicSharedFlow: MutableSharedFlow = MutableSharedFlow( + replay = 0, + extraBufferCapacity = 1, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + + private var _value: T = initialValue + override var value: T + get() { + return _value + } + set(value) { + tryEmit(value) + } + private val job = internalSharedFlow.subscribe(internalScope) { + if (_value != it) { + _value = it + publicSharedFlow.emit(it) + } + } + + override val replayCache: List + get() = publicSharedFlow.replayCache + override val subscriptionCount: StateFlow + get() = publicSharedFlow.subscriptionCount + + init { + value = initialValue + } + + override fun compareAndSet(expect: T, update: T): Boolean { + return if (value == expect) { + tryEmit(update) + } else { + false + } + } + + @ExperimentalCoroutinesApi + override fun resetReplayCache() = publicSharedFlow.resetReplayCache() + + override fun tryEmit(value: T): Boolean { + return internalSharedFlow.tryEmit(value) + } + + override suspend fun emit(value: T) { + internalSharedFlow.emit(value) + } + + override suspend fun collect(collector: FlowCollector) = publicSharedFlow.collect(collector) + } +}