add managers

This commit is contained in:
InsanusMokrassar 2019-10-12 12:30:02 +06:00
parent f0f911e17b
commit bd12aa5610
7 changed files with 229 additions and 68 deletions

View File

@ -12,6 +12,12 @@
* Now `SauceNaoAPI` working with synchronous queue * Now `SauceNaoAPI` working with synchronous queue
* `SauceNaoAPI` now will wait for some time when one of limits will be achieved * `SauceNaoAPI` now will wait for some time when one of limits will be achieved
### 0.4.1 Managers experiments
* Add `TimeManager` - it will manage work with requests times
* Add `RequestQuotaMagager` - it will manage quota for requests and call suspend
if they will be over
## 0.3.0 ## 0.3.0
* Now `results` field of `SauceNaoAnswer` is optional and is empty list by default * Now `results` field of `SauceNaoAnswer` is optional and is empty list by default

View File

@ -1,4 +1,4 @@
project.version = "0.4.0" project.version = "0.4.1"
project.group = "com.github.insanusmokrassar" project.group = "com.github.insanusmokrassar"
buildscript { buildscript {
@ -15,7 +15,6 @@ buildscript {
} }
} }
apply plugin: 'java-library'
apply plugin: 'kotlin' apply plugin: 'kotlin'
apply plugin: 'kotlinx-serialization' apply plugin: 'kotlinx-serialization'

View File

@ -2,15 +2,13 @@ package com.github.insanusmokrassar.SauceNaoAPI
import kotlinx.coroutines.* import kotlinx.coroutines.*
fun main(vararg args: String) { suspend fun main(vararg args: String) {
val (key, requestUrl) = args val (key, requestUrl) = args
runBlocking {
val api = SauceNaoAPI(key, scope = GlobalScope) val api = SauceNaoAPI(key, scope = GlobalScope)
api.use { api.use {
println( println(
it.request(requestUrl) it.request(requestUrl)
) )
} }
}
} }

View File

