mirror of
https://github.com/InsanusMokrassar/krontab.git
synced 2025-12-07 05:25:50 +00:00
start 0.10.0 + rework of flows
This commit is contained in:
@@ -2,35 +2,96 @@ package dev.inmo.krontab.utils
|
||||
|
||||
import com.soywiz.klock.DateTime
|
||||
import com.soywiz.klock.DateTimeTz
|
||||
import com.soywiz.klock.milliseconds
|
||||
import dev.inmo.krontab.*
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.currentCoroutineContext
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.channelFlow
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.isActive
|
||||
|
||||
/**
|
||||
* This [Flow] will trigger emitting each near time which will be returned from [this] [KronScheduler] with attention to
|
||||
* time zones
|
||||
* **This flow is [cold](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/)**
|
||||
*
|
||||
* @see channelFlow
|
||||
* @see KronSchedulerTz.doInfinityTz
|
||||
* Will emit all the [KronScheduler.next] as soon as possible. In case [KronScheduler.next] return null, flow will
|
||||
* be completed
|
||||
*
|
||||
* @param since Will be used as the first parameter for [KronScheduler.next] fun
|
||||
*/
|
||||
@FlowPreview
|
||||
fun KronScheduler.asTzFlow(): Flow<DateTimeTz> = channelFlow {
|
||||
doInfinityTz {
|
||||
send(it)
|
||||
fun KronScheduler.asTzFlowWithoutDelays(since: DateTimeTz = DateTime.nowLocal()): Flow<DateTimeTz> = flow {
|
||||
var previous = since
|
||||
while (currentCoroutineContext().isActive) {
|
||||
val next = next(previous) ?: break
|
||||
emit(next)
|
||||
previous = next + 1.milliseconds
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is a map for [asTzFlow] and will works the same but return flow with [DateTime]s
|
||||
* **This flow is [cold](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/)**
|
||||
*
|
||||
* @see channelFlow
|
||||
* @see KronScheduler.doInfinity
|
||||
* This [Flow] will use [asTzFlowWithoutDelays], but stop on each time until this time will happen
|
||||
*/
|
||||
@FlowPreview
|
||||
fun KronScheduler.asFlow(): Flow<DateTime> = channelFlow {
|
||||
doInfinity {
|
||||
send(it)
|
||||
fun KronScheduler.asTzFlowWithDelays(): Flow<DateTimeTz> = asTzFlowWithoutDelays().onEach { futureHappenTime ->
|
||||
val now = DateTime.nowLocal()
|
||||
|
||||
delay((futureHappenTime - now).millisecondsLong)
|
||||
}
|
||||
|
||||
/**
|
||||
* **This flow is [cold](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/)**
|
||||
*
|
||||
* This [Flow] will use [asTzFlowWithoutDelays], but stop on each time until this time will happen
|
||||
*/
|
||||
@Deprecated(
|
||||
"Behaviour will be changed. In some of near versions this flow will not delay executions",
|
||||
ReplaceWith("this.asTzFlowWithDelays()", "dev.inmo.krontab.utils.asTzFlowWithDelays")
|
||||
)
|
||||
@FlowPreview
|
||||
fun KronScheduler.asTzFlow(): Flow<DateTimeTz> = asTzFlowWithDelays()
|
||||
|
||||
/**
|
||||
* **This flow is [cold](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/)**
|
||||
*
|
||||
* Will emit all the [KronScheduler.next] as soon as possible. In case [KronScheduler.next] return null, flow will
|
||||
* be completed
|
||||
*
|
||||
* @param since Will be used as the first parameter for [KronScheduler.next] fun
|
||||
*/
|
||||
@FlowPreview
|
||||
fun KronScheduler.asFlowWithoutDelays(since: DateTime = DateTime.now()): Flow<DateTime> = flow {
|
||||
var previous = since
|
||||
while (currentCoroutineContext().isActive) {
|
||||
val next = next(previous) ?: break
|
||||
emit(next)
|
||||
previous = next + 1.milliseconds
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* **This flow is [cold](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/)**
|
||||
*
|
||||
* This [Flow] will use [asFlowWithoutDelays], but stop on each time until this time will happen
|
||||
*/
|
||||
@FlowPreview
|
||||
fun KronScheduler.asFlowWithDelays(): Flow<DateTime> = asFlowWithoutDelays().onEach { futureHappenTime ->
|
||||
val now = DateTime.now()
|
||||
|
||||
delay((futureHappenTime - now).millisecondsLong)
|
||||
}
|
||||
|
||||
/**
|
||||
* **This flow is [cold](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/)**
|
||||
*
|
||||
* This [Flow] will use [asFlowWithDelays], but stop on each time until this time will happen
|
||||
*/
|
||||
@Deprecated(
|
||||
"Behaviour will be changed. In some of near versions this flow will not delay executions",
|
||||
ReplaceWith("this.asFlowWithDelays()", "dev.inmo.krontab.utils.asFlowWithDelays")
|
||||
)
|
||||
@FlowPreview
|
||||
fun KronScheduler.asFlow(): Flow<DateTime> = asFlowWithDelays()
|
||||
|
||||
Reference in New Issue
Block a user