diff --git a/CHANGELOG.md b/CHANGELOG.md index f7ef3f2af77..16225991008 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.10.3 + +* `Coroutines`: + * Fixes in `AccumulatorFlow` + ## 0.10.2 * `Versions`: diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt index eaee88190aa..8485228026c 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt @@ -6,11 +6,12 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlin.coroutines.cancellation.CancellationException -private sealed interface AccumulatorFlowStep -private data class DataRetrievedAccumulatorFlowStep(val data: Any) : AccumulatorFlowStep -private data class SubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep -private data class UnsubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep +private sealed interface AccumulatorFlowStep +private data class DataRetrievedAccumulatorFlowStep(val data: T) : AccumulatorFlowStep +private data class SubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep +private data class UnsubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep /** * This [Flow] will have behaviour very similar to [SharedFlow], but there are several differences: @@ -26,12 +27,12 @@ class AccumulatorFlow( private val subscope = scope.LinkedSupervisorScope() private val activeData = ArrayDeque() private val dataMutex = Mutex() - private val channelsForBroadcast = mutableListOf>() + private val channelsForBroadcast = mutableListOf>() private val channelsMutex = Mutex() - private val steps = subscope.actor { step -> + private val steps = subscope.actor> { step -> when (step) { is DataRetrievedAccumulatorFlowStep -> { - if (activeData.first() === step.data) { + if (activeData.firstOrNull() === step.data) { dataMutex.withLock { activeData.removeFirst() } @@ -42,7 +43,7 @@ class AccumulatorFlow( dataMutex.withLock { val dataToSend = activeData.toList() safelyWithoutExceptions { - dataToSend.forEach { step.channel.send(it as Any) } + dataToSend.forEach { step.channel.send(it) } } } } @@ -58,24 +59,29 @@ class AccumulatorFlow( channelsMutex.withLock { channelsForBroadcast.forEach { channel -> safelyWithResult { - channel.send(it as Any) + channel.send(it) } } } } override suspend fun collectSafely(collector: FlowCollector) { - val channel = Channel(Channel.UNLIMITED, BufferOverflow.SUSPEND) + val channel = Channel(Channel.UNLIMITED, BufferOverflow.SUSPEND) steps.send(SubscribeAccumulatorFlowStep(channel)) - for (data in channel) { - try { - collector.emit(data as T) - steps.send(DataRetrievedAccumulatorFlowStep(data)) - } finally { - channel.cancel() - steps.send(UnsubscribeAccumulatorFlowStep(channel)) + val result = runCatchingSafely { + for (data in channel) { + val emitResult = runCatchingSafely { + collector.emit(data) + } + if (emitResult.isSuccess || emitResult.exceptionOrNull() is CancellationException) { + steps.send(DataRetrievedAccumulatorFlowStep(data)) + } + emitResult.getOrThrow() } } + channel.cancel() + steps.send(UnsubscribeAccumulatorFlowStep(channel)) + result.getOrThrow() } } diff --git a/gradle.properties b/gradle.properties index ff5a543f45b..1168e45c389 100644 --- a/gradle.properties +++ b/gradle.properties @@ -14,5 +14,5 @@ crypto_js_version=4.1.1 # Project data group=dev.inmo -version=0.10.2 -android_code_version=117 +version=0.10.3 +android_code_version=118