diff --git a/CHANGELOG.md b/CHANGELOG.md index 750949dace2..2ea46bb6f16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ ## 0.25.8.1 +* `Coroutines`: + * New function `suspendPoint` to check coroutine cancellation status + * `SpecialMutableStateFlow` renamed to `MutableRedeliverStateFlow` (old name deprecated with `ReplaceWith`) + * `SmartSemaphore`: + * Fix of `waitRelease` call to pass correct number of permits + ## 0.25.8 * `Pagination`: @@ -401,7 +407,7 @@ KTOR DEPENDENCY** * A lot of inline functions became common functions due to inline with only noinline callbacks in arguments leads to low performance * `Coroutines`: - * `SmartMutex`, `SmartSemaphore` and `SmartRWLocker` as their user changed their state flow to `SpecialMutableStateFlow` + * `SmartMutex`, `SmartSemaphore` and `SmartRWLocker` as their user changed their state flow to `MutableRedeliverStateFlow` ## 0.20.49 @@ -610,7 +616,7 @@ low performance ## 0.20.18 * `Coroutines`: - * `SpecialMutableStateFlow` now extends `MutableStateFlow` + * `MutableRedeliverStateFlow` now extends `MutableStateFlow` * `Compose`: * Deprecate `FlowState` due to its complexity in fixes @@ -624,7 +630,7 @@ low performance * `Versions`: * `Exposed`: `0.44.1` -> `0.45.0` * `Coroutines`: - * Add `SpecialMutableStateFlow` + * Add `MutableRedeliverStateFlow` * `Compose`: * Add `FlowState` diff --git a/common/compose/src/jvmTest/kotlin/LoadableComponentTests.kt b/common/compose/src/jvmTest/kotlin/LoadableComponentTests.kt index dc33811f442..132158f77f2 100644 --- a/common/compose/src/jvmTest/kotlin/LoadableComponentTests.kt +++ b/common/compose/src/jvmTest/kotlin/LoadableComponentTests.kt @@ -2,7 +2,7 @@ import androidx.compose.runtime.remember import androidx.compose.ui.test.ExperimentalTestApi import androidx.compose.ui.test.runComposeUiTest import dev.inmo.micro_utils.common.compose.LoadableComponent -import dev.inmo.micro_utils.coroutines.SpecialMutableStateFlow +import dev.inmo.micro_utils.coroutines.MutableRedeliverStateFlow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first @@ -16,8 +16,8 @@ class LoadableComponentTests { @Test @TestOnly fun testSimpleLoad() = runComposeUiTest { - val loadingFlow = SpecialMutableStateFlow(0) - val loadedFlow = SpecialMutableStateFlow(0) + val loadingFlow = MutableRedeliverStateFlow(0) + val loadedFlow = MutableRedeliverStateFlow(0) setContent { LoadableComponent({ loadingFlow.filter { it == 1 }.first() diff --git a/coroutines/compose/src/jsMain/kotlin/dev/inmo/micro_utils/coroutines/compose/StyleSheetsAggregator.kt b/coroutines/compose/src/jsMain/kotlin/dev/inmo/micro_utils/coroutines/compose/StyleSheetsAggregator.kt index 9cc62b81acd..7f1ed7ab8e3 100644 --- a/coroutines/compose/src/jsMain/kotlin/dev/inmo/micro_utils/coroutines/compose/StyleSheetsAggregator.kt +++ b/coroutines/compose/src/jsMain/kotlin/dev/inmo/micro_utils/coroutines/compose/StyleSheetsAggregator.kt @@ -3,7 +3,7 @@ package dev.inmo.micro_utils.coroutines.compose import androidx.compose.runtime.Composable import androidx.compose.runtime.collectAsState import androidx.compose.runtime.remember -import dev.inmo.micro_utils.coroutines.SpecialMutableStateFlow +import dev.inmo.micro_utils.coroutines.MutableRedeliverStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.debounce @@ -16,7 +16,7 @@ import org.jetbrains.compose.web.css.StyleSheet * to add `Style(stylesheet)` on every compose function call */ object StyleSheetsAggregator { - private val _stylesFlow = SpecialMutableStateFlow>(emptySet()) + private val _stylesFlow = MutableRedeliverStateFlow>(emptySet()) val stylesFlow: StateFlow> = _stylesFlow.asStateFlow() @Composable diff --git a/coroutines/compose/src/jvmTest/kotlin/FlowStateTests.kt b/coroutines/compose/src/jvmTest/kotlin/FlowStateTests.kt index 2f4ad81e9f0..741ec3b0057 100644 --- a/coroutines/compose/src/jvmTest/kotlin/FlowStateTests.kt +++ b/coroutines/compose/src/jvmTest/kotlin/FlowStateTests.kt @@ -2,7 +2,7 @@ import androidx.compose.material.Button import androidx.compose.material.Text import androidx.compose.runtime.collectAsState import androidx.compose.ui.test.* -import dev.inmo.micro_utils.coroutines.SpecialMutableStateFlow +import dev.inmo.micro_utils.coroutines.MutableRedeliverStateFlow import org.jetbrains.annotations.TestOnly import kotlin.test.Test @@ -11,7 +11,7 @@ class FlowStateTests { @Test @TestOnly fun simpleTest() = runComposeUiTest { - val flowState = SpecialMutableStateFlow(0) + val flowState = MutableRedeliverStateFlow(0) setContent { Button({ flowState.value++ }) { Text("Click") } Text(flowState.collectAsState().value.toString()) diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartMutex.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartMutex.kt index 5127fc6ecde..5ae306a725d 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartMutex.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartMutex.kt @@ -1,7 +1,6 @@ package dev.inmo.micro_utils.coroutines import kotlinx.coroutines.currentCoroutineContext -import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.first @@ -44,7 +43,7 @@ sealed interface SmartMutex { * @param locked Preset state of [isLocked] and its internal [_lockStateFlow] */ class Mutable(locked: Boolean = false) : SmartMutex { - private val _lockStateFlow = SpecialMutableStateFlow(locked) + private val _lockStateFlow = MutableRedeliverStateFlow(locked) override val lockStateFlow: StateFlow = _lockStateFlow.asStateFlow() private val internalChangesMutex = Mutex() diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartSemaphore.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartSemaphore.kt index cef80cfab16..eb06cafd9c7 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartSemaphore.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SmartSemaphore.kt @@ -1,7 +1,6 @@ package dev.inmo.micro_utils.coroutines import kotlinx.coroutines.currentCoroutineContext -import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.first @@ -47,7 +46,7 @@ sealed interface SmartSemaphore { */ class Mutable(permits: Int, acquiredPermits: Int = 0) : SmartSemaphore { override val maxPermits: Int = permits - private val _freePermitsStateFlow = SpecialMutableStateFlow(permits - acquiredPermits) + private val _freePermitsStateFlow = MutableRedeliverStateFlow(permits - acquiredPermits) override val permitsStateFlow: StateFlow = _freePermitsStateFlow.asStateFlow() private val internalChangesMutex = Mutex(false) @@ -73,7 +72,7 @@ sealed interface SmartSemaphore { acquiredPermits != checkedPermits } if (shouldContinue) { - waitRelease() + waitRelease(checkedPermits - acquiredPermits) } } while (shouldContinue && currentCoroutineContext().isActive) } catch (e: Throwable) { diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SpecialMutableStateFlow.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SpecialMutableStateFlow.kt index 80972346b2d..d78a9be9196 100644 --- a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SpecialMutableStateFlow.kt +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SpecialMutableStateFlow.kt @@ -1,7 +1,5 @@ package dev.inmo.micro_utils.coroutines -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.channels.BufferOverflow @@ -11,13 +9,12 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.internal.SynchronizedObject import kotlinx.coroutines.internal.synchronized -import kotlin.coroutines.CoroutineContext /** * Works like [StateFlow], but guarantee that latest value update will always be delivered to * each active subscriber */ -open class SpecialMutableStateFlow( +open class MutableRedeliverStateFlow( initialValue: T ) : MutableStateFlow, FlowCollector, MutableSharedFlow { @OptIn(InternalCoroutinesApi::class) @@ -68,3 +65,6 @@ open class SpecialMutableStateFlow( override suspend fun collect(collector: FlowCollector) = sharingFlow.collect(collector) } + +@Deprecated("Renamed to MutableRedeliverStateFlow", ReplaceWith("MutableRedeliverStateFlow")) +typealias SpecialMutableStateFlow = MutableRedeliverStateFlow diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SuspendPoint.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SuspendPoint.kt new file mode 100644 index 00000000000..e5161c761b6 --- /dev/null +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/SuspendPoint.kt @@ -0,0 +1,15 @@ +package dev.inmo.micro_utils.coroutines + +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.ensureActive + +/** + * Ensures that the current coroutine context is still active and throws a [kotlinx.coroutines.CancellationException] + * if the coroutine has been canceled. + * + * This function provides a convenient way to check the active status of a coroutine, which is useful + * to identify cancellation points in long-running or suspendable operations. + * + * @throws kotlinx.coroutines.CancellationException if the coroutine context is no longer active. + */ +suspend fun suspendPoint() = currentCoroutineContext().ensureActive() diff --git a/coroutines/src/commonTest/kotlin/SmartKeyRWLockerTests.kt b/coroutines/src/commonTest/kotlin/SmartKeyRWLockerTests.kt index 70038c189b4..a0906812b28 100644 --- a/coroutines/src/commonTest/kotlin/SmartKeyRWLockerTests.kt +++ b/coroutines/src/commonTest/kotlin/SmartKeyRWLockerTests.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.flow.first import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.test.runTest +import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFails @@ -13,184 +14,517 @@ import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds class SmartKeyRWLockerTests { - @Test - fun writeLockKeyFailedOnGlobalWriteLockTest() = runTest { - val locker = SmartKeyRWLocker() - val testKey = "test" - locker.lockWrite() + private lateinit var locker: SmartKeyRWLocker - assertTrue { locker.isWriteLocked() } - - assertFails { - realWithTimeout(1.seconds) { - locker.lockWrite(testKey) - } - } - assertFalse { locker.isWriteLocked(testKey) } - - locker.unlockWrite() - assertFalse { locker.isWriteLocked() } - - realWithTimeout(1.seconds) { - locker.lockWrite(testKey) - } - assertTrue { locker.isWriteLocked(testKey) } - assertTrue { locker.unlockWrite(testKey) } - assertFalse { locker.isWriteLocked(testKey) } + @BeforeTest + fun setup() { + locker = SmartKeyRWLocker() } + + // ==================== Global Read Tests ==================== + @Test - fun writeLockKeyFailedOnGlobalReadLockTest() = runTest { - val locker = SmartKeyRWLocker() - val testKey = "test" + fun testGlobalReadAllowsMultipleConcurrentReads() = runTest { + val results = mutableListOf() + locker.acquireRead() - assertEquals(Int.MAX_VALUE - 1, locker.readSemaphore().freePermits) - - assertFails { - realWithTimeout(1.seconds) { - locker.lockWrite(testKey) + val jobs = List(5) { + launch { + locker.acquireRead() + delay(100.milliseconds) + results.add(true) + locker.releaseRead() } } - assertFalse { locker.isWriteLocked(testKey) } + + jobs.joinAll() + locker.releaseRead() + + assertEquals(5, results.size) + } + + @Test + fun testGlobalReadBlocksGlobalWrite() = runTest { + locker.acquireRead() + + var writeAcquired = false + val writeJob = launch { + locker.lockWrite() + writeAcquired = true + locker.unlockWrite() + } + + delay(200.milliseconds) + assertFalse(writeAcquired, "Write should be blocked by global read") locker.releaseRead() - assertEquals(Int.MAX_VALUE, locker.readSemaphore().freePermits) + writeJob.join() - realWithTimeout(1.seconds) { - locker.lockWrite(testKey) - } - assertTrue { locker.isWriteLocked(testKey) } - assertTrue { locker.unlockWrite(testKey) } - assertFalse { locker.isWriteLocked(testKey) } + assertTrue(writeAcquired, "Write should succeed after read released") } + @Test - fun readLockFailedOnWriteLockKeyTest() = runTest { - val locker = SmartKeyRWLocker() - val testKey = "test" - locker.lockWrite(testKey) + fun testGlobalReadBlocksAllKeyWrites() = runTest { + locker.acquireRead() - assertTrue { locker.isWriteLocked(testKey) } + val writeFlags = mutableMapOf() + val keys = listOf("key1", "key2", "key3") - assertFails { - realWithTimeout(1.seconds) { + val jobs = keys.map { key -> + launch { + locker.lockWrite(key) + writeFlags[key] = true + locker.unlockWrite(key) + } + } + + delay(200.milliseconds) + assertTrue(writeFlags.isEmpty(), "No writes should succeed while global read active") + + locker.releaseRead() + jobs.joinAll() + + assertEquals(keys.size, writeFlags.size, "All writes should succeed after global read released") + } + + // ==================== Global Write Tests ==================== + + @Test + fun testGlobalWriteBlocksAllOperations() = runTest { + locker.lockWrite() + + var globalReadAcquired = false + var keyReadAcquired = false + var keyWriteAcquired = false + + val jobs = listOf( + launch { locker.acquireRead() + globalReadAcquired = true + locker.releaseRead() + }, + launch { + locker.acquireRead("key1") + keyReadAcquired = true + locker.releaseRead("key1") + }, + launch { + locker.lockWrite("key2") + keyWriteAcquired = true + locker.unlockWrite("key2") } - } - assertEquals(locker.readSemaphore().maxPermits - 1, locker.readSemaphore().freePermits) + ) - locker.unlockWrite(testKey) - assertFalse { locker.isWriteLocked(testKey) } + delay(200.milliseconds) + assertFalse(globalReadAcquired, "Global read should be blocked") + assertFalse(keyReadAcquired, "Key read should be blocked") + assertFalse(keyWriteAcquired, "Key write should be blocked") - realWithTimeout(1.seconds) { - locker.acquireRead() - } - assertEquals(locker.readSemaphore().maxPermits - 1, locker.readSemaphore().freePermits) - assertTrue { locker.releaseRead() } - assertEquals(locker.readSemaphore().maxPermits, locker.readSemaphore().freePermits) + locker.unlockWrite() + jobs.joinAll() + + assertTrue(globalReadAcquired) + assertTrue(keyReadAcquired) + assertTrue(keyWriteAcquired) } + @Test - fun writeLockFailedOnWriteLockKeyTest() = runTest { - val locker = SmartKeyRWLocker() - val testKey = "test" - locker.lockWrite(testKey) + fun testGlobalWriteIsExclusive() = runTest { + locker.lockWrite() - assertTrue { locker.isWriteLocked(testKey) } - - assertFails { - realWithTimeout(1.seconds) { - locker.lockWrite() - } - } - assertFalse(locker.isWriteLocked()) - - locker.unlockWrite(testKey) - assertFalse { locker.isWriteLocked(testKey) } - - realWithTimeout(1.seconds) { + var secondWriteAcquired = false + val job = launch { locker.lockWrite() + secondWriteAcquired = true + locker.unlockWrite() } - assertTrue(locker.isWriteLocked()) - assertTrue { locker.unlockWrite() } - assertFalse(locker.isWriteLocked()) + + delay(200.milliseconds) + assertFalse(secondWriteAcquired, "Second global write should be blocked") + + locker.unlockWrite() + job.join() + + assertTrue(secondWriteAcquired) } + + // ==================== Key Read Tests ==================== + @Test - fun readsBlockingGlobalWrite() = runTest { - val locker = SmartKeyRWLocker() + fun testKeyReadAllowsMultipleConcurrentReadsForSameKey() = runTest { + val key = "testKey" + val results = mutableListOf() - val testKeys = (0 until 100).map { "test$it" } + locker.acquireRead(key) - for (i in testKeys.indices) { - val it = testKeys[i] - locker.acquireRead(it) - val previous = testKeys.take(i) - val next = testKeys.drop(i + 1) - - previous.forEach { - assertTrue { locker.readSemaphoreOrNull(it) ?.freePermits == Int.MAX_VALUE - 1 } - } - next.forEach { - assertTrue { locker.readSemaphoreOrNull(it) ?.freePermits == null } + val jobs = List(5) { + launch { + locker.acquireRead(key) + delay(50.milliseconds) + results.add(true) + locker.releaseRead(key) } } - for (i in testKeys.indices) { - val it = testKeys[i] - assertFails { - realWithTimeout(13.milliseconds) { locker.lockWrite() } + jobs.joinAll() + locker.releaseRead(key) + + assertEquals(5, results.size) + } + + @Test + fun testKeyReadAllowsReadsForDifferentKeys() = runTest { + val results = mutableMapOf() + + locker.acquireRead("key1") + + val jobs = listOf("key2", "key3", "key4").map { key -> + launch { + locker.acquireRead(key) + delay(50.milliseconds) + results[key] = true + locker.releaseRead(key) } - val readPermitsBeforeLock = locker.readSemaphore().freePermits - realWithTimeout(1.seconds) { locker.acquireRead() } + } + + jobs.joinAll() + locker.releaseRead("key1") + + assertEquals(3, results.size) + } + + @Test + fun testKeyReadBlocksWriteForSameKey() = runTest { + val key = "testKey" + locker.acquireRead(key) + + var writeAcquired = false + val job = launch { + locker.lockWrite(key) + writeAcquired = true + locker.unlockWrite(key) + } + + delay(200.milliseconds) + assertFalse(writeAcquired, "Write for same key should be blocked") + + locker.releaseRead(key) + job.join() + + assertTrue(writeAcquired) + } + + @Test + fun testKeyReadBlocksGlobalWrite() = runTest { + locker.acquireRead("key1") + + var globalWriteAcquired = false + val job = launch { + locker.lockWrite() + globalWriteAcquired = true + locker.unlockWrite() + } + + delay(200.milliseconds) + assertFalse(globalWriteAcquired, "Global write should be blocked by key read") + + locker.releaseRead("key1") + job.join() + + assertTrue(globalWriteAcquired) + } + + @Test + fun testKeyReadAllowsWriteForDifferentKey() = runTest { + locker.acquireRead("key1") + + var writeAcquired = false + val job = launch { + locker.lockWrite("key2") + writeAcquired = true + locker.unlockWrite("key2") + } + + job.join() + assertTrue(writeAcquired, "Write for different key should succeed") + + locker.releaseRead("key1") + } + + // ==================== Key Write Tests ==================== + + @Test + fun testKeyWriteBlocksReadForSameKey() = runTest { + val key = "testKey" + locker.lockWrite(key) + + var readAcquired = false + val job = launch { + locker.acquireRead(key) + readAcquired = true + locker.releaseRead(key) + } + + delay(200.milliseconds) + assertFalse(readAcquired, "Read for same key should be blocked") + + locker.unlockWrite(key) + job.join() + + assertTrue(readAcquired) + } + + @Test + fun testKeyWriteBlocksGlobalRead() = runTest { + locker.lockWrite("key1") + + var globalReadAcquired = false + val job = launch { + locker.acquireRead() + globalReadAcquired = true locker.releaseRead() - assertEquals(readPermitsBeforeLock, locker.readSemaphore().freePermits) - - locker.releaseRead(it) } - assertTrue { locker.readSemaphore().freePermits == Int.MAX_VALUE } - realWithTimeout(1.seconds) { locker.lockWrite() } - assertFails { - realWithTimeout(13.milliseconds) { locker.acquireRead() } - } - assertTrue { locker.unlockWrite() } - assertTrue { locker.readSemaphore().freePermits == Int.MAX_VALUE } + delay(200.milliseconds) + assertFalse(globalReadAcquired, "Global read should be blocked by key write") + + locker.unlockWrite("key1") + job.join() + + assertTrue(globalReadAcquired) } + @Test - fun writesBlockingGlobalWrite() = runTest { - val locker = SmartKeyRWLocker() + fun testKeyWriteIsExclusiveForSameKey() = runTest { + val key = "testKey" + locker.lockWrite(key) - val testKeys = (0 until 100).map { "test$it" } + var secondWriteAcquired = false + val job = launch { + locker.lockWrite(key) + secondWriteAcquired = true + locker.unlockWrite(key) + } - for (i in testKeys.indices) { - val it = testKeys[i] - locker.lockWrite(it) - val previous = testKeys.take(i) - val next = testKeys.drop(i + 1) + delay(200.milliseconds) + assertFalse(secondWriteAcquired, "Second write for same key should be blocked") - previous.forEach { - assertTrue { locker.writeMutexOrNull(it) ?.isLocked == true } + locker.unlockWrite(key) + job.join() + + assertTrue(secondWriteAcquired) + } + + @Test + fun testKeyWriteAllowsOperationsOnDifferentKeys() = runTest { + locker.lockWrite("key1") + + val results = mutableMapOf() + + val jobs = listOf( + launch { + locker.acquireRead("key2") + results["read-key2"] = true + locker.releaseRead("key2") + }, + launch { + locker.lockWrite("key3") + results["write-key3"] = true + locker.unlockWrite("key3") } - next.forEach { - assertTrue { locker.writeMutexOrNull(it) ?.isLocked != true } + ) + + jobs.joinAll() + assertEquals(2, results.size, "Operations on different keys should succeed") + + locker.unlockWrite("key1") + } + + // ==================== Complex Scenarios ==================== + + @Test + fun testMultipleReadersThenWriter() = runTest { + val key = "testKey" + val readCount = 5 + val readers = mutableListOf() + + repeat(readCount) { + readers.add(launch { + locker.acquireRead(key) + delay(100.milliseconds) + locker.releaseRead(key) + }) + } + + delay(50.milliseconds) // Let readers acquire + + var writerExecuted = false + val writer = launch { + locker.lockWrite(key) + writerExecuted = true + locker.unlockWrite(key) + } + + delay(50.milliseconds) + assertFalse(writerExecuted, "Writer should wait for all readers") + + readers.joinAll() + writer.join() + + assertTrue(writerExecuted, "Writer should execute after all readers done") + } + + @Test + fun testWriterThenMultipleReaders() = runTest { + val key = "testKey" + + locker.lockWrite(key) + + val readerFlags = mutableListOf() + val readers = List(5) { + launch { + locker.acquireRead(key) + readerFlags.add(true) + locker.releaseRead(key) } } - for (i in testKeys.indices) { - val it = testKeys[i] - assertFails { realWithTimeout(13.milliseconds) { locker.lockWrite() } } + delay(200.milliseconds) + assertTrue(readerFlags.isEmpty(), "Readers should be blocked by writer") - val readPermitsBeforeLock = locker.readSemaphore().freePermits - assertFails { realWithTimeout(13.milliseconds) { locker.acquireRead() } } - assertEquals(readPermitsBeforeLock, locker.readSemaphore().freePermits) + locker.unlockWrite(key) + readers.joinAll() - locker.unlockWrite(it) + assertEquals(5, readerFlags.size, "All readers should succeed after writer") + } + + @Test + fun testCascadingLocksWithDifferentKeys() = runTest { + val executed = mutableMapOf() + + launch { + locker.lockWrite("key1") + executed["write-key1-start"] = true + delay(100.milliseconds) + locker.unlockWrite("key1") + executed["write-key1-end"] = true } - assertTrue { locker.readSemaphore().freePermits == Int.MAX_VALUE } - realWithTimeout(1.seconds) { locker.lockWrite() } - assertFails { - realWithTimeout(13.milliseconds) { locker.acquireRead() } + delay(50.milliseconds) + + launch { + locker.acquireRead("key2") + executed["read-key2"] = true + delay(100.milliseconds) + locker.releaseRead("key2") } - assertTrue { locker.unlockWrite() } - assertTrue { locker.readSemaphore().freePermits == Int.MAX_VALUE } + + delay(200.milliseconds) + + assertTrue(executed["write-key1-start"] == true) + assertTrue(executed["read-key2"] == true) + assertTrue(executed["write-key1-end"] == true) + } + + @Test + fun testReleaseWithoutAcquireReturnsFalse() = runTest { + assertFalse(locker.releaseRead(), "Release without acquire should return false") + assertFalse(locker.releaseRead("key1"), "Release without acquire should return false") + } + + @Test + fun testUnlockWithoutLockReturnsFalse() = runTest { + assertFalse(locker.unlockWrite(), "Unlock without lock should return false") + assertFalse(locker.unlockWrite("key1"), "Unlock without lock should return false") + } + + @Test + fun testProperReleaseReturnsTrue() = runTest { + locker.acquireRead() + assertTrue(locker.releaseRead(), "Release after acquire should return true") + + locker.acquireRead("key1") + assertTrue(locker.releaseRead("key1"), "Release after acquire should return true") + } + + @Test + fun testProperUnlockReturnsTrue() = runTest { + locker.lockWrite() + assertTrue(locker.unlockWrite(), "Unlock after lock should return true") + + locker.lockWrite("key1") + assertTrue(locker.unlockWrite("key1"), "Unlock after lock should return true") + } + + // ==================== Stress Tests ==================== + + @Test + fun stressTestWithMixedOperations() = runTest(timeout = 10.seconds) { + val operations = 100 + val keys = listOf("key1", "key2", "key3", "key4", "key5") + val jobs = mutableListOf() + + repeat(operations) { i -> + val key = keys[i % keys.size] + + when (i % 4) { + 0 -> jobs.add(launch { + locker.acquireRead(key) + delay(10.milliseconds) + locker.releaseRead(key) + }) + 1 -> jobs.add(launch { + locker.lockWrite(key) + delay(10.milliseconds) + locker.unlockWrite(key) + }) + 2 -> jobs.add(launch { + locker.acquireRead() + delay(10.milliseconds) + locker.releaseRead() + }) + 3 -> jobs.add(launch { + locker.lockWrite() + delay(10.milliseconds) + locker.unlockWrite() + }) + } + } + + jobs.joinAll() + // If we reach here without deadlock or exceptions, test passes + } + + @Test + fun testFairnessReadersDontStarveWriters() = runTest(timeout = 5.seconds) { + val key = "testKey" + var writerExecuted = false + + // Start continuous readers + val readers = List(10) { + launch { + repeat(5) { + locker.acquireRead(key) + delay(50.milliseconds) + locker.releaseRead(key) + delay(10.milliseconds) + } + } + } + + delay(100.milliseconds) + + // Try to acquire write lock + val writer = launch { + locker.lockWrite(key) + writerExecuted = true + locker.unlockWrite(key) + } + + readers.joinAll() + writer.join() + + assertTrue(writerExecuted, "Writer should eventually execute") } } diff --git a/coroutines/src/commonTest/kotlin/SpecialMutableStateFlowTests.kt b/coroutines/src/commonTest/kotlin/SpecialMutableStateFlowTests.kt index 8e3aa105621..7da49d8b9a6 100644 --- a/coroutines/src/commonTest/kotlin/SpecialMutableStateFlowTests.kt +++ b/coroutines/src/commonTest/kotlin/SpecialMutableStateFlowTests.kt @@ -1,4 +1,4 @@ -import dev.inmo.micro_utils.coroutines.SpecialMutableStateFlow +import dev.inmo.micro_utils.coroutines.MutableRedeliverStateFlow import dev.inmo.micro_utils.coroutines.asDeferred import dev.inmo.micro_utils.coroutines.subscribe import kotlinx.coroutines.Job @@ -8,17 +8,17 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertTrue -class SpecialMutableStateFlowTests { +class MutableRedeliverStateFlowTests { @Test fun simpleTest() = runTest { - val specialMutableStateFlow = SpecialMutableStateFlow(0) + val specialMutableStateFlow = MutableRedeliverStateFlow(0) specialMutableStateFlow.value = 1 specialMutableStateFlow.first { it == 1 } assertEquals(1, specialMutableStateFlow.value) } @Test fun specialTest() = runTest { - val specialMutableStateFlow = SpecialMutableStateFlow(0) + val specialMutableStateFlow = MutableRedeliverStateFlow(0) lateinit var subscriberJob: Job subscriberJob = specialMutableStateFlow.subscribe(this) { when (it) { diff --git a/pagination/compose/src/commonMain/kotlin/InfinityPagedComponent.kt b/pagination/compose/src/commonMain/kotlin/InfinityPagedComponent.kt index 95b17437f3d..7627062471c 100644 --- a/pagination/compose/src/commonMain/kotlin/InfinityPagedComponent.kt +++ b/pagination/compose/src/commonMain/kotlin/InfinityPagedComponent.kt @@ -1,7 +1,7 @@ package dev.inmo.micro_utils.pagination.compose import androidx.compose.runtime.* -import dev.inmo.micro_utils.coroutines.SpecialMutableStateFlow +import dev.inmo.micro_utils.coroutines.MutableRedeliverStateFlow import dev.inmo.micro_utils.coroutines.launchLoggingDropExceptions import dev.inmo.micro_utils.coroutines.runCatchingLogging import dev.inmo.micro_utils.pagination.* @@ -27,8 +27,8 @@ class InfinityPagedComponentContext internal constructor( private val loader: suspend InfinityPagedComponentContext.(Pagination) -> PaginationResult ) { internal val startPage = SimplePagination(page, size) - internal val latestLoadedPage = SpecialMutableStateFlow?>(null) - internal val dataState = SpecialMutableStateFlow?>(null) + internal val latestLoadedPage = MutableRedeliverStateFlow?>(null) + internal val dataState = MutableRedeliverStateFlow?>(null) internal var loadingJob: Job? = null internal val loadingMutex = Mutex() diff --git a/pagination/compose/src/commonMain/kotlin/PagedComponent.kt b/pagination/compose/src/commonMain/kotlin/PagedComponent.kt index 90d93f7906b..fe85d845975 100644 --- a/pagination/compose/src/commonMain/kotlin/PagedComponent.kt +++ b/pagination/compose/src/commonMain/kotlin/PagedComponent.kt @@ -1,7 +1,7 @@ package dev.inmo.micro_utils.pagination.compose import androidx.compose.runtime.* -import dev.inmo.micro_utils.coroutines.SpecialMutableStateFlow +import dev.inmo.micro_utils.coroutines.MutableRedeliverStateFlow import dev.inmo.micro_utils.coroutines.launchLoggingDropExceptions import dev.inmo.micro_utils.pagination.* import kotlinx.coroutines.CoroutineScope @@ -28,8 +28,8 @@ class PagedComponentContext internal constructor( private val loader: suspend PagedComponentContext.(Pagination) -> PaginationResult ) { internal val startPage = SimplePagination(initialPage, size) - internal val latestLoadedPage = SpecialMutableStateFlow?>(null) - internal val dataState = SpecialMutableStateFlow?>(null) + internal val latestLoadedPage = MutableRedeliverStateFlow?>(null) + internal val dataState = MutableRedeliverStateFlow?>(null) internal var loadingJob: Job? = null internal val loadingMutex = Mutex()