This commit is contained in:
2021-06-28 00:31:46 +06:00
parent 38521558a1
commit 2b7e9534f3
2 changed files with 7 additions and 4 deletions

View File

@@ -38,7 +38,7 @@ private data class AsyncSubscriptionCommandData<T, M>(
override suspend fun invoke(markersMap: MutableMap<M, SubscribeAsyncReceiver<T>>) {
val marker = markerFactory(data)
markersMap.getOrPut(marker) {
SubscribeAsyncReceiver(scope) {
SubscribeAsyncReceiver(scope.LinkedSupervisorScope()) {
safelyWithoutExceptions { block(it) }
if (isEmpty()) {
onEmpty(marker)
@@ -80,7 +80,7 @@ fun <T, M> Flow<T>.subscribeAsync(
actor.send(dataCommand)
}
job.invokeOnCompletion { subscope.cancel() }
job.invokeOnCompletion { if (subscope.isActive) subscope.cancel() }
return job
}