diff --git a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/ktor/KtorRequestsExecutor.kt b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/ktor/KtorRequestsExecutor.kt index c9a3663fe5..b6b3cd79de 100644 --- a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/ktor/KtorRequestsExecutor.kt +++ b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/ktor/KtorRequestsExecutor.kt @@ -51,7 +51,7 @@ class KtorRequestsExecutor( override suspend fun execute(request: Request): T { return runCatchingSafely { pipelineStepsHolder.onBeforeSearchCallFactory(request, callsFactories) - requestsLimiter.limit { + requestsLimiter.limit(request) { var result: T? = null lateinit var factoryHandledRequest: KtorCallFactory for (potentialFactory in callsFactories) { diff --git a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/ExceptionsOnlyLimiter.kt b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/ExceptionsOnlyLimiter.kt index 7d908afbb1..0ad5e2a14b 100644 --- a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/ExceptionsOnlyLimiter.kt +++ b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/ExceptionsOnlyLimiter.kt @@ -2,6 +2,7 @@ package dev.inmo.tgbotapi.bot.settings.limiters import dev.inmo.micro_utils.coroutines.safely import dev.inmo.tgbotapi.bot.exceptions.TooMuchRequestsException +import dev.inmo.tgbotapi.requests.abstracts.Request import dev.inmo.tgbotapi.types.MilliSeconds import dev.inmo.tgbotapi.types.RetryAfterError import io.ktor.client.plugins.ClientRequestException @@ -9,46 +10,77 @@ import io.ktor.http.HttpStatusCode import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.first +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock /** * This limiter will limit requests only after getting a [RetryAfterError] or [ClientRequestException] with - * [HttpStatusCode.TooManyRequests] status code. Important thing is that in case if some of block has been blocked, all - * the others will wait until it will be possible to be called + * [HttpStatusCode.TooManyRequests] status code. When block throws [TooMuchRequestsException] or [RetryAfterError], + * in the limiter will be created special [Mutex] for the key, defined in [requestKeyFactory], and this mutex will be + * locked for some time based on type of error. See [limit] for more info * * @param defaultTooManyRequestsDelay This parameter will be used in case of getting [ClientRequestException] with * [HttpStatusCode.TooManyRequests] as a parameter for delay like it would be [TooMuchRequestsException]. The reason of * it is that in [ClientRequestException] there is no information about required delay between requests + * @param requestKeyFactory This parameter define how to determine request key in limiter */ class ExceptionsOnlyLimiter( - private val defaultTooManyRequestsDelay: MilliSeconds = 1000L + private val defaultTooManyRequestsDelay: MilliSeconds = 1000L, + private val requestKeyFactory: suspend (Request<*>) -> Any = { it::class } ) : RequestLimiter { - private val lockState = MutableStateFlow(false) - private suspend fun lock(timeMillis: MilliSeconds) { - try { + /** + * Should be used for all [mutexesMap] changes + */ + private val lockMutex = Mutex() + + /** + * Contains [Mutex]es for [Any] keys. If [Mutex] is presented it means that [lock] function has been called and + * that mutex should be locked for some time + */ + private val mutexesMap = mutableMapOf() + private suspend fun lock( + key: Any, + timeMillis: MilliSeconds + ) { + val mutex = Mutex() + mutex.withLock { safely { - lockState.emit(true) + lockMutex.withLock { + mutexesMap[key] = mutex + } delay(timeMillis) + lockMutex.withLock { + mutexesMap.remove(key) + } } - } finally { - lockState.emit(false) } } - override suspend fun limit(block: suspend () -> T): T { + /** + * Just call [block] + */ + override suspend fun limit(block: suspend () -> T): T = block() + + /** + * Will take a key for [request] using [requestKeyFactory] and try to retrieve [Mutex] by that key. In case if [Mutex] + * presented it will wait while [Mutex] is locked. After that operations completed, method will call + * [limit] with [block] inside of [safely] and in case of exception will call internal [lock] method + */ + override suspend fun limit(request: Request, block: suspend () -> T): T { + val key = requestKeyFactory(request) while (true) { - if (lockState.value) { - lockState.first { it == false } - } + // do nothing, just wait for unlock in case when mutex is presented in mutexesMap + lockMutex.withLock { mutexesMap[key] } ?.takeIf { it.isLocked } ?.withLock { } var throwable: Throwable? = null val result = safely({ throwable = when (it) { is TooMuchRequestsException -> { - lock(it.retryAfter.leftToRetry) + lock(key, it.retryAfter.leftToRetry) it } is ClientRequestException -> { if (it.response.status == HttpStatusCode.TooManyRequests) { - lock(defaultTooManyRequestsDelay) + lock(key, defaultTooManyRequestsDelay) } else { throw it } @@ -58,7 +90,7 @@ class ExceptionsOnlyLimiter( } null }) { - block() + limit(block) } if (throwable == null) { return result!! diff --git a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/RequestLimiter.kt b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/RequestLimiter.kt index a23ed0b97c..dd08878577 100644 --- a/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/RequestLimiter.kt +++ b/tgbotapi.core/src/commonMain/kotlin/dev/inmo/tgbotapi/bot/settings/limiters/RequestLimiter.kt @@ -1,8 +1,12 @@ package dev.inmo.tgbotapi.bot.settings.limiters +import dev.inmo.tgbotapi.requests.abstracts.Request + interface RequestLimiter { /** * Use limit for working of block (like delay between or after, for example) */ suspend fun limit(block: suspend () -> T): T + + suspend fun limit(request: Request, block: suspend () -> T) = limit(block) }