diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c20967ee59..25e414f3407 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## 0.20.3 +* `Versions`: + * `Compose`: `1.4.3` -> `1.5.0` + * `Exposed`: `0.42.1` -> `0.43.0` * `Repos`: * `Cache`: * Fixes in locks of caches diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartRWLocker.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartRWLocker.kt index 2ca18d8d4ed..e1a6e7eb694 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartRWLocker.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartRWLocker.kt @@ -23,9 +23,7 @@ class SmartRWLocker(private val readPermits: Int = Int.MAX_VALUE, writeIsLocked: * Do lock in [readSemaphore] inside of [writeMutex] locking */ suspend fun acquireRead() { - _writeMutex.withLock { - _readSemaphore.acquire() - } + _readSemaphore.acquire() } /** @@ -39,15 +37,19 @@ class SmartRWLocker(private val readPermits: Int = Int.MAX_VALUE, writeIsLocked: * Locking [writeMutex] and wait while all [readSemaphore] permits will be freed */ suspend fun lockWrite() { + _readSemaphore.acquire(readPermits) _writeMutex.lock() - readSemaphore.waitRelease(readPermits) } /** * Unlock [writeMutex] */ suspend fun unlockWrite(): Boolean { - return _writeMutex.unlock() + return _writeMutex.unlock().also { + if (it) { + _readSemaphore.release(readPermits) + } + } } } diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartSemaphore.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartSemaphore.kt index fd072f1e415..e52c07a52c6 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartSemaphore.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartSemaphore.kt @@ -42,11 +42,11 @@ sealed interface SmartSemaphore { * Mutable variant of [SmartSemaphore]. With that variant you may [lock] and [unlock]. Besides, you may create * [Immutable] variant of [this] instance with [immutable] factory * - * @param locked Preset state of [freePermits] and its internal [_permitsStateFlow] + * @param locked Preset state of [freePermits] and its internal [_freePermitsStateFlow] */ class Mutable(private val permits: Int, acquiredPermits: Int = 0) : SmartSemaphore { - private val _permitsStateFlow = MutableStateFlow(permits - acquiredPermits) - override val permitsStateFlow: StateFlow = _permitsStateFlow.asStateFlow() + private val _freePermitsStateFlow = MutableStateFlow(permits - acquiredPermits) + override val permitsStateFlow: StateFlow = _freePermitsStateFlow.asStateFlow() private val internalChangesMutex = Mutex(false) @@ -54,19 +54,45 @@ sealed interface SmartSemaphore { private fun checkedPermits(permits: Int) = permits.coerceIn(1 .. this.permits) + /** + * Holds call until this [SmartSemaphore] will be re-locked. That means that current method will + */ + suspend fun acquire(permits: Int = 1) { + var acquiredPermits = 0 + val checkedPermits = checkedPermits(permits) + try { + do { + val shouldContinue = internalChangesMutex.withLock { + val requiredPermits = checkedPermits - acquiredPermits + val acquiring = minOf(freePermits, requiredPermits).takeIf { it > 0 } ?: return@withLock true + acquiredPermits += acquiring + _freePermitsStateFlow.value -= acquiring + + acquiredPermits != checkedPermits + } + if (shouldContinue) { + waitRelease() + } + } while (shouldContinue && currentCoroutineContext().isActive) + } catch (e: Throwable) { + release(acquiredPermits) + throw e + } + } + /** * Holds call until this [SmartSemaphore] will be re-locked. That means that while [freePermits] == true, [holds] will * wait for [freePermits] == false and then try to lock */ - suspend fun acquire(permits: Int = 1) { + suspend fun acquireByOne(permits: Int = 1) { + val checkedPermits = checkedPermits(permits) do { - val checkedPermits = checkedPermits(permits) waitRelease(checkedPermits) val shouldContinue = internalChangesMutex.withLock { - if (_permitsStateFlow.value < checkedPermits) { + if (_freePermitsStateFlow.value < checkedPermits) { true } else { - _permitsStateFlow.value -= checkedPermits + _freePermitsStateFlow.value -= checkedPermits false } } @@ -80,10 +106,10 @@ sealed interface SmartSemaphore { */ suspend fun tryAcquire(permits: Int = 1): Boolean { val checkedPermits = checkedPermits(permits) - return if (_permitsStateFlow.value < checkedPermits) { + return if (_freePermitsStateFlow.value < checkedPermits) { internalChangesMutex.withLock { - if (_permitsStateFlow.value < checkedPermits) { - _permitsStateFlow.value -= checkedPermits + if (_freePermitsStateFlow.value < checkedPermits) { + _freePermitsStateFlow.value -= checkedPermits true } else { false @@ -100,10 +126,10 @@ sealed interface SmartSemaphore { */ suspend fun release(permits: Int = 1): Boolean { val checkedPermits = checkedPermits(permits) - return if (_permitsStateFlow.value < this.permits) { + return if (_freePermitsStateFlow.value < this.permits) { internalChangesMutex.withLock { - if (_permitsStateFlow.value < this.permits) { - _permitsStateFlow.value = minOf(_permitsStateFlow.value + checkedPermits, this.permits) + if (_freePermitsStateFlow.value < this.permits) { + _freePermitsStateFlow.value = minOf(_freePermitsStateFlow.value + checkedPermits, this.permits) true } else { false diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index c036f67e734..31ab0fe2ec5 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -6,8 +6,8 @@ kt-coroutines = "1.7.3" kslog = "1.2.0" -jb-compose = "1.4.3" -jb-exposed = "0.42.1" +jb-compose = "1.5.0" +jb-exposed = "0.43.0" jb-dokka = "1.8.20" korlibs = "4.0.10" diff --git a/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/CRUDCacheRepo.kt b/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/CRUDCacheRepo.kt index c17dd5c70e9..0a58b4ceffa 100644 --- a/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/CRUDCacheRepo.kt +++ b/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/CRUDCacheRepo.kt @@ -61,18 +61,28 @@ open class WriteCRUDCacheRepo( override val deletedObjectsIdsFlow: Flow by parentRepo::deletedObjectsIdsFlow val createdObjectsFlowJob = parentRepo.newObjectsFlow.onEach { - locker.withWriteLock { kvCache.set(idGetter(it), it) } + locker.withWriteLock { + kvCache.set(idGetter(it), it) + } }.launchIn(scope) val updatedObjectsFlowJob = parentRepo.updatedObjectsFlow.onEach { - locker.withWriteLock { kvCache.set(idGetter(it), it) } + locker.withWriteLock { + kvCache.set(idGetter(it), it) + } }.launchIn(scope) val deletedObjectsFlowJob = parentRepo.deletedObjectsIdsFlow.onEach { - locker.withWriteLock { kvCache.unset(it) } + locker.withWriteLock { + kvCache.unset(it) + } }.launchIn(scope) - override suspend fun deleteById(ids: List) = parentRepo.deleteById(ids) + override suspend fun deleteById(ids: List) = parentRepo.deleteById(ids).also { + locker.withWriteLock { + kvCache.unset(ids) + } + } override suspend fun update(values: List>): List { val updated = parentRepo.update(values) diff --git a/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullKeyValueCacheRepo.kt b/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullKeyValueCacheRepo.kt index b8a53ab9457..3eeb3627f70 100644 --- a/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullKeyValueCacheRepo.kt +++ b/repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/full/FullKeyValueCacheRepo.kt @@ -43,7 +43,7 @@ open class FullReadKeyValueCacheRepo( protected open suspend fun actualizeAll() { locker.withWriteLock { kvCache.clear() - kvCache.set(parentRepo.getAll { keys(it) }.toMap()) + kvCache.set(parentRepo.getAll()) } } diff --git a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapCRUDRepo.kt b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapCRUDRepo.kt index c9aa60c63f4..05d019a68e6 100644 --- a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapCRUDRepo.kt +++ b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapCRUDRepo.kt @@ -65,12 +65,12 @@ abstract class WriteMapCRUDRepo( protected val map: MutableMap = mutableMapOf(), protected val locker: SmartRWLocker = SmartRWLocker() ) : WriteCRUDRepo { - protected val _newObjectsFlow: MutableSharedFlow = MutableSharedFlow() - override val newObjectsFlow: Flow = _newObjectsFlow.asSharedFlow() - protected val _updatedObjectsFlow: MutableSharedFlow = MutableSharedFlow() - override val updatedObjectsFlow: Flow = _updatedObjectsFlow.asSharedFlow() - protected val _deletedObjectsIdsFlow: MutableSharedFlow = MutableSharedFlow() - override val deletedObjectsIdsFlow: Flow = _deletedObjectsIdsFlow.asSharedFlow() + protected open val _newObjectsFlow: MutableSharedFlow = MapsReposDefaultMutableSharedFlow() + override val newObjectsFlow: Flow by lazy { _newObjectsFlow.asSharedFlow() } + protected open val _updatedObjectsFlow: MutableSharedFlow = MapsReposDefaultMutableSharedFlow() + override val updatedObjectsFlow: Flow by lazy { _updatedObjectsFlow.asSharedFlow() } + protected open val _deletedObjectsIdsFlow: MutableSharedFlow = MapsReposDefaultMutableSharedFlow() + override val deletedObjectsIdsFlow: Flow by lazy { _deletedObjectsIdsFlow.asSharedFlow() } protected abstract suspend fun updateObject(newValue: InputValueType, id: IdType, old: ObjectType): ObjectType protected abstract suspend fun createObject(newValue: InputValueType): Pair @@ -80,10 +80,10 @@ abstract class WriteMapCRUDRepo( values.map { val (id, newObject) = createObject(it) map[id] = newObject - newObject.also { _ -> - _newObjectsFlow.emit(newObject) - } + newObject } + }.onEach { + _newObjectsFlow.emit(it) } } @@ -93,8 +93,9 @@ abstract class WriteMapCRUDRepo( newValue.also { map[id] = it - _updatedObjectsFlow.emit(it) } + } ?.also { + _updatedObjectsFlow.emit(it) } } @@ -104,10 +105,10 @@ abstract class WriteMapCRUDRepo( override suspend fun deleteById(ids: List) { locker.withWriteLock { - ids.forEach { - map.remove(it) ?.also { _ -> _deletedObjectsIdsFlow.emit(it) } + ids.mapNotNull { + it.takeIf { map.remove(it) != null } } - } + }.onEach { _deletedObjectsIdsFlow.emit(it) } } } diff --git a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValueRepo.kt b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValueRepo.kt index 71ccd98c865..32b6815330f 100644 --- a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValueRepo.kt +++ b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValueRepo.kt @@ -9,6 +9,7 @@ import dev.inmo.micro_utils.pagination.utils.paginate import dev.inmo.micro_utils.pagination.utils.reverse import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow /** * [Map]-based [ReadKeyValueRepo]. All internal operations will be locked with [locker] (mostly with @@ -88,12 +89,14 @@ class WriteMapKeyValueRepo( private val map: MutableMap, private val locker: SmartRWLocker ) : WriteKeyValueRepo { - private val _onNewValue: MutableSharedFlow> = MutableSharedFlow() - override val onNewValue: Flow> - get() = _onNewValue - private val _onValueRemoved: MutableSharedFlow = MutableSharedFlow() - override val onValueRemoved: Flow - get() = _onValueRemoved + private val _onNewValue: MutableSharedFlow> = MapsReposDefaultMutableSharedFlow() + override val onNewValue: Flow> by lazy { + _onNewValue.asSharedFlow() + } + private val _onValueRemoved: MutableSharedFlow = MapsReposDefaultMutableSharedFlow() + override val onValueRemoved: Flow by lazy { + _onValueRemoved.asSharedFlow() + } constructor(map: MutableMap = mutableMapOf()) : this(map, SmartRWLocker()) override suspend fun set(toSet: Map) { @@ -103,10 +106,10 @@ class WriteMapKeyValueRepo( override suspend fun unset(toUnset: List) { locker.withWriteLock { - toUnset.forEach { k -> - map.remove(k) ?.also { _ -> _onValueRemoved.emit(k) } + toUnset.mapNotNull { k -> + map.remove(k) ?.let { _ -> k } } - } + }.forEach { _onValueRemoved.emit(it) } } override suspend fun unsetWithValues(toUnset: List) { diff --git a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValuesRepo.kt b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValuesRepo.kt index c1c61edcc79..226bb93a36d 100644 --- a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValuesRepo.kt +++ b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapKeyValuesRepo.kt @@ -81,68 +81,94 @@ class MapWriteKeyValuesRepo( private val map: MutableMap> = mutableMapOf(), private val locker: SmartRWLocker = SmartRWLocker() ) : WriteKeyValuesRepo { - private val _onNewValue: MutableSharedFlow> = MutableSharedFlow() - override val onNewValue: Flow> = _onNewValue.asSharedFlow() - private val _onValueRemoved: MutableSharedFlow> = MutableSharedFlow() - override val onValueRemoved: Flow> = _onValueRemoved.asSharedFlow() - private val _onDataCleared: MutableSharedFlow = MutableSharedFlow() - override val onDataCleared: Flow = _onDataCleared.asSharedFlow() + private val _onNewValue: MutableSharedFlow> = MapsReposDefaultMutableSharedFlow() + override val onNewValue: Flow> by lazy { + _onNewValue.asSharedFlow() + } + private val _onValueRemoved: MutableSharedFlow> = MapsReposDefaultMutableSharedFlow() + override val onValueRemoved: Flow> by lazy { + _onValueRemoved.asSharedFlow() + } + private val _onDataCleared: MutableSharedFlow = MapsReposDefaultMutableSharedFlow() + override val onDataCleared: Flow by lazy { + _onDataCleared.asSharedFlow() + } override suspend fun add(toAdd: Map>) { locker.withWriteLock { - toAdd.keys.forEach { k -> - if (map.getOrPut(k) { mutableListOf() }.addAll(toAdd[k] ?: return@forEach)) { - toAdd[k] ?.forEach { v -> - _onNewValue.emit(k to v) - } + toAdd.keys.mapNotNull { k -> + (k to toAdd[k]).takeIf { + map.getOrPut(k) { mutableListOf() }.addAll(toAdd[k] ?: return@mapNotNull null) } } + }.forEach { (k, vs) -> + vs ?.forEach { v -> + _onNewValue.emit(k to v) + } } } override suspend fun remove(toRemove: Map>) { + val removed = mutableListOf>() + val cleared = mutableListOf() locker.withWriteLock { toRemove.keys.forEach { k -> if (map[k]?.removeAll(toRemove[k] ?: return@forEach) == true) { toRemove[k]?.forEach { v -> - _onValueRemoved.emit(k to v) + removed.add(k to v) } } if (map[k]?.isEmpty() == true) { map.remove(k) - _onDataCleared.emit(k) + cleared.add(k) } } } + removed.forEach { + _onValueRemoved.emit(it) + } + cleared.forEach { + _onDataCleared.emit(it) + } } override suspend fun removeWithValue(v: Value) { locker.withWriteLock { - map.forEach { (k, values) -> + map.mapNotNull { (k, values) -> if (values.remove(v)) { - _onValueRemoved.emit(k to v) + k to v + } else { + null } } + }.forEach { + _onValueRemoved.emit(it) } } override suspend fun clear(k: Key) { locker.withWriteLock { - map.remove(k) ?.also { _onDataCleared.emit(k) } - } + map.remove(k) + } ?.also { _onDataCleared.emit(k) } } override suspend fun clearWithValue(v: Value) { locker.withWriteLock { map.filter { (_, values) -> values.contains(v) - }.forEach { - map.remove(it.key)?.onEach { v -> - _onValueRemoved.emit(it.key to v) - }?.also { _ -> - _onDataCleared.emit(it.key) + }.mapNotNull { + if (map.remove(it.key) != null) { + it.toPair() + } else { + null } } + }.forEach { + it.second.onEach { v -> + _onValueRemoved.emit(it.first to v) + }.also { _ -> + _onDataCleared.emit(it.first) + } } } } diff --git a/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapsReposDefaultMutableSharedFlow.kt b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapsReposDefaultMutableSharedFlow.kt new file mode 100644 index 00000000000..4be3f4dfc45 --- /dev/null +++ b/repos/inmemory/src/commonMain/kotlin/dev/inmo/micro_utils/repos/MapsReposDefaultMutableSharedFlow.kt @@ -0,0 +1,7 @@ +package dev.inmo.micro_utils.repos + +import kotlinx.coroutines.flow.MutableSharedFlow + +fun MapsReposDefaultMutableSharedFlow() = MutableSharedFlow( + extraBufferCapacity = Int.MAX_VALUE +) \ No newline at end of file