diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/Actor.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/Actor.kt index 01c53023e97..13478c85960 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/Actor.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/Actor.kt @@ -6,22 +6,19 @@ import kotlinx.coroutines.flow.consumeAsFlow fun CoroutineScope.actor( channelCapacity: Int = Channel.UNLIMITED, - markerFactory: suspend (T) -> Any? = { null }, block: suspend (T) -> Unit ): Channel { val channel = Channel(channelCapacity) - channel.consumeAsFlow().subscribeAsync(this, markerFactory, block) + channel.consumeAsFlow().subscribe(this, block) return channel } inline fun CoroutineScope.safeActor( channelCapacity: Int = Channel.UNLIMITED, noinline onException: ExceptionHandler = defaultSafelyExceptionHandler, - noinline markerFactory: suspend (T) -> Any? = { null }, crossinline block: suspend (T) -> Unit ): Channel = actor( - channelCapacity, - markerFactory + channelCapacity ) { safely(onException) { block(it) diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/ActorAsync.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/ActorAsync.kt new file mode 100644 index 00000000000..9964a561998 --- /dev/null +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/ActorAsync.kt @@ -0,0 +1,30 @@ +package dev.inmo.micro_utils.coroutines + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.consumeAsFlow + +fun CoroutineScope.actorAsync( + channelCapacity: Int = Channel.UNLIMITED, + markerFactory: suspend (T) -> Any? = { null }, + block: suspend (T) -> Unit +): Channel { + val channel = Channel(channelCapacity) + channel.consumeAsFlow().subscribeAsync(this, markerFactory, block) + return channel +} + +inline fun CoroutineScope.safeActorAsync( + channelCapacity: Int = Channel.UNLIMITED, + noinline onException: ExceptionHandler = defaultSafelyExceptionHandler, + noinline markerFactory: suspend (T) -> Any? = { null }, + crossinline block: suspend (T) -> Unit +): Channel = actorAsync( + channelCapacity, + markerFactory +) { + safely(onException) { + block(it) + } +} +