krontab/src/commonMain/kotlin/dev/inmo/krontab/utils/SchedulerFlow.kt

59 lines
1.7 KiB
Kotlin
Raw Normal View History

2020-11-21 08:58:19 +00:00
package dev.inmo.krontab.utils
2020-01-13 04:15:01 +00:00
import com.soywiz.klock.DateTime
2021-04-24 10:58:25 +00:00
import com.soywiz.klock.DateTimeTz
2020-11-21 08:58:19 +00:00
import dev.inmo.krontab.KronScheduler
2021-04-24 10:58:25 +00:00
import dev.inmo.krontab.next
import kotlinx.coroutines.*
2020-01-13 04:15:01 +00:00
import kotlinx.coroutines.flow.*
2021-04-24 10:58:25 +00:00
/**
* This [Flow] will trigger emitting each near time which will be returned from [this] [KronScheduler] with attention to
* time zones
*
* @see channelFlow
*/
2020-01-13 04:15:01 +00:00
@FlowPreview
2021-04-24 10:58:25 +00:00
fun KronScheduler.asTzFlow(): Flow<DateTimeTz> = channelFlow {
2021-06-03 06:16:39 +00:00
var previousTime = DateTime.nowLocal()
2021-04-24 10:58:25 +00:00
while (isActive) {
2021-06-03 06:16:39 +00:00
val now = DateTime.nowLocal()
2021-04-24 10:58:25 +00:00
val nextTime = next(now) ?: break
2021-06-03 06:16:39 +00:00
if (previousTime == nextTime) {
delay(1L) // skip 1ms
continue
} else {
previousTime = nextTime
}
2021-04-24 10:58:25 +00:00
val sleepDelay = (nextTime - DateTime.now().local).millisecondsLong
delay(sleepDelay)
send(nextTime)
}
}
/**
* This method is a map for [asTzFlow] and will works the same but return flow with [DateTime]s
*/
@FlowPreview
fun KronScheduler.asFlow(): Flow<DateTime> = asTzFlow().map { it.local }
2020-01-13 04:15:01 +00:00
2021-04-24 10:58:25 +00:00
@Deprecated(
"It is not recommended to use this class in future. This functionality will be removed soon",
ReplaceWith("asFlow", "dev.inmo.krontab.utils.asFlow")
)
2020-01-13 04:15:01 +00:00
@FlowPreview
class SchedulerFlow(
private val scheduler: KronScheduler
) : AbstractFlow<DateTime>() {
@FlowPreview
override suspend fun collectSafely(collector: FlowCollector<DateTime>) {
while (true) {
val now = DateTime.now()
val nextTime = scheduler.next(now) ?: break
2020-01-13 04:15:01 +00:00
val sleepDelay = (nextTime - now).millisecondsLong
delay(sleepDelay)
collector.emit(nextTime)
}
}
2021-06-03 06:16:39 +00:00
}