diff --git a/CHANGELOG.md b/CHANGELOG.md index bbac4a252ea..dd6ca32fe5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## 0.20.3 + +* `Versions`: + * `Compose`: `1.4.3` -> `1.5.0` + * `Exposed`: `0.42.1` -> `0.43.0` + * `Ktor`: `2.3.3` -> `2.3.4` +* `Repos`: + * `Cache`: + * Fixes in locks of caches + ## 0.20.2 * All main repos uses `SmartRWLocker` diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartRWLocker.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartRWLocker.kt index 2ca18d8d4ed..e1a6e7eb694 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartRWLocker.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartRWLocker.kt @@ -23,9 +23,7 @@ class SmartRWLocker(private val readPermits: Int = Int.MAX_VALUE, writeIsLocked: * Do lock in [readSemaphore] inside of [writeMutex] locking */ suspend fun acquireRead() { - _writeMutex.withLock { - _readSemaphore.acquire() - } + _readSemaphore.acquire() } /** @@ -39,15 +37,19 @@ class SmartRWLocker(private val readPermits: Int = Int.MAX_VALUE, writeIsLocked: * Locking [writeMutex] and wait while all [readSemaphore] permits will be freed */ suspend fun lockWrite() { + _readSemaphore.acquire(readPermits) _writeMutex.lock() - readSemaphore.waitRelease(readPermits) } /** * Unlock [writeMutex] */ suspend fun unlockWrite(): Boolean { - return _writeMutex.unlock() + return _writeMutex.unlock().also { + if (it) { + _readSemaphore.release(readPermits) + } + } } } diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartSemaphore.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartSemaphore.kt index fd072f1e415..e52c07a52c6 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartSemaphore.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartSemaphore.kt @@ -42,11 +42,11 @@ sealed interface SmartSemaphore { * Mutable variant of [SmartSemaphore]. With that variant you may [lock] and [unlock]. Besides, you may create * [Immutable] variant of [this] instance with [immutable] factory * - * @param locked Preset state of [freePermits] and its internal [_permitsStateFlow] + * @param locked Preset state of [freePermits] and its internal [_freePermitsStateFlow] */ class Mutable(private val permits: Int, acquiredPermits: Int = 0) : SmartSemaphore { - private val _permitsStateFlow = MutableStateFlow(permits - acquiredPermits) - override val permitsStateFlow: StateFlow = _permitsStateFlow.asStateFlow() + private val _freePermitsStateFlow = MutableStateFlow(permits - acquiredPermits) + override val permitsStateFlow: StateFlow = _freePermitsStateFlow.asStateFlow() private val internalChangesMutex = Mutex(false) @@ -54,19 +54,45 @@ sealed interface SmartSemaphore { private fun checkedPermits(permits: Int) = permits.coerceIn(1 .. this.permits) + /** + * Holds call until this [SmartSemaphore] will be re-locked. That means that current method will + */ + suspend fun acquire(permits: Int = 1) { + var acquiredPermits = 0 + val checkedPermits = checkedPermits(permits) + try { + do { + val shouldContinue = internalChangesMutex.withLock { + val requiredPermits = checkedPermits - acquiredPermits + val acquiring = minOf(freePermits, requiredPermits).takeIf { it > 0 } ?: return@withLock true + acquiredPermits += acquiring + _freePermitsStateFlow.value -= acquiring + + acquiredPermits != checkedPermits + } + if (shouldContinue) { + waitRelease() + } + } while (shouldContinue && currentCoroutineContext().isActive) + } catch (e: Throwable) { + release(acquiredPermits) + throw e + } + } + /** * Holds call until this [SmartSemaphore] will be re-locked. That means that while [freePermits] == true, [holds] will * wait for [freePermits] == false and then try to lock */ - suspend fun acquire(permits: Int = 1) { + suspend fun acquireByOne(permits: Int = 1) { + val checkedPermits = checkedPermits(permits) do { - val checkedPermits = checkedPermits(permits) waitRelease(checkedPermits) val shouldContinue = internalChangesMutex.withLock { - if (_permitsStateFlow.value < checkedPermits) { + if (_freePermitsStateFlow.value < checkedPermits) { true } else { - _permitsStateFlow.value -= checkedPermits + _freePermitsStateFlow.value -= checkedPermits false } } @@ -80,10 +106,10 @@ sealed interface SmartSemaphore { */ suspend fun tryAcquire(permits: Int = 1): Boolean { val checkedPermits = checkedPermits(permits) - return if (_permitsStateFlow.value < checkedPermits) { + return if (_freePermitsStateFlow.value < checkedPermits) { internalChangesMutex.withLock { - if (_permitsStateFlow.value < checkedPermits) { - _permitsStateFlow.value -= checkedPermits + if (_freePermitsStateFlow.value < checkedPermits) { + _freePermitsStateFlow.value -= checkedPermits true } else { false @@ -100,10 +126,10 @@ sealed interface SmartSemaphore { */ suspend fun release(permits: Int = 1): Boolean { val checkedPermits = checkedPermits(permits) - return if (_permitsStateFlow.value < this.permits) { + return if (_freePermitsStateFlow.value < this.permits) { internalChangesMutex.withLock { - if (_permitsStateFlow.value < this.permits) { - _permitsStateFlow.value = minOf(_permitsStateFlow.value + checkedPermits, this.permits) + if (_freePermitsStateFlow.value < this.permits) { + _freePermitsStateFlow.value = minOf(_freePermitsStateFlow.value + checkedPermits, this.permits) true } else { false diff --git a/coroutines/src/commonTest/kotlin/SmartRWLockerTests.kt b/coroutines/src/commonTest/kotlin/SmartRWLockerTests.kt index cef0ff93526..75fd251d17b 100644 --- a/coroutines/src/commonTest/kotlin/SmartRWLockerTests.kt +++ b/coroutines/src/commonTest/kotlin/SmartRWLockerTests.kt @@ -1,10 +1,6 @@ -import dev.inmo.micro_utils.coroutines.SmartRWLocker -import dev.inmo.micro_utils.coroutines.withReadAcquire -import dev.inmo.micro_utils.coroutines.withWriteLock -import kotlinx.coroutines.CoroutineStart -import kotlinx.coroutines.delay -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch +import dev.inmo.micro_utils.coroutines.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.first import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.test.runTest @@ -57,4 +53,99 @@ class SmartRWLockerTests { assertEquals(expected = readAndWriteWorkers, actual = doneWrites) } } + + @Test + fun simpleWithWriteLockTest() { + val locker = SmartRWLocker() + + runTest { + locker.withWriteLock { + assertEquals(0, locker.readSemaphore.freePermits) + assertEquals(true, locker.writeMutex.isLocked) + } + assertEquals(Int.MAX_VALUE, locker.readSemaphore.freePermits) + assertEquals(false, locker.writeMutex.isLocked) + } + } + + @Test + fun failureWithWriteLockTest() { + val locker = SmartRWLocker() + + val exception = IllegalArgumentException() + try { + runTest { + val subscope = kotlinx.coroutines.CoroutineScope(this.coroutineContext) + var happenException: Throwable? = null + try { + locker.withWriteLock { + val checkFunction = fun (): Deferred { + return subscope.async { + assertEquals(0, locker.readSemaphore.freePermits) + assertEquals(true, locker.writeMutex.isLocked) + throw exception + } + } + doInDefault { + assertEquals(0, locker.readSemaphore.freePermits) + assertEquals(true, locker.writeMutex.isLocked) + checkFunction().await() + } + } + } catch (e: Exception) { + happenException = e + } + if (exception != happenException) { + assertEquals(exception, happenException ?.cause) + } + assertEquals(Int.MAX_VALUE, locker.readSemaphore.freePermits) + assertEquals(false, locker.writeMutex.isLocked) + } + } catch (e: Exception) { + assertEquals(exception, e) + } + } + + @Test + fun simpleWithReadAcquireTest() { + val locker = SmartRWLocker() + + runTest { + locker.withReadAcquire { + assertEquals(Int.MAX_VALUE - 1, locker.readSemaphore.freePermits) + assertEquals(false, locker.writeMutex.isLocked) + locker.withReadAcquire { + assertEquals(Int.MAX_VALUE - 2, locker.readSemaphore.freePermits) + assertEquals(false, locker.writeMutex.isLocked) + } + } + assertEquals(Int.MAX_VALUE, locker.readSemaphore.freePermits) + assertEquals(false, locker.writeMutex.isLocked) + } + } + + @Test + fun simple2WithWriteLockTest() { + val locker = SmartRWLocker() + + val unlockDelay = 1000L // 1 sec + var unlocked: Boolean = false + runTest { + launch { + locker.withReadAcquire { + delay(unlockDelay) + } + unlocked = true + } + locker.readSemaphore.permitsStateFlow.first { it == Int.MAX_VALUE - 1 } + assertEquals(false, unlocked) + locker.withWriteLock { + assertEquals(true, unlocked) + assertEquals(0, locker.readSemaphore.freePermits) + assertEquals(true, locker.writeMutex.isLocked) + } + assertEquals(Int.MAX_VALUE, locker.readSemaphore.freePermits) + assertEquals(false, locker.writeMutex.isLocked) + } + } } diff --git a/gradle.properties b/gradle.properties index 24586dd8976..bdee9aa9855 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,5 +15,5 @@ crypto_js_version=4.1.1 # Project data group=dev.inmo -version=0.20.2 -android_code_version=208 +version=0.20.3 +android_code_version=209 diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index c036f67e734..3fbe99f5781 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -6,14 +6,14 @@ kt-coroutines = "1.7.3" kslog = "1.2.0" -jb-compose = "1.4.3" -jb-exposed = "0.42.1" +jb-compose = "1.5.0" +jb-exposed = "0.43.0" jb-dokka = "1.8.20" korlibs = "4.0.10" uuid = "0.8.0" -ktor = "2.3.3" +ktor = "2.3.4" gh-release = "2.4.1" diff --git a/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/CRUDCacheRepo.kt b/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/CRUDCacheRepo.kt index c17dd5c70e9..0a58b4ceffa 100644 --- a/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/CRUDCacheRepo.kt +++ b/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/CRUDCacheRepo.kt @@ -61,18 +61,28 @@ open class WriteCRUDCacheRepo( override val deletedObjectsIdsFlow: Flow by parentRepo::deletedObjectsIdsFlow val createdObjectsFlowJob = parentRepo.newObjectsFlow.onEach { - locker.withWriteLock { kvCache.set(idGetter(it), it) } + locker.withWriteLock { + kvCache.set(idGetter(it), it) + } }.launchIn(scope) val updatedObjectsFlowJob = parentRepo.updatedObjectsFlow.onEach { - locker.withWriteLock { kvCache.set(idGetter(it), it) } + locker.withWriteLock { + kvCache.set(idGetter(it), it) + } }.launchIn(scope) val deletedObjectsFlowJob = parentRepo.deletedObjectsIdsFlow.onEach { - locker.withWriteLock { kvCache.unset(it) } + locker.withWriteLock { + kvCache.unset(it) + } }.launchIn(scope) - override suspend fun deleteById(ids: List) = parentRepo.deleteById(ids) + override suspend fun deleteById(ids: List) = parentRepo.deleteById(ids).also { + locker.withWriteLock { + kvCache.unset(ids) + } + } override suspend fun update(values: List>): List { val updated = parentRepo.update(values) diff --git a/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullCRUDCacheRepo.kt b/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullCRUDCacheRepo.kt index 22b34ffec54..50d1918d07c 100644 --- a/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullCRUDCacheRepo.kt +++ b/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullCRUDCacheRepo.kt @@ -28,9 +28,18 @@ open class FullReadCRUDCacheRepo( kvCache.action().onPresented { return it } } return parentRepo.actionElse().also { - locker.withWriteLock { kvCache.actualize(it) } + kvCache.actualize(it) } } + protected suspend inline fun doOrTakeAndActualizeWithWriteLock( + action: KeyValueRepo.() -> Optional, + actionElse: ReadCRUDRepo.() -> T, + actualize: KeyValueRepo.(T) -> Unit + ): T = doOrTakeAndActualize( + action = action, + actionElse = actionElse, + actualize = { locker.withWriteLock { actualize(it) } } + ) protected open suspend fun actualizeAll() { locker.withWriteLock { kvCache.actualizeAll(parentRepo) } @@ -54,22 +63,22 @@ open class FullReadCRUDCacheRepo( { if (it != 0L) actualizeAll() } ) - override suspend fun contains(id: IdType): Boolean = doOrTakeAndActualize( + override suspend fun contains(id: IdType): Boolean = doOrTakeAndActualizeWithWriteLock( { contains(id).takeIf { it }.optionalOrAbsentIfNull }, { contains(id) }, - { if (it) parentRepo.getById(id) ?.let { set(id, it) } } + { if (it) parentRepo.getById(id) ?.let { kvCache.set(id, it) } } ) - override suspend fun getAll(): Map = doOrTakeAndActualize( + override suspend fun getAll(): Map = doOrTakeAndActualizeWithWriteLock( { getAll().takeIf { it.isNotEmpty() }.optionalOrAbsentIfNull }, { getAll() }, { kvCache.actualizeAll(clear = true) { it } } ) - override suspend fun getById(id: IdType): ObjectType? = doOrTakeAndActualize( + override suspend fun getById(id: IdType): ObjectType? = doOrTakeAndActualizeWithWriteLock( { get(id) ?.optional ?: Optional.absent() }, { getById(id) }, - { it ?.let { set(idGetter(it), it) } } + { it ?.let { kvCache.set(idGetter(it), it) } } ) override suspend fun invalidate() { diff --git a/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullKeyValueCacheRepo.kt b/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullKeyValueCacheRepo.kt index 898bab97e09..3eeb3627f70 100644 --- a/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullKeyValueCacheRepo.kt +++ b/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullKeyValueCacheRepo.kt @@ -28,20 +28,29 @@ open class FullReadKeyValueCacheRepo( kvCache.action().onPresented { return it } } return parentRepo.actionElse().also { - locker.withWriteLock { kvCache.actualize(it) } + kvCache.actualize(it) } } + protected suspend inline fun doOrTakeAndActualizeWithWriteLock( + action: KeyValueRepo.() -> Optional, + actionElse: ReadKeyValueRepo.() -> T, + actualize: KeyValueRepo.(T) -> Unit + ): T = doOrTakeAndActualize( + action = action, + actionElse = actionElse, + actualize = { locker.withWriteLock { actualize(it) } } + ) protected open suspend fun actualizeAll() { locker.withWriteLock { kvCache.clear() - kvCache.set(parentRepo.getAll { keys(it) }.toMap()) + kvCache.set(parentRepo.getAll()) } } - override suspend fun get(k: Key): Value? = doOrTakeAndActualize( + override suspend fun get(k: Key): Value? = doOrTakeAndActualizeWithWriteLock( { get(k) ?.optional ?: Optional.absent() }, { get(k) }, - { set(k, it ?: return@doOrTakeAndActualize) } + { kvCache.set(k, it ?: return@doOrTakeAndActualizeWithWriteLock) } ) override suspend fun values(pagination: Pagination, reversed: Boolean): PaginationResult = doOrTakeAndActualize( @@ -56,13 +65,13 @@ open class FullReadKeyValueCacheRepo( { if (it != 0L) actualizeAll() } ) - override suspend fun contains(key: Key): Boolean = doOrTakeAndActualize( + override suspend fun contains(key: Key): Boolean = doOrTakeAndActualizeWithWriteLock( { contains(key).takeIf { it }.optionalOrAbsentIfNull }, { contains(key) }, { if (it) parentRepo.get(key) ?.also { kvCache.set(key, it) } } ) - override suspend fun getAll(): Map = doOrTakeAndActualize( + override suspend fun getAll(): Map = doOrTakeAndActualizeWithWriteLock( { getAll().takeIf { it.isNotEmpty() }.optionalOrAbsentIfNull }, { getAll() }, { kvCache.actualizeAll(clear = true) { it } } diff --git a/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullKeyValuesCacheRepo.kt b/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullKeyValuesCacheRepo.kt index 5e1ff38e235..e139256d539 100644 --- a/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullKeyValuesCacheRepo.kt +++ b/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullKeyValuesCacheRepo.kt @@ -27,9 +27,18 @@ open class FullReadKeyValuesCacheRepo( kvCache.action().onPresented { return it } } return parentRepo.actionElse().also { - locker.withWriteLock { kvCache.actualize(it) } + kvCache.actualize(it) } } + protected suspend inline fun doOrTakeAndActualizeWithWriteLock( + action: KeyValueRepo>.() -> Optional, + actionElse: ReadKeyValuesRepo.() -> T, + actualize: KeyValueRepo>.(T) -> Unit + ): T = doOrTakeAndActualize( + action = action, + actionElse = actionElse, + actualize = { locker.withWriteLock { actualize(it) } } + ) protected open suspend fun actualizeKey(k: Key) { locker.withWriteLock { diff --git a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapCRUDRepo.kt b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapCRUDRepo.kt index c9aa60c63f4..05d019a68e6 100644 --- a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapCRUDRepo.kt +++ b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapCRUDRepo.kt @@ -65,12 +65,12 @@ abstract class WriteMapCRUDRepo( protected val map: MutableMap = mutableMapOf(), protected val locker: SmartRWLocker = SmartRWLocker() ) : WriteCRUDRepo { - protected val _newObjectsFlow: MutableSharedFlow = MutableSharedFlow() - override val newObjectsFlow: Flow = _newObjectsFlow.asSharedFlow() - protected val _updatedObjectsFlow: MutableSharedFlow = MutableSharedFlow() - override val updatedObjectsFlow: Flow = _updatedObjectsFlow.asSharedFlow() - protected val _deletedObjectsIdsFlow: MutableSharedFlow = MutableSharedFlow() - override val deletedObjectsIdsFlow: Flow = _deletedObjectsIdsFlow.asSharedFlow() + protected open val _newObjectsFlow: MutableSharedFlow = MapsReposDefaultMutableSharedFlow() + override val newObjectsFlow: Flow by lazy { _newObjectsFlow.asSharedFlow() } + protected open val _updatedObjectsFlow: MutableSharedFlow = MapsReposDefaultMutableSharedFlow() + override val updatedObjectsFlow: Flow by lazy { _updatedObjectsFlow.asSharedFlow() } + protected open val _deletedObjectsIdsFlow: MutableSharedFlow = MapsReposDefaultMutableSharedFlow() + override val deletedObjectsIdsFlow: Flow by lazy { _deletedObjectsIdsFlow.asSharedFlow() } protected abstract suspend fun updateObject(newValue: InputValueType, id: IdType, old: ObjectType): ObjectType protected abstract suspend fun createObject(newValue: InputValueType): Pair @@ -80,10 +80,10 @@ abstract class WriteMapCRUDRepo( values.map { val (id, newObject) = createObject(it) map[id] = newObject - newObject.also { _ -> - _newObjectsFlow.emit(newObject) - } + newObject } + }.onEach { + _newObjectsFlow.emit(it) } } @@ -93,8 +93,9 @@ abstract class WriteMapCRUDRepo( newValue.also { map[id] = it - _updatedObjectsFlow.emit(it) } + } ?.also { + _updatedObjectsFlow.emit(it) } } @@ -104,10 +105,10 @@ abstract class WriteMapCRUDRepo( override suspend fun deleteById(ids: List) { locker.withWriteLock { - ids.forEach { - map.remove(it) ?.also { _ -> _deletedObjectsIdsFlow.emit(it) } + ids.mapNotNull { + it.takeIf { map.remove(it) != null } } - } + }.onEach { _deletedObjectsIdsFlow.emit(it) } } } diff --git a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValueRepo.kt b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValueRepo.kt index 71ccd98c865..32b6815330f 100644 --- a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValueRepo.kt +++ b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValueRepo.kt @@ -9,6 +9,7 @@ import dev.inmo.micro_utils.pagination.utils.paginate import dev.inmo.micro_utils.pagination.utils.reverse import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow /** * [Map]-based [ReadKeyValueRepo]. All internal operations will be locked with [locker] (mostly with @@ -88,12 +89,14 @@ class WriteMapKeyValueRepo( private val map: MutableMap, private val locker: SmartRWLocker ) : WriteKeyValueRepo { - private val _onNewValue: MutableSharedFlow> = MutableSharedFlow() - override val onNewValue: Flow> - get() = _onNewValue - private val _onValueRemoved: MutableSharedFlow = MutableSharedFlow() - override val onValueRemoved: Flow - get() = _onValueRemoved + private val _onNewValue: MutableSharedFlow> = MapsReposDefaultMutableSharedFlow() + override val onNewValue: Flow> by lazy { + _onNewValue.asSharedFlow() + } + private val _onValueRemoved: MutableSharedFlow = MapsReposDefaultMutableSharedFlow() + override val onValueRemoved: Flow by lazy { + _onValueRemoved.asSharedFlow() + } constructor(map: MutableMap = mutableMapOf()) : this(map, SmartRWLocker()) override suspend fun set(toSet: Map) { @@ -103,10 +106,10 @@ class WriteMapKeyValueRepo( override suspend fun unset(toUnset: List) { locker.withWriteLock { - toUnset.forEach { k -> - map.remove(k) ?.also { _ -> _onValueRemoved.emit(k) } + toUnset.mapNotNull { k -> + map.remove(k) ?.let { _ -> k } } - } + }.forEach { _onValueRemoved.emit(it) } } override suspend fun unsetWithValues(toUnset: List) { diff --git a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValuesRepo.kt b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValuesRepo.kt index c1c61edcc79..226bb93a36d 100644 --- a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValuesRepo.kt +++ b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValuesRepo.kt @@ -81,68 +81,94 @@ class MapWriteKeyValuesRepo( private val map: MutableMap> = mutableMapOf(), private val locker: SmartRWLocker = SmartRWLocker() ) : WriteKeyValuesRepo { - private val _onNewValue: MutableSharedFlow> = MutableSharedFlow() - override val onNewValue: Flow> = _onNewValue.asSharedFlow() - private val _onValueRemoved: MutableSharedFlow> = MutableSharedFlow() - override val onValueRemoved: Flow> = _onValueRemoved.asSharedFlow() - private val _onDataCleared: MutableSharedFlow = MutableSharedFlow() - override val onDataCleared: Flow = _onDataCleared.asSharedFlow() + private val _onNewValue: MutableSharedFlow> = MapsReposDefaultMutableSharedFlow() + override val onNewValue: Flow> by lazy { + _onNewValue.asSharedFlow() + } + private val _onValueRemoved: MutableSharedFlow> = MapsReposDefaultMutableSharedFlow() + override val onValueRemoved: Flow> by lazy { + _onValueRemoved.asSharedFlow() + } + private val _onDataCleared: MutableSharedFlow = MapsReposDefaultMutableSharedFlow() + override val onDataCleared: Flow by lazy { + _onDataCleared.asSharedFlow() + } override suspend fun add(toAdd: Map>) { locker.withWriteLock { - toAdd.keys.forEach { k -> - if (map.getOrPut(k) { mutableListOf() }.addAll(toAdd[k] ?: return@forEach)) { - toAdd[k] ?.forEach { v -> - _onNewValue.emit(k to v) - } + toAdd.keys.mapNotNull { k -> + (k to toAdd[k]).takeIf { + map.getOrPut(k) { mutableListOf() }.addAll(toAdd[k] ?: return@mapNotNull null) } } + }.forEach { (k, vs) -> + vs ?.forEach { v -> + _onNewValue.emit(k to v) + } } } override suspend fun remove(toRemove: Map>) { + val removed = mutableListOf>() + val cleared = mutableListOf() locker.withWriteLock { toRemove.keys.forEach { k -> if (map[k]?.removeAll(toRemove[k] ?: return@forEach) == true) { toRemove[k]?.forEach { v -> - _onValueRemoved.emit(k to v) + removed.add(k to v) } } if (map[k]?.isEmpty() == true) { map.remove(k) - _onDataCleared.emit(k) + cleared.add(k) } } } + removed.forEach { + _onValueRemoved.emit(it) + } + cleared.forEach { + _onDataCleared.emit(it) + } } override suspend fun removeWithValue(v: Value) { locker.withWriteLock { - map.forEach { (k, values) -> + map.mapNotNull { (k, values) -> if (values.remove(v)) { - _onValueRemoved.emit(k to v) + k to v + } else { + null } } + }.forEach { + _onValueRemoved.emit(it) } } override suspend fun clear(k: Key) { locker.withWriteLock { - map.remove(k) ?.also { _onDataCleared.emit(k) } - } + map.remove(k) + } ?.also { _onDataCleared.emit(k) } } override suspend fun clearWithValue(v: Value) { locker.withWriteLock { map.filter { (_, values) -> values.contains(v) - }.forEach { - map.remove(it.key)?.onEach { v -> - _onValueRemoved.emit(it.key to v) - }?.also { _ -> - _onDataCleared.emit(it.key) + }.mapNotNull { + if (map.remove(it.key) != null) { + it.toPair() + } else { + null } } + }.forEach { + it.second.onEach { v -> + _onValueRemoved.emit(it.first to v) + }.also { _ -> + _onDataCleared.emit(it.first) + } } } } diff --git a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapsReposDefaultMutableSharedFlow.kt b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapsReposDefaultMutableSharedFlow.kt new file mode 100644 index 00000000000..4be3f4dfc45 --- /dev/null +++ b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapsReposDefaultMutableSharedFlow.kt @@ -0,0 +1,7 @@ +package dev.inmo.micro_utils.repos + +import kotlinx.coroutines.flow.MutableSharedFlow + +fun MapsReposDefaultMutableSharedFlow() = MutableSharedFlow( + extraBufferCapacity = Int.MAX_VALUE +) \ No newline at end of file