From f8b8626859d958f69f9df71915c6c827f9265ecd Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Thu, 15 Sep 2022 01:11:20 +0600 Subject: [PATCH] make actor not async --- .../dev/inmo/micro_utils/coroutines/Actor.kt | 7 ++--- .../inmo/micro_utils/coroutines/ActorAsync.kt | 30 +++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) create mode 100644 coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/ActorAsync.kt diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/Actor.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/Actor.kt index 01c53023e97..13478c85960 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/Actor.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/Actor.kt @@ -6,22 +6,19 @@ import kotlinx.coroutines.flow.consumeAsFlow fun CoroutineScope.actor( channelCapacity: Int = Channel.UNLIMITED, - markerFactory: suspend (T) -> Any? = { null }, block: suspend (T) -> Unit ): Channel { val channel = Channel(channelCapacity) - channel.consumeAsFlow().subscribeAsync(this, markerFactory, block) + channel.consumeAsFlow().subscribe(this, block) return channel } inline fun CoroutineScope.safeActor( channelCapacity: Int = Channel.UNLIMITED, noinline onException: ExceptionHandler = defaultSafelyExceptionHandler, - noinline markerFactory: suspend (T) -> Any? = { null }, crossinline block: suspend (T) -> Unit ): Channel = actor( - channelCapacity, - markerFactory + channelCapacity ) { safely(onException) { block(it) diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/ActorAsync.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/ActorAsync.kt new file mode 100644 index 00000000000..cb7402db237 --- /dev/null +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/ActorAsync.kt @@ -0,0 +1,30 @@ +package dev.inmo.micro_utils.coroutines + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.consumeAsFlow + +fun CoroutineScope.actorAsync( + channelCapacity: Int = Channel.UNLIMITED, + markerFactory: suspend (T) -> Any? = { null }, + block: suspend (T) -> Unit +): Channel { + val channel = Channel(channelCapacity) + channel.consumeAsFlow().subscribeAsync(this, markerFactory, block) + return channel +} + +inline fun CoroutineScope.safeActorAsync( + channelCapacity: Int = Channel.UNLIMITED, + noinline onException: ExceptionHandler = defaultSafelyExceptionHandler, + noinline markerFactory: suspend (T) -> Any? = { null }, + crossinline block: suspend (T) -> Unit +): Channel = actor( + channelCapacity, + markerFactory +) { + safely(onException) { + block(it) + } +} +