diff --git a/CHANGELOG.md b/CHANGELOG.md index 4eed8f3..8dc3acf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,3 +6,7 @@ * Updates in libraries: * Coroutines `1.3.2` -> `1.3.3` * Klock `1.7.3` -> `1.8.6` + +## 0.2.1 + +* Added support of flows: now any `KronScheduler` can be convert to `Flow` using `asFlow` extension diff --git a/README.md b/README.md index ec1b0ba..1852a64 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ runtime of applications. | [ How to use: Including in project ](#including-in-project) | | [ How to use: Config from string ](#config-from-string) | | [ How to use: Config via builder (DSL preview) ](#config-via-builder) | +| [ How to use: KronScheduler as a Flow ](#KronScheduler-as-a-Flow) | ## How to use @@ -124,3 +125,28 @@ kronScheduler.doInfinity { ``` All of these examples will do the same things: print `Called` message every five seconds. + +### KronScheduler as a Flow + +Any `KronScheduler`can e converted to a `Flow = SchedulerFlow(this) + +@FlowPreview +class SchedulerFlow( + private val scheduler: KronScheduler +) : AbstractFlow() { + @FlowPreview + override suspend fun collectSafely(collector: FlowCollector) { + while (true) { + val now = DateTime.now() + val nextTime = scheduler.next(now) + val sleepDelay = (nextTime - now).millisecondsLong + delay(sleepDelay) + collector.emit(nextTime) + } + } +} \ No newline at end of file diff --git a/src/commonTest/kotlin/com/insanusmokrassar/krontab/utils/FailJob.kt b/src/commonTest/kotlin/com/insanusmokrassar/krontab/utils/FailJob.kt new file mode 100644 index 0000000..43530fe --- /dev/null +++ b/src/commonTest/kotlin/com/insanusmokrassar/krontab/utils/FailJob.kt @@ -0,0 +1,8 @@ +package com.insanusmokrassar.krontab.utils + +import kotlinx.coroutines.* + +fun CoroutineScope.createFailJob(forTimeMillis: Long) = launch { + delay(forTimeMillis) + throw IllegalStateException("This job must be completed at before this exception happen") +} diff --git a/src/commonTest/kotlin/com/insanusmokrassar/krontab/utils/RunTest.kt b/src/commonTest/kotlin/com/insanusmokrassar/krontab/utils/RunTest.kt new file mode 100644 index 0000000..129a4b5 --- /dev/null +++ b/src/commonTest/kotlin/com/insanusmokrassar/krontab/utils/RunTest.kt @@ -0,0 +1,8 @@ +package com.insanusmokrassar.krontab.utils + +import kotlinx.coroutines.CoroutineScope + +/** + * Workaround to use suspending functions in unit tests + */ +expect fun runTest(block: suspend (scope : CoroutineScope) -> Unit) diff --git a/src/commonTest/kotlin/com/insanusmokrassar/krontab/utils/SchedulerFlow.kt b/src/commonTest/kotlin/com/insanusmokrassar/krontab/utils/SchedulerFlow.kt new file mode 100644 index 0000000..2bb913f --- /dev/null +++ b/src/commonTest/kotlin/com/insanusmokrassar/krontab/utils/SchedulerFlow.kt @@ -0,0 +1,69 @@ +package com.insanusmokrassar.krontab.utils + +import com.insanusmokrassar.krontab.builder.buildSchedule +import com.soywiz.klock.DateTime +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.takeWhile +import kotlin.test.Test +import kotlin.test.assertEquals + +@ExperimentalCoroutinesApi +@FlowPreview +class SchedulerFlowTests { + @Test + fun testThatFlowIsCorrectlyWorkEverySecond() { + val kronScheduler = buildSchedule { + seconds { + 0 every 1 + } + } + + val flow = kronScheduler.asFlow() + + runTest { + val mustBeCollected = 10 + var collected = 0 + flow.takeWhile { + collected < mustBeCollected + }.collect { + collected++ + } + assertEquals(mustBeCollected, collected) + } + } + + @Test + fun testThatFlowIsCorrectlyWorkEverySecondWithMuchOfEmitters() { + val kronScheduler = buildSchedule { + seconds { + 0 every 1 + } + } + + val flow = kronScheduler.asFlow() + + runTest { + val testsCount = 10 + val failJob = it.createFailJob((testsCount * 2) * 1000L) + val mustBeCollected = 10 + val answers = (0 until testsCount).map { _ -> + it.async { + var collected = 0 + flow.takeWhile { + collected < mustBeCollected + }.collect { + collected++ + } + collected + } + }.awaitAll() + + failJob.cancel() + + answers.forEach { + assertEquals(mustBeCollected, it) + } + } + } +} diff --git a/src/jsTest/kotlin/com/insanusmokrassar/krontab/utils/RunTest.kt b/src/jsTest/kotlin/com/insanusmokrassar/krontab/utils/RunTest.kt new file mode 100644 index 0000000..5d87632 --- /dev/null +++ b/src/jsTest/kotlin/com/insanusmokrassar/krontab/utils/RunTest.kt @@ -0,0 +1,5 @@ +package com.insanusmokrassar.krontab.utils + +import kotlinx.coroutines.* + +actual fun runTest(block: suspend (scope : CoroutineScope) -> Unit): dynamic = GlobalScope.promise { block(this) } diff --git a/src/jvmTest/kotlin/com/insanusmokrassar/krontab/utils/RunTest.kt b/src/jvmTest/kotlin/com/insanusmokrassar/krontab/utils/RunTest.kt new file mode 100644 index 0000000..a20cefc --- /dev/null +++ b/src/jvmTest/kotlin/com/insanusmokrassar/krontab/utils/RunTest.kt @@ -0,0 +1,9 @@ +package com.insanusmokrassar.krontab.utils + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.runBlocking + +/** + * Workaround to use suspending functions in unit tests + */ +actual fun runTest(block: suspend (scope: CoroutineScope) -> Unit) = runBlocking(block = block)