Merge pull request #101 from InsanusMokrassar/0.7.1

0.7.1
This commit is contained in:
InsanusMokrassar 2021-10-13 15:11:26 +06:00 committed by GitHub
commit 2e66c6f4e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 409 additions and 72 deletions

View File

@ -1,5 +1,20 @@
# Changelog # Changelog
## 0.7.1
* `Versions`:
* `Klock`: `2.4.3` -> `2.4.5`
* `Exposed`: `0.35.1` -> `0.35.2`
* `Coroutines`:
* `Common`:
* New `Flow` - `AccumulatorFlow`
* `FSM`:
* `Common`:
* `InMemoryStatesManager` has been replaced
* `StatesMachine` became an interface
* New manager `DefaultStatesManager` with `DefaultStatesManagerRepo` for abstraction of manager and storing of
data info
## 0.7.0 ## 0.7.0
**THIS VERSION HAS MIGRATED FROM KOTLINX DATETIME TO KORLIBS KLOCK. CAREFUL** **THIS VERSION HAS MIGRATED FROM KOTLINX DATETIME TO KORLIBS KLOCK. CAREFUL**

View File

@ -0,0 +1,94 @@
package dev.inmo.micro_utils.coroutines
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
private sealed interface AccumulatorFlowStep
private data class DataRetrievedAccumulatorFlowStep(val data: Any) : AccumulatorFlowStep
private data class SubscribeAccumulatorFlowStep(val channel: Channel<Any>) : AccumulatorFlowStep
private data class UnsubscribeAccumulatorFlowStep(val channel: Channel<Any>) : AccumulatorFlowStep
/**
* This [Flow] will have behaviour very similar to [SharedFlow], but there are several differences:
*
* * All unhandled by [FlowCollector] data will not be removed from [AccumulatorFlow] and will be sent to new
* [FlowCollector]s until anybody will handle it
* * Here there are an [activeData] where data [T] will be stored until somebody will handle it
*/
class AccumulatorFlow<T>(
sourceDataFlow: Flow<T>,
scope: CoroutineScope
) : AbstractFlow<T>() {
private val subscope = scope.LinkedSupervisorScope()
private val activeData = ArrayDeque<T>()
private val dataMutex = Mutex()
private val channelsForBroadcast = mutableListOf<Channel<Any>>()
private val channelsMutex = Mutex()
private val steps = subscope.actor<AccumulatorFlowStep> { step ->
when (step) {
is DataRetrievedAccumulatorFlowStep -> {
if (activeData.first() === step.data) {
dataMutex.withLock {
activeData.removeFirst()
}
}
}
is SubscribeAccumulatorFlowStep -> channelsMutex.withLock {
channelsForBroadcast.add(step.channel)
dataMutex.withLock {
val dataToSend = activeData.toList()
safelyWithoutExceptions {
dataToSend.forEach { step.channel.send(it as Any) }
}
}
}
is UnsubscribeAccumulatorFlowStep -> channelsMutex.withLock {
channelsForBroadcast.remove(step.channel)
}
}
}
private val subscriptionJob = sourceDataFlow.subscribeSafelyWithoutExceptions(subscope) {
dataMutex.withLock {
activeData.addLast(it)
}
channelsMutex.withLock {
channelsForBroadcast.forEach { channel ->
safelyWithResult {
channel.send(it as Any)
}
}
}
}
override suspend fun collectSafely(collector: FlowCollector<T>) {
val channel = Channel<Any>(Channel.UNLIMITED, BufferOverflow.SUSPEND)
steps.send(SubscribeAccumulatorFlowStep(channel))
for (data in channel) {
try {
collector.emit(data as T)
steps.send(DataRetrievedAccumulatorFlowStep(data))
} finally {
channel.cancel()
steps.send(UnsubscribeAccumulatorFlowStep(channel))
}
}
}
}
/**
* Creates [AccumulatorFlow] using [this] as base [Flow]
*/
fun <T> Flow<T>.accumulatorFlow(scope: CoroutineScope): Flow<T> {
return AccumulatorFlow(this, scope)
}
/**
* Creates [AccumulatorFlow] using [this] with [receiveAsFlow] to get
*/
fun <T> Channel<T>.accumulatorFlow(scope: CoroutineScope): Flow<T> {
return receiveAsFlow().accumulatorFlow(scope)
}

View File

