From fb9e4d57fbcc7c8f48f083e62c1474eee1563d76 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Fri, 25 Jun 2021 17:19:28 +0600 Subject: [PATCH 1/3] start 0.5.14 --- CHANGELOG.md | 2 ++ gradle.properties | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 892f2c75e66..b6a42657592 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ # Changelog +## 0.5.14 + ## 0.5.13 * `Common`: diff --git a/gradle.properties b/gradle.properties index b853266f659..214e662497a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -45,5 +45,5 @@ dokka_version=1.4.32 # Project data group=dev.inmo -version=0.5.13 -android_code_version=54 +version=0.5.14 +android_code_version=55 From 59fc90e55629fe1b18592b7bf755ad36017ae017 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Sat, 26 Jun 2021 00:46:51 +0600 Subject: [PATCH 2/3] add subscribeAsync --- CHANGELOG.md | 4 + .../coroutines/FlowSubscription.kt | 5 +- .../coroutines/FlowSubscriptionAsync.kt | 118 ++++++++++++++++++ 3 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscriptionAsync.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index b6a42657592..34887ee1761 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## 0.5.14 +* `Coroutines` + * `subscribeSafelyWithoutExceptions` got new parameter `onException` by analogue with `safelyWithoutExceptions` + * New extensions `Flow#subscribeAsync` and subsequent analogs of `subscribe` with opportunity to set up custom marker + ## 0.5.13 * `Common`: diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscription.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscription.kt index 76546375293..fddcc64ac4d 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscription.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscription.kt @@ -4,6 +4,8 @@ package dev.inmo.micro_utils.coroutines import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock /** * Shortcut for chain if [Flow.onEach] and [Flow.launchIn] @@ -29,9 +31,10 @@ inline fun Flow.subscribeSafely( */ inline fun Flow.subscribeSafelyWithoutExceptions( scope: CoroutineScope, + noinline onException: ExceptionHandler = defaultSafelyWithoutExceptionHandlerWithNull, noinline block: suspend (T) -> Unit ) = subscribe(scope) { - safelyWithoutExceptions { + safelyWithoutExceptions(onException) { block(it) } } diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscriptionAsync.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscriptionAsync.kt new file mode 100644 index 00000000000..0a3019f6009 --- /dev/null +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscriptionAsync.kt @@ -0,0 +1,118 @@ +package dev.inmo.micro_utils.coroutines + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +private class SubscribeAsyncReceiver( + val scope: CoroutineScope, + output: suspend SubscribeAsyncReceiver.(T) -> Unit +) { + private val dataChannel: Channel = Channel(Channel.UNLIMITED) + val channel: SendChannel + get() = dataChannel + + init { + scope.launchSafelyWithoutExceptions { + for (data in dataChannel) { + output(data) + } + } + } + + fun isEmpty(): Boolean = dataChannel.isEmpty +} + +private sealed interface AsyncSubscriptionCommand { + suspend operator fun invoke(markersMap: MutableMap>) +} +private data class AsyncSubscriptionCommandData( + val data: T, + val scope: CoroutineScope, + val markerFactory: suspend (T) -> M, + val block: suspend (T) -> Unit, + val onEmpty: suspend (M) -> Unit +) : AsyncSubscriptionCommand { + override suspend fun invoke(markersMap: MutableMap>) { + val marker = markerFactory(data) + markersMap.getOrPut(marker) { + SubscribeAsyncReceiver(scope) { + safelyWithoutExceptions { block(it) } + if (isEmpty()) { + onEmpty(marker) + } + } + }.channel.send(data) + } +} + +private data class AsyncSubscriptionCommandClearReceiver( + val marker: M +) : AsyncSubscriptionCommand { + override suspend fun invoke(markersMap: MutableMap>) { + val receiver = markersMap[marker] + if (receiver ?.isEmpty() == true) { + markersMap.remove(marker) + receiver.scope.cancel() + } + } +} + +fun Flow.subscribeAsync( + scope: CoroutineScope, + markerFactory: suspend (T) -> M, + block: suspend (T) -> Unit +): Job { + val subscope = scope.LinkedSupervisorScope() + val markersMap = mutableMapOf>() + val actor = subscope.actor>(Channel.UNLIMITED) { + it.invoke(markersMap) + } + + val job = subscribeSafelyWithoutExceptions(subscope) { data -> + val dataCommand = AsyncSubscriptionCommandData(data, subscope, markerFactory, block) { marker -> + actor.send( + AsyncSubscriptionCommandClearReceiver(marker) + ) + } + actor.send(dataCommand) + } + + job.invokeOnCompletion { subscope.cancel() } + + return job +} + +inline fun Flow.subscribeSafelyAsync( + scope: CoroutineScope, + noinline markerFactory: suspend (T) -> M, + noinline onException: ExceptionHandler = defaultSafelyExceptionHandler, + noinline block: suspend (T) -> Unit +) = subscribeAsync(scope, markerFactory) { + safely(onException) { + block(it) + } +} + +inline fun Flow.subscribeSafelyWithoutExceptionsAsync( + scope: CoroutineScope, + noinline markerFactory: suspend (T) -> M, + noinline onException: ExceptionHandler = defaultSafelyWithoutExceptionHandlerWithNull, + noinline block: suspend (T) -> Unit +) = subscribeAsync(scope, markerFactory) { + safelyWithoutExceptions(onException) { + block(it) + } +} + +inline fun Flow.subscribeSafelySkippingExceptionsAsync( + scope: CoroutineScope, + noinline markerFactory: suspend (T) -> M, + noinline block: suspend (T) -> Unit +) = subscribeAsync(scope, markerFactory) { + safelyWithoutExceptions({ /* do nothing */}) { + block(it) + } +} From f118ebce6e9ec2392450a51fb7353236afc473d3 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Sat, 26 Jun 2021 00:50:09 +0600 Subject: [PATCH 3/3] update kotlin --- CHANGELOG.md | 2 ++ gradle.properties | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34887ee1761..a2ddad728a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## 0.5.14 +* `Versions` + * `Kotlin`: `1.5.10` -> `1.5.20` * `Coroutines` * `subscribeSafelyWithoutExceptions` got new parameter `onException` by analogue with `safelyWithoutExceptions` * New extensions `Flow#subscribeAsync` and subsequent analogs of `subscribe` with opportunity to set up custom marker diff --git a/gradle.properties b/gradle.properties index 214e662497a..b9a05f40b22 100644 --- a/gradle.properties +++ b/gradle.properties @@ -7,7 +7,7 @@ android.useAndroidX=true android.enableJetifier=true org.gradle.jvmargs=-Xmx2g -kotlin_version=1.5.10 +kotlin_version=1.5.20 kotlin_coroutines_version=1.5.0 kotlin_serialisation_core_version=1.2.1 kotlin_exposed_version=0.32.1