Merge pull request #152 from InsanusMokrassar/0.10.3

0.10.3
This commit is contained in:
InsanusMokrassar 2022-05-11 02:49:35 +06:00 committed by GitHub
commit 99dfa97958
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 19 deletions

View File

@ -1,5 +1,10 @@
# Changelog # Changelog
## 0.10.3
* `Coroutines`:
* Fixes in `AccumulatorFlow`
## 0.10.2 ## 0.10.2
* `Versions`: * `Versions`:

View File

@ -6,11 +6,12 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlin.coroutines.cancellation.CancellationException
private sealed interface AccumulatorFlowStep private sealed interface AccumulatorFlowStep<T>
private data class DataRetrievedAccumulatorFlowStep(val data: Any) : AccumulatorFlowStep private data class DataRetrievedAccumulatorFlowStep<T>(val data: T) : AccumulatorFlowStep<T>
private data class SubscribeAccumulatorFlowStep(val channel: Channel<Any>) : AccumulatorFlowStep private data class SubscribeAccumulatorFlowStep<T>(val channel: Channel<T>) : AccumulatorFlowStep<T>
private data class UnsubscribeAccumulatorFlowStep(val channel: Channel<Any>) : AccumulatorFlowStep private data class UnsubscribeAccumulatorFlowStep<T>(val channel: Channel<T>) : AccumulatorFlowStep<T>
/** /**
* This [Flow] will have behaviour very similar to [SharedFlow], but there are several differences: * This [Flow] will have behaviour very similar to [SharedFlow], but there are several differences:
@ -26,12 +27,12 @@ class AccumulatorFlow<T>(
private val subscope = scope.LinkedSupervisorScope() private val subscope = scope.LinkedSupervisorScope()
private val activeData = ArrayDeque<T>() private val activeData = ArrayDeque<T>()
private val dataMutex = Mutex() private val dataMutex = Mutex()
private val channelsForBroadcast = mutableListOf<Channel<Any>>() private val channelsForBroadcast = mutableListOf<Channel<T>>()
private val channelsMutex = Mutex() private val channelsMutex = Mutex()
private val steps = subscope.actor<AccumulatorFlowStep> { step -> private val steps = subscope.actor<AccumulatorFlowStep<T>> { step ->
when (step) { when (step) {
is DataRetrievedAccumulatorFlowStep -> { is DataRetrievedAccumulatorFlowStep -> {
if (activeData.first() === step.data) { if (activeData.firstOrNull() === step.data) {
dataMutex.withLock { dataMutex.withLock {
activeData.removeFirst() activeData.removeFirst()
} }
@ -42,7 +43,7 @@ class AccumulatorFlow<T>(
dataMutex.withLock { dataMutex.withLock {
val dataToSend = activeData.toList() val dataToSend = activeData.toList()
safelyWithoutExceptions { safelyWithoutExceptions {
dataToSend.forEach { step.channel.send(it as Any) } dataToSend.forEach { step.channel.send(it) }
} }
} }
} }
@ -58,24 +59,29 @@ class AccumulatorFlow<T>(
channelsMutex.withLock { channelsMutex.withLock {
channelsForBroadcast.forEach { channel -> channelsForBroadcast.forEach { channel ->
safelyWithResult { safelyWithResult {
channel.send(it as Any) channel.send(it)
} }
} }
} }
} }
override suspend fun collectSafely(collector: FlowCollector<T>) { override suspend fun collectSafely(collector: FlowCollector<T>) {
val channel = Channel<Any>(Channel.UNLIMITED, BufferOverflow.SUSPEND) val channel = Channel<T>(Channel.UNLIMITED, BufferOverflow.SUSPEND)
steps.send(SubscribeAccumulatorFlowStep(channel)) steps.send(SubscribeAccumulatorFlowStep(channel))
val result = runCatchingSafely {
for (data in channel) { for (data in channel) {
try { val emitResult = runCatchingSafely {
collector.emit(data as T) collector.emit(data)
}
if (emitResult.isSuccess || emitResult.exceptionOrNull() is CancellationException) {
steps.send(DataRetrievedAccumulatorFlowStep(data)) steps.send(DataRetrievedAccumulatorFlowStep(data))
} finally { }
emitResult.getOrThrow()
}
}
channel.cancel() channel.cancel()
steps.send(UnsubscribeAccumulatorFlowStep(channel)) steps.send(UnsubscribeAccumulatorFlowStep(channel))
} result.getOrThrow()
}
} }
} }

View File

@ -14,5 +14,5 @@ crypto_js_version=4.1.1
# Project data # Project data
group=dev.inmo group=dev.inmo
version=0.10.2 version=0.10.3
android_code_version=117 android_code_version=118