@ -4,6 +4,8 @@ import com.github.insanusmokrassar.SauceNaoAPI.additional.LONG_TIME_RECALCULATIN
import com.github.insanusmokrassar.SauceNaoAPI.additional.SHORT_TIME_RECALCULATING_MILLIS import com.github.insanusmokrassar.SauceNaoAPI.additional.SHORT_TIME_RECALCULATING_MILLIS
import com.github.insanusmokrassar.SauceNaoAPI.exceptions.sauceNaoAPIException import com.github.insanusmokrassar.SauceNaoAPI.exceptions.sauceNaoAPIException
import com.github.insanusmokrassar.SauceNaoAPI.models.SauceNaoAnswer import com.github.insanusmokrassar.SauceNaoAPI.models.SauceNaoAnswer
import com.github.insanusmokrassar.SauceNaoAPI.utils.*
import com.github.insanusmokrassar.SauceNaoAPI.utils.calculateSleepTime
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import io.ktor.client.engine.okhttp.OkHttp import io.ktor.client.engine.okhttp.OkHttp
import io.ktor.client.features.ClientRequestException import io.ktor.client.features.ClientRequestException
@ -38,31 +40,23 @@ data class SauceNaoAPI(
private val logger = Logger.getLogger("SauceNaoAPI") private val logger = Logger.getLogger("SauceNaoAPI")
private val requestsChannel = Channel<Pair<Continuation<SauceNaoAnswer>, HttpRequestBuilder>>(Channel.UNLIMITED) private val requestsChannel = Channel<Pair<Continuation<SauceNaoAnswer>, HttpRequestBuilder>>(Channel.UNLIMITED)
private val requestsSendTimes = mutableListOf<DateTime>() private val timeManager = TimeManager(scope)
private val quotaManager = RequestQuotaManager(scope)
init { private val requestsJob = scope.launch {
scope.launch {
for ((callback, requestBuilder) in requestsChannel) { for ((callback, requestBuilder) in requestsChannel) {
try { try {
quotaManager.getQuota()
val answer = makeRequest(requestBuilder) val answer = makeRequest(requestBuilder)
callback.resumeWith(Result.success(answer)) callback.resumeWith(Result.success(answer))
val sleepUntil = if (answer.header.longRemaining < 1) { quotaManager.updateQuota(answer.header, timeManager)
getMostOldestInLongPeriod() ?.plusMillis(LONG_TIME_RECALCULATING_MILLIS)
} else {
if (answer.header.shortRemaining < 1) {
getMostOldestInShortPeriod() ?.plusMillis(SHORT_TIME_RECALCULATING_MILLIS)
} else {
null
}
}
sleepUntil ?.also { _ ->
logger.warning("LONG LIMIT REACHED, SLEEP UNTIL $sleepUntil")
delay(sleepUntil.millis - DateTime.now().millis)
}
} catch (e: Exception) { } catch (e: Exception) {
try {
callback.resumeWith(Result.failure(e)) callback.resumeWith(Result.failure(e))
} catch (e: IllegalStateException) { // may happen when already resumed and api was closed
// do nothing
} }
} }
} }
@ -121,7 +115,7 @@ data class SauceNaoAPI(
val call = client.execute(builder) val call = client.execute(builder)
val answerText = call.response.readText() val answerText = call.response.readText()
logger.info(answerText) logger.info(answerText)
addRequestTimesAndClear() timeManager.addTimeAndClear()
Json.nonstrict.parse( Json.nonstrict.parse(
SauceNaoAnswer.serializer(), SauceNaoAnswer.serializer(),
answerText answerText
@ -131,40 +125,6 @@ data class SauceNaoAPI(
} }
} }
private fun addRequestTimesAndClear() {
val newDateTime = DateTime.now()
clearRequestTimes(newDateTime)
requestsSendTimes.add(newDateTime)
}
private fun clearRequestTimes(relatedTo: DateTime = DateTime.now()) {
val limitValue = relatedTo.minusMillis(LONG_TIME_RECALCULATING_MILLIS)
requestsSendTimes.removeAll {
it < limitValue
}
}
private fun getMostOldestInLongPeriod(): DateTime? {
clearRequestTimes()
return requestsSendTimes.min()
}
private fun getMostOldestInShortPeriod(): DateTime? {
val now = DateTime.now()
val limitTime = now.minusMillis(SHORT_TIME_RECALCULATING_MILLIS)
clearRequestTimes(now)
return requestsSendTimes.asSequence().filter {
limitTime < it
}.min()
}
private suspend fun makeRequest( private suspend fun makeRequest(
url: String, url: String,
db: Int? = null, db: Int? = null,
@ -193,6 +153,8 @@ data class SauceNaoAPI(
override fun close() { override fun close() {
requestsChannel.close() requestsChannel.close()
client.close() client.close()
requestsSendTimes.clear() requestsJob.cancel()
timeManager.close()
quotaManager.close()
} }
} }

View File

@ -0,0 +1,74 @@
package com.github.insanusmokrassar.SauceNaoAPI.utils
import com.github.insanusmokrassar.SauceNaoAPI.additional.LONG_TIME_RECALCULATING_MILLIS
import com.github.insanusmokrassar.SauceNaoAPI.additional.SHORT_TIME_RECALCULATING_MILLIS
import com.github.insanusmokrassar.SauceNaoAPI.models.Header
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.io.core.Closeable
import org.joda.time.DateTime
import kotlin.coroutines.suspendCoroutine
import kotlin.math.min
class RequestQuotaManager (
private val scope: CoroutineScope
) : Closeable {
private var longQuota = 1
private var shortQuota = 1
private var longMaxQuota = 1
private var shortMaxQuota = 1
private val quotaActions = Channel<suspend () -> Unit>(Channel.UNLIMITED)
private val quotaJob = scope.launch {
for (callback in quotaActions) {
callback()
}
}
suspend fun updateQuota(header: Header, timeManager: TimeManager) {
quotaActions.send(
suspend {
longMaxQuota = header.longLimit
shortMaxQuota = header.shortLimit
longQuota = min(header.longLimit, header.longRemaining)
shortQuota = min(header.shortLimit, header.shortRemaining)
when {
shortQuota < 1 -> timeManager.getMostOldestInShortPeriod() ?.millis ?.plus(SHORT_TIME_RECALCULATING_MILLIS) ?: let {
shortQuota = 1
null
}
longQuota < 1 -> timeManager.getMostOldestInLongPeriod() ?.millis ?.plus(LONG_TIME_RECALCULATING_MILLIS) ?: let {
longQuota = 1
null
}
else -> null
} ?.let {
delay(it - DateTime.now().millis)
}
Unit
}
)
}
suspend fun getQuota() {
return suspendCoroutine {
lateinit var callback: suspend () -> Unit
callback = suspend {
if (longQuota > 0 && shortQuota > 0) {
it.resumeWith(Result.success(Unit))
} else {
quotaActions.send(callback)
}
}
quotaActions.offer(callback)
}
}
override fun close() {
quotaJob.cancel()
quotaActions.close()
}
}

View File

@ -0,0 +1,18 @@
package com.github.insanusmokrassar.SauceNaoAPI.utils
import com.github.insanusmokrassar.SauceNaoAPI.additional.LONG_TIME_RECALCULATING_MILLIS
import com.github.insanusmokrassar.SauceNaoAPI.additional.SHORT_TIME_RECALCULATING_MILLIS
import com.github.insanusmokrassar.SauceNaoAPI.models.Header
import org.joda.time.DateTime
internal suspend fun calculateSleepTime(
header: Header,
mostOldestInShortPeriodGetter: suspend () -> DateTime?,
mostOldestInLongPeriodGetter: suspend () -> DateTime?
): DateTime? {
return when {
header.longRemaining < 1 -> mostOldestInLongPeriodGetter() ?.plusMillis(LONG_TIME_RECALCULATING_MILLIS)
header.shortRemaining < 1 -> mostOldestInShortPeriodGetter() ?.plusMillis(SHORT_TIME_RECALCULATING_MILLIS)
else -> null
}
}

View File

@ -0,0 +1,104 @@
package com.github.insanusmokrassar.SauceNaoAPI.utils
import com.github.insanusmokrassar.SauceNaoAPI.additional.LONG_TIME_RECALCULATING_MILLIS
import com.github.insanusmokrassar.SauceNaoAPI.additional.SHORT_TIME_RECALCULATING_MILLIS
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.io.core.Closeable
import org.joda.time.DateTime
import kotlin.coroutines.Continuation
import kotlin.coroutines.suspendCoroutine
private fun MutableList<DateTime>.clearTooOldTimes(relatedTo: DateTime = DateTime.now()) {
val limitValue = relatedTo.minusMillis(LONG_TIME_RECALCULATING_MILLIS)
removeAll {
it < limitValue
}
}
private interface TimeManagerAction {
suspend fun makeChangeWith(times: MutableList<DateTime>)
suspend operator fun invoke(times: MutableList<DateTime>) = makeChangeWith(times)
}
private data class TimeManagerClean(private val relatedTo: DateTime = DateTime.now()) : TimeManagerAction {
override suspend fun makeChangeWith(times: MutableList<DateTime>) {
times.clearTooOldTimes(relatedTo)
}
}
private data class TimeManagerTimeAdder(
private val time: DateTime = DateTime.now()
) : TimeManagerAction {
override suspend fun makeChangeWith(times: MutableList<DateTime>) {
times.add(time)
times.clearTooOldTimes()
}
}
private data class TimeManagerMostOldestInLongGetter(
private val continuation: Continuation<DateTime?>
) : TimeManagerAction {
override suspend fun makeChangeWith(times: MutableList<DateTime>) {
times.clearTooOldTimes()
continuation.resumeWith(Result.success(times.min()))
}
}
private data class TimeManagerMostOldestInShortGetter(
private val continuation: Continuation<DateTime?>
) : TimeManagerAction {
override suspend fun makeChangeWith(times: MutableList<DateTime>) {
times.clearTooOldTimes()
val now = DateTime.now()
val limitTime = now.minusMillis(SHORT_TIME_RECALCULATING_MILLIS)
continuation.resumeWith(
Result.success(
times.asSequence().filter {
limitTime < it
}.min()
)
)
}
}
class TimeManager(
scope: CoroutineScope
) : Closeable {
private val actionsChannel = Channel<TimeManagerAction>(Channel.UNLIMITED)
private val timeUpdateJob = scope.launch {
val times = mutableListOf<DateTime>()
for (action in actionsChannel) {
action(times)
}
}
suspend fun addTimeAndClear() {
actionsChannel.send(TimeManagerTimeAdder())
}
suspend fun getMostOldestInLongPeriod(): DateTime? {
return suspendCoroutine {
actionsChannel.offer(
TimeManagerMostOldestInLongGetter(it)
)
}
}
suspend fun getMostOldestInShortPeriod(): DateTime? {
return suspendCoroutine {
actionsChannel.offer(TimeManagerMostOldestInShortGetter(it))
}
}
override fun close() {
actionsChannel.close()
timeUpdateJob.cancel()
}
}