From 0d19952ba78bcd9e1893f745809625b8e087a8e7 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Tue, 17 Nov 2020 15:48:58 +0600 Subject: [PATCH] PowLimiter and CommonLimiter rewriting (#210) --- CHANGELOG.md | 1 + .../bot/settings/limiters/CommonLimiter.kt | 72 +++++++------------ .../bot/settings/limiters/PowLimiter.kt | 66 ++++++++--------- 3 files changed, 58 insertions(+), 81 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b5223e811..f82db78a7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * `MicroUtils`: `0.4.0` -> `0.4.1` * `Core`: * `TelegramAPIUrlsKeeper` will fix ending of host url since this version + * New mechanisms in`PowLimiter` and `CommonLimiter` has been added ## 0.30.6 diff --git a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/CommonLimiter.kt b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/CommonLimiter.kt index 4910440856..d87b007544 100644 --- a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/CommonLimiter.kt +++ b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/CommonLimiter.kt @@ -1,67 +1,43 @@ package dev.inmo.tgbotapi.bot.settings.limiters import com.soywiz.klock.DateTime +import dev.inmo.micro_utils.coroutines.* +import dev.inmo.tgbotapi.types.MilliSeconds import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.sync.Semaphore +import kotlinx.serialization.Serializable +import kotlinx.serialization.Transient +import kotlin.coroutines.Continuation +import kotlin.math.roundToLong private fun now(): Long = DateTime.nowUnixLong() +@Serializable class CommonLimiter( private val lockCount: Int = 10, - private val regenTime: Long = 20 * 1000L // 20 seconds for full regen of opportunity to send message + private val regenTime: MilliSeconds = 15 * 1000, // 15 seconds for full regen of opportunity to send message + @Transient + private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default) ) : RequestLimiter { - private var doLimit: Boolean = false - - private val counterChannel = Channel(Channel.UNLIMITED) - private val scope = CoroutineScope(Dispatchers.Default) - private val counterJob = scope.launch { - var wasLastSecond = 0 - var lastCountTime = now() - var limitManagementJob: Job? = null - var removeLimitTime: Long = lastCountTime - for (counter in counterChannel) { - val now = now() - if (now - lastCountTime > 1000) { - lastCountTime = now - wasLastSecond = 1 - } else { - wasLastSecond++ - } - if (wasLastSecond >= lockCount) { - removeLimitTime = now + regenTime - if (limitManagementJob == null) { - limitManagementJob = launch { - doLimit = true - var internalNow = now() - while (internalNow < removeLimitTime) { - delay(removeLimitTime - internalNow) - internalNow = now() - } - doLimit = false - } + private val quotaSemaphore = Semaphore(lockCount) + private val counterRegeneratorJob = scope.launch { + val regenDelay: MilliSeconds = (regenTime.toDouble() / lockCount).roundToLong() + while (isActive) { + delay(regenDelay) + if (quotaSemaphore.availablePermits < lockCount) { + try { + quotaSemaphore.release() + } catch (_: IllegalStateException) { + // Skip IllegalStateException due to the fact that this exception may happens in release method } } - if (now > removeLimitTime) { - limitManagementJob = null - } - } - } - - private val quoterChannel = Channel(Channel.CONFLATED) - private val tickerJob = scope.launch { - while (isActive) { - quoterChannel.send(Unit) - delay(1000L) } } override suspend fun limit(block: suspend () -> T): T { - counterChannel.send(Unit) - return if (!doLimit) { - block() - } else { - quoterChannel.receive() - block() - } + quotaSemaphore.acquire() + return block() } } diff --git a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/PowLimiter.kt b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/PowLimiter.kt index 910cc03988..2eb0a87fa9 100644 --- a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/PowLimiter.kt +++ b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/PowLimiter.kt @@ -1,5 +1,7 @@ package dev.inmo.tgbotapi.bot.settings.limiters +import dev.inmo.micro_utils.coroutines.* +import dev.inmo.tgbotapi.types.MilliSeconds import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.serialization.Serializable @@ -9,62 +11,60 @@ import kotlin.math.pow private sealed class RequestEvent private class AddRequest( - val continuation: Continuation + val continuation: Continuation ) : RequestEvent() private object CompleteRequest : RequestEvent() @Serializable data class PowLimiter( - private val minAwaitTime: Long = 0L, - private val maxAwaitTime: Long = 10000L, + private val minAwaitTime: MilliSeconds = 0L, + private val maxAwaitTime: MilliSeconds = 10000L, private val powValue: Double = 4.0, - private val powK: Double = 0.0016 + private val powK: Double = 1.6, + @Transient + private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default) ) : RequestLimiter { - @Transient - private val scope = CoroutineScope(Dispatchers.Default) - @Transient - private val eventsChannel = Channel(Channel.UNLIMITED) @Transient private val awaitTimeRange = minAwaitTime .. maxAwaitTime + @Transient + private val eventsChannel = let { + var requestsInWork = 0.0 + scope.actor { + when (it) { + is AddRequest -> { + val awaitTime = (requestsInWork.pow(powValue) * powK).toLong() + requestsInWork++ - init { - scope.launch { - var requestsInWork: Double = 0.0 - for (event in eventsChannel) { - when (event) { - is AddRequest -> { - val awaitTime = (((requestsInWork.pow(powValue) * powK) * 1000L).toLong()) - requestsInWork++ - - event.continuation.resume( - if (awaitTime in awaitTimeRange) { - awaitTime - } else { - if (awaitTime < minAwaitTime) { - minAwaitTime - } else { - maxAwaitTime - } - } - ) - } - is CompleteRequest -> requestsInWork-- + it.continuation.resume( + when { + awaitTime in awaitTimeRange -> awaitTime + awaitTime < awaitTimeRange.first -> awaitTimeRange.first + else -> awaitTimeRange.last + } + ) } + is CompleteRequest -> requestsInWork-- } } } - override suspend fun limit( - block: suspend () -> T + private suspend inline fun withDelay( + crossinline block: suspend () -> T ): T { val delayMillis = suspendCoroutine { scope.launch { eventsChannel.send(AddRequest(it)) } } delay(delayMillis) return try { - block() + safely { block() } } finally { eventsChannel.send(CompleteRequest) } } + + override suspend fun limit( + block: suspend () -> T + ): T { + return withDelay(block) + } }