From ed364676003ea14a3dc21b61ad74452ddafb5da0 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Mon, 21 Jul 2025 21:53:03 +0600 Subject: [PATCH] add opportunity to pass logger in subscribe async --- CHANGELOG.md | 3 ++ .../coroutines/FlowSubscriptionAsync.kt | 40 +++++++++++++++---- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7152b6da71d..69dd1d211b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## 0.26.1 +* `Coroutines`: + * Add opportunity to pass logger in subscribe async + ## 0.26.0 **WARNING!!! SINCE THIS VERSION IF YOU WANT TO USE SOME OF KSP MODULES, SET `ksp.useKSP2=false` IN YOUR `gradle.properties`** (see [gh issue 2491](https://github.com/google/ksp/issues/2491)) diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscriptionAsync.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscriptionAsync.kt index 3f46576e8dc..b3f1dcf8565 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscriptionAsync.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowSubscriptionAsync.kt @@ -1,5 +1,6 @@ package dev.inmo.micro_utils.coroutines +import dev.inmo.kslog.common.KSLog import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* @@ -8,6 +9,7 @@ import kotlinx.coroutines.sync.withLock private class SubscribeAsyncReceiver( val scope: CoroutineScope, + val logger: KSLog, output: suspend SubscribeAsyncReceiver.(T) -> Unit ) { private val dataChannel: Channel = Channel(Channel.UNLIMITED) @@ -15,7 +17,7 @@ private class SubscribeAsyncReceiver( get() = dataChannel init { - scope.launchLoggingDropExceptions { + scope.launchLoggingDropExceptions(logger = logger) { for (data in dataChannel) { output(data) } @@ -33,13 +35,16 @@ private data class AsyncSubscriptionCommandData( val scope: CoroutineScope, val markerFactory: suspend (T) -> M, val block: suspend (T) -> Unit, + val logger: KSLog, val onEmpty: suspend (M) -> Unit ) : AsyncSubscriptionCommand { override suspend fun invoke(markersMap: MutableMap>) { val marker = markerFactory(data) markersMap.getOrPut(marker) { - SubscribeAsyncReceiver(scope.LinkedSupervisorScope()) { - safelyWithoutExceptions { block(it) } + SubscribeAsyncReceiver(scope.LinkedSupervisorScope(), logger) { + runCatchingLogging(logger = logger) { + block(it) + } if (isEmpty()) { onEmpty(marker) } @@ -63,6 +68,7 @@ private data class AsyncSubscriptionCommandClearReceiver( fun Flow.subscribeAsync( scope: CoroutineScope, markerFactory: suspend (T) -> M, + logger: KSLog = KSLog, block: suspend (T) -> Unit ): Job { val subscope = scope.LinkedSupervisorScope() @@ -71,8 +77,14 @@ fun Flow.subscribeAsync( it.invoke(markersMap) } - val job = subscribeLoggingDropExceptions(subscope) { data -> - val dataCommand = AsyncSubscriptionCommandData(data, subscope, markerFactory, block) { marker -> + val job = subscribeLoggingDropExceptions(subscope, logger = logger) { data -> + val dataCommand = AsyncSubscriptionCommandData( + data = data, + scope = subscope, + markerFactory = markerFactory, + block = block, + logger = logger + ) { marker -> actor.send( AsyncSubscriptionCommandClearReceiver(marker) ) @@ -85,17 +97,20 @@ fun Flow.subscribeAsync( return job } +@Deprecated("Renamed", ReplaceWith("subscribeLoggingDropExceptionsAsync(scope, markerFactory, block = block)", "dev.inmo.micro_utils.coroutines.subscribeLoggingDropExceptionsAsync")) fun Flow.subscribeSafelyAsync( scope: CoroutineScope, markerFactory: suspend (T) -> M, onException: ExceptionHandler = defaultSafelyExceptionHandler, + logger: KSLog = KSLog, block: suspend (T) -> Unit -) = subscribeAsync(scope, markerFactory) { +) = subscribeAsync(scope, markerFactory, logger) { safely(onException) { block(it) } } +@Deprecated("Renamed", ReplaceWith("subscribeLoggingDropExceptionsAsync(scope, markerFactory, block = block)", "dev.inmo.micro_utils.coroutines.subscribeLoggingDropExceptionsAsync")) fun Flow.subscribeSafelyWithoutExceptionsAsync( scope: CoroutineScope, markerFactory: suspend (T) -> M, @@ -107,11 +122,22 @@ fun Flow.subscribeSafelyWithoutExceptionsAsync( } } +fun Flow.subscribeLoggingDropExceptionsAsync( + scope: CoroutineScope, + markerFactory: suspend (T) -> M, + logger: KSLog = KSLog, + block: suspend (T) -> Unit +) = subscribeAsync(scope, markerFactory, logger) { + block(it) +} + +@Deprecated("Renamed", ReplaceWith("subscribeLoggingDropExceptionsAsync(scope, markerFactory, logger, block = block)", "dev.inmo.micro_utils.coroutines.subscribeLoggingDropExceptionsAsync")) fun Flow.subscribeSafelySkippingExceptionsAsync( scope: CoroutineScope, markerFactory: suspend (T) -> M, + logger: KSLog = KSLog, block: suspend (T) -> Unit -) = subscribeAsync(scope, markerFactory) { +) = subscribeAsync(scope, markerFactory, logger) { safelyWithoutExceptions({ /* do nothing */}) { block(it) }