From 4082f65afa0ae8d5bc6dc3dcc9305aefa4bff202 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Wed, 13 Oct 2021 13:26:39 +0600 Subject: [PATCH] AccumulatorFlow --- CHANGELOG.md | 5 +- .../micro_utils/coroutines/AccumulatorFlow.kt | 94 +++++++++++++++++++ 2 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 58b1ffb5b8f..35fa6ec5b96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,11 @@ ## 0.7.1 +* `Coroutines`: + * `Common`: + * New `Flow` - `AccumulatorFlow` * `FSM`: - * `Common` + * `Common`: * New manager `DefaultStatesManager` with `DefaultStatesManagerRepo` for abstraction of manager and storing of data info diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt new file mode 100644 index 00000000000..eaee88190aa --- /dev/null +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/AccumulatorFlow.kt @@ -0,0 +1,94 @@ +package dev.inmo.micro_utils.coroutines + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +private sealed interface AccumulatorFlowStep +private data class DataRetrievedAccumulatorFlowStep(val data: Any) : AccumulatorFlowStep +private data class SubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep +private data class UnsubscribeAccumulatorFlowStep(val channel: Channel) : AccumulatorFlowStep + +/** + * This [Flow] will have behaviour very similar to [SharedFlow], but there are several differences: + * + * * All unhandled by [FlowCollector] data will not be removed from [AccumulatorFlow] and will be sent to new + * [FlowCollector]s until anybody will handle it + * * Here there are an [activeData] where data [T] will be stored until somebody will handle it + */ +class AccumulatorFlow( + sourceDataFlow: Flow, + scope: CoroutineScope +) : AbstractFlow() { + private val subscope = scope.LinkedSupervisorScope() + private val activeData = ArrayDeque() + private val dataMutex = Mutex() + private val channelsForBroadcast = mutableListOf>() + private val channelsMutex = Mutex() + private val steps = subscope.actor { step -> + when (step) { + is DataRetrievedAccumulatorFlowStep -> { + if (activeData.first() === step.data) { + dataMutex.withLock { + activeData.removeFirst() + } + } + } + is SubscribeAccumulatorFlowStep -> channelsMutex.withLock { + channelsForBroadcast.add(step.channel) + dataMutex.withLock { + val dataToSend = activeData.toList() + safelyWithoutExceptions { + dataToSend.forEach { step.channel.send(it as Any) } + } + } + } + is UnsubscribeAccumulatorFlowStep -> channelsMutex.withLock { + channelsForBroadcast.remove(step.channel) + } + } + } + private val subscriptionJob = sourceDataFlow.subscribeSafelyWithoutExceptions(subscope) { + dataMutex.withLock { + activeData.addLast(it) + } + channelsMutex.withLock { + channelsForBroadcast.forEach { channel -> + safelyWithResult { + channel.send(it as Any) + } + } + } + } + + override suspend fun collectSafely(collector: FlowCollector) { + val channel = Channel(Channel.UNLIMITED, BufferOverflow.SUSPEND) + steps.send(SubscribeAccumulatorFlowStep(channel)) + for (data in channel) { + try { + collector.emit(data as T) + steps.send(DataRetrievedAccumulatorFlowStep(data)) + } finally { + channel.cancel() + steps.send(UnsubscribeAccumulatorFlowStep(channel)) + } + } + } +} + +/** + * Creates [AccumulatorFlow] using [this] as base [Flow] + */ +fun Flow.accumulatorFlow(scope: CoroutineScope): Flow { + return AccumulatorFlow(this, scope) +} + +/** + * Creates [AccumulatorFlow] using [this] with [receiveAsFlow] to get + */ +fun Channel.accumulatorFlow(scope: CoroutineScope): Flow { + return receiveAsFlow().accumulatorFlow(scope) +}