asFlow extension

This commit is contained in:
InsanusMokrassar 2020-01-13 10:15:01 +06:00
parent 5e4065ee53
commit 1d202a7311
7 changed files with 84 additions and 0 deletions

View File

@ -8,3 +8,5 @@
* Klock `1.7.3` -> `1.8.6`
## 0.2.1
* Added support of flows: now any `KronScheduler` can be convert to `Flow<DateTime>` using `asFlow` extension

View File

@ -47,6 +47,7 @@ kotlin {
implementation kotlin('test-common')
implementation kotlin('test-annotations-common')
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines_version"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:$kotlin_coroutines_version"
}
}
jvmMain {

View File

@ -0,0 +1,25 @@
package com.insanusmokrassar.krontab.utils
import com.insanusmokrassar.krontab.KronScheduler
import com.soywiz.klock.DateTime
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
@FlowPreview
fun KronScheduler.asFlow(): Flow<DateTime> = SchedulerFlow(this)
@FlowPreview
class SchedulerFlow(
private val scheduler: KronScheduler
) : AbstractFlow<DateTime>() {
@FlowPreview
override suspend fun collectSafely(collector: FlowCollector<DateTime>) {
while (true) {
val now = DateTime.now()
val nextTime = scheduler.next(now)
val sleepDelay = (nextTime - now).millisecondsLong
delay(sleepDelay)
collector.emit(nextTime)
}
}
}

View File

@ -0,0 +1,8 @@
package com.insanusmokrassar.krontab.utils
import kotlinx.coroutines.CoroutineScope
/**
* Workaround to use suspending functions in unit tests
*/
expect fun runTest(block: suspend (scope : CoroutineScope) -> Unit)

View File

@ -0,0 +1,34 @@
package com.insanusmokrassar.krontab.utils
import com.insanusmokrassar.krontab.builder.buildSchedule
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.takeWhile
import kotlin.test.Test
import kotlin.test.assertEquals
@ExperimentalCoroutinesApi
@FlowPreview
class SchedulerFlowTests {
@Test
fun testThatFlowIsCorrectlyWorkEverySecond() {
val kronScheduler = buildSchedule {
seconds {
0 every 1
}
}
val flow = kronScheduler.asFlow()
runTest {
val mustBeCollected = 10
var collected = 0
flow.takeWhile {
collected < mustBeCollected
}.collect {
collected++
}
assertEquals(mustBeCollected, collected)
}
}
}

View File

@ -0,0 +1,5 @@
package com.insanusmokrassar.krontab.utils
import kotlinx.coroutines.*
actual fun runTest(block: suspend (scope : CoroutineScope) -> Unit): dynamic = GlobalScope.promise { block(this) }

View File

@ -0,0 +1,9 @@
package com.insanusmokrassar.krontab.utils
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
/**
* Workaround to use suspending functions in unit tests
*/
actual fun runTest(block: suspend (scope: CoroutineScope) -> Unit) = runBlocking(block = block)