@ -0,0 +1,6 @@
package dev.inmo.micro_utils.fsm.common
import dev.inmo.micro_utils.fsm.common.managers.InMemoryStatesManager
@Deprecated("Replaced", ReplaceWith("InMemoryStatesManager", "dev.inmo.micro_utils.fsm.common.managers.InMemoryStatesManager"))
typealias InMemoryStatesManager = InMemoryStatesManager

View File

@ -2,13 +2,25 @@ package dev.inmo.micro_utils.fsm.common
import kotlin.reflect.KClass import kotlin.reflect.KClass
/**
* Default realization of [StatesHandler]. It will incapsulate checking of [State] type in [checkHandleable] and class
* casting in [handleState]
*/
class StateHandlerHolder<I : State>( class StateHandlerHolder<I : State>(
private val inputKlass: KClass<I>, private val inputKlass: KClass<I>,
private val strict: Boolean = false, private val strict: Boolean = false,
private val delegateTo: StatesHandler<I> private val delegateTo: StatesHandler<I>
) : StatesHandler<State> { ) : StatesHandler<State> {
/**
* Checks that [state] can be handled by [delegateTo]. Under the hood it will check exact equality of [state]
* [KClass] and use [KClass.isInstance] of [inputKlass] if [strict] == false
*/
fun checkHandleable(state: State) = state::class == inputKlass || (!strict && inputKlass.isInstance(state)) fun checkHandleable(state: State) = state::class == inputKlass || (!strict && inputKlass.isInstance(state))
/**
* Calls [delegateTo] method [StatesHandler.handleState] with [state] casted to [I]. Use [checkHandleable]
* to be sure that this [StateHandlerHolder] will be able to handle [state]
*/
override suspend fun StatesMachine.handleState(state: State): State? { override suspend fun StatesMachine.handleState(state: State): State? {
return delegateTo.run { handleState(state as I) } return delegateTo.run { handleState(state as I) }
} }

View File

@ -1,5 +1,12 @@
package dev.inmo.micro_utils.fsm.common package dev.inmo.micro_utils.fsm.common
/**
* Default realization of states handler
*/
fun interface StatesHandler<I : State> { fun interface StatesHandler<I : State> {
/**
* Main handling of [state]. In case when this [state] leads to another [State] and [handleState] returns not null
* [State] it is assumed that chain is not completed.
*/
suspend fun StatesMachine.handleState(state: I): State? suspend fun StatesMachine.handleState(state: I): State?
} }

View File

@ -13,13 +13,53 @@ private suspend fun <I : State> StatesMachine.launchStateHandling(
} }
} }
class StatesMachine ( /**
* Default [StatesMachine] may [startChain] and use inside logic for handling [State]s. By default you may use
* [DefaultStatesMachine] or build it with [dev.inmo.micro_utils.fsm.common.dsl.buildFSM]. Implementers MUST NOT start
* handling until [start] method will be called
*/
interface StatesMachine : StatesHandler<State> {
/**
* Starts handling of [State]s
*/
fun start(scope: CoroutineScope): Job
/**
* Start chain of [State]s witn [state]
*/
suspend fun startChain(state: State)
companion object {
/**
* Creates [DefaultStatesMachine]
*/
operator fun invoke(
statesManager: StatesManager,
handlers: List<StateHandlerHolder<*>>
) = DefaultStatesMachine(statesManager, handlers)
}
}
/**
* Default realization of [StatesMachine]. It uses [statesManager] for incapsulation of [State]s storing and contexts
* resolving, and uses [launchStateHandling] for [State] handling
*/
class DefaultStatesMachine (
private val statesManager: StatesManager, private val statesManager: StatesManager,
private val handlers: List<StateHandlerHolder<*>> private val handlers: List<StateHandlerHolder<*>>
) : StatesHandler<State> { ) : StatesMachine {
/**
* Will call [launchStateHandling] for state handling
*/
override suspend fun StatesMachine.handleState(state: State): State? = launchStateHandling(state, handlers) override suspend fun StatesMachine.handleState(state: State): State? = launchStateHandling(state, handlers)
fun start(scope: CoroutineScope): Job = scope.launchSafelyWithoutExceptions { /**
* Launch handling of states. On [statesManager] [StatesManager.onStartChain],
* [statesManager] [StatesManager.onChainStateUpdated] will be called lambda with performing of state. If
* [launchStateHandling] will returns some [State] then [statesManager] [StatesManager.update] will be used, otherwise
* [StatesManager.endChain].
*/
override fun start(scope: CoroutineScope): Job = scope.launchSafelyWithoutExceptions {
val statePerformer: suspend (State) -> Unit = { state: State -> val statePerformer: suspend (State) -> Unit = { state: State ->
val newState = launchStateHandling(state, handlers) val newState = launchStateHandling(state, handlers)
if (newState != null) { if (newState != null) {
@ -40,7 +80,10 @@ class StatesMachine (
} }
} }
suspend fun startChain(state: State) { /**
* Just calls [StatesManager.startChain] of [statesManager]
*/
override suspend fun startChain(state: State) {
statesManager.startChain(state) statesManager.startChain(state)
} }
} }

View File

@ -1,8 +1,6 @@
package dev.inmo.micro_utils.fsm.common package dev.inmo.micro_utils.fsm.common
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
interface StatesManager { interface StatesManager {
val onChainStateUpdated: Flow<Pair<State, State>> val onChainStateUpdated: Flow<Pair<State, State>>
@ -30,63 +28,3 @@ interface StatesManager {
suspend fun getActiveStates(): List<State> suspend fun getActiveStates(): List<State>
} }
/**
* @param onContextsConflictResolver Receive old [State], new one and the state currently placed on new [State.context]
* key. In case when this callback will returns true, the state placed on [State.context] of new will be replaced by
* new state by using [endChain] with that state
*/
class InMemoryStatesManager(
private val onContextsConflictResolver: suspend (old: State, new: State, currentNew: State) -> Boolean = { _, _, _ -> true }
) : StatesManager {
private val _onChainStateUpdated = MutableSharedFlow<Pair<State, State>>(0)
override val onChainStateUpdated: Flow<Pair<State, State>> = _onChainStateUpdated.asSharedFlow()
private val _onStartChain = MutableSharedFlow<State>(0)
override val onStartChain: Flow<State> = _onStartChain.asSharedFlow()
private val _onEndChain = MutableSharedFlow<State>(0)
override val onEndChain: Flow<State> = _onEndChain.asSharedFlow()
private val contextsToStates = mutableMapOf<Any, State>()
private val mapMutex = Mutex()
override suspend fun update(old: State, new: State) = mapMutex.withLock {
when {
contextsToStates[old.context] != old -> return@withLock
old.context == new.context || !contextsToStates.containsKey(new.context) -> {
contextsToStates[old.context] = new
_onChainStateUpdated.emit(old to new)
}
else -> {
val stateOnNewOneContext = contextsToStates.getValue(new.context)
if (onContextsConflictResolver(old, new, stateOnNewOneContext)) {
endChainWithoutLock(stateOnNewOneContext)
contextsToStates.remove(old.context)
contextsToStates[new.context] = new
_onChainStateUpdated.emit(old to new)
}
}
}
}
override suspend fun startChain(state: State) = mapMutex.withLock {
if (!contextsToStates.containsKey(state.context)) {
contextsToStates[state.context] = state
_onStartChain.emit(state)
}
}
private suspend fun endChainWithoutLock(state: State) {
if (contextsToStates[state.context] == state) {
contextsToStates.remove(state.context)
_onEndChain.emit(state)
}
}
override suspend fun endChain(state: State) {
mapMutex.withLock {
endChainWithoutLock(state)
}
}
override suspend fun getActiveStates(): List<State> = contextsToStates.values.toList()
}

View File

@ -1,10 +1,12 @@
package dev.inmo.micro_utils.fsm.common.dsl package dev.inmo.micro_utils.fsm.common.dsl
import dev.inmo.micro_utils.fsm.common.* import dev.inmo.micro_utils.fsm.common.*
import dev.inmo.micro_utils.fsm.common.managers.*
import dev.inmo.micro_utils.fsm.common.managers.InMemoryStatesManager
import kotlin.reflect.KClass import kotlin.reflect.KClass
class FSMBuilder( class FSMBuilder(
var statesManager: StatesManager = InMemoryStatesManager() var statesManager: StatesManager = DefaultStatesManager(InMemoryDefaultStatesManagerRepo())
) { ) {
private var states = mutableListOf<StateHandlerHolder<*>>() private var states = mutableListOf<StateHandlerHolder<*>>()

View File

@ -0,0 +1,101 @@
package dev.inmo.micro_utils.fsm.common.managers
import dev.inmo.micro_utils.fsm.common.State
import dev.inmo.micro_utils.fsm.common.StatesManager
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
/**
* Implement this repo if you want to use some custom repo for [DefaultStatesManager]
*/
interface DefaultStatesManagerRepo {
/**
* Must save [state] as current state of chain with [State.context] of [state]
*/
suspend fun set(state: State)
/**
* Remove exactly [state]. In case if internally [State.context] is busy with different [State], that [State] should
* NOT be removed
*/
suspend fun removeState(state: State)
/**
* @return Current list of available and saved states
*/
suspend fun getStates(): List<State>
/**
* @return Current state by [context]
*/
suspend fun getContextState(context: Any): State?
/**
* @return Current state by [context]
*/
suspend fun contains(context: Any): Boolean = getContextState(context) != null
}
/**
* @param repo This repo will be used as repository for storing states. All operations with this repo will happen BEFORE
* any event will be sent to [onChainStateUpdated], [onStartChain] or [onEndChain]. By default will be used
* [InMemoryDefaultStatesManagerRepo] or you may create custom [DefaultStatesManagerRepo] and pass as [repo] parameter
* @param onContextsConflictResolver Receive old [State], new one and the state currently placed on new [State.context]
* key. In case when this callback will returns true, the state placed on [State.context] of new will be replaced by
* new state by using [endChain] with that state
*/
class DefaultStatesManager(
private val repo: DefaultStatesManagerRepo = InMemoryDefaultStatesManagerRepo(),
private val onContextsConflictResolver: suspend (old: State, new: State, currentNew: State) -> Boolean = { _, _, _ -> true }
) : StatesManager {
private val _onChainStateUpdated = MutableSharedFlow<Pair<State, State>>(0)
override val onChainStateUpdated: Flow<Pair<State, State>> = _onChainStateUpdated.asSharedFlow()
private val _onStartChain = MutableSharedFlow<State>(0)
override val onStartChain: Flow<State> = _onStartChain.asSharedFlow()
private val _onEndChain = MutableSharedFlow<State>(0)
override val onEndChain: Flow<State> = _onEndChain.asSharedFlow()
private val mapMutex = Mutex()
override suspend fun update(old: State, new: State) = mapMutex.withLock {
val stateByOldContext: State? = repo.getContextState(old.context)
when {
stateByOldContext != old -> return@withLock
stateByOldContext == null || old.context == new.context -> {
repo.set(new)
_onChainStateUpdated.emit(old to new)
}
else -> {
val stateOnNewOneContext = repo.getContextState(new.context)
if (stateOnNewOneContext == null || onContextsConflictResolver(old, new, stateOnNewOneContext)) {
stateOnNewOneContext ?.let { endChainWithoutLock(it) }
repo.removeState(old)
repo.set(new)
_onChainStateUpdated.emit(old to new)
}
}
}
}
override suspend fun startChain(state: State) = mapMutex.withLock {
if (!repo.contains(state.context)) {
repo.set(state)
_onStartChain.emit(state)
}
}
private suspend fun endChainWithoutLock(state: State) {
if (repo.getContextState(state.context) == state) {
repo.removeState(state)
_onEndChain.emit(state)
}
}
override suspend fun endChain(state: State) {
mapMutex.withLock {
endChainWithoutLock(state)
}
}
override suspend fun getActiveStates(): List<State> = repo.getStates()
}

View File

@ -0,0 +1,25 @@
package dev.inmo.micro_utils.fsm.common.managers
import dev.inmo.micro_utils.fsm.common.State
/**
* Simple [DefaultStatesManagerRepo] for [DefaultStatesManager] which will store data in [map] and use primitive
* functionality
*/
class InMemoryDefaultStatesManagerRepo(
private val map: MutableMap<Any, State> = mutableMapOf()
) : DefaultStatesManagerRepo {
override suspend fun set(state: State) {
map[state.context] = state
}
override suspend fun removeState(state: State) {
map.remove(state.context)
}
override suspend fun getStates(): List<State> = map.values.toList()
override suspend fun getContextState(context: Any): State? = map[context]
override suspend fun contains(context: Any): Boolean = map.contains(context)
}

View File

@ -0,0 +1,68 @@
package dev.inmo.micro_utils.fsm.common.managers
import dev.inmo.micro_utils.fsm.common.State
import dev.inmo.micro_utils.fsm.common.StatesManager
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
/**
* @param onContextsConflictResolver Receive old [State], new one and the state currently placed on new [State.context]
* key. In case when this callback will returns true, the state placed on [State.context] of new will be replaced by
* new state by using [endChain] with that state
*/
class InMemoryStatesManager(
private val onContextsConflictResolver: suspend (old: State, new: State, currentNew: State) -> Boolean = { _, _, _ -> true }
) : StatesManager {
private val _onChainStateUpdated = MutableSharedFlow<Pair<State, State>>(0)
override val onChainStateUpdated: Flow<Pair<State, State>> = _onChainStateUpdated.asSharedFlow()
private val _onStartChain = MutableSharedFlow<State>(0)
override val onStartChain: Flow<State> = _onStartChain.asSharedFlow()
private val _onEndChain = MutableSharedFlow<State>(0)
override val onEndChain: Flow<State> = _onEndChain.asSharedFlow()
private val contextsToStates = mutableMapOf<Any, State>()
private val mapMutex = Mutex()
override suspend fun update(old: State, new: State) = mapMutex.withLock {
when {
contextsToStates[old.context] != old -> return@withLock
old.context == new.context || !contextsToStates.containsKey(new.context) -> {
contextsToStates[old.context] = new
_onChainStateUpdated.emit(old to new)
}
else -> {
val stateOnNewOneContext = contextsToStates.getValue(new.context)
if (onContextsConflictResolver(old, new, stateOnNewOneContext)) {
endChainWithoutLock(stateOnNewOneContext)
contextsToStates.remove(old.context)
contextsToStates[new.context] = new
_onChainStateUpdated.emit(old to new)
}
}
}
}
override suspend fun startChain(state: State) = mapMutex.withLock {
if (!contextsToStates.containsKey(state.context)) {
contextsToStates[state.context] = state
_onStartChain.emit(state)
}
}
private suspend fun endChainWithoutLock(state: State) {
if (contextsToStates[state.context] == state) {
contextsToStates.remove(state.context)
_onEndChain.emit(state)
}
}
override suspend fun endChain(state: State) {
mapMutex.withLock {
endChainWithoutLock(state)
}
}
override suspend fun getActiveStates(): List<State> = contextsToStates.values.toList()
}

View File

@ -0,0 +1,25 @@
package dev.inmo.micro_utils.fsm.repos.common
import dev.inmo.micro_utils.fsm.common.State
import dev.inmo.micro_utils.fsm.common.managers.DefaultStatesManagerRepo
import dev.inmo.micro_utils.repos.*
import dev.inmo.micro_utils.repos.pagination.getAll
class KeyValueBasedDefaultStatesManagerRepo(
private val keyValueRepo: KeyValueRepo<Any, State>
) : DefaultStatesManagerRepo {
override suspend fun set(state: State) {
keyValueRepo.set(state.context, state)
}
override suspend fun removeState(state: State) {
if (keyValueRepo.get(state.context) == state) {
keyValueRepo.unset(state.context)
}
}
override suspend fun getStates(): List<State> = keyValueRepo.getAll { keys(it) }.map { it.second }
override suspend fun getContextState(context: Any): State? = keyValueRepo.get(context)
override suspend fun contains(context: Any): Boolean = keyValueRepo.contains(context)
}

View File

@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
@Deprecated("Replace with DefaultStatesManager and KeyValueBasedDefaultStatesManagerRepo")
class KeyValueBasedStatesManager( class KeyValueBasedStatesManager(
private val keyValueRepo: KeyValueRepo<Any, State>, private val keyValueRepo: KeyValueRepo<Any, State>,
private val onContextsConflictResolver: suspend (old: State, new: State, currentNew: State) -> Boolean = { _, _, _ -> true } private val onContextsConflictResolver: suspend (old: State, new: State, currentNew: State) -> Boolean = { _, _, _ -> true }

View File

@ -10,11 +10,11 @@ org.gradle.jvmargs=-Xmx2g
kotlin_version=1.5.31 kotlin_version=1.5.31
kotlin_coroutines_version=1.5.2 kotlin_coroutines_version=1.5.2
kotlin_serialisation_core_version=1.3.0 kotlin_serialisation_core_version=1.3.0
kotlin_exposed_version=0.35.1 kotlin_exposed_version=0.35.2
ktor_version=1.6.4 ktor_version=1.6.4
klockVersion=2.4.3 klockVersion=2.4.5
github_release_plugin_version=2.2.12 github_release_plugin_version=2.2.12
@ -40,10 +40,10 @@ crypto_js_version=4.1.1
# Dokka # Dokka
dokka_version=1.5.30 dokka_version=1.5.31
# Project data # Project data
group=dev.inmo group=dev.inmo
version=0.7.0 version=0.7.1
android_code_version=74 android_code_version=75