Merge pull request #19 from InsanusMokrassar/0.5.0

0.5.0
This commit is contained in:
InsanusMokrassar 2023-11-06 21:31:25 +06:00 committed by GitHub
commit 2f0a823f7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 307 additions and 44 deletions

View File

@ -8,10 +8,10 @@ jobs:
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: Set up JDK 11 - name: Set up JDK 17
uses: actions/setup-java@v1 uses: actions/setup-java@v1
with: with:
java-version: 11 java-version: 17
- name: Rewrite version - name: Rewrite version
run: | run: |
branch="`echo "${{ github.ref }}" | grep -o "[^/]*$"`" branch="`echo "${{ github.ref }}" | grep -o "[^/]*$"`"

View File

@ -11,6 +11,9 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v2 uses: actions/checkout@v2
- uses: actions/setup-java@v1
with:
java-version: 17
- name: Rewrite version - name: Rewrite version
run: | run: |
branch="`echo "${{ github.ref }}" | grep -o "[^/]*$"`" branch="`echo "${{ github.ref }}" | grep -o "[^/]*$"`"

View File

@ -1,5 +1,10 @@
# PlaguPoster # PlaguPoster
## 0.5.0
* Dependencies update
* Since this update bots will require **`JDK` 17+**
## 0.3.0 ## 0.3.0
* `Versions`: * `Versions`:

View File

@ -10,4 +10,4 @@ android.enableJetifier=true
# Project data # Project data
group=dev.inmo group=dev.inmo
version=0.4.0 version=0.5.0

View File

@ -1,16 +1,16 @@
[versions] [versions]
kotlin = "1.8.22" kotlin = "1.9.20"
kotlin-serialization = "1.5.1" kotlin-serialization = "1.6.0"
plagubot = "7.2.1" plagubot = "7.3.0"
tgbotapi = "9.2.1" tgbotapi = "9.3.0"
microutils = "0.19.9" microutils = "0.20.12"
kslog = "1.1.2" kslog = "1.2.4"
krontab = "2.1.2" krontab = "2.2.3"
plagubot-plugins = "0.15.1" plagubot-plugins = "0.16.0"
dokka = "1.8.20" dokka = "1.9.10"
psql = "42.6.0" psql = "42.6.0"

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.2-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists

View File

@ -7,7 +7,7 @@ kotlin {
jvm { jvm {
compilations.main { compilations.main {
kotlinOptions { kotlinOptions {
jvmTarget = "1.8" jvmTarget = "17"
} }
} }
} }
@ -34,6 +34,6 @@ kotlin {
} }
java { java {
sourceCompatibility = JavaVersion.VERSION_1_8 sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_17
} }

24
posts/gc/build.gradle Normal file
View File

@ -0,0 +1,24 @@
plugins {
id "org.jetbrains.kotlin.multiplatform"
id "org.jetbrains.kotlin.plugin.serialization"
}
apply from: "$mppProjectWithSerializationPresetPath"
kotlin {
sourceSets {
commonMain {
dependencies {
api project(":plaguposter.common")
api project(":plaguposter.posts")
api libs.microutils.koin
api libs.krontab
}
}
jvmMain {
dependencies {
api libs.plagubot.plugins.inline.queries
}
}
}
}

View File

@ -0,0 +1 @@
package dev.inmo.plaguposter.posts.gc

View File

