corotoutines actualization

This commit is contained in:
InsanusMokrassar 2020-10-28 14:17:39 +06:00
parent 4019ad7d31
commit e2d1c5d6a1
2 changed files with 36 additions and 12 deletions

View File

@ -9,6 +9,8 @@
* `K/JS`
* Add several extensions for `Element` objects to detect that object is on screen viewport
* Add several extensions for `Element` objects to detect object visibility
* `Coroutines`
* `BroadcastStateFlow` now use different strategy for getting of state and implements `replayCache`
## 0.2.2

View File

@ -5,29 +5,51 @@ import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
const val defaultBroadcastStateFlowReplayCacheSize = 1
class BroadcastStateFlow<T> internal constructor(
parentFlow: Flow<T>,
private val stateGetter: () -> T
initial: T,
replayCacheSize: Int = defaultBroadcastStateFlowReplayCacheSize,
replayScope: CoroutineScope
) : StateFlow<T>, Flow<T> by parentFlow {
private val deque = ArrayDeque<T>(1).also {
it.add(initial)
}
override val replayCache: List<T>
get() = deque.toList()
override val value: T
get() = stateGetter()
}
get() = deque.last()
fun <T> BroadcastChannel<T>.asStateFlow(value: T, scope: CoroutineScope): StateFlow<T> = asFlow().let {
var state: T = value
it.onEach { state = it }.launchIn(scope)
BroadcastStateFlow(it) {
state
init {
if (replayCacheSize < 1) {
error("Replay cache size can't be less than 1, but was $replayCacheSize")
}
parentFlow.onEach {
deque.addLast(it)
if (deque.size > replayCacheSize) {
deque.removeFirst()
}
}.launchIn(replayScope)
}
}
fun <T> BroadcastChannel<T?>.asStateFlow(scope: CoroutineScope): StateFlow<T?> = asStateFlow(null, scope)
fun <T> BroadcastChannel<T>.asStateFlow(
value: T,
scope: CoroutineScope,
replayCacheSize: Int = defaultBroadcastStateFlowReplayCacheSize
): StateFlow<T> = BroadcastStateFlow(asFlow(), value, replayCacheSize, scope)
fun <T> broadcastStateFlow(initial: T, scope: CoroutineScope, channelSize: Int = Channel.BUFFERED) = BroadcastChannel<T>(
fun <T> BroadcastChannel<T?>.asStateFlow(
scope: CoroutineScope,
replayCacheSize: Int = defaultBroadcastStateFlowReplayCacheSize
): StateFlow<T?> = asStateFlow(null, scope, replayCacheSize)
fun <T> broadcastStateFlow(initial: T, scope: CoroutineScope, channelSize: Int = Channel.BUFFERED, replayCacheSize: Int = defaultBroadcastStateFlowReplayCacheSize) = BroadcastChannel<T>(
channelSize
).let {
it to it.asStateFlow(initial, scope)
it to it.asStateFlow(initial, scope, replayCacheSize)
}
fun <T> broadcastStateFlow(scope: CoroutineScope, channelSize: Int = Channel.BUFFERED) = broadcastStateFlow<T?>(null, scope, channelSize)
fun <T> broadcastStateFlow(scope: CoroutineScope, channelSize: Int = Channel.BUFFERED, replayCacheSize: Int = defaultBroadcastStateFlowReplayCacheSize) = broadcastStateFlow<T?>(null, scope, channelSize, replayCacheSize)