core/features/content/server/src/jvmMain/kotlin/dev/inmo/postssystem/features/content/server/ServerContentStorageAggregator.kt

140 lines
4.5 KiB
Kotlin

package dev.inmo.postssystem.features.content.server
import dev.inmo.micro_utils.pagination.*
import dev.inmo.micro_utils.repos.UpdatedValuePair
import dev.inmo.postssystem.features.content.common.*
import dev.inmo.postssystem.features.content.server.storage.ServerContentStorage
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
class ServerContentStorageAggregator(
private val storages: List<ServerContentStorageWrapper<*>>,
private val scope: CoroutineScope
) : ServerContentStorage<Content> {
override val deletedObjectsIdsFlow: Flow<ContentId>
get() = TODO("Not yet implemented")
override val newObjectsFlow: Flow<RegisteredContent>
get() = TODO("Not yet implemented")
override val updatedObjectsFlow: Flow<RegisteredContent>
get() = TODO("Not yet implemented")
override suspend fun create(values: List<Content>): List<RegisteredContent> {
return values.groupBy {
storages.firstOrNull { storage -> storage.mayHandle(it) }
}.let {
(it - null) as Map<ServerContentStorageWrapper<*>, List<Content>>
}.flatMap { (storage, content) ->
storage.create(content)
}
}
override suspend fun deleteById(ids: List<ContentId>) {
storages.map {
scope.launch { it.deleteById(ids) }
}.joinAll()
}
override suspend fun update(id: ContentId, value: Content): RegisteredContent? {
return storages.mapNotNull {
it.takeIf { it.mayHandle(value) }
}.firstNotNullOfOrNull { it.update(id, value) }
}
override suspend fun update(values: List<UpdatedValuePair<ContentId, Content>>): List<RegisteredContent> {
return values.groupBy { (_, content) ->
storages.firstOrNull { storage -> storage.mayHandle(content) }
}.let {
(it - null) as Map<ServerContentStorageWrapper<*>, List<UpdatedValuePair<ContentId, Content>>>
}.flatMap { (storage, values) ->
storage.update(values)
}
}
override suspend fun contains(id: ContentId): Boolean {
val contains = CompletableDeferred<Boolean>()
storages.map {
scope.launch {
if (it.contains(id)) {
contains.complete(true)
}
}.also { job ->
contains.invokeOnCompletion { job.cancel() }
}
}.joinAll()
return if (contains.isCompleted) {
contains.getCompleted()
} else {
return false
}
}
override suspend fun count(): Long {
return storages.map {
scope.async {
it.count()
}
}.awaitAll().sum()
}
override suspend fun getById(id: ContentId): RegisteredContent? {
val result = CompletableDeferred<RegisteredContent>()
storages.map {
scope.launch {
val content = it.getById(id)
if (content != null) {
result.complete(content)
}
}.also { job ->
result.invokeOnCompletion { job.cancel() }
}
}.joinAll()
return if (result.isCompleted) {
result.getCompleted()
} else {
return null
}
}
override suspend fun getByPagination(
pagination: Pagination
): PaginationResult<RegisteredContent> {
val currentResults = mutableListOf<RegisteredContent>()
for (it in storages) {
val currentPagination = PaginationByIndexes((currentResults.size + pagination.firstIndex), pagination.lastIndex)
val wrapperResults = it.getByPagination(currentPagination)
currentResults.addAll(wrapperResults.results)
if (currentResults.size >= pagination.size) {
break
}
}
return currentResults.createPaginationResult(pagination, count())
}
override suspend fun getContentPreview(id: ContentId): RegisteredContent? {
val result = CompletableDeferred<RegisteredContent>()
storages.map {
scope.launch {
val content = it.getContentPreview(id)
if (content != null) {
result.complete(content)
}
}.also { job ->
result.invokeOnCompletion { job.cancel() }
}
}.joinAll()
return if (result.isCompleted) {
result.getCompleted()
} else {
return null
}
}
}