make actor not async

This commit is contained in:
InsanusMokrassar 2022-09-15 01:11:20 +06:00
parent 3870db1c88
commit f8b8626859
2 changed files with 32 additions and 5 deletions

View File

@ -6,22 +6,19 @@ import kotlinx.coroutines.flow.consumeAsFlow
fun <T> CoroutineScope.actor( fun <T> CoroutineScope.actor(
channelCapacity: Int = Channel.UNLIMITED, channelCapacity: Int = Channel.UNLIMITED,
markerFactory: suspend (T) -> Any? = { null },
block: suspend (T) -> Unit block: suspend (T) -> Unit
): Channel<T> { ): Channel<T> {
val channel = Channel<T>(channelCapacity) val channel = Channel<T>(channelCapacity)
channel.consumeAsFlow().subscribeAsync(this, markerFactory, block) channel.consumeAsFlow().subscribe(this, block)
return channel return channel
} }
inline fun <T> CoroutineScope.safeActor( inline fun <T> CoroutineScope.safeActor(
channelCapacity: Int = Channel.UNLIMITED, channelCapacity: Int = Channel.UNLIMITED,
noinline onException: ExceptionHandler<Unit> = defaultSafelyExceptionHandler, noinline onException: ExceptionHandler<Unit> = defaultSafelyExceptionHandler,
noinline markerFactory: suspend (T) -> Any? = { null },
crossinline block: suspend (T) -> Unit crossinline block: suspend (T) -> Unit
): Channel<T> = actor( ): Channel<T> = actor(
channelCapacity, channelCapacity
markerFactory
) { ) {
safely(onException) { safely(onException) {
block(it) block(it)

View File

@ -0,0 +1,30 @@
package dev.inmo.micro_utils.coroutines
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.consumeAsFlow
fun <T> CoroutineScope.actorAsync(
channelCapacity: Int = Channel.UNLIMITED,
markerFactory: suspend (T) -> Any? = { null },
block: suspend (T) -> Unit
): Channel<T> {
val channel = Channel<T>(channelCapacity)
channel.consumeAsFlow().subscribeAsync(this, markerFactory, block)
return channel
}
inline fun <T> CoroutineScope.safeActorAsync(
channelCapacity: Int = Channel.UNLIMITED,
noinline onException: ExceptionHandler<Unit> = defaultSafelyExceptionHandler,
noinline markerFactory: suspend (T) -> Any? = { null },
crossinline block: suspend (T) -> Unit
): Channel<T> = actor(
channelCapacity,
markerFactory
) {
safely(onException) {
block(it)
}
}