1
0
mirror of https://github.com/InsanusMokrassar/TelegramBotAPI.git synced 2024-11-22 16:23:48 +00:00

improvements in ExceptionsOnlyLimiter

This commit is contained in:
InsanusMokrassar 2022-12-26 21:37:48 +06:00
parent 8e64205f53
commit 91307f3ebf
3 changed files with 53 additions and 17 deletions

View File

@ -51,7 +51,7 @@ class KtorRequestsExecutor(
override suspend fun <T : Any> execute(request: Request<T>): T { override suspend fun <T : Any> execute(request: Request<T>): T {
return runCatchingSafely { return runCatchingSafely {
pipelineStepsHolder.onBeforeSearchCallFactory(request, callsFactories) pipelineStepsHolder.onBeforeSearchCallFactory(request, callsFactories)
requestsLimiter.limit { requestsLimiter.limit(request) {
var result: T? = null var result: T? = null
lateinit var factoryHandledRequest: KtorCallFactory lateinit var factoryHandledRequest: KtorCallFactory
for (potentialFactory in callsFactories) { for (potentialFactory in callsFactories) {

View File

@ -2,6 +2,7 @@ package dev.inmo.tgbotapi.bot.settings.limiters
import dev.inmo.micro_utils.coroutines.safely import dev.inmo.micro_utils.coroutines.safely
import dev.inmo.tgbotapi.bot.exceptions.TooMuchRequestsException import dev.inmo.tgbotapi.bot.exceptions.TooMuchRequestsException
import dev.inmo.tgbotapi.requests.abstracts.Request
import dev.inmo.tgbotapi.types.MilliSeconds import dev.inmo.tgbotapi.types.MilliSeconds
import dev.inmo.tgbotapi.types.RetryAfterError import dev.inmo.tgbotapi.types.RetryAfterError
import io.ktor.client.plugins.ClientRequestException import io.ktor.client.plugins.ClientRequestException
@ -9,46 +10,77 @@ import io.ktor.http.HttpStatusCode
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.first
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
/** /**
* This limiter will limit requests only after getting a [RetryAfterError] or [ClientRequestException] with * This limiter will limit requests only after getting a [RetryAfterError] or [ClientRequestException] with
* [HttpStatusCode.TooManyRequests] status code. Important thing is that in case if some of block has been blocked, all * [HttpStatusCode.TooManyRequests] status code. When block throws [TooMuchRequestsException] or [RetryAfterError],
* the others will wait until it will be possible to be called * in the limiter will be created special [Mutex] for the key, defined in [requestKeyFactory], and this mutex will be
* locked for some time based on type of error. See [limit] for more info
* *
* @param defaultTooManyRequestsDelay This parameter will be used in case of getting [ClientRequestException] with * @param defaultTooManyRequestsDelay This parameter will be used in case of getting [ClientRequestException] with
* [HttpStatusCode.TooManyRequests] as a parameter for delay like it would be [TooMuchRequestsException]. The reason of * [HttpStatusCode.TooManyRequests] as a parameter for delay like it would be [TooMuchRequestsException]. The reason of
* it is that in [ClientRequestException] there is no information about required delay between requests * it is that in [ClientRequestException] there is no information about required delay between requests
* @param requestKeyFactory This parameter define how to determine request key in limiter
*/ */
class ExceptionsOnlyLimiter( class ExceptionsOnlyLimiter(
private val defaultTooManyRequestsDelay: MilliSeconds = 1000L private val defaultTooManyRequestsDelay: MilliSeconds = 1000L,
private val requestKeyFactory: suspend (Request<*>) -> Any = { it::class }
) : RequestLimiter { ) : RequestLimiter {
private val lockState = MutableStateFlow(false) /**
private suspend fun lock(timeMillis: MilliSeconds) { * Should be used for all [mutexesMap] changes
try { */
private val lockMutex = Mutex()
/**
* Contains [Mutex]es for [Any] keys. If [Mutex] is presented it means that [lock] function has been called and
* that mutex should be locked for some time
*/
private val mutexesMap = mutableMapOf<Any, Mutex>()
private suspend fun lock(
key: Any,
timeMillis: MilliSeconds
) {
val mutex = Mutex()
mutex.withLock {
safely { safely {
lockState.emit(true) lockMutex.withLock {
delay(timeMillis) mutexesMap[key] = mutex
}
delay(timeMillis)
lockMutex.withLock {
mutexesMap.remove(key)
}
} }
} finally {
lockState.emit(false)
} }
} }
override suspend fun <T> limit(block: suspend () -> T): T { /**
* Just call [block]
*/
override suspend fun <T> limit(block: suspend () -> T): T = block()
/**
* Will take a key for [request] using [requestKeyFactory] and try to retrieve [Mutex] by that key. In case if [Mutex]
* presented it will wait while [Mutex] is locked. After that operations completed, method will call
* [limit] with [block] inside of [safely] and in case of exception will call internal [lock] method
*/
override suspend fun <T : Any> limit(request: Request<T>, block: suspend () -> T): T {
val key = requestKeyFactory(request)
while (true) { while (true) {
if (lockState.value) { // do nothing, just wait for unlock in case when mutex is presented in mutexesMap
lockState.first { it == false } lockMutex.withLock { mutexesMap[key] } ?.takeIf { it.isLocked } ?.withLock { }
}
var throwable: Throwable? = null var throwable: Throwable? = null
val result = safely({ val result = safely({
throwable = when (it) { throwable = when (it) {
is TooMuchRequestsException -> { is TooMuchRequestsException -> {
lock(it.retryAfter.leftToRetry) lock(key, it.retryAfter.leftToRetry)
it it
} }
is ClientRequestException -> { is ClientRequestException -> {
if (it.response.status == HttpStatusCode.TooManyRequests) { if (it.response.status == HttpStatusCode.TooManyRequests) {
lock(defaultTooManyRequestsDelay) lock(key, defaultTooManyRequestsDelay)
} else { } else {
throw it throw it
} }
@ -58,7 +90,7 @@ class ExceptionsOnlyLimiter(
} }
null null
}) { }) {
block() limit(block)
} }
if (throwable == null) { if (throwable == null) {
return result!! return result!!

View File

@ -1,8 +1,12 @@
package dev.inmo.tgbotapi.bot.settings.limiters package dev.inmo.tgbotapi.bot.settings.limiters
import dev.inmo.tgbotapi.requests.abstracts.Request
interface RequestLimiter { interface RequestLimiter {
/** /**
* Use limit for working of block (like delay between or after, for example) * Use limit for working of block (like delay between or after, for example)
*/ */
suspend fun <T> limit(block: suspend () -> T): T suspend fun <T> limit(block: suspend () -> T): T
suspend fun <T : Any> limit(request: Request<T>, block: suspend () -> T) = limit(block)
} }