PowLimiter and CommonLimiter rewriting (#210)

This commit is contained in:
InsanusMokrassar 2020-11-17 15:48:58 +06:00
parent f8cccc3e17
commit 0d19952ba7
3 changed files with 58 additions and 81 deletions

View File

@ -7,6 +7,7 @@
* `MicroUtils`: `0.4.0` -> `0.4.1` * `MicroUtils`: `0.4.0` -> `0.4.1`
* `Core`: * `Core`:
* `TelegramAPIUrlsKeeper` will fix ending of host url since this version * `TelegramAPIUrlsKeeper` will fix ending of host url since this version
* New mechanisms in`PowLimiter` and `CommonLimiter` has been added
## 0.30.6 ## 0.30.6

View File

@ -1,67 +1,43 @@
package dev.inmo.tgbotapi.bot.settings.limiters package dev.inmo.tgbotapi.bot.settings.limiters
import com.soywiz.klock.DateTime import com.soywiz.klock.DateTime
import dev.inmo.micro_utils.coroutines.*
import dev.inmo.tgbotapi.types.MilliSeconds
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.sync.Semaphore
import kotlinx.serialization.Serializable
import kotlinx.serialization.Transient
import kotlin.coroutines.Continuation
import kotlin.math.roundToLong
private fun now(): Long = DateTime.nowUnixLong() private fun now(): Long = DateTime.nowUnixLong()
@Serializable
class CommonLimiter( class CommonLimiter(
private val lockCount: Int = 10, private val lockCount: Int = 10,
private val regenTime: Long = 20 * 1000L // 20 seconds for full regen of opportunity to send message private val regenTime: MilliSeconds = 15 * 1000, // 15 seconds for full regen of opportunity to send message
@Transient
private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default)
) : RequestLimiter { ) : RequestLimiter {
private var doLimit: Boolean = false private val quotaSemaphore = Semaphore(lockCount)
private val counterRegeneratorJob = scope.launch {
private val counterChannel = Channel<Unit>(Channel.UNLIMITED) val regenDelay: MilliSeconds = (regenTime.toDouble() / lockCount).roundToLong()
private val scope = CoroutineScope(Dispatchers.Default) while (isActive) {
private val counterJob = scope.launch { delay(regenDelay)
var wasLastSecond = 0 if (quotaSemaphore.availablePermits < lockCount) {
var lastCountTime = now() try {
var limitManagementJob: Job? = null quotaSemaphore.release()
var removeLimitTime: Long = lastCountTime } catch (_: IllegalStateException) {
for (counter in counterChannel) { // Skip IllegalStateException due to the fact that this exception may happens in release method
val now = now()
if (now - lastCountTime > 1000) {
lastCountTime = now
wasLastSecond = 1
} else {
wasLastSecond++
}
if (wasLastSecond >= lockCount) {
removeLimitTime = now + regenTime
if (limitManagementJob == null) {
limitManagementJob = launch {
doLimit = true
var internalNow = now()
while (internalNow < removeLimitTime) {
delay(removeLimitTime - internalNow)
internalNow = now()
}
doLimit = false
}
} }
} }
if (now > removeLimitTime) {
limitManagementJob = null
}
}
}
private val quoterChannel = Channel<Unit>(Channel.CONFLATED)
private val tickerJob = scope.launch {
while (isActive) {
quoterChannel.send(Unit)
delay(1000L)
} }
} }
override suspend fun <T> limit(block: suspend () -> T): T { override suspend fun <T> limit(block: suspend () -> T): T {
counterChannel.send(Unit) quotaSemaphore.acquire()
return if (!doLimit) { return block()
block()
} else {
quoterChannel.receive()
block()
}
} }
} }

View File

@ -1,5 +1,7 @@
package dev.inmo.tgbotapi.bot.settings.limiters package dev.inmo.tgbotapi.bot.settings.limiters
import dev.inmo.micro_utils.coroutines.*
import dev.inmo.tgbotapi.types.MilliSeconds
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
@ -9,62 +11,60 @@ import kotlin.math.pow
private sealed class RequestEvent private sealed class RequestEvent
private class AddRequest( private class AddRequest(
val continuation: Continuation<Long> val continuation: Continuation<MilliSeconds>
) : RequestEvent() ) : RequestEvent()
private object CompleteRequest : RequestEvent() private object CompleteRequest : RequestEvent()
@Serializable @Serializable
data class PowLimiter( data class PowLimiter(
private val minAwaitTime: Long = 0L, private val minAwaitTime: MilliSeconds = 0L,
private val maxAwaitTime: Long = 10000L, private val maxAwaitTime: MilliSeconds = 10000L,
private val powValue: Double = 4.0, private val powValue: Double = 4.0,
private val powK: Double = 0.0016 private val powK: Double = 1.6,
@Transient
private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default)
) : RequestLimiter { ) : RequestLimiter {
@Transient
private val scope = CoroutineScope(Dispatchers.Default)
@Transient
private val eventsChannel = Channel<RequestEvent>(Channel.UNLIMITED)
@Transient @Transient
private val awaitTimeRange = minAwaitTime .. maxAwaitTime private val awaitTimeRange = minAwaitTime .. maxAwaitTime
@Transient
private val eventsChannel = let {
var requestsInWork = 0.0
scope.actor<RequestEvent> {
when (it) {
is AddRequest -> {
val awaitTime = (requestsInWork.pow(powValue) * powK).toLong()
requestsInWork++
init { it.continuation.resume(
scope.launch { when {
var requestsInWork: Double = 0.0 awaitTime in awaitTimeRange -> awaitTime
for (event in eventsChannel) { awaitTime < awaitTimeRange.first -> awaitTimeRange.first
when (event) { else -> awaitTimeRange.last
is AddRequest -> { }
val awaitTime = (((requestsInWork.pow(powValue) * powK) * 1000L).toLong()) )
requestsInWork++
event.continuation.resume(
if (awaitTime in awaitTimeRange) {
awaitTime
} else {
if (awaitTime < minAwaitTime) {
minAwaitTime
} else {
maxAwaitTime
}
}
)
}
is CompleteRequest -> requestsInWork--
} }
is CompleteRequest -> requestsInWork--
} }
} }
} }
override suspend fun <T> limit( private suspend inline fun <T> withDelay(
block: suspend () -> T crossinline block: suspend () -> T
): T { ): T {
val delayMillis = suspendCoroutine<Long> { val delayMillis = suspendCoroutine<Long> {
scope.launch { eventsChannel.send(AddRequest(it)) } scope.launch { eventsChannel.send(AddRequest(it)) }
} }
delay(delayMillis) delay(delayMillis)
return try { return try {
block() safely { block() }
} finally { } finally {
eventsChannel.send(CompleteRequest) eventsChannel.send(CompleteRequest)
} }
} }
override suspend fun <T> limit(
block: suspend () -> T
): T {
return withDelay(block)
}
} }