mirror of
https://github.com/InsanusMokrassar/MicroUtils.git
synced 2024-11-16 13:23:49 +00:00
commit
611f64f2e1
@ -1,5 +1,13 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 0.5.14
|
||||||
|
|
||||||
|
* `Versions`
|
||||||
|
* `Kotlin`: `1.5.10` -> `1.5.20`
|
||||||
|
* `Coroutines`
|
||||||
|
* `subscribeSafelyWithoutExceptions` got new parameter `onException` by analogue with `safelyWithoutExceptions`
|
||||||
|
* New extensions `Flow#subscribeAsync` and subsequent analogs of `subscribe` with opportunity to set up custom marker
|
||||||
|
|
||||||
## 0.5.13
|
## 0.5.13
|
||||||
|
|
||||||
* `Common`:
|
* `Common`:
|
||||||
|
@ -4,6 +4,8 @@ package dev.inmo.micro_utils.coroutines
|
|||||||
|
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.flow.*
|
import kotlinx.coroutines.flow.*
|
||||||
|
import kotlinx.coroutines.sync.Mutex
|
||||||
|
import kotlinx.coroutines.sync.withLock
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shortcut for chain if [Flow.onEach] and [Flow.launchIn]
|
* Shortcut for chain if [Flow.onEach] and [Flow.launchIn]
|
||||||
@ -29,9 +31,10 @@ inline fun <T> Flow<T>.subscribeSafely(
|
|||||||
*/
|
*/
|
||||||
inline fun <T> Flow<T>.subscribeSafelyWithoutExceptions(
|
inline fun <T> Flow<T>.subscribeSafelyWithoutExceptions(
|
||||||
scope: CoroutineScope,
|
scope: CoroutineScope,
|
||||||
|
noinline onException: ExceptionHandler<T?> = defaultSafelyWithoutExceptionHandlerWithNull,
|
||||||
noinline block: suspend (T) -> Unit
|
noinline block: suspend (T) -> Unit
|
||||||
) = subscribe(scope) {
|
) = subscribe(scope) {
|
||||||
safelyWithoutExceptions {
|
safelyWithoutExceptions(onException) {
|
||||||
block(it)
|
block(it)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,118 @@
|
|||||||
|
package dev.inmo.micro_utils.coroutines
|
||||||
|
|
||||||
|
import kotlinx.coroutines.*
|
||||||
|
import kotlinx.coroutines.channels.*
|
||||||
|
import kotlinx.coroutines.flow.*
|
||||||
|
import kotlinx.coroutines.sync.Mutex
|
||||||
|
import kotlinx.coroutines.sync.withLock
|
||||||
|
|
||||||
|
private class SubscribeAsyncReceiver<T>(
|
||||||
|
val scope: CoroutineScope,
|
||||||
|
output: suspend SubscribeAsyncReceiver<T>.(T) -> Unit
|
||||||
|
) {
|
||||||
|
private val dataChannel: Channel<T> = Channel(Channel.UNLIMITED)
|
||||||
|
val channel: SendChannel<T>
|
||||||
|
get() = dataChannel
|
||||||
|
|
||||||
|
init {
|
||||||
|
scope.launchSafelyWithoutExceptions {
|
||||||
|
for (data in dataChannel) {
|
||||||
|
output(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun isEmpty(): Boolean = dataChannel.isEmpty
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed interface AsyncSubscriptionCommand<T, M> {
|
||||||
|
suspend operator fun invoke(markersMap: MutableMap<M, SubscribeAsyncReceiver<T>>)
|
||||||
|
}
|
||||||
|
private data class AsyncSubscriptionCommandData<T, M>(
|
||||||
|
val data: T,
|
||||||
|
val scope: CoroutineScope,
|
||||||
|
val markerFactory: suspend (T) -> M,
|
||||||
|
val block: suspend (T) -> Unit,
|
||||||
|
val onEmpty: suspend (M) -> Unit
|
||||||
|
) : AsyncSubscriptionCommand<T, M> {
|
||||||
|
override suspend fun invoke(markersMap: MutableMap<M, SubscribeAsyncReceiver<T>>) {
|
||||||
|
val marker = markerFactory(data)
|
||||||
|
markersMap.getOrPut(marker) {
|
||||||
|
SubscribeAsyncReceiver(scope) {
|
||||||
|
safelyWithoutExceptions { block(it) }
|
||||||
|
if (isEmpty()) {
|
||||||
|
onEmpty(marker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.channel.send(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private data class AsyncSubscriptionCommandClearReceiver<T, M>(
|
||||||
|
val marker: M
|
||||||
|
) : AsyncSubscriptionCommand<T, M> {
|
||||||
|
override suspend fun invoke(markersMap: MutableMap<M, SubscribeAsyncReceiver<T>>) {
|
||||||
|
val receiver = markersMap[marker]
|
||||||
|
if (receiver ?.isEmpty() == true) {
|
||||||
|
markersMap.remove(marker)
|
||||||
|
receiver.scope.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun <T, M> Flow<T>.subscribeAsync(
|
||||||
|
scope: CoroutineScope,
|
||||||
|
markerFactory: suspend (T) -> M,
|
||||||
|
block: suspend (T) -> Unit
|
||||||
|
): Job {
|
||||||
|
val subscope = scope.LinkedSupervisorScope()
|
||||||
|
val markersMap = mutableMapOf<M, SubscribeAsyncReceiver<T>>()
|
||||||
|
val actor = subscope.actor<AsyncSubscriptionCommand<T, M>>(Channel.UNLIMITED) {
|
||||||
|
it.invoke(markersMap)
|
||||||
|
}
|
||||||
|
|
||||||
|
val job = subscribeSafelyWithoutExceptions(subscope) { data ->
|
||||||
|
val dataCommand = AsyncSubscriptionCommandData(data, subscope, markerFactory, block) { marker ->
|
||||||
|
actor.send(
|
||||||
|
AsyncSubscriptionCommandClearReceiver(marker)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
actor.send(dataCommand)
|
||||||
|
}
|
||||||
|
|
||||||
|
job.invokeOnCompletion { subscope.cancel() }
|
||||||
|
|
||||||
|
return job
|
||||||
|
}
|
||||||
|
|
||||||
|
inline fun <T, M> Flow<T>.subscribeSafelyAsync(
|
||||||
|
scope: CoroutineScope,
|
||||||
|
noinline markerFactory: suspend (T) -> M,
|
||||||
|
noinline onException: ExceptionHandler<Unit> = defaultSafelyExceptionHandler,
|
||||||
|
noinline block: suspend (T) -> Unit
|
||||||
|
) = subscribeAsync(scope, markerFactory) {
|
||||||
|
safely(onException) {
|
||||||
|
block(it)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
inline fun <T, M> Flow<T>.subscribeSafelyWithoutExceptionsAsync(
|
||||||
|
scope: CoroutineScope,
|
||||||
|
noinline markerFactory: suspend (T) -> M,
|
||||||
|
noinline onException: ExceptionHandler<T?> = defaultSafelyWithoutExceptionHandlerWithNull,
|
||||||
|
noinline block: suspend (T) -> Unit
|
||||||
|
) = subscribeAsync(scope, markerFactory) {
|
||||||
|
safelyWithoutExceptions(onException) {
|
||||||
|
block(it)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
inline fun <T, M> Flow<T>.subscribeSafelySkippingExceptionsAsync(
|
||||||
|
scope: CoroutineScope,
|
||||||
|
noinline markerFactory: suspend (T) -> M,
|
||||||
|
noinline block: suspend (T) -> Unit
|
||||||
|
) = subscribeAsync(scope, markerFactory) {
|
||||||
|
safelyWithoutExceptions({ /* do nothing */}) {
|
||||||
|
block(it)
|
||||||
|
}
|
||||||
|
}
|
@ -7,7 +7,7 @@ android.useAndroidX=true
|
|||||||
android.enableJetifier=true
|
android.enableJetifier=true
|
||||||
org.gradle.jvmargs=-Xmx2g
|
org.gradle.jvmargs=-Xmx2g
|
||||||
|
|
||||||
kotlin_version=1.5.10
|
kotlin_version=1.5.20
|
||||||
kotlin_coroutines_version=1.5.0
|
kotlin_coroutines_version=1.5.0
|
||||||
kotlin_serialisation_core_version=1.2.1
|
kotlin_serialisation_core_version=1.2.1
|
||||||
kotlin_exposed_version=0.32.1
|
kotlin_exposed_version=0.32.1
|
||||||
@ -45,5 +45,5 @@ dokka_version=1.4.32
|
|||||||
# Project data
|
# Project data
|
||||||
|
|
||||||
group=dev.inmo
|
group=dev.inmo
|
||||||
version=0.5.13
|
version=0.5.14
|
||||||
android_code_version=54
|
android_code_version=55
|
||||||
|
Loading…
Reference in New Issue
Block a user