From 542ed8103453b398dda45d8023431492f77f11df Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Wed, 11 May 2022 01:18:48 +0600 Subject: [PATCH 1/2] start 0.10.3 --- CHANGELOG.md | 2 ++ gradle.properties | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f7ef3f2af77..3c2a8dd1970 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ # Changelog +## 0.10.3 + ## 0.10.2 * `Versions`: diff --git a/gradle.properties b/gradle.properties index ff5a543f45b..1168e45c389 100644 --- a/gradle.properties +++ b/gradle.properties @@ -14,5 +14,5 @@ crypto_js_version=4.1.1 # Project data group=dev.inmo -version=0.10.2 -android_code_version=117 +version=0.10.3 +android_code_version=118 From f68270a5b3f0cc6a454e92925c21ecd919851d55 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Wed, 11 May 2022 02:38:48 +0600 Subject: [PATCH 2/2] fixes in AccumulatorFlow --- CHANGELOG.md | 3 ++ .../micro_utils/coroutines/AccumulatorFlow.kt | 40 +++++++++++-------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c2a8dd1970..16225991008 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## 0.10.3 +* `Coroutines`: + * Fixes in `AccumulatorFlow` + ## 0.10.2 * `Versions`: diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt index eaee88190aa..8485228026c 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt @@ -6,11 +6,12 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlin.coroutines.cancellation.CancellationException -private sealed interface AccumulatorFlowStep -private data class DataRetrievedAccumulatorFlowStep(val data: Any) : AccumulatorFlowStep -private data class SubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep -private data class UnsubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep +private sealed interface AccumulatorFlowStep +private data class DataRetrievedAccumulatorFlowStep(val data: T) : AccumulatorFlowStep +private data class SubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep +private data class UnsubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep /** * This [Flow] will have behaviour very similar to [SharedFlow], but there are several differences: @@ -26,12 +27,12 @@ class AccumulatorFlow( private val subscope = scope.LinkedSupervisorScope() private val activeData = ArrayDeque() private val dataMutex = Mutex() - private val channelsForBroadcast = mutableListOf>() + private val channelsForBroadcast = mutableListOf>() private val channelsMutex = Mutex() - private val steps = subscope.actor { step -> + private val steps = subscope.actor> { step -> when (step) { is DataRetrievedAccumulatorFlowStep -> { - if (activeData.first() === step.data) { + if (activeData.firstOrNull() === step.data) { dataMutex.withLock { activeData.removeFirst() } @@ -42,7 +43,7 @@ class AccumulatorFlow( dataMutex.withLock { val dataToSend = activeData.toList() safelyWithoutExceptions { - dataToSend.forEach { step.channel.send(it as Any) } + dataToSend.forEach { step.channel.send(it) } } } } @@ -58,24 +59,29 @@ class AccumulatorFlow( channelsMutex.withLock { channelsForBroadcast.forEach { channel -> safelyWithResult { - channel.send(it as Any) + channel.send(it) } } } } override suspend fun collectSafely(collector: FlowCollector) { - val channel = Channel(Channel.UNLIMITED, BufferOverflow.SUSPEND) + val channel = Channel(Channel.UNLIMITED, BufferOverflow.SUSPEND) steps.send(SubscribeAccumulatorFlowStep(channel)) - for (data in channel) { - try { - collector.emit(data as T) - steps.send(DataRetrievedAccumulatorFlowStep(data)) - } finally { - channel.cancel() - steps.send(UnsubscribeAccumulatorFlowStep(channel)) + val result = runCatchingSafely { + for (data in channel) { + val emitResult = runCatchingSafely { + collector.emit(data) + } + if (emitResult.isSuccess || emitResult.exceptionOrNull() is CancellationException) { + steps.send(DataRetrievedAccumulatorFlowStep(data)) + } + emitResult.getOrThrow() } } + channel.cancel() + steps.send(UnsubscribeAccumulatorFlowStep(channel)) + result.getOrThrow() } }