diff --git a/posts/gc/build.gradle b/posts/gc/build.gradle new file mode 100644 index 0000000..9b77c99 --- /dev/null +++ b/posts/gc/build.gradle @@ -0,0 +1,24 @@ +plugins { + id "org.jetbrains.kotlin.multiplatform" + id "org.jetbrains.kotlin.plugin.serialization" +} + +apply from: "$mppProjectWithSerializationPresetPath" + +kotlin { + sourceSets { + commonMain { + dependencies { + api project(":plaguposter.common") + api project(":plaguposter.posts") + api libs.microutils.koin + api libs.krontab + } + } + jvmMain { + dependencies { + api libs.plagubot.plugins.inline.queries + } + } + } +} diff --git a/posts/gc/src/commonMain/kotlin/PackageInfo.kt b/posts/gc/src/commonMain/kotlin/PackageInfo.kt new file mode 100644 index 0000000..bbfff8b --- /dev/null +++ b/posts/gc/src/commonMain/kotlin/PackageInfo.kt @@ -0,0 +1 @@ +package dev.inmo.plaguposter.posts.gc diff --git a/posts/gc/src/jvmMain/kotlin/Plugin.kt b/posts/gc/src/jvmMain/kotlin/Plugin.kt new file mode 100644 index 0000000..889bc7b --- /dev/null +++ b/posts/gc/src/jvmMain/kotlin/Plugin.kt @@ -0,0 +1,168 @@ +package dev.inmo.plaguposter.posts.gc + +import com.benasher44.uuid.uuid4 +import dev.inmo.krontab.KrontabTemplate +import dev.inmo.krontab.toKronScheduler +import dev.inmo.krontab.utils.asFlowWithDelays +import dev.inmo.kslog.common.KSLog +import dev.inmo.kslog.common.i +import dev.inmo.kslog.common.iS +import dev.inmo.micro_utils.coroutines.actor +import dev.inmo.micro_utils.coroutines.subscribeSafelyWithoutExceptions +import dev.inmo.micro_utils.repos.deleteById +import dev.inmo.plagubot.Plugin +import dev.inmo.plaguposter.common.ChatConfig +import dev.inmo.plaguposter.posts.models.NewPost +import dev.inmo.plaguposter.posts.models.PostContentInfo +import dev.inmo.plaguposter.posts.repo.PostsRepo +import dev.inmo.tgbotapi.extensions.api.delete +import dev.inmo.tgbotapi.extensions.api.edit.edit +import dev.inmo.tgbotapi.extensions.api.forwardMessage +import dev.inmo.tgbotapi.extensions.api.send.send +import dev.inmo.tgbotapi.extensions.behaviour_builder.BehaviourContext +import dev.inmo.tgbotapi.extensions.behaviour_builder.expectations.waitMessageDataCallbackQuery +import dev.inmo.tgbotapi.extensions.behaviour_builder.triggers_handling.onCommand +import dev.inmo.tgbotapi.extensions.utils.extensions.sameMessage +import dev.inmo.tgbotapi.extensions.utils.types.buttons.dataButton +import dev.inmo.tgbotapi.extensions.utils.types.buttons.flatInlineKeyboard +import dev.inmo.tgbotapi.types.MilliSeconds +import dev.inmo.tgbotapi.utils.bold +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.* +import org.jetbrains.exposed.sql.Database +import org.koin.core.Koin +import org.koin.core.module.Module + +object Plugin : Plugin { + @Serializable + internal data class Config ( + val krontab: KrontabTemplate? = null, + val throttlingMillis: MilliSeconds = 1000, + val doFullCheck: Boolean = false + ) + override fun Module.setupDI(database: Database, params: JsonObject) { + params["messagesChecker"] ?.let { element -> + single { get().decodeFromJsonElement(Config.serializer(), element) } + } + } + + private val gcLogger = KSLog("GarbageCollector") + private suspend fun BehaviourContext.doRecheck( + throttlingMillis: MilliSeconds, + doFullCheck: Boolean, + postsRepo: PostsRepo, + chatsConfig: ChatConfig + ) { + val posts = postsRepo.getAll() + gcLogger.i { + "Start garbage collecting of posts. Initial posts count: ${posts.size}" + } + posts.forEach { (postId, post) -> + val surelyAbsentMessages = mutableListOf() + for (content in post.content) { + try { + forwardMessage( + toChatId = chatsConfig.cacheChatId, + fromChatId = content.chatId, + messageId = content.messageId + ) + + if (!doFullCheck) { + break + } + } catch (e: Throwable) { + if (e.message ?.contains("message to forward not found") == true) { + surelyAbsentMessages.add(content) + } + } + delay(throttlingMillis) + } + val existsPostMessages = post.content.filter { + it !in surelyAbsentMessages + } + if (existsPostMessages.isNotEmpty() && surelyAbsentMessages.isNotEmpty()) { + runCatching { + postsRepo.update( + postId, + NewPost( + content = existsPostMessages + ) + ) + } + } + + if (existsPostMessages.isNotEmpty()) { + return@forEach + } + + send( + chatsConfig.sourceChatId, + "Can't find any messages for post $postId. So, deleting it" + ) + runCatching { + postsRepo.deleteById(postId) + } + } + gcLogger.iS { + "Complete garbage collecting of posts. Result posts count: ${postsRepo.count()}" + } + } + + override suspend fun BehaviourContext.setupBotPlugin(koin: Koin) { + val postsRepo = koin.get() + val chatsConfig = koin.get() + val config = koin.getOrNull() ?: Config() + + val scope = koin.get() + + val recheckActor = scope.actor(0) { + runCatching { + doRecheck( + config.throttlingMillis, + config.doFullCheck, + postsRepo, + chatsConfig + ) + } + } + + config.krontab ?.toKronScheduler() ?.asFlowWithDelays() ?.subscribeSafelyWithoutExceptions(koin.get()) { + recheckActor.trySend(Unit) + } + + onCommand("force_garbage_collection") { message -> + launch { + val prefix = uuid4().toString() + val yesData = "${prefix}yes" + val noData = "${prefix}no" + edit( + message, + text = "Are you sure want to trigger posts garbage collecting?", + replyMarkup = flatInlineKeyboard { + dataButton("Sure", yesData) + dataButton("No", noData) + } + ) + + val answer = waitMessageDataCallbackQuery().filter { + it.message.sameMessage(message) + }.first() + + if (answer.data == yesData) { + if (recheckActor.trySend(Unit).isSuccess) { + edit(message, "Checking of posts without exists messages triggered") + } else { + edit(message) { + +"Checking of posts without exists messages has been triggered " + bold("earlier") + } + } + } else { + delete(message) + } + } + } + } +} diff --git a/posts/src/jvmMain/kotlin/exposed/ExposedPostsRepo.kt b/posts/src/jvmMain/kotlin/exposed/ExposedPostsRepo.kt index 84fc4a9..e0917d0 100644 --- a/posts/src/jvmMain/kotlin/exposed/ExposedPostsRepo.kt +++ b/posts/src/jvmMain/kotlin/exposed/ExposedPostsRepo.kt @@ -26,6 +26,7 @@ class ExposedPostsRepo( ) { val idColumn = text("id") val createdColumn = double("datetime").default(0.0) + val latestUpdateColumn = double("latest_update").default(0.0) private val contentRepo by lazy { ExposedContentInfoRepo( @@ -81,7 +82,9 @@ class ExposedPostsRepo( return id } - override fun update(id: PostId?, value: NewPost, it: UpdateBuilder) {} + override fun update(id: PostId?, value: NewPost, it: UpdateBuilder) { + it[latestUpdateColumn] = DateTime.now().unixMillis + } private fun updateContent(post: RegisteredPost) { transaction(database) { diff --git a/runner/build.gradle b/runner/build.gradle index 6af9ff2..4aeb4fe 100644 --- a/runner/build.gradle +++ b/runner/build.gradle @@ -22,6 +22,7 @@ dependencies { api project(":plaguposter.ratings.source") api project(":plaguposter.ratings.selector") api project(":plaguposter.ratings.gc") + api project(":plaguposter.posts.gc") api project(":plaguposter.inlines") api libs.psql diff --git a/sample/config.json b/sample/config.json index ccc880e..18f7014 100644 --- a/sample/config.json +++ b/sample/config.json @@ -20,7 +20,8 @@ "dev.inmo.plaguposter.common.CommonPlugin", "dev.inmo.plaguposter.triggers.timer.Plugin", "dev.inmo.plaguposter.triggers.timer.disablers.ratings.Plugin", - "dev.inmo.plaguposter.triggers.timer.disablers.autoposts.Plugin" + "dev.inmo.plaguposter.triggers.timer.disablers.autoposts.Plugin", + "dev.inmo.plaguposter.posts.gc.Plugin" ], "posts": { "chats": { @@ -85,5 +86,10 @@ "skipPostAge": 86400 }, "immediateDrop": -6 + }, + "messagesChecker": { + "krontab": "0 0 0 * *", + "throttlingMillis": 1000, + "doFullCheck": false } } diff --git a/settings.gradle b/settings.gradle index 85c648e..fc09a85 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,6 +4,7 @@ String[] includes = [ ":common", ":posts", ":posts:panel", + ":posts:gc", ":posts_registrar", ":ratings", ":ratings:source",