@ -0,0 +1,168 @@
package dev.inmo.plaguposter.posts.gc
import com.benasher44.uuid.uuid4
import dev.inmo.krontab.KrontabTemplate
import dev.inmo.krontab.toKronScheduler
import dev.inmo.krontab.utils.asFlowWithDelays
import dev.inmo.kslog.common.KSLog
import dev.inmo.kslog.common.i
import dev.inmo.kslog.common.iS
import dev.inmo.micro_utils.coroutines.actor
import dev.inmo.micro_utils.coroutines.subscribeSafelyWithoutExceptions
import dev.inmo.micro_utils.repos.deleteById
import dev.inmo.plagubot.Plugin
import dev.inmo.plaguposter.common.ChatConfig
import dev.inmo.plaguposter.posts.models.NewPost
import dev.inmo.plaguposter.posts.models.PostContentInfo
import dev.inmo.plaguposter.posts.repo.PostsRepo
import dev.inmo.tgbotapi.extensions.api.delete
import dev.inmo.tgbotapi.extensions.api.edit.edit
import dev.inmo.tgbotapi.extensions.api.forwardMessage
import dev.inmo.tgbotapi.extensions.api.send.send
import dev.inmo.tgbotapi.extensions.behaviour_builder.BehaviourContext
import dev.inmo.tgbotapi.extensions.behaviour_builder.expectations.waitMessageDataCallbackQuery
import dev.inmo.tgbotapi.extensions.behaviour_builder.triggers_handling.onCommand
import dev.inmo.tgbotapi.extensions.utils.extensions.sameMessage
import dev.inmo.tgbotapi.extensions.utils.types.buttons.dataButton
import dev.inmo.tgbotapi.extensions.utils.types.buttons.flatInlineKeyboard
import dev.inmo.tgbotapi.types.MilliSeconds
import dev.inmo.tgbotapi.utils.bold
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.*
import org.jetbrains.exposed.sql.Database
import org.koin.core.Koin
import org.koin.core.module.Module
object Plugin : Plugin {
@Serializable
internal data class Config (
val krontab: KrontabTemplate? = null,
val throttlingMillis: MilliSeconds = 1000,
val doFullCheck: Boolean = false
)
override fun Module.setupDI(database: Database, params: JsonObject) {
params["messagesChecker"] ?.let { element ->
single { get<Json>().decodeFromJsonElement(Config.serializer(), element) }
}
}
private val gcLogger = KSLog("GarbageCollector")
private suspend fun BehaviourContext.doRecheck(
throttlingMillis: MilliSeconds,
doFullCheck: Boolean,
postsRepo: PostsRepo,
chatsConfig: ChatConfig
) {
val posts = postsRepo.getAll()
gcLogger.i {
"Start garbage collecting of posts. Initial posts count: ${posts.size}"
}
posts.forEach { (postId, post) ->
val surelyAbsentMessages = mutableListOf<PostContentInfo>()
for (content in post.content) {
try {
forwardMessage(
toChatId = chatsConfig.cacheChatId,
fromChatId = content.chatId,
messageId = content.messageId
)
if (!doFullCheck) {
break
}
} catch (e: Throwable) {
if (e.message ?.contains("message to forward not found") == true) {
surelyAbsentMessages.add(content)
}
}
delay(throttlingMillis)
}
val existsPostMessages = post.content.filter {
it !in surelyAbsentMessages
}
if (existsPostMessages.isNotEmpty() && surelyAbsentMessages.isNotEmpty()) {
runCatching {
postsRepo.update(
postId,
NewPost(
content = existsPostMessages
)
)
}
}
if (existsPostMessages.isNotEmpty()) {
return@forEach
}
send(
chatsConfig.sourceChatId,
"Can't find any messages for post $postId. So, deleting it"
)
runCatching {
postsRepo.deleteById(postId)
}
}
gcLogger.iS {
"Complete garbage collecting of posts. Result posts count: ${postsRepo.count()}"
}
}
override suspend fun BehaviourContext.setupBotPlugin(koin: Koin) {
val postsRepo = koin.get<PostsRepo>()
val chatsConfig = koin.get<ChatConfig>()
val config = koin.getOrNull<Config>() ?: Config()
val scope = koin.get<CoroutineScope>()
val recheckActor = scope.actor<Unit>(0) {
runCatching {
doRecheck(
config.throttlingMillis,
config.doFullCheck,
postsRepo,
chatsConfig
)
}
}
config.krontab ?.toKronScheduler() ?.asFlowWithDelays() ?.subscribeSafelyWithoutExceptions(koin.get()) {
recheckActor.trySend(Unit)
}
onCommand("force_garbage_collection") { message ->
launch {
val prefix = uuid4().toString()
val yesData = "${prefix}yes"
val noData = "${prefix}no"
edit(
message,
text = "Are you sure want to trigger posts garbage collecting?",
replyMarkup = flatInlineKeyboard {
dataButton("Sure", yesData)
dataButton("No", noData)
}
)
val answer = waitMessageDataCallbackQuery().filter {
it.message.sameMessage(message)
}.first()
if (answer.data == yesData) {
if (recheckActor.trySend(Unit).isSuccess) {
edit(message, "Checking of posts without exists messages triggered")
} else {
edit(message) {
+"Checking of posts without exists messages has been triggered " + bold("earlier")
}
}
} else {
delete(message)
}
}
}
}
}

View File

