From bfb6e738ee1278f70cc976f3546480d1ec9dcc3b Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Thu, 26 Sep 2024 23:50:57 +0600 Subject: [PATCH] add debouncedBy extension --- CHANGELOG.md | 3 ++ .../micro_utils/coroutines/FlowDebouncedBy.kt | 40 ++++++++++++++++++ .../src/commonTest/kotlin/DebouncedByTests.kt | 42 +++++++++++++++++++ 3 files changed, 85 insertions(+) create mode 100644 coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowDebouncedBy.kt create mode 100644 coroutines/src/commonTest/kotlin/DebouncedByTests.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fce6ed620e..72c30759b00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## 0.22.5 +* `Coroutines`: + * Add extension `Flow.debouncedBy` + ## 0.22.4 * `Versions`: diff --git a/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowDebouncedBy.kt b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowDebouncedBy.kt new file mode 100644 index 00000000000..08e1ffbc97a --- /dev/null +++ b/coroutines/src/commonMain/kotlin/dev/inmo/micro_utils/coroutines/FlowDebouncedBy.kt @@ -0,0 +1,40 @@ +package dev.inmo.micro_utils.coroutines + +import kotlinx.coroutines.Job +import kotlinx.coroutines.async +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlin.jvm.JvmInline +import kotlin.time.Duration + +@JvmInline +private value class DebouncedByData( + val millisToData: Pair +) + +fun Flow.debouncedBy(timeout: (T) -> Long, markerFactory: (T) -> Any?): Flow = channelFlow { + val jobs = mutableMapOf() + val mutex = Mutex() + subscribe(this) { + mutex.withLock { + val marker = markerFactory(it) + lateinit var job: Job + job = async { + delay(timeout(it)) + mutex.withLock { + if (jobs[marker] === job) { + this@channelFlow.send(it) + jobs.remove(marker) + } + } + } + jobs[marker] ?.cancel() + jobs[marker] = job + } + } +} + +fun Flow.debouncedBy(timeout: Long, markerFactory: (T) -> Any?): Flow = debouncedBy({ timeout }, markerFactory) +fun Flow.debouncedBy(timeout: Duration, markerFactory: (T) -> Any?): Flow = debouncedBy({ timeout.inWholeMilliseconds }, markerFactory) diff --git a/coroutines/src/commonTest/kotlin/DebouncedByTests.kt b/coroutines/src/commonTest/kotlin/DebouncedByTests.kt new file mode 100644 index 00000000000..6680374d9c5 --- /dev/null +++ b/coroutines/src/commonTest/kotlin/DebouncedByTests.kt @@ -0,0 +1,42 @@ +import dev.inmo.micro_utils.coroutines.debouncedBy +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class DebouncedByTests { + @Test + fun testThatParallelDebouncingWorksCorrectly() = runTest { + val dataToMarkerFactories = listOf( + 1 to 0, + 2 to 1, + 3 to 2, + 4 to 0, + 5 to 1, + 6 to 2, + 7 to 0, + 8 to 1, + 9 to 2, + ) + + val collected = mutableListOf() + + dataToMarkerFactories.asFlow().debouncedBy(10L) { + it.second + }.collect { + when (it.second) { + 0 -> assertEquals(7, it.first) + 1 -> assertEquals(8, it.first) + 2 -> assertEquals(9, it.first) + else -> error("wtf") + } + collected.add(it.first) + } + + val expectedList = listOf(7, 8, 9) + assertEquals(expectedList, collected) + assertTrue { collected.containsAll(expectedList) } + assertTrue { expectedList.containsAll(collected) } + } +} \ No newline at end of file