diff --git a/CHANGELOG.md b/CHANGELOG.md index 892f2c75e66..a2ddad728a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## 0.5.14 + +* `Versions` + * `Kotlin`: `1.5.10` -> `1.5.20` +* `Coroutines` + * `subscribeSafelyWithoutExceptions` got new parameter `onException` by analogue with `safelyWithoutExceptions` + * New extensions `Flow#subscribeAsync` and subsequent analogs of `subscribe` with opportunity to set up custom marker + ## 0.5.13 * `Common`: diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscription.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscription.kt index 76546375293..fddcc64ac4d 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscription.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscription.kt @@ -4,6 +4,8 @@ package dev.inmo.micro_utils.coroutines import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock /** * Shortcut for chain if [Flow.onEach] and [Flow.launchIn] @@ -29,9 +31,10 @@ inline fun Flow.subscribeSafely( */ inline fun Flow.subscribeSafelyWithoutExceptions( scope: CoroutineScope, + noinline onException: ExceptionHandler = defaultSafelyWithoutExceptionHandlerWithNull, noinline block: suspend (T) -> Unit ) = subscribe(scope) { - safelyWithoutExceptions { + safelyWithoutExceptions(onException) { block(it) } } diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscriptionAsync.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscriptionAsync.kt new file mode 100644 index 00000000000..0a3019f6009 --- /dev/null +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscriptionAsync.kt @@ -0,0 +1,118 @@ +package dev.inmo.micro_utils.coroutines + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +private class SubscribeAsyncReceiver( + val scope: CoroutineScope, + output: suspend SubscribeAsyncReceiver.(T) -> Unit +) { + private val dataChannel: Channel = Channel(Channel.UNLIMITED) + val channel: SendChannel + get() = dataChannel + + init { + scope.launchSafelyWithoutExceptions { + for (data in dataChannel) { + output(data) + } + } + } + + fun isEmpty(): Boolean = dataChannel.isEmpty +} + +private sealed interface AsyncSubscriptionCommand { + suspend operator fun invoke(markersMap: MutableMap>) +} +private data class AsyncSubscriptionCommandData( + val data: T, + val scope: CoroutineScope, + val markerFactory: suspend (T) -> M, + val block: suspend (T) -> Unit, + val onEmpty: suspend (M) -> Unit +) : AsyncSubscriptionCommand { + override suspend fun invoke(markersMap: MutableMap>) { + val marker = markerFactory(data) + markersMap.getOrPut(marker) { + SubscribeAsyncReceiver(scope) { + safelyWithoutExceptions { block(it) } + if (isEmpty()) { + onEmpty(marker) + } + } + }.channel.send(data) + } +} + +private data class AsyncSubscriptionCommandClearReceiver( + val marker: M +) : AsyncSubscriptionCommand { + override suspend fun invoke(markersMap: MutableMap>) { + val receiver = markersMap[marker] + if (receiver ?.isEmpty() == true) { + markersMap.remove(marker) + receiver.scope.cancel() + } + } +} + +fun Flow.subscribeAsync( + scope: CoroutineScope, + markerFactory: suspend (T) -> M, + block: suspend (T) -> Unit +): Job { + val subscope = scope.LinkedSupervisorScope() + val markersMap = mutableMapOf>() + val actor = subscope.actor>(Channel.UNLIMITED) { + it.invoke(markersMap) + } + + val job = subscribeSafelyWithoutExceptions(subscope) { data -> + val dataCommand = AsyncSubscriptionCommandData(data, subscope, markerFactory, block) { marker -> + actor.send( + AsyncSubscriptionCommandClearReceiver(marker) + ) + } + actor.send(dataCommand) + } + + job.invokeOnCompletion { subscope.cancel() } + + return job +} + +inline fun Flow.subscribeSafelyAsync( + scope: CoroutineScope, + noinline markerFactory: suspend (T) -> M, + noinline onException: ExceptionHandler = defaultSafelyExceptionHandler, + noinline block: suspend (T) -> Unit +) = subscribeAsync(scope, markerFactory) { + safely(onException) { + block(it) + } +} + +inline fun Flow.subscribeSafelyWithoutExceptionsAsync( + scope: CoroutineScope, + noinline markerFactory: suspend (T) -> M, + noinline onException: ExceptionHandler = defaultSafelyWithoutExceptionHandlerWithNull, + noinline block: suspend (T) -> Unit +) = subscribeAsync(scope, markerFactory) { + safelyWithoutExceptions(onException) { + block(it) + } +} + +inline fun Flow.subscribeSafelySkippingExceptionsAsync( + scope: CoroutineScope, + noinline markerFactory: suspend (T) -> M, + noinline block: suspend (T) -> Unit +) = subscribeAsync(scope, markerFactory) { + safelyWithoutExceptions({ /* do nothing */}) { + block(it) + } +} diff --git a/gradle.properties b/gradle.properties index b853266f659..b9a05f40b22 100644 --- a/gradle.properties +++ b/gradle.properties @@ -7,7 +7,7 @@ android.useAndroidX=true android.enableJetifier=true org.gradle.jvmargs=-Xmx2g -kotlin_version=1.5.10 +kotlin_version=1.5.20 kotlin_coroutines_version=1.5.0 kotlin_serialisation_core_version=1.2.1 kotlin_exposed_version=0.32.1 @@ -45,5 +45,5 @@ dokka_version=1.4.32 # Project data group=dev.inmo -version=0.5.13 -android_code_version=54 +version=0.5.14 +android_code_version=55