mirror of
https://github.com/InsanusMokrassar/MicroUtils.git
synced 2025-09-04 23:59:29 +00:00
repos update
This commit is contained in:
@@ -2,10 +2,8 @@ package dev.inmo.micro_utils.repos.exposed.keyvalue
|
||||
|
||||
import dev.inmo.micro_utils.repos.StandardKeyValueRepo
|
||||
import dev.inmo.micro_utils.repos.exposed.ColumnAllocator
|
||||
import kotlinx.coroutines.channels.BroadcastChannel
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.*
|
||||
import org.jetbrains.exposed.sql.*
|
||||
import org.jetbrains.exposed.sql.transactions.transaction
|
||||
|
||||
@@ -20,11 +18,11 @@ open class ExposedKeyValueRepo<Key, Value>(
|
||||
valueColumnAllocator,
|
||||
tableName
|
||||
) {
|
||||
private val onNewValueChannel = BroadcastChannel<Pair<Key, Value>>(Channel.BUFFERED)
|
||||
private val onValueRemovedChannel = BroadcastChannel<Key>(Channel.BUFFERED)
|
||||
private val _onNewValue = MutableSharedFlow<Pair<Key, Value>>(Channel.BUFFERED)
|
||||
private val _onValueRemoved = MutableSharedFlow<Key>(Channel.BUFFERED)
|
||||
|
||||
override val onNewValue: Flow<Pair<Key, Value>> = onNewValueChannel.asFlow()
|
||||
override val onValueRemoved: Flow<Key> = onValueRemovedChannel.asFlow()
|
||||
override val onNewValue: Flow<Pair<Key, Value>> = _onNewValue.asSharedFlow()
|
||||
override val onValueRemoved: Flow<Key> = _onValueRemoved.asSharedFlow()
|
||||
|
||||
override suspend fun set(k: Key, v: Value) {
|
||||
transaction(database) {
|
||||
@@ -39,14 +37,50 @@ open class ExposedKeyValueRepo<Key, Value>(
|
||||
}
|
||||
}
|
||||
}
|
||||
onNewValueChannel.send(k to v)
|
||||
_onNewValue.emit(k to v)
|
||||
}
|
||||
|
||||
override suspend fun set(toSet: Map<Key, Value>) {
|
||||
transaction(database) {
|
||||
toSet.mapNotNull { (k, v) ->
|
||||
if (update({ keyColumn.eq(k) }) { it[valueColumn] = v } > 0) {
|
||||
k to v
|
||||
} else {
|
||||
val inserted = insert {
|
||||
it[keyColumn] = k
|
||||
it[valueColumn] = v
|
||||
}.getOrNull(keyColumn) != null
|
||||
if (inserted) {
|
||||
k to v
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
}.forEach {
|
||||
_onNewValue.emit(it)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun unset(k: Key) {
|
||||
transaction(database) {
|
||||
deleteWhere { keyColumn.eq(k) }
|
||||
}
|
||||
onValueRemovedChannel.send(k)
|
||||
_onValueRemoved.emit(k)
|
||||
}
|
||||
|
||||
override suspend fun unset(toUnset: List<Key>) {
|
||||
transaction(database) {
|
||||
toUnset.mapNotNull {
|
||||
if (deleteWhere { keyColumn.eq(it) } > 0) {
|
||||
it
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}.forEach {
|
||||
_onValueRemoved.emit(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -3,7 +3,7 @@ package dev.inmo.micro_utils.repos.exposed.onetomany
|
||||
import dev.inmo.micro_utils.coroutines.BroadcastFlow
|
||||
import dev.inmo.micro_utils.repos.OneToManyKeyValueRepo
|
||||
import dev.inmo.micro_utils.repos.exposed.ColumnAllocator
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.*
|
||||
import org.jetbrains.exposed.sql.*
|
||||
import org.jetbrains.exposed.sql.transactions.transaction
|
||||
|
||||
@@ -18,13 +18,13 @@ open class ExposedOneToManyKeyValueRepo<Key, Value>(
|
||||
valueColumnAllocator,
|
||||
tableName
|
||||
) {
|
||||
protected val _onNewValue: BroadcastFlow<Pair<Key, Value>> = BroadcastFlow()
|
||||
protected val _onNewValue: MutableSharedFlow<Pair<Key, Value>> = MutableSharedFlow()
|
||||
override val onNewValue: Flow<Pair<Key, Value>>
|
||||
get() = _onNewValue
|
||||
protected val _onValueRemoved: BroadcastFlow<Pair<Key, Value>> = BroadcastFlow()
|
||||
protected val _onValueRemoved: MutableSharedFlow<Pair<Key, Value>> = MutableSharedFlow()
|
||||
override val onValueRemoved: Flow<Pair<Key, Value>>
|
||||
get() = _onValueRemoved
|
||||
protected val _onDataCleared: BroadcastFlow<Key> = BroadcastFlow()
|
||||
protected val _onDataCleared: MutableSharedFlow<Key> = MutableSharedFlow()
|
||||
override val onDataCleared: Flow<Key>
|
||||
get() = _onDataCleared
|
||||
|
||||
@@ -34,19 +34,42 @@ open class ExposedOneToManyKeyValueRepo<Key, Value>(
|
||||
it[keyColumn] = k
|
||||
it[valueColumn] = v
|
||||
}
|
||||
}.also { _onNewValue.send(k to v) }
|
||||
}.also { _onNewValue.emit(k to v) }
|
||||
}
|
||||
|
||||
override suspend fun remove(k: Key, v: Value) {
|
||||
override suspend fun add(toAdd: Map<Key, List<Value>>) {
|
||||
transaction(database) {
|
||||
deleteWhere { keyColumn.eq(k).and(valueColumn.eq(v)) }
|
||||
}.also { _onValueRemoved.send(k to v) }
|
||||
toAdd.keys.flatMap { k ->
|
||||
toAdd[k] ?.mapNotNull { v ->
|
||||
insertIgnore {
|
||||
it[keyColumn] = k
|
||||
it[valueColumn] = v
|
||||
}.getOrNull(keyColumn) ?.let { k to v }
|
||||
} ?: emptyList()
|
||||
}
|
||||
}.forEach { _onNewValue.emit(it) }
|
||||
}
|
||||
|
||||
override suspend fun remove(toRemove: Map<Key, List<Value>>) {
|
||||
transaction(database) {
|
||||
toRemove.keys.flatMap { k ->
|
||||
toRemove[k] ?.mapNotNull { v ->
|
||||
if (deleteIgnoreWhere { keyColumn.eq(k).and(valueColumn.eq(v)) } > 0 ) {
|
||||
k to v
|
||||
} else {
|
||||
null
|
||||
}
|
||||
} ?: emptyList()
|
||||
}
|
||||
}.forEach {
|
||||
_onValueRemoved.emit(it)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun clear(k: Key) {
|
||||
transaction(database) {
|
||||
deleteWhere { keyColumn.eq(k) }
|
||||
}.also { _onDataCleared.send(k) }
|
||||
}.also { _onDataCleared.emit(k) }
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user