mirror of
https://github.com/InsanusMokrassar/MicroUtils.git
synced 2024-11-17 13:53:49 +00:00
add all autorecaches repos
This commit is contained in:
parent
46178e723b
commit
e6d9c8250f
19
repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/fallback/ActionWrapper.kt
vendored
Normal file
19
repos/cache/src/commonMain/kotlin/dev/inmo/micro_utils/repos/cache/fallback/ActionWrapper.kt
vendored
Normal file
@ -0,0 +1,19 @@
|
||||
package dev.inmo.micro_utils.repos.cache.fallback
|
||||
|
||||
import dev.inmo.micro_utils.coroutines.runCatchingSafely
|
||||
import kotlinx.coroutines.withTimeout
|
||||
|
||||
interface ActionWrapper {
|
||||
suspend fun <T> wrap(block: suspend () -> T): Result<T>
|
||||
|
||||
class Timeouted(private val timeoutMillis: Long) : ActionWrapper {
|
||||
override suspend fun <T> wrap(block: suspend () -> T): Result<T> = runCatchingSafely {
|
||||
withTimeout(timeoutMillis) {
|
||||
block()
|
||||
}
|
||||
}
|
||||
}
|
||||
object Direct : ActionWrapper {
|
||||
override suspend fun <T> wrap(block: suspend () -> T): Result<T> = runCatchingSafely { block() }
|
||||
}
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
package dev.inmo.micro_utils.repos.cache.fallback.crud
|
||||
|
||||
import dev.inmo.micro_utils.repos.CRUDRepo
|
||||
import dev.inmo.micro_utils.repos.WriteCRUDRepo
|
||||
import dev.inmo.micro_utils.repos.cache.cache.FullKVCache
|
||||
import dev.inmo.micro_utils.repos.cache.fallback.ActionWrapper
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
open class AutoRecacheCRUDRepo<RegisteredObject, Id, InputObject>(
|
||||
originalRepo: CRUDRepo<RegisteredObject, Id, InputObject>,
|
||||
scope: CoroutineScope,
|
||||
kvCache: FullKVCache<Id, RegisteredObject> = FullKVCache(),
|
||||
recacheDelay: Long = 60.seconds.inWholeMilliseconds,
|
||||
actionWrapper: ActionWrapper = ActionWrapper.Direct,
|
||||
idGetter: (RegisteredObject) -> Id
|
||||
) : AutoRecacheReadCRUDRepo<RegisteredObject, Id>(
|
||||
originalRepo,
|
||||
scope,
|
||||
kvCache,
|
||||
recacheDelay,
|
||||
actionWrapper,
|
||||
idGetter
|
||||
),
|
||||
WriteCRUDRepo<RegisteredObject, Id, InputObject> by AutoRecacheWriteCRUDRepo(originalRepo, scope, kvCache, idGetter),
|
||||
CRUDRepo<RegisteredObject, Id, InputObject> {
|
||||
|
||||
constructor(
|
||||
originalRepo: CRUDRepo<RegisteredObject, Id, InputObject>,
|
||||
scope: CoroutineScope,
|
||||
originalCallTimeoutMillis: Long,
|
||||
kvCache: FullKVCache<Id, RegisteredObject> = FullKVCache(),
|
||||
recacheDelay: Long = 60.seconds.inWholeMilliseconds,
|
||||
idGetter: (RegisteredObject) -> Id
|
||||
) : this(originalRepo, scope, kvCache, recacheDelay, ActionWrapper.Timeouted(originalCallTimeoutMillis), idGetter)
|
||||
}
|
@ -0,0 +1,77 @@
|
||||
package dev.inmo.micro_utils.repos.cache.fallback.crud
|
||||
|
||||
import dev.inmo.micro_utils.coroutines.runCatchingSafely
|
||||
import dev.inmo.micro_utils.pagination.Pagination
|
||||
import dev.inmo.micro_utils.pagination.PaginationResult
|
||||
import dev.inmo.micro_utils.repos.ReadCRUDRepo
|
||||
import dev.inmo.micro_utils.repos.cache.cache.FullKVCache
|
||||
import dev.inmo.micro_utils.repos.cache.fallback.ActionWrapper
|
||||
import dev.inmo.micro_utils.repos.cache.util.actualizeAll
|
||||
import dev.inmo.micro_utils.repos.set
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
open class AutoRecacheReadCRUDRepo<RegisteredObject, Id>(
|
||||
protected val originalRepo: ReadCRUDRepo<RegisteredObject, Id>,
|
||||
protected val scope: CoroutineScope,
|
||||
protected val kvCache: FullKVCache<Id, RegisteredObject> = FullKVCache(),
|
||||
protected val recacheDelay: Long = 60.seconds.inWholeMilliseconds,
|
||||
protected val actionWrapper: ActionWrapper = ActionWrapper.Direct,
|
||||
protected val idGetter: (RegisteredObject) -> Id
|
||||
) : ReadCRUDRepo<RegisteredObject, Id> {
|
||||
val autoUpdateJob = scope.launch {
|
||||
while (isActive) {
|
||||
runCatchingSafely {
|
||||
kvCache.actualizeAll(originalRepo)
|
||||
}
|
||||
|
||||
delay(recacheDelay)
|
||||
}
|
||||
}
|
||||
|
||||
constructor(
|
||||
originalRepo: ReadCRUDRepo<RegisteredObject, Id>,
|
||||
scope: CoroutineScope,
|
||||
originalCallTimeoutMillis: Long,
|
||||
kvCache: FullKVCache<Id, RegisteredObject> = FullKVCache(),
|
||||
recacheDelay: Long = 60.seconds.inWholeMilliseconds,
|
||||
idGetter: (RegisteredObject) -> Id
|
||||
) : this(originalRepo, scope, kvCache, recacheDelay, ActionWrapper.Timeouted(originalCallTimeoutMillis), idGetter)
|
||||
|
||||
override suspend fun contains(id: Id): Boolean = actionWrapper.wrap {
|
||||
originalRepo.contains(id)
|
||||
}.getOrElse {
|
||||
kvCache.contains(id)
|
||||
}
|
||||
|
||||
override suspend fun count(): Long = actionWrapper.wrap {
|
||||
originalRepo.count()
|
||||
}.getOrElse {
|
||||
kvCache.count()
|
||||
}
|
||||
|
||||
override suspend fun getByPagination(
|
||||
pagination: Pagination
|
||||
): PaginationResult<RegisteredObject> = actionWrapper.wrap {
|
||||
originalRepo.getByPagination(pagination)
|
||||
}.getOrNull() ?.also {
|
||||
it.results.forEach {
|
||||
kvCache.set(idGetter(it), it)
|
||||
}
|
||||
} ?: kvCache.values(pagination)
|
||||
|
||||
override suspend fun getIdsByPagination(
|
||||
pagination: Pagination
|
||||
): PaginationResult<Id> = actionWrapper.wrap {
|
||||
originalRepo.getIdsByPagination(pagination)
|
||||
}.getOrElse { kvCache.keys(pagination) }
|
||||
|
||||
override suspend fun getById(id: Id): RegisteredObject? = actionWrapper.wrap {
|
||||
originalRepo.getById(id)
|
||||
}.getOrNull() ?.also {
|
||||
kvCache.set(idGetter(it), it)
|
||||
} ?: kvCache.get(id)
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
package dev.inmo.micro_utils.repos.cache.fallback.crud
|
||||
|
||||
import dev.inmo.micro_utils.coroutines.plus
|
||||
import dev.inmo.micro_utils.coroutines.subscribeSafelyWithoutExceptions
|
||||
import dev.inmo.micro_utils.repos.UpdatedValuePair
|
||||
import dev.inmo.micro_utils.repos.WriteCRUDRepo
|
||||
import dev.inmo.micro_utils.repos.cache.cache.FullKVCache
|
||||
import dev.inmo.micro_utils.repos.set
|
||||
import dev.inmo.micro_utils.repos.unset
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.merge
|
||||
|
||||
open class AutoRecacheWriteCRUDRepo<RegisteredObject, Id, InputObject>(
|
||||
protected val originalRepo: WriteCRUDRepo<RegisteredObject, Id, InputObject>,
|
||||
protected val scope: CoroutineScope,
|
||||
protected val kvCache: FullKVCache<Id, RegisteredObject> = FullKVCache(),
|
||||
protected val idGetter: (RegisteredObject) -> Id
|
||||
) : WriteCRUDRepo<RegisteredObject, Id, InputObject> {
|
||||
override val deletedObjectsIdsFlow: Flow<Id>
|
||||
get() = (originalRepo.deletedObjectsIdsFlow + kvCache.onValueRemoved).distinctUntilChanged()
|
||||
override val newObjectsFlow: Flow<RegisteredObject>
|
||||
get() = (originalRepo.newObjectsFlow + kvCache.onNewValue.map { it.second }).distinctUntilChanged()
|
||||
override val updatedObjectsFlow: Flow<RegisteredObject>
|
||||
get() = originalRepo.updatedObjectsFlow
|
||||
|
||||
private val onRemovingUpdatesListeningJob = originalRepo.deletedObjectsIdsFlow.subscribeSafelyWithoutExceptions(scope) {
|
||||
kvCache.unset(it)
|
||||
}
|
||||
|
||||
private val onNewAndUpdatedObjectsListeningJob = merge(
|
||||
originalRepo.newObjectsFlow,
|
||||
originalRepo.updatedObjectsFlow,
|
||||
).subscribeSafelyWithoutExceptions(scope) {
|
||||
kvCache.set(idGetter(it), it)
|
||||
}
|
||||
|
||||
override suspend fun update(
|
||||
values: List<UpdatedValuePair<Id, InputObject>>
|
||||
): List<RegisteredObject> = originalRepo.update(values).onEach {
|
||||
kvCache.set(idGetter(it), it)
|
||||
}
|
||||
|
||||
override suspend fun update(
|
||||
id: Id,
|
||||
value: InputObject
|
||||
): RegisteredObject? = originalRepo.update(id, value) ?.also {
|
||||
kvCache.set(idGetter(it), it)
|
||||
}
|
||||
|
||||
override suspend fun deleteById(ids: List<Id>) = originalRepo.deleteById(ids).also {
|
||||
kvCache.unset(ids)
|
||||
}
|
||||
|
||||
override suspend fun create(values: List<InputObject>): List<RegisteredObject> = originalRepo.create(values).onEach {
|
||||
kvCache.set(idGetter(it), it)
|
||||
}
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
package dev.inmo.micro_utils.repos.cache.fallback.keyvalue
|
||||
|
||||
import dev.inmo.micro_utils.repos.KeyValueRepo
|
||||
import dev.inmo.micro_utils.repos.WriteKeyValueRepo
|
||||
import dev.inmo.micro_utils.repos.cache.cache.FullKVCache
|
||||
import dev.inmo.micro_utils.repos.cache.fallback.ActionWrapper
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
open class AutoRecacheKeyValueRepo<Id, RegisteredObject>(
|
||||
override val originalRepo: KeyValueRepo<Id, RegisteredObject>,
|
||||
scope: CoroutineScope,
|
||||
kvCache: FullKVCache<Id, RegisteredObject> = FullKVCache(),
|
||||
recacheDelay: Long = 60.seconds.inWholeMilliseconds,
|
||||
actionWrapper: ActionWrapper = ActionWrapper.Direct,
|
||||
idGetter: (RegisteredObject) -> Id
|
||||
) : AutoRecacheReadKeyValueRepo<Id, RegisteredObject> (
|
||||
originalRepo,
|
||||
scope,
|
||||
kvCache,
|
||||
recacheDelay,
|
||||
actionWrapper,
|
||||
idGetter
|
||||
),
|
||||
WriteKeyValueRepo<Id, RegisteredObject> by AutoRecacheWriteKeyValueRepo(originalRepo, scope, kvCache),
|
||||
KeyValueRepo<Id, RegisteredObject> {
|
||||
|
||||
constructor(
|
||||
originalRepo: KeyValueRepo<Id, RegisteredObject>,
|
||||
scope: CoroutineScope,
|
||||
originalCallTimeoutMillis: Long,
|
||||
kvCache: FullKVCache<Id, RegisteredObject> = FullKVCache(),
|
||||
recacheDelay: Long = 60.seconds.inWholeMilliseconds,
|
||||
idGetter: (RegisteredObject) -> Id
|
||||
) : this(originalRepo, scope, kvCache, recacheDelay, ActionWrapper.Timeouted(originalCallTimeoutMillis), idGetter)
|
||||
|
||||
override suspend fun unsetWithValues(toUnset: List<RegisteredObject>) = originalRepo.unsetWithValues(
|
||||
toUnset
|
||||
).also {
|
||||
kvCache.unsetWithValues(toUnset)
|
||||
}
|
||||
}
|
@ -0,0 +1,87 @@
|
||||
package dev.inmo.micro_utils.repos.cache.fallback.keyvalue
|
||||
|
||||
import dev.inmo.micro_utils.coroutines.runCatchingSafely
|
||||
import dev.inmo.micro_utils.pagination.Pagination
|
||||
import dev.inmo.micro_utils.pagination.PaginationResult
|
||||
import dev.inmo.micro_utils.repos.ReadKeyValueRepo
|
||||
import dev.inmo.micro_utils.repos.cache.cache.FullKVCache
|
||||
import dev.inmo.micro_utils.repos.cache.fallback.ActionWrapper
|
||||
import dev.inmo.micro_utils.repos.cache.util.actualizeAll
|
||||
import dev.inmo.micro_utils.repos.set
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
open class AutoRecacheReadKeyValueRepo<Id, RegisteredObject>(
|
||||
protected open val originalRepo: ReadKeyValueRepo<Id, RegisteredObject>,
|
||||
protected val scope: CoroutineScope,
|
||||
protected val kvCache: FullKVCache<Id, RegisteredObject> = FullKVCache(),
|
||||
protected val recacheDelay: Long = 60.seconds.inWholeMilliseconds,
|
||||
protected val actionWrapper: ActionWrapper = ActionWrapper.Direct,
|
||||
protected val idGetter: (RegisteredObject) -> Id
|
||||
) : ReadKeyValueRepo<Id, RegisteredObject> {
|
||||
val autoUpdateJob = scope.launch {
|
||||
while (isActive) {
|
||||
runCatchingSafely {
|
||||
kvCache.actualizeAll(originalRepo)
|
||||
}
|
||||
|
||||
delay(recacheDelay)
|
||||
}
|
||||
}
|
||||
|
||||
constructor(
|
||||
originalRepo: ReadKeyValueRepo<Id, RegisteredObject>,
|
||||
scope: CoroutineScope,
|
||||
originalCallTimeoutMillis: Long,
|
||||
kvCache: FullKVCache<Id, RegisteredObject> = FullKVCache(),
|
||||
recacheDelay: Long = 60.seconds.inWholeMilliseconds,
|
||||
idGetter: (RegisteredObject) -> Id
|
||||
) : this(originalRepo, scope, kvCache, recacheDelay, ActionWrapper.Timeouted(originalCallTimeoutMillis), idGetter)
|
||||
|
||||
override suspend fun contains(key: Id): Boolean = actionWrapper.wrap {
|
||||
originalRepo.contains(key)
|
||||
}.getOrElse {
|
||||
kvCache.contains(key)
|
||||
}
|
||||
|
||||
override suspend fun count(): Long = actionWrapper.wrap {
|
||||
originalRepo.count()
|
||||
}.getOrElse {
|
||||
kvCache.count()
|
||||
}
|
||||
|
||||
override suspend fun get(k: Id): RegisteredObject? = actionWrapper.wrap {
|
||||
originalRepo.get(k)
|
||||
}.getOrNull() ?.also {
|
||||
kvCache.set(idGetter(it), it)
|
||||
} ?: kvCache.get(k)
|
||||
|
||||
override suspend fun values(
|
||||
pagination: Pagination,
|
||||
reversed: Boolean
|
||||
): PaginationResult<RegisteredObject> = actionWrapper.wrap {
|
||||
originalRepo.values(pagination, reversed)
|
||||
}.getOrNull() ?.also {
|
||||
it.results.forEach {
|
||||
kvCache.set(idGetter(it), it)
|
||||
}
|
||||
} ?: kvCache.values(pagination, reversed)
|
||||
|
||||
override suspend fun keys(
|
||||
pagination: Pagination,
|
||||
reversed: Boolean
|
||||
): PaginationResult<Id> = actionWrapper.wrap {
|
||||
originalRepo.keys(pagination, reversed)
|
||||
}.getOrElse { kvCache.keys(pagination, reversed) }
|
||||
|
||||
override suspend fun keys(
|
||||
v: RegisteredObject,
|
||||
pagination: Pagination,
|
||||
reversed: Boolean
|
||||
): PaginationResult<Id> = actionWrapper.wrap {
|
||||
originalRepo.keys(v, pagination, reversed)
|
||||
}.getOrElse { kvCache.keys(v, pagination, reversed) }
|
||||
}
|
@ -0,0 +1,49 @@
|
||||
package dev.inmo.micro_utils.repos.cache.fallback.keyvalue
|
||||
|
||||
import dev.inmo.micro_utils.coroutines.plus
|
||||
import dev.inmo.micro_utils.coroutines.subscribeSafelyWithoutExceptions
|
||||
import dev.inmo.micro_utils.repos.WriteKeyValueRepo
|
||||
import dev.inmo.micro_utils.repos.cache.cache.FullKVCache
|
||||
import dev.inmo.micro_utils.repos.set
|
||||
import dev.inmo.micro_utils.repos.unset
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
|
||||
open class AutoRecacheWriteKeyValueRepo<Id, RegisteredObject>(
|
||||
protected val originalRepo: WriteKeyValueRepo<Id, RegisteredObject>,
|
||||
protected val scope: CoroutineScope,
|
||||
protected val kvCache: FullKVCache<Id, RegisteredObject> = FullKVCache()
|
||||
) : WriteKeyValueRepo<Id, RegisteredObject> {
|
||||
override val onValueRemoved: Flow<Id>
|
||||
get() = (originalRepo.onValueRemoved + kvCache.onValueRemoved).distinctUntilChanged()
|
||||
|
||||
override val onNewValue: Flow<Pair<Id, RegisteredObject>>
|
||||
get() = (originalRepo.onNewValue + kvCache.onNewValue).distinctUntilChanged()
|
||||
|
||||
private val onRemovingUpdatesListeningJob = originalRepo.onValueRemoved.subscribeSafelyWithoutExceptions(scope) {
|
||||
kvCache.unset(it)
|
||||
}
|
||||
|
||||
private val onNewAndUpdatedObjectsListeningJob = originalRepo.onNewValue.subscribeSafelyWithoutExceptions(scope) {
|
||||
kvCache.set(it.first, it.second)
|
||||
}
|
||||
|
||||
override suspend fun unsetWithValues(toUnset: List<RegisteredObject>) = originalRepo.unsetWithValues(
|
||||
toUnset
|
||||
).also {
|
||||
kvCache.unsetWithValues(toUnset)
|
||||
}
|
||||
|
||||
override suspend fun unset(toUnset: List<Id>) = originalRepo.unset(
|
||||
toUnset
|
||||
).also {
|
||||
kvCache.unset(toUnset)
|
||||
}
|
||||
|
||||
override suspend fun set(toSet: Map<Id, RegisteredObject>) = originalRepo.set(
|
||||
toSet
|
||||
).also {
|
||||
kvCache.set(toSet)
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
package dev.inmo.micro_utils.repos.cache.fallback.keyvalues
|
||||
|
||||
import dev.inmo.micro_utils.repos.KeyValuesRepo
|
||||
import dev.inmo.micro_utils.repos.WriteKeyValuesRepo
|
||||
import dev.inmo.micro_utils.repos.cache.cache.FullKVCache
|
||||
import dev.inmo.micro_utils.repos.cache.fallback.ActionWrapper
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
open class AutoRecacheKeyValuesRepo<Id, RegisteredObject>(
|
||||
override val originalRepo: KeyValuesRepo<Id, RegisteredObject>,
|
||||
scope: CoroutineScope,
|
||||
kvCache: FullKVCache<Id, List<RegisteredObject>> = FullKVCache(),
|
||||
recacheDelay: Long = 60.seconds.inWholeMilliseconds,
|
||||
actionWrapper: ActionWrapper = ActionWrapper.Direct,
|
||||
idGetter: (RegisteredObject) -> Id
|
||||
) : AutoRecacheReadKeyValuesRepo<Id, RegisteredObject> (
|
||||
originalRepo,
|
||||
scope,
|
||||
kvCache,
|
||||
recacheDelay,
|
||||
actionWrapper,
|
||||
idGetter
|
||||
),
|
||||
WriteKeyValuesRepo<Id, RegisteredObject> by AutoRecacheWriteKeyValuesRepo(originalRepo, scope, kvCache),
|
||||
KeyValuesRepo<Id, RegisteredObject> {
|
||||
|
||||
constructor(
|
||||
originalRepo: KeyValuesRepo<Id, RegisteredObject>,
|
||||
scope: CoroutineScope,
|
||||
originalCallTimeoutMillis: Long,
|
||||
kvCache: FullKVCache<Id, List<RegisteredObject>> = FullKVCache(),
|
||||
recacheDelay: Long = 60.seconds.inWholeMilliseconds,
|
||||
idGetter: (RegisteredObject) -> Id
|
||||
) : this(originalRepo, scope, kvCache, recacheDelay, ActionWrapper.Timeouted(originalCallTimeoutMillis), idGetter)
|
||||
|
||||
override suspend fun clearWithValue(v: RegisteredObject) {
|
||||
super.clearWithValue(v)
|
||||
}
|
||||
}
|
@ -0,0 +1,138 @@
|
||||
package dev.inmo.micro_utils.repos.cache.fallback.keyvalues
|
||||
|
||||
import dev.inmo.micro_utils.coroutines.runCatchingSafely
|
||||
import dev.inmo.micro_utils.pagination.Pagination
|
||||
import dev.inmo.micro_utils.pagination.PaginationResult
|
||||
import dev.inmo.micro_utils.pagination.changeResultsUnchecked
|
||||
import dev.inmo.micro_utils.pagination.createPaginationResult
|
||||
import dev.inmo.micro_utils.pagination.emptyPaginationResult
|
||||
import dev.inmo.micro_utils.pagination.firstIndex
|
||||
import dev.inmo.micro_utils.pagination.utils.doForAllWithNextPaging
|
||||
import dev.inmo.micro_utils.pagination.utils.optionallyReverse
|
||||
import dev.inmo.micro_utils.pagination.utils.paginate
|
||||
import dev.inmo.micro_utils.repos.ReadKeyValuesRepo
|
||||
import dev.inmo.micro_utils.repos.cache.cache.FullKVCache
|
||||
import dev.inmo.micro_utils.repos.cache.fallback.ActionWrapper
|
||||
import dev.inmo.micro_utils.repos.cache.util.actualizeAll
|
||||
import dev.inmo.micro_utils.repos.set
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
open class AutoRecacheReadKeyValuesRepo<Id, RegisteredObject>(
|
||||
protected open val originalRepo: ReadKeyValuesRepo<Id, RegisteredObject>,
|
||||
protected val scope: CoroutineScope,
|
||||
protected val kvCache: FullKVCache<Id, List<RegisteredObject>> = FullKVCache(),
|
||||
protected val recacheDelay: Long = 60.seconds.inWholeMilliseconds,
|
||||
protected val actionWrapper: ActionWrapper = ActionWrapper.Direct,
|
||||
protected val idGetter: (RegisteredObject) -> Id
|
||||
) : ReadKeyValuesRepo<Id, RegisteredObject> {
|
||||
val autoUpdateJob = scope.launch {
|
||||
while (isActive) {
|
||||
runCatchingSafely {
|
||||
kvCache.actualizeAll(originalRepo)
|
||||
}
|
||||
|
||||
delay(recacheDelay)
|
||||
}
|
||||
}
|
||||
|
||||
constructor(
|
||||
originalRepo: ReadKeyValuesRepo<Id, RegisteredObject>,
|
||||
scope: CoroutineScope,
|
||||
originalCallTimeoutMillis: Long,
|
||||
kvCache: FullKVCache<Id, List<RegisteredObject>> = FullKVCache(),
|
||||
recacheDelay: Long = 60.seconds.inWholeMilliseconds,
|
||||
idGetter: (RegisteredObject) -> Id
|
||||
) : this(originalRepo, scope, kvCache, recacheDelay, ActionWrapper.Timeouted(originalCallTimeoutMillis), idGetter)
|
||||
|
||||
override suspend fun contains(k: Id): Boolean = actionWrapper.wrap {
|
||||
originalRepo.contains(k)
|
||||
}.getOrElse {
|
||||
kvCache.contains(k)
|
||||
}
|
||||
|
||||
override suspend fun count(): Long = actionWrapper.wrap {
|
||||
originalRepo.count()
|
||||
}.getOrElse {
|
||||
kvCache.count()
|
||||
}
|
||||
|
||||
override suspend fun keys(
|
||||
v: RegisteredObject,
|
||||
pagination: Pagination,
|
||||
reversed: Boolean
|
||||
): PaginationResult<Id> = actionWrapper.wrap {
|
||||
originalRepo.keys(v, pagination, reversed)
|
||||
}.getOrElse {
|
||||
val results = mutableListOf<Id>()
|
||||
|
||||
val toSkip = pagination.firstIndex
|
||||
var count = 0
|
||||
|
||||
doForAllWithNextPaging {
|
||||
kvCache.keys(pagination, reversed).also {
|
||||
it.results.forEach {
|
||||
if (kvCache.get(it) ?.contains(v) == true) {
|
||||
count++
|
||||
if (count < toSkip || results.size >= pagination.size) {
|
||||
return@forEach
|
||||
} else {
|
||||
results.add(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return@getOrElse results.createPaginationResult(
|
||||
pagination,
|
||||
count.toLong()
|
||||
)
|
||||
}
|
||||
|
||||
override suspend fun keys(
|
||||
pagination: Pagination,
|
||||
reversed: Boolean
|
||||
): PaginationResult<Id> = actionWrapper.wrap {
|
||||
originalRepo.keys(pagination, reversed)
|
||||
}.getOrElse { kvCache.keys(pagination, reversed) }
|
||||
|
||||
override suspend fun get(
|
||||
k: Id,
|
||||
pagination: Pagination,
|
||||
reversed: Boolean
|
||||
): PaginationResult<RegisteredObject> = actionWrapper.wrap {
|
||||
originalRepo.get(k, pagination, reversed)
|
||||
}.getOrNull() ?.also {
|
||||
it.results.forEach {
|
||||
kvCache.set(k, ((kvCache.get(k) ?: return@also) + it).distinct())
|
||||
}
|
||||
} ?: kvCache.get(k) ?.run {
|
||||
paginate(pagination.optionallyReverse(size, reversed)).let {
|
||||
if (reversed) {
|
||||
it.changeResultsUnchecked(
|
||||
it.results.reversed()
|
||||
)
|
||||
} else {
|
||||
it
|
||||
}
|
||||
}
|
||||
} ?: emptyPaginationResult()
|
||||
|
||||
override suspend fun count(k: Id): Long = actionWrapper.wrap {
|
||||
originalRepo.count(k)
|
||||
}.getOrElse {
|
||||
kvCache.get(k) ?.size ?.toLong() ?: 0L
|
||||
}
|
||||
|
||||
override suspend fun contains(k: Id, v: RegisteredObject): Boolean {
|
||||
return (actionWrapper.wrap {
|
||||
originalRepo.contains(k, v)
|
||||
}.getOrNull() ?.also {
|
||||
kvCache.set(k, ((kvCache.get(k) ?: return@also) + v).distinct())
|
||||
}) ?: (kvCache.get(k) ?.contains(v) == true)
|
||||
}
|
||||
}
|
@ -0,0 +1,81 @@
|
||||
package dev.inmo.micro_utils.repos.cache.fallback.keyvalues
|
||||
|
||||
import dev.inmo.micro_utils.coroutines.plus
|
||||
import dev.inmo.micro_utils.coroutines.subscribeSafelyWithoutExceptions
|
||||
import dev.inmo.micro_utils.pagination.FirstPagePagination
|
||||
import dev.inmo.micro_utils.pagination.utils.doForAllWithNextPaging
|
||||
import dev.inmo.micro_utils.repos.WriteKeyValuesRepo
|
||||
import dev.inmo.micro_utils.repos.cache.cache.FullKVCache
|
||||
import dev.inmo.micro_utils.repos.set
|
||||
import dev.inmo.micro_utils.repos.unset
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
|
||||
open class AutoRecacheWriteKeyValuesRepo<Id, RegisteredObject>(
|
||||
protected val originalRepo: WriteKeyValuesRepo<Id, RegisteredObject>,
|
||||
protected val scope: CoroutineScope,
|
||||
protected val kvCache: FullKVCache<Id, List<RegisteredObject>> = FullKVCache()
|
||||
) : WriteKeyValuesRepo<Id, RegisteredObject> {
|
||||
override val onValueRemoved: Flow<Pair<Id, RegisteredObject>>
|
||||
get() = originalRepo.onValueRemoved
|
||||
|
||||
override val onNewValue: Flow<Pair<Id, RegisteredObject>>
|
||||
get() = originalRepo.onNewValue
|
||||
override val onDataCleared: Flow<Id>
|
||||
get() = (originalRepo.onDataCleared + kvCache.onValueRemoved).distinctUntilChanged()
|
||||
|
||||
private val onDataClearedListeningJob = originalRepo.onDataCleared.subscribeSafelyWithoutExceptions(scope) {
|
||||
kvCache.unset(it)
|
||||
}
|
||||
|
||||
private val onRemovingUpdatesListeningJob = originalRepo.onValueRemoved.subscribeSafelyWithoutExceptions(scope) {
|
||||
kvCache.set(
|
||||
it.first,
|
||||
(kvCache.get(
|
||||
it.first
|
||||
) ?: return@subscribeSafelyWithoutExceptions) - it.second
|
||||
)
|
||||
}
|
||||
|
||||
private val onNewAndUpdatedObjectsListeningJob = originalRepo.onNewValue.subscribeSafelyWithoutExceptions(scope) {
|
||||
kvCache.set(
|
||||
it.first,
|
||||
(kvCache.get(
|
||||
it.first
|
||||
) ?: return@subscribeSafelyWithoutExceptions) + it.second
|
||||
)
|
||||
}
|
||||
|
||||
override suspend fun clearWithValue(v: RegisteredObject) {
|
||||
originalRepo.clearWithValue(v)
|
||||
doForAllWithNextPaging(FirstPagePagination(kvCache.count().takeIf { it < Int.MAX_VALUE } ?.toInt() ?: Int.MAX_VALUE)) {
|
||||
kvCache.keys(it).also {
|
||||
it.results.forEach { id ->
|
||||
kvCache.get(id) ?.takeIf { it.contains(v) } ?.let {
|
||||
kvCache.unset(id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun clear(k: Id) {
|
||||
originalRepo.clear(k)
|
||||
kvCache.unset(k)
|
||||
}
|
||||
|
||||
override suspend fun remove(toRemove: Map<Id, List<RegisteredObject>>) {
|
||||
originalRepo.remove(toRemove)
|
||||
toRemove.forEach { (k, v) ->
|
||||
kvCache.set(k, (kvCache.get(k) ?: return@forEach) - v)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun add(toAdd: Map<Id, List<RegisteredObject>>) {
|
||||
originalRepo.add(toAdd)
|
||||
toAdd.forEach { (k, v) ->
|
||||
kvCache.set(k, (kvCache.get(k) ?: return@forEach) + v)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user