diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c2a8dd1970..16225991008 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## 0.10.3 +* `Coroutines`: + * Fixes in `AccumulatorFlow` + ## 0.10.2 * `Versions`: diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt index eaee88190aa..8485228026c 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt @@ -6,11 +6,12 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlin.coroutines.cancellation.CancellationException -private sealed interface AccumulatorFlowStep -private data class DataRetrievedAccumulatorFlowStep(val data: Any) : AccumulatorFlowStep -private data class SubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep -private data class UnsubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep +private sealed interface AccumulatorFlowStep +private data class DataRetrievedAccumulatorFlowStep(val data: T) : AccumulatorFlowStep +private data class SubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep +private data class UnsubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep /** * This [Flow] will have behaviour very similar to [SharedFlow], but there are several differences: @@ -26,12 +27,12 @@ class AccumulatorFlow( private val subscope = scope.LinkedSupervisorScope() private val activeData = ArrayDeque() private val dataMutex = Mutex() - private val channelsForBroadcast = mutableListOf>() + private val channelsForBroadcast = mutableListOf>() private val channelsMutex = Mutex() - private val steps = subscope.actor { step -> + private val steps = subscope.actor> { step -> when (step) { is DataRetrievedAccumulatorFlowStep -> { - if (activeData.first() === step.data) { + if (activeData.firstOrNull() === step.data) { dataMutex.withLock { activeData.removeFirst() } @@ -42,7 +43,7 @@ class AccumulatorFlow( dataMutex.withLock { val dataToSend = activeData.toList() safelyWithoutExceptions { - dataToSend.forEach { step.channel.send(it as Any) } + dataToSend.forEach { step.channel.send(it) } } } } @@ -58,24 +59,29 @@ class AccumulatorFlow( channelsMutex.withLock { channelsForBroadcast.forEach { channel -> safelyWithResult { - channel.send(it as Any) + channel.send(it) } } } } override suspend fun collectSafely(collector: FlowCollector) { - val channel = Channel(Channel.UNLIMITED, BufferOverflow.SUSPEND) + val channel = Channel(Channel.UNLIMITED, BufferOverflow.SUSPEND) steps.send(SubscribeAccumulatorFlowStep(channel)) - for (data in channel) { - try { - collector.emit(data as T) - steps.send(DataRetrievedAccumulatorFlowStep(data)) - } finally { - channel.cancel() - steps.send(UnsubscribeAccumulatorFlowStep(channel)) + val result = runCatchingSafely { + for (data in channel) { + val emitResult = runCatchingSafely { + collector.emit(data) + } + if (emitResult.isSuccess || emitResult.exceptionOrNull() is CancellationException) { + steps.send(DataRetrievedAccumulatorFlowStep(data)) + } + emitResult.getOrThrow() } } + channel.cancel() + steps.send(UnsubscribeAccumulatorFlowStep(channel)) + result.getOrThrow() } }