package dev.inmo.micro_utils.repos.exposed.keyvalue import dev.inmo.micro_utils.repos.KeyValueRepo import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.* import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.statements.* import org.jetbrains.exposed.sql.transactions.transaction abstract class AbstractExposedKeyValueRepo( override val database: Database, tableName: String? = null, flowsExtraBufferCapacity: Int = Int.MAX_VALUE, flowsBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : KeyValueRepo, AbstractExposedReadKeyValueRepo( database, tableName ) { protected val _onNewValue = MutableSharedFlow>(extraBufferCapacity = flowsExtraBufferCapacity, onBufferOverflow = flowsBufferOverflow) protected val _onValueRemoved = MutableSharedFlow(extraBufferCapacity = flowsExtraBufferCapacity, onBufferOverflow = flowsBufferOverflow) override val onNewValue: Flow> = _onNewValue.asSharedFlow() override val onValueRemoved: Flow = _onValueRemoved.asSharedFlow() protected abstract fun update(k: Key, v: Value, it: UpdateBuilder) protected abstract fun insertKey(k: Key, v: Value, it: InsertStatement) protected open fun insert(k: Key, v: Value, it: InsertStatement) { insertKey(k, v, it) update(k, v, it as UpdateBuilder) } override suspend fun set(toSet: Map) { transaction(database) { toSet.mapNotNull { (k, v) -> if (update({ selectById(k) }) { update(k, v, it as UpdateBuilder) } > 0) { k to v } else { val inserted = insert { insert(k, v, it) }.getOrNull(keyColumn) != null if (inserted) { k to v } else { null } } } }.forEach { _onNewValue.emit(it) } } override suspend fun unset(toUnset: List) { transaction(database) { toUnset.mapNotNull { item -> if (deleteWhere { selectById(it, item) } > 0) { item } else { null } } }.forEach { _onValueRemoved.emit(it) } } override suspend fun unsetWithValues(toUnset: List) { transaction(database) { toUnset.flatMap { val keys = selectAll().where { selectByValue(it) }.mapNotNull { it.asKey } deleteWhere { selectByIds(it, keys) } keys } }.distinct().forEach { _onValueRemoved.emit(it) } } override suspend fun clear() { transaction(database) { val keys = selectAll().map { it.asKey } deleteAll() keys }.also { it.forEach { _onValueRemoved.emit(it) } } } }