@ -4,14 +4,11 @@ import com.benasher44.uuid.uuid4
import dev.inmo.micro_utils.coroutines.runCatchingSafely import dev.inmo.micro_utils.coroutines.runCatchingSafely
import dev.inmo.micro_utils.coroutines.subscribeSafelyWithoutExceptions import dev.inmo.micro_utils.coroutines.subscribeSafelyWithoutExceptions
import dev.inmo.micro_utils.koin.getAllDistinct import dev.inmo.micro_utils.koin.getAllDistinct
import dev.inmo.micro_utils.repos.*
import dev.inmo.micro_utils.repos.cache.cache.FullKVCache import dev.inmo.micro_utils.repos.cache.cache.FullKVCache
import dev.inmo.micro_utils.repos.cache.cached import dev.inmo.micro_utils.repos.cache.cached
import dev.inmo.micro_utils.repos.cache.full.cached import dev.inmo.micro_utils.repos.cache.full.cached
import dev.inmo.micro_utils.repos.deleteById import dev.inmo.micro_utils.repos.cache.full.fullyCached
import dev.inmo.micro_utils.repos.id
import dev.inmo.micro_utils.repos.set
import dev.inmo.micro_utils.repos.unset
import dev.inmo.micro_utils.repos.value
import dev.inmo.plagubot.Plugin import dev.inmo.plagubot.Plugin
import dev.inmo.plaguposter.common.ChatConfig import dev.inmo.plaguposter.common.ChatConfig
import dev.inmo.plaguposter.common.UnsuccessfulSymbol import dev.inmo.plaguposter.common.UnsuccessfulSymbol
@ -99,7 +96,7 @@ object Plugin : Plugin {
val api = koin.get<PanelButtonsAPI>() val api = koin.get<PanelButtonsAPI>()
val basePostsMessages = PostsMessages(koin.get(), koin.get()) val basePostsMessages = PostsMessages(koin.get(), koin.get())
val postsMessages = if (koin.useCache) { val postsMessages = if (koin.useCache) {
basePostsMessages.cached(FullKVCache(), koin.get()) basePostsMessages.fullyCached(MapKeyValueRepo(), koin.get())
} else { } else {
basePostsMessages basePostsMessages
} }

View File

@ -11,7 +11,6 @@ import dev.inmo.tgbotapi.extensions.api.send.copyMessage
import dev.inmo.tgbotapi.extensions.api.send.send import dev.inmo.tgbotapi.extensions.api.send.send
import dev.inmo.tgbotapi.extensions.utils.* import dev.inmo.tgbotapi.extensions.utils.*
import dev.inmo.tgbotapi.types.* import dev.inmo.tgbotapi.types.*
import dev.inmo.tgbotapi.types.message.content.MediaGroupContent
import dev.inmo.tgbotapi.types.message.content.MediaGroupPartContent import dev.inmo.tgbotapi.types.message.content.MediaGroupPartContent
class PostPublisher( class PostPublisher(
@ -21,10 +20,10 @@ class PostPublisher(
private val targetChatIds: List<IdChatIdentifier>, private val targetChatIds: List<IdChatIdentifier>,
private val deleteAfterPosting: Boolean = true private val deleteAfterPosting: Boolean = true
) { ) {
suspend fun publish(postId: PostId) { suspend fun publish(postId: PostId): Boolean {
val messagesInfo = postsRepo.getById(postId) ?: let { val messagesInfo = postsRepo.getById(postId) ?: let {
logger.w { "Unable to get post with id $postId for publishing" } logger.w { "Unable to get post with id $postId for publishing" }
return return false
} }
val sortedMessagesContents = messagesInfo.content.groupBy { it.group }.flatMap { (group, list) -> val sortedMessagesContents = messagesInfo.content.groupBy { it.group }.flatMap { (group, list) ->
if (group == null) { if (group == null) {
@ -35,6 +34,7 @@ class PostPublisher(
listOf(list.first().order to list) listOf(list.first().order to list)
} }
}.sortedBy { it.first } }.sortedBy { it.first }
var haveSentMessages = false
sortedMessagesContents.forEach { (_, contents) -> sortedMessagesContents.forEach { (_, contents) ->
contents.singleOrNull() ?.also { contents.singleOrNull() ?.also {
@ -44,13 +44,16 @@ class PostPublisher(
}.onFailure { _ -> }.onFailure { _ ->
runCatching { runCatching {
bot.forwardMessage( bot.forwardMessage(
it.chatId, fromChatId = it.chatId,
targetChatId, toChatId = cachingChatId,
it.messageId messageId = it.messageId
) )
}.onSuccess { }.onSuccess {
bot.copyMessage(targetChatId, it) bot.copyMessage(targetChatId, it)
haveSentMessages = true
} }
}.onSuccess {
haveSentMessages = true
} }
} }
return@forEach return@forEach
@ -61,12 +64,14 @@ class PostPublisher(
forwardedMessage.withContentOrNull<MediaGroupPartContent>() ?: null.also { _ -> forwardedMessage.withContentOrNull<MediaGroupPartContent>() ?: null.also { _ ->
targetChatIds.forEach { targetChatId -> targetChatIds.forEach { targetChatId ->
bot.copyMessage(targetChatId, forwardedMessage) bot.copyMessage(targetChatId, forwardedMessage)
haveSentMessages = true
} }
} }
} }
resultContents.singleOrNull() ?.also { resultContents.singleOrNull() ?.also {
targetChatIds.forEach { targetChatId -> targetChatIds.forEach { targetChatId ->
bot.copyMessage(targetChatId, it) bot.copyMessage(targetChatId, it)
haveSentMessages = true
} }
return@forEach return@forEach
} ?: resultContents.chunked(mediaCountInMediaGroup.last).forEach { } ?: resultContents.chunked(mediaCountInMediaGroup.last).forEach {
@ -75,6 +80,7 @@ class PostPublisher(
targetChatId, targetChatId,
it.map { it.content.toMediaGroupMemberTelegramMedia() } it.map { it.content.toMediaGroupMemberTelegramMedia() }
) )
haveSentMessages = true
} }
} }
} }
@ -83,5 +89,6 @@ class PostPublisher(
postsRepo.deleteById(postId) postsRepo.deleteById(postId)
} }
return haveSentMessages
} }
} }

