This commit is contained in:
2025-02-14 13:16:14 +06:00
parent d8ca29eab1
commit 4bfa4c32aa
15 changed files with 108 additions and 30 deletions

View File

@@ -2,6 +2,7 @@
package dev.inmo.micro_utils.coroutines
import dev.inmo.kslog.common.KSLog
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
@@ -16,6 +17,45 @@ inline fun <T> Flow<T>.subscribe(scope: CoroutineScope, noinline block: suspend
* Use [subscribe], but all [block]s will be called inside of [safely] function.
* Use [onException] to set up your reaction for [Throwable]s
*/
inline fun <T> Flow<T>.subscribeLogging(
scope: CoroutineScope,
noinline errorMessageBuilder: T.(Throwable) -> Any = { "Something web wrong" },
logger: KSLog = KSLog,
noinline block: suspend (T) -> Unit
) = subscribe(scope) {
it.runCatchingLogging(
errorMessageBuilder,
logger
) {
block(it)
}.getOrThrow()
}
/**
* Use [subscribeSafelyWithoutExceptions], but all exceptions will be passed to [defaultSafelyExceptionHandler]
*/
inline fun <T> Flow<T>.subscribeLoggingDropExceptions(
scope: CoroutineScope,
noinline errorMessageBuilder: T.(Throwable) -> Any = { "Something web wrong" },
logger: KSLog = KSLog,
noinline block: suspend (T) -> Unit
) = subscribe(scope) {
it.runCatchingLogging(
errorMessageBuilder,
logger
) {
block(it)
}
}
/**
* Use [subscribe], but all [block]s will be called inside of [safely] function.
* Use [onException] to set up your reaction for [Throwable]s
*/
@Deprecated(
"Will be removed soon due to replacement by subscribeLogging",
ReplaceWith("this.subscribeLogging(scope = scope, block = block)")
)
inline fun <T> Flow<T>.subscribeSafely(
scope: CoroutineScope,
noinline onException: ExceptionHandler<Unit> = defaultSafelyExceptionHandler,
@@ -29,6 +69,10 @@ inline fun <T> Flow<T>.subscribeSafely(
/**
* Use [subscribeSafelyWithoutExceptions], but all exceptions will be passed to [defaultSafelyExceptionHandler]
*/
@Deprecated(
"Will be removed soon due to replacement by subscribeLoggingDropExceptions",
ReplaceWith("this.subscribeLoggingDropExceptions(scope = scope, block = block)")
)
inline fun <T> Flow<T>.subscribeSafelyWithoutExceptions(
scope: CoroutineScope,
noinline onException: ExceptionHandler<T?> = defaultSafelyWithoutExceptionHandlerWithNull,
@@ -42,6 +86,10 @@ inline fun <T> Flow<T>.subscribeSafelyWithoutExceptions(
/**
* Use [subscribeSafelyWithoutExceptions], but all exceptions inside of [safely] will be skipped
*/
@Deprecated(
"Will be removed soon due to replacement by subscribeLoggingDropExceptions",
ReplaceWith("this.subscribeLoggingDropExceptions(scope = scope, block = block)")
)
inline fun <T> Flow<T>.subscribeSafelySkippingExceptions(
scope: CoroutineScope,
noinline block: suspend (T) -> Unit

View File

@@ -15,7 +15,7 @@ private class SubscribeAsyncReceiver<T>(
get() = dataChannel
init {
scope.launchSafelyWithoutExceptions {
scope.launchLoggingDropExceptions {
for (data in dataChannel) {
output(data)
}

View File

@@ -7,49 +7,49 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
fun CoroutineScope.launchLogging(
errorMessageBuilder: () -> Any = { "Something web wrong" },
errorMessageBuilder: CoroutineScope.(Throwable) -> Any = { "Something web wrong" },
logger: KSLog = KSLog,
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
) = launch(context, start) {
runCatching { block() }.onFailure {
logger.e(it, errorMessageBuilder)
logger.e(it) { errorMessageBuilder(it) }
}.getOrThrow()
}
fun CoroutineScope.launchLoggingDropExceptions(
errorMessageBuilder: () -> Any = { "Something web wrong" },
errorMessageBuilder: CoroutineScope.(Throwable) -> Any = { "Something web wrong" },
logger: KSLog = KSLog,
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
) = launch(context, start) {
runCatching { block() }.onFailure {
logger.e(it, errorMessageBuilder)
logger.e(it) { errorMessageBuilder(it) }
} // just dropping exception
}
fun <T> CoroutineScope.asyncLogging(
errorMessageBuilder: () -> Any = { "Something web wrong" },
errorMessageBuilder: CoroutineScope.(Throwable) -> Any = { "Something web wrong" },
logger: KSLog = KSLog,
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
) = async(context, start) {
runCatching { block() }.onFailure {
logger.e(it, errorMessageBuilder)
logger.e(it) { errorMessageBuilder(it) }
}.getOrThrow()
}
fun <T> CoroutineScope.asyncLoggingDropExceptions(
errorMessageBuilder: () -> Any = { "Something web wrong" },
errorMessageBuilder: CoroutineScope.(Throwable) -> Any = { "Something web wrong" },
logger: KSLog = KSLog,
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
) = async(context, start) {
runCatching { block() }.onFailure {
logger.e(it, errorMessageBuilder)
logger.e(it) { errorMessageBuilder(it) }
}
}

View File

@@ -4,6 +4,10 @@ import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@Deprecated(
"This method will be removed soon. Use launchLogging instead",
ReplaceWith("this.launchLogging(context = context, start = start, block = block)")
)
fun CoroutineScope.launchSafely(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
@@ -15,6 +19,10 @@ fun CoroutineScope.launchSafely(
}
}
@Deprecated(
"This method will be removed soon. Use launchLoggingDropExceptions instead",
ReplaceWith("this.launchLoggingDropExceptions(context = context, start = start, block = block)")
)
fun CoroutineScope.launchSafelyWithoutExceptions(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
@@ -26,6 +34,10 @@ fun CoroutineScope.launchSafelyWithoutExceptions(
}
}
@Deprecated(
"This method will be removed soon. Use asyncLogging instead",
ReplaceWith("this.asyncLogging(context = context, start = start, block = block)")
)
fun <T> CoroutineScope.asyncSafely(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
@@ -37,6 +49,10 @@ fun <T> CoroutineScope.asyncSafely(
}
}
@Deprecated(
"This method will be removed soon. Use asyncLoggingDropExceptions instead",
ReplaceWith("this.asyncLoggingDropExceptions(context = context, start = start, block = block)")
)
fun <T> CoroutineScope.asyncSafelyWithoutExceptions(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,

View File

@@ -0,0 +1,12 @@
package dev.inmo.micro_utils.coroutines
import dev.inmo.kslog.common.KSLog
import dev.inmo.kslog.common.e
inline fun <T, R> R.runCatchingLogging(
noinline errorMessageBuilder: R.(Throwable) -> Any = { "Something web wrong" },
logger: KSLog = KSLog,
block: R.() -> T
) = runCatching(block).onFailure {
logger.e(it) { errorMessageBuilder(it) }
}

View File

@@ -128,8 +128,10 @@ open class DefaultStatesMachine <T: State>(
*/
override fun start(scope: CoroutineScope): Job {
val supervisorScope = scope.LinkedSupervisorScope()
supervisorScope.launchSafelyWithoutExceptions {
(statesManager.getActiveStates().asFlow() + statesManager.onStartChain).subscribeSafelyWithoutExceptions(supervisorScope) {
supervisorScope.launchLoggingDropExceptions {
(statesManager.getActiveStates().asFlow() + statesManager.onStartChain).subscribeSafelyWithoutExceptions(
supervisorScope
) {
supervisorScope.launch { performStateUpdate(Optional.absent(), it, supervisorScope) }
}
statesManager.onChainStateUpdated.subscribeSafelyWithoutExceptions(supervisorScope) {
@@ -140,7 +142,7 @@ open class DefaultStatesMachine <T: State>(
statesJobsMutex.withLock {
val stateInMap = statesJobs.keys.firstOrNull { stateInMap -> stateInMap == removedState }
if (stateInMap === removedState) {
statesJobs[stateInMap] ?.cancel()
statesJobs[stateInMap]?.cancel()
}
}
}

View File

@@ -2,7 +2,7 @@ package dev.inmo.micro_utils.ktor.client
import dev.inmo.micro_utils.common.MPPFile
import dev.inmo.micro_utils.coroutines.LinkedSupervisorJob
import dev.inmo.micro_utils.coroutines.launchSafelyWithoutExceptions
import dev.inmo.micro_utils.coroutines.launchLoggingDropExceptions
import dev.inmo.micro_utils.ktor.common.TemporalFileId
import io.ktor.client.HttpClient
import io.ktor.client.content.*
@@ -27,7 +27,7 @@ suspend fun tempUpload(
val request = XMLHttpRequest()
request.responseType = XMLHttpRequestResponseType.TEXT
request.upload.onprogress = {
subscope.launchSafelyWithoutExceptions { onUpload.onProgress(it.loaded.toLong(), it.total.toLong()) }
subscope.launchLoggingDropExceptions { onUpload.onProgress(it.loaded.toLong(), it.total.toLong()) }
}
request.onload = {
if (request.status == 200.toShort()) {

View File

@@ -2,7 +2,7 @@ package dev.inmo.micro_utils.ktor.client
import dev.inmo.micro_utils.common.MPPFile
import dev.inmo.micro_utils.coroutines.LinkedSupervisorJob
import dev.inmo.micro_utils.coroutines.launchSafelyWithoutExceptions
import dev.inmo.micro_utils.coroutines.launchLoggingDropExceptions
import io.ktor.client.HttpClient
import io.ktor.client.content.*
import io.ktor.http.Headers
@@ -66,7 +66,7 @@ actual suspend fun <T> HttpClient.uniUpload(
}
request.responseType = XMLHttpRequestResponseType.TEXT
request.upload.onprogress = {
subscope.launchSafelyWithoutExceptions { onUpload.onProgress(it.loaded.toLong(), it.total.toLong()) }
subscope.launchLoggingDropExceptions { onUpload.onProgress(it.loaded.toLong(), it.total.toLong()) }
}
request.onload = {
if (request.status == 200.toShort()) {

View File

@@ -3,7 +3,7 @@ package dev.inmo.micro_utils.ktor.server
import com.benasher44.uuid.uuid4
import dev.inmo.micro_utils.common.FileName
import dev.inmo.micro_utils.common.MPPFile
import dev.inmo.micro_utils.coroutines.launchSafelyWithoutExceptions
import dev.inmo.micro_utils.coroutines.launchLoggingDropExceptions
import dev.inmo.micro_utils.ktor.common.DefaultTemporalFilesSubPath
import dev.inmo.micro_utils.ktor.common.TemporalFileId
import dev.inmo.micro_utils.ktor.server.configurators.ApplicationRoutingConfigurator
@@ -44,7 +44,7 @@ class TemporalFilesRoutingConfigurator(
filesMap: MutableMap<TemporalFileId, MPPFile>,
filesMutex: Mutex,
onNewFileFlow: Flow<TemporalFileId>
): Job = scope.launchSafelyWithoutExceptions {
): Job = scope.launchLoggingDropExceptions {
while (currentCoroutineContext().isActive) {
val filesWithCreationInfo = filesMap.mapNotNull { (fileId, file) ->
fileId to ((Files.getAttribute(file.toPath(), "creationTime") as? FileTime) ?.toMillis() ?: return@mapNotNull null)

View File

@@ -2,7 +2,7 @@ package dev.inmo.micro_utils.repos.cache.full
import dev.inmo.micro_utils.common.*
import dev.inmo.micro_utils.coroutines.SmartRWLocker
import dev.inmo.micro_utils.coroutines.launchSafelyWithoutExceptions
import dev.inmo.micro_utils.coroutines.launchLoggingDropExceptions
import dev.inmo.micro_utils.coroutines.withReadAcquire
import dev.inmo.micro_utils.coroutines.withWriteLock
import dev.inmo.micro_utils.pagination.Pagination
@@ -116,7 +116,7 @@ open class FullCRUDCacheRepo<ObjectType, IdType, InputValueType>(
CRUDRepo<ObjectType, IdType, InputValueType> {
init {
if (!skipStartInvalidate) {
scope.launchSafelyWithoutExceptions {
scope.launchLoggingDropExceptions {
if (locker.writeMutex.isLocked) {
initialInvalidate()
} else {

View File

@@ -2,7 +2,7 @@ package dev.inmo.micro_utils.repos.cache.full
import dev.inmo.micro_utils.common.*
import dev.inmo.micro_utils.coroutines.SmartRWLocker
import dev.inmo.micro_utils.coroutines.launchSafelyWithoutExceptions
import dev.inmo.micro_utils.coroutines.launchLoggingDropExceptions
import dev.inmo.micro_utils.coroutines.withReadAcquire
import dev.inmo.micro_utils.coroutines.withWriteLock
import dev.inmo.micro_utils.pagination.Pagination
@@ -141,7 +141,7 @@ open class FullKeyValueCacheRepo<Key,Value>(
) {
init {
if (!skipStartInvalidate) {
scope.launchSafelyWithoutExceptions {
scope.launchLoggingDropExceptions {
if (locker.writeMutex.isLocked) {
initialInvalidate()
} else {

View File

@@ -2,7 +2,7 @@ package dev.inmo.micro_utils.repos.cache.full
import dev.inmo.micro_utils.common.*
import dev.inmo.micro_utils.coroutines.SmartRWLocker
import dev.inmo.micro_utils.coroutines.launchSafelyWithoutExceptions
import dev.inmo.micro_utils.coroutines.launchLoggingDropExceptions
import dev.inmo.micro_utils.coroutines.withReadAcquire
import dev.inmo.micro_utils.coroutines.withWriteLock
import dev.inmo.micro_utils.pagination.*
@@ -210,7 +210,7 @@ open class FullKeyValuesCacheRepo<Key,Value>(
WriteKeyValuesRepo<Key, Value> by parentRepo {
init {
if (!skipStartInvalidate) {
scope.launchSafelyWithoutExceptions {
scope.launchLoggingDropExceptions {
if (locker.writeMutex.isLocked) {
initialInvalidate()
} else {

View File

@@ -2,7 +2,7 @@ package dev.inmo.micro_utils.repos.cache.full.direct
import dev.inmo.micro_utils.common.*
import dev.inmo.micro_utils.coroutines.SmartRWLocker
import dev.inmo.micro_utils.coroutines.launchSafelyWithoutExceptions
import dev.inmo.micro_utils.coroutines.launchLoggingDropExceptions
import dev.inmo.micro_utils.coroutines.withReadAcquire
import dev.inmo.micro_utils.coroutines.withWriteLock
import dev.inmo.micro_utils.pagination.Pagination
@@ -82,7 +82,7 @@ open class DirectFullCRUDCacheRepo<ObjectType, IdType, InputValueType>(
CRUDRepo<ObjectType, IdType, InputValueType> {
init {
if (!skipStartInvalidate) {
scope.launchSafelyWithoutExceptions {
scope.launchLoggingDropExceptions {
if (locker.writeMutex.isLocked) {
initialInvalidate()
} else {

View File

@@ -1,7 +1,7 @@
package dev.inmo.micro_utils.repos.cache.full.direct
import dev.inmo.micro_utils.coroutines.SmartRWLocker
import dev.inmo.micro_utils.coroutines.launchSafelyWithoutExceptions
import dev.inmo.micro_utils.coroutines.launchLoggingDropExceptions
import dev.inmo.micro_utils.coroutines.withReadAcquire
import dev.inmo.micro_utils.coroutines.withWriteLock
import dev.inmo.micro_utils.pagination.Pagination
@@ -117,7 +117,7 @@ open class DirectFullKeyValueCacheRepo<Key, Value>(
DirectFullReadKeyValueCacheRepo<Key, Value>(parentRepo, kvCache, locker) {
init {
if (!skipStartInvalidate) {
scope.launchSafelyWithoutExceptions {
scope.launchLoggingDropExceptions {
if (locker.writeMutex.isLocked) {
initialInvalidate()
} else {

View File

@@ -2,7 +2,7 @@ package dev.inmo.micro_utils.repos.cache.full.direct
import dev.inmo.micro_utils.common.*
import dev.inmo.micro_utils.coroutines.SmartRWLocker
import dev.inmo.micro_utils.coroutines.launchSafelyWithoutExceptions
import dev.inmo.micro_utils.coroutines.launchLoggingDropExceptions
import dev.inmo.micro_utils.coroutines.withReadAcquire
import dev.inmo.micro_utils.coroutines.withWriteLock
import dev.inmo.micro_utils.pagination.*
@@ -145,7 +145,7 @@ open class DirectFullKeyValuesCacheRepo<Key,Value>(
WriteKeyValuesRepo<Key, Value> by parentRepo {
init {
if (!skipStartInvalidate) {
scope.launchSafelyWithoutExceptions {
scope.launchLoggingDropExceptions {
if (locker.writeMutex.isLocked) {
initialInvalidate()
} else {