add opportunity to pass logger in subscribe async

This commit is contained in:
2025-07-21 21:53:03 +06:00
parent dd0de327fc
commit ed36467600
2 changed files with 36 additions and 7 deletions

View File

@@ -2,6 +2,9 @@
## 0.26.1 ## 0.26.1
* `Coroutines`:
* Add opportunity to pass logger in subscribe async
## 0.26.0 ## 0.26.0
**WARNING!!! SINCE THIS VERSION IF YOU WANT TO USE SOME OF KSP MODULES, SET `ksp.useKSP2=false` IN YOUR `gradle.properties`** (see [gh issue 2491](https://github.com/google/ksp/issues/2491)) **WARNING!!! SINCE THIS VERSION IF YOU WANT TO USE SOME OF KSP MODULES, SET `ksp.useKSP2=false` IN YOUR `gradle.properties`** (see [gh issue 2491](https://github.com/google/ksp/issues/2491))

View File

@@ -1,5 +1,6 @@
package dev.inmo.micro_utils.coroutines package dev.inmo.micro_utils.coroutines
import dev.inmo.kslog.common.KSLog
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.* import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
@@ -8,6 +9,7 @@ import kotlinx.coroutines.sync.withLock
private class SubscribeAsyncReceiver<T>( private class SubscribeAsyncReceiver<T>(
val scope: CoroutineScope, val scope: CoroutineScope,
val logger: KSLog,
output: suspend SubscribeAsyncReceiver<T>.(T) -> Unit output: suspend SubscribeAsyncReceiver<T>.(T) -> Unit
) { ) {
private val dataChannel: Channel<T> = Channel(Channel.UNLIMITED) private val dataChannel: Channel<T> = Channel(Channel.UNLIMITED)
@@ -15,7 +17,7 @@ private class SubscribeAsyncReceiver<T>(
get() = dataChannel get() = dataChannel
init { init {
scope.launchLoggingDropExceptions { scope.launchLoggingDropExceptions(logger = logger) {
for (data in dataChannel) { for (data in dataChannel) {
output(data) output(data)
} }
@@ -33,13 +35,16 @@ private data class AsyncSubscriptionCommandData<T, M>(
val scope: CoroutineScope, val scope: CoroutineScope,
val markerFactory: suspend (T) -> M, val markerFactory: suspend (T) -> M,
val block: suspend (T) -> Unit, val block: suspend (T) -> Unit,
val logger: KSLog,
val onEmpty: suspend (M) -> Unit val onEmpty: suspend (M) -> Unit
) : AsyncSubscriptionCommand<T, M> { ) : AsyncSubscriptionCommand<T, M> {
override suspend fun invoke(markersMap: MutableMap<M, SubscribeAsyncReceiver<T>>) { override suspend fun invoke(markersMap: MutableMap<M, SubscribeAsyncReceiver<T>>) {
val marker = markerFactory(data) val marker = markerFactory(data)
markersMap.getOrPut(marker) { markersMap.getOrPut(marker) {
SubscribeAsyncReceiver(scope.LinkedSupervisorScope()) { SubscribeAsyncReceiver(scope.LinkedSupervisorScope(), logger) {
safelyWithoutExceptions { block(it) } runCatchingLogging(logger = logger) {
block(it)
}
if (isEmpty()) { if (isEmpty()) {
onEmpty(marker) onEmpty(marker)
} }
@@ -63,6 +68,7 @@ private data class AsyncSubscriptionCommandClearReceiver<T, M>(
fun <T, M> Flow<T>.subscribeAsync( fun <T, M> Flow<T>.subscribeAsync(
scope: CoroutineScope, scope: CoroutineScope,
markerFactory: suspend (T) -> M, markerFactory: suspend (T) -> M,
logger: KSLog = KSLog,
block: suspend (T) -> Unit block: suspend (T) -> Unit
): Job { ): Job {
val subscope = scope.LinkedSupervisorScope() val subscope = scope.LinkedSupervisorScope()
@@ -71,8 +77,14 @@ fun <T, M> Flow<T>.subscribeAsync(
it.invoke(markersMap) it.invoke(markersMap)
} }
val job = subscribeLoggingDropExceptions(subscope) { data -> val job = subscribeLoggingDropExceptions(subscope, logger = logger) { data ->
val dataCommand = AsyncSubscriptionCommandData(data, subscope, markerFactory, block) { marker -> val dataCommand = AsyncSubscriptionCommandData(
data = data,
scope = subscope,
markerFactory = markerFactory,
block = block,
logger = logger
) { marker ->
actor.send( actor.send(
AsyncSubscriptionCommandClearReceiver(marker) AsyncSubscriptionCommandClearReceiver(marker)
) )
@@ -85,17 +97,20 @@ fun <T, M> Flow<T>.subscribeAsync(
return job return job
} }
@Deprecated("Renamed", ReplaceWith("subscribeLoggingDropExceptionsAsync(scope, markerFactory, block = block)", "dev.inmo.micro_utils.coroutines.subscribeLoggingDropExceptionsAsync"))
fun <T, M> Flow<T>.subscribeSafelyAsync( fun <T, M> Flow<T>.subscribeSafelyAsync(
scope: CoroutineScope, scope: CoroutineScope,
markerFactory: suspend (T) -> M, markerFactory: suspend (T) -> M,
onException: ExceptionHandler<Unit> = defaultSafelyExceptionHandler, onException: ExceptionHandler<Unit> = defaultSafelyExceptionHandler,
logger: KSLog = KSLog,
block: suspend (T) -> Unit block: suspend (T) -> Unit
) = subscribeAsync(scope, markerFactory) { ) = subscribeAsync(scope, markerFactory, logger) {
safely(onException) { safely(onException) {
block(it) block(it)
} }
} }
@Deprecated("Renamed", ReplaceWith("subscribeLoggingDropExceptionsAsync(scope, markerFactory, block = block)", "dev.inmo.micro_utils.coroutines.subscribeLoggingDropExceptionsAsync"))
fun <T, M> Flow<T>.subscribeSafelyWithoutExceptionsAsync( fun <T, M> Flow<T>.subscribeSafelyWithoutExceptionsAsync(
scope: CoroutineScope, scope: CoroutineScope,
markerFactory: suspend (T) -> M, markerFactory: suspend (T) -> M,
@@ -107,11 +122,22 @@ fun <T, M> Flow<T>.subscribeSafelyWithoutExceptionsAsync(
} }
} }
fun <T, M> Flow<T>.subscribeLoggingDropExceptionsAsync(
scope: CoroutineScope,
markerFactory: suspend (T) -> M,
logger: KSLog = KSLog,
block: suspend (T) -> Unit
) = subscribeAsync(scope, markerFactory, logger) {
block(it)
}
@Deprecated("Renamed", ReplaceWith("subscribeLoggingDropExceptionsAsync(scope, markerFactory, logger, block = block)", "dev.inmo.micro_utils.coroutines.subscribeLoggingDropExceptionsAsync"))
fun <T, M> Flow<T>.subscribeSafelySkippingExceptionsAsync( fun <T, M> Flow<T>.subscribeSafelySkippingExceptionsAsync(
scope: CoroutineScope, scope: CoroutineScope,
markerFactory: suspend (T) -> M, markerFactory: suspend (T) -> M,
logger: KSLog = KSLog,
block: suspend (T) -> Unit block: suspend (T) -> Unit
) = subscribeAsync(scope, markerFactory) { ) = subscribeAsync(scope, markerFactory, logger) {
safelyWithoutExceptions({ /* do nothing */}) { safelyWithoutExceptions({ /* do nothing */}) {
block(it) block(it)
} }