View File

@ -1,10 +1,13 @@
package dev.inmo.plaguposter.posts.cached package dev.inmo.plaguposter.posts.cached
import dev.inmo.micro_utils.coroutines.SmartRWLocker
import korlibs.time.DateTime import korlibs.time.DateTime
import dev.inmo.micro_utils.pagination.FirstPagePagination import dev.inmo.micro_utils.pagination.FirstPagePagination
import dev.inmo.micro_utils.pagination.firstPageWithOneElementPagination import dev.inmo.micro_utils.pagination.firstPageWithOneElementPagination
import dev.inmo.micro_utils.pagination.utils.doForAllWithNextPaging import dev.inmo.micro_utils.pagination.utils.doForAllWithNextPaging
import dev.inmo.micro_utils.repos.CRUDRepo import dev.inmo.micro_utils.repos.CRUDRepo
import dev.inmo.micro_utils.repos.KeyValueRepo
import dev.inmo.micro_utils.repos.MapKeyValueRepo
import dev.inmo.micro_utils.repos.cache.cache.FullKVCache import dev.inmo.micro_utils.repos.cache.cache.FullKVCache
import dev.inmo.micro_utils.repos.cache.full.FullCRUDCacheRepo import dev.inmo.micro_utils.repos.cache.full.FullCRUDCacheRepo
import dev.inmo.plaguposter.posts.models.NewPost import dev.inmo.plaguposter.posts.models.NewPost
@ -20,12 +23,13 @@ import kotlinx.coroutines.flow.Flow
class CachedPostsRepo( class CachedPostsRepo(
private val parentRepo: PostsRepo, private val parentRepo: PostsRepo,
private val scope: CoroutineScope, private val scope: CoroutineScope,
private val kvCache: FullKVCache<PostId, RegisteredPost> = FullKVCache() private val kvCache: KeyValueRepo<PostId, RegisteredPost> = MapKeyValueRepo()
) : PostsRepo, CRUDRepo<RegisteredPost, PostId, NewPost> by FullCRUDCacheRepo( ) : PostsRepo, CRUDRepo<RegisteredPost, PostId, NewPost> by FullCRUDCacheRepo(
parentRepo, parentRepo,
kvCache, kvCache,
scope, scope,
skipStartInvalidate = false, skipStartInvalidate = false,
locker = SmartRWLocker(),
{ it.id } { it.id }
) { ) {
override val removedPostsFlow: Flow<RegisteredPost> by parentRepo::removedPostsFlow override val removedPostsFlow: Flow<RegisteredPost> by parentRepo::removedPostsFlow

View File

@ -26,6 +26,7 @@ class ExposedPostsRepo(
) { ) {
val idColumn = text("id") val idColumn = text("id")
val createdColumn = double("datetime").default(0.0) val createdColumn = double("datetime").default(0.0)
val latestUpdateColumn = double("latest_update").default(0.0)
private val contentRepo by lazy { private val contentRepo by lazy {
ExposedContentInfoRepo( ExposedContentInfoRepo(
@ -81,7 +82,9 @@ class ExposedPostsRepo(
return id return id
} }
override fun update(id: PostId?, value: NewPost, it: UpdateBuilder<Int>) {} override fun update(id: PostId?, value: NewPost, it: UpdateBuilder<Int>) {
it[latestUpdateColumn] = DateTime.now().unixMillis
}
private fun updateContent(post: RegisteredPost) { private fun updateContent(post: RegisteredPost) {
transaction(database) { transaction(database) {

View File

@ -1,7 +1,7 @@
apply plugin: 'maven-publish' apply plugin: 'maven-publish'
task javadocsJar(type: Jar) { task javadocsJar(type: Jar) {
classifier = 'javadoc' archiveClassifier = 'javadoc'
} }
publishing { publishing {
@ -79,4 +79,27 @@ if (project.hasProperty("signing.gnupg.keyName")) {
dependsOn(it) dependsOn(it)
} }
} }
// Workaround to make android sign operations depend on signing tasks
project.getTasks().withType(AbstractPublishToMaven.class).configureEach {
def signingTasks = project.getTasks().withType(Sign.class)
mustRunAfter(signingTasks)
}
// Workaround to make test tasks use sign
project.getTasks().withType(Sign.class).configureEach { signTask ->
def withoutSign = (signTask.name.startsWith("sign") ? signTask.name.minus("sign") : signTask.name)
def pubName = withoutSign.endsWith("Publication") ? withoutSign.substring(0, withoutSign.length() - "Publication".length()) : withoutSign
// These tasks only exist for native targets, hence findByName() to avoid trying to find them for other targets
// Task ':linkDebugTest<platform>' uses this output of task ':sign<platform>Publication' without declaring an explicit or implicit dependency
def debugTestTask = tasks.findByName("linkDebugTest$pubName")
if (debugTestTask != null) {
signTask.mustRunAfter(debugTestTask)
}
// Task ':compileTestKotlin<platform>' uses this output of task ':sign<platform>Publication' without declaring an explicit or implicit dependency
def testTask = tasks.findByName("compileTestKotlin$pubName")
if (testTask != null) {
signTask.mustRunAfter(testTask)
}
}
} }

View File

@ -5,4 +5,9 @@ import dev.inmo.plaguposter.posts.models.PostId
interface Selector { interface Selector {
suspend fun take(n: Int = 1, now: DateTime = DateTime.now(), exclude: List<PostId> = emptyList()): List<PostId> suspend fun take(n: Int = 1, now: DateTime = DateTime.now(), exclude: List<PostId> = emptyList()): List<PostId>
suspend fun takeOneOrNull(now: DateTime = DateTime.now(), exclude: List<PostId> = emptyList()): PostId? = take(
n = 1,
now = now,
exclude = exclude
).firstOrNull()
} }

View File

@ -22,6 +22,7 @@ dependencies {
api project(":plaguposter.ratings.source") api project(":plaguposter.ratings.source")
api project(":plaguposter.ratings.selector") api project(":plaguposter.ratings.selector")
api project(":plaguposter.ratings.gc") api project(":plaguposter.ratings.gc")
api project(":plaguposter.posts.gc")
api project(":plaguposter.inlines") api project(":plaguposter.inlines")
api libs.psql api libs.psql
@ -32,6 +33,6 @@ application {
} }
java { java {
sourceCompatibility = JavaVersion.VERSION_1_8 sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_17
} }

View File

@ -20,7 +20,8 @@
"dev.inmo.plaguposter.common.CommonPlugin", "dev.inmo.plaguposter.common.CommonPlugin",
"dev.inmo.plaguposter.triggers.timer.Plugin", "dev.inmo.plaguposter.triggers.timer.Plugin",
"dev.inmo.plaguposter.triggers.timer.disablers.ratings.Plugin", "dev.inmo.plaguposter.triggers.timer.disablers.ratings.Plugin",
"dev.inmo.plaguposter.triggers.timer.disablers.autoposts.Plugin" "dev.inmo.plaguposter.triggers.timer.disablers.autoposts.Plugin",
"dev.inmo.plaguposter.posts.gc.Plugin"
], ],
"posts": { "posts": {
"chats": { "chats": {
@ -65,7 +66,9 @@
] ]
}, },
"timer_trigger": { "timer_trigger": {
"krontab": "0 30 2/4 * *" "krontab": "0 30 2/4 * *",
"retryOnPostFailureTimes": 0,
"_note": "retryOnPostFailureTimes will retry to publish one or several posts if posting has been failed"
}, },
"panel": { "panel": {
"textPrefix": "Post management:", "textPrefix": "Post management:",
@ -83,5 +86,10 @@
"skipPostAge": 86400 "skipPostAge": 86400
}, },
"immediateDrop": -6 "immediateDrop": -6
},
"messagesChecker": {
"krontab": "0 0 0 * *",
"throttlingMillis": 1000,
"doFullCheck": false
} }
} }

View File

@ -4,6 +4,7 @@ String[] includes = [
":common", ":common",
":posts", ":posts",
":posts:panel", ":posts:panel",
":posts:gc",
":posts_registrar", ":posts_registrar",
":ratings", ":ratings",
":ratings:source", ":ratings:source",
@ -29,5 +30,3 @@ includes.each { originalName ->
project.name = projectName project.name = projectName
project.projectDir = new File(projectDirectory) project.projectDir = new File(projectDirectory)
} }
enableFeaturePreview("VERSION_CATALOGS")

View File

@ -77,7 +77,7 @@ object Plugin : Plugin {
return@onCommand return@onCommand
} }
} ?: selector ?.take(1) ?.firstOrNull() } ?: selector ?.takeOneOrNull()
if (postId == null) { if (postId == null) {
reply( reply(
it, it,

View File

@ -51,7 +51,8 @@ object Plugin : Plugin {
internal data class Config( internal data class Config(
@SerialName("krontab") @SerialName("krontab")
val krontabTemplate: KrontabTemplate, val krontabTemplate: KrontabTemplate,
val dateTimeFormat: String = "HH:mm:ss, dd.MM.yyyy" val dateTimeFormat: String = "HH:mm:ss, dd.MM.yyyy",
val retryOnPostFailureTimes: Int = 0
) { ) {
@Transient @Transient
val krontab by lazy { val krontab by lazy {
@ -88,13 +89,27 @@ object Plugin : Plugin {
} }
val krontab = koin.get<Config>().krontab val krontab = koin.get<Config>().krontab
val retryOnPostFailureTimes = koin.get<Config>().retryOnPostFailureTimes
val dateTimeFormat = koin.get<Config>().format val dateTimeFormat = koin.get<Config>().format
krontab.asFlowWithDelays().subscribeSafelyWithoutExceptions(this) { dateTime -> krontab.asFlowWithDelays().subscribeSafelyWithoutExceptions(this) { dateTime ->
selector.take(now = dateTime).forEach { postId -> var leftRetries = retryOnPostFailureTimes
if (filters.all { it.check(postId, dateTime) }) { do {
publisher.publish(postId) val success = runCatching {
selector.takeOneOrNull(now = dateTime) ?.let { postId ->
if (filters.all { it.check(postId, dateTime) }) {
publisher.publish(postId)
} else {
false
}
} ?: false
}.getOrElse {
false
} }
} if (success) {
break;
}
leftRetries--;
} while (leftRetries > 0)
} }
suspend fun buildPage(pagination: Pagination = FirstPagePagination(size = pageCallbackDataQuerySize)): InlineKeyboardMarkup { suspend fun buildPage(pagination: Pagination = FirstPagePagination(size = pageCallbackDataQuerySize)): InlineKeyboardMarkup {
@ -113,7 +128,7 @@ object Plugin : Plugin {
val selected = mutableListOf<PostId>() val selected = mutableListOf<PostId>()
krontab.asFlowWithoutDelays().take(pagination.lastIndexExclusive).collectIndexed { i, dateTime -> krontab.asFlowWithoutDelays().take(pagination.lastIndexExclusive).collectIndexed { i, dateTime ->
val postId = selector.take(now = dateTime, exclude = selected).firstOrNull() ?.also { postId -> val postId = selector.takeOneOrNull(now = dateTime, exclude = selected) ?.also { postId ->
if (filters.all { it.check(postId, dateTime) }) { if (filters.all { it.check(postId, dateTime) }) {
selected.add(postId) selected.add(postId)
} else { } else {