fixes and improvements

This commit is contained in:
InsanusMokrassar 2020-03-17 21:24:04 +06:00
parent d61aa8b50e
commit 71c0b688e8
4 changed files with 93 additions and 59 deletions

View File

@ -26,6 +26,8 @@
* `MessageUpdate` now is implementing `BaseSentMessageUpdate` interface * `MessageUpdate` now is implementing `BaseSentMessageUpdate` interface
* `UpdatesPoller` and all its usages, childs and childs usages now are deprecated * `UpdatesPoller` and all its usages, childs and childs usages now are deprecated
* `GetUpdates#timeout` type now is `Seconds` (in fact it is `Int` as previously) * `GetUpdates#timeout` type now is `Seconds` (in fact it is `Int` as previously)
* `KtorRequestsExecutor` now is using a copy of incoming `HttpClient` object and install `HttpTimeout` feature
* `AbstractRequestCallFactory` now setting up a custom delay in case if request is `GetUpdates`
* `TelegramBotAPI-extensions-api`: * `TelegramBotAPI-extensions-api`:
* All functions from `com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.UpdatesPolling` now available * All functions from `com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.UpdatesPolling` now available
in package `com.github.insanusmokrassar.TelegramBotAPI.extensions.api.updates.UpdatesPolling` in package `com.github.insanusmokrassar.TelegramBotAPI.extensions.api.updates.UpdatesPolling`

View File

@ -1,6 +1,7 @@
package com.github.insanusmokrassar.TelegramBotAPI.extensions.api.updates package com.github.insanusmokrassar.TelegramBotAPI.extensions.api.updates
import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor
import com.github.insanusmokrassar.TelegramBotAPI.bot.exceptions.RequestException
import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.InternalUtils.convertWithMediaGroupUpdates import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.InternalUtils.convertWithMediaGroupUpdates
import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.InternalUtils.lastUpdateIdentifier import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.InternalUtils.lastUpdateIdentifier
import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.getUpdates import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.getUpdates
@ -10,10 +11,11 @@ import com.github.insanusmokrassar.TelegramBotAPI.types.update.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdates.* import com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdates.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.* import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.*
import io.ktor.client.features.HttpRequestTimeoutException
import kotlinx.coroutines.* import kotlinx.coroutines.*
fun RequestsExecutor.startGettingOfUpdates( fun RequestsExecutor.startGettingOfUpdates(
timeoutMillis: Seconds = 30, timeoutSeconds: Seconds = 30,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default), scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
allowedUpdates: List<String>? = null, allowedUpdates: List<String>? = null,
updatesReceiver: UpdateReceiver<Update> updatesReceiver: UpdateReceiver<Update>
@ -21,30 +23,37 @@ fun RequestsExecutor.startGettingOfUpdates(
var lastUpdateIdentifier: UpdateIdentifier? = null var lastUpdateIdentifier: UpdateIdentifier? = null
while (isActive) { while (isActive) {
supervisorScope { try {
val updates = getUpdates(
offset = lastUpdateIdentifier,
timeout = timeoutMillis,
allowed_updates = allowedUpdates
).convertWithMediaGroupUpdates()
supervisorScope { supervisorScope {
for (update in updates) { val updates = getUpdates(
updatesReceiver(update) offset = lastUpdateIdentifier?.plus(1),
timeout = timeoutSeconds,
allowed_updates = allowedUpdates
).convertWithMediaGroupUpdates()
lastUpdateIdentifier = update.lastUpdateIdentifier() supervisorScope {
for (update in updates) {
updatesReceiver(update)
lastUpdateIdentifier = update.lastUpdateIdentifier()
}
} }
} }
} catch (e: HttpRequestTimeoutException) {
e // it is ok due to mechanism of long polling
} catch (e: RequestException) {
e // it is not ok, but in most cases it will mean that there is some limit for requests count
delay(1000L)
} }
} }
} }
fun RequestsExecutor.startGettingOfUpdates( fun RequestsExecutor.startGettingOfUpdates(
updatesFilter: UpdatesFilter, updatesFilter: UpdatesFilter,
timeoutMillis: Seconds = 30, timeoutSeconds: Seconds = 30,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default) scope: CoroutineScope = CoroutineScope(Dispatchers.Default)
): Job = startGettingOfUpdates( ): Job = startGettingOfUpdates(
timeoutMillis, timeoutSeconds,
scope, scope,
updatesFilter.allowedUpdates, updatesFilter.allowedUpdates,
updatesFilter.asUpdateReceiver updatesFilter.asUpdateReceiver
@ -66,7 +75,7 @@ fun RequestsExecutor.startGettingOfUpdates(
preCheckoutQueryCallback: UpdateReceiver<PreCheckoutQueryUpdate>? = null, preCheckoutQueryCallback: UpdateReceiver<PreCheckoutQueryUpdate>? = null,
pollCallback: UpdateReceiver<PollUpdate>? = null, pollCallback: UpdateReceiver<PollUpdate>? = null,
pollAnswerCallback: UpdateReceiver<PollAnswerUpdate>? = null, pollAnswerCallback: UpdateReceiver<PollAnswerUpdate>? = null,
timeoutMillis: Seconds = 30, timeoutSeconds: Seconds = 30,
scope: CoroutineScope = GlobalScope scope: CoroutineScope = GlobalScope
): Job { ): Job {
return startGettingOfUpdates( return startGettingOfUpdates(
@ -87,7 +96,7 @@ fun RequestsExecutor.startGettingOfUpdates(
pollCallback, pollCallback,
pollAnswerCallback pollAnswerCallback
), ),
timeoutMillis, timeoutSeconds,
scope scope
) )
} }
@ -105,7 +114,7 @@ fun RequestsExecutor.startGettingOfUpdates(
preCheckoutQueryCallback: UpdateReceiver<PreCheckoutQueryUpdate>? = null, preCheckoutQueryCallback: UpdateReceiver<PreCheckoutQueryUpdate>? = null,
pollCallback: UpdateReceiver<PollUpdate>? = null, pollCallback: UpdateReceiver<PollUpdate>? = null,
pollAnswerCallback: UpdateReceiver<PollAnswerUpdate>? = null, pollAnswerCallback: UpdateReceiver<PollAnswerUpdate>? = null,
timeoutMillis: Seconds = 30, timeoutSeconds: Seconds = 30,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default) scope: CoroutineScope = CoroutineScope(Dispatchers.Default)
): Job = startGettingOfUpdates( ): Job = startGettingOfUpdates(
messageCallback = messageCallback, messageCallback = messageCallback,
@ -123,6 +132,6 @@ fun RequestsExecutor.startGettingOfUpdates(
preCheckoutQueryCallback = preCheckoutQueryCallback, preCheckoutQueryCallback = preCheckoutQueryCallback,
pollCallback = pollCallback, pollCallback = pollCallback,
pollAnswerCallback = pollAnswerCallback, pollAnswerCallback = pollAnswerCallback,
timeoutMillis = timeoutMillis, timeoutSeconds = timeoutSeconds,
scope = scope scope = scope
) )

View File

@ -13,14 +13,16 @@ import com.github.insanusmokrassar.TelegramBotAPI.utils.TelegramAPIUrlsKeeper
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import io.ktor.client.call.receive import io.ktor.client.call.receive
import io.ktor.client.features.ClientRequestException import io.ktor.client.features.ClientRequestException
import io.ktor.client.features.HttpTimeout
import io.ktor.client.statement.HttpStatement import io.ktor.client.statement.HttpStatement
import io.ktor.client.statement.readText import io.ktor.client.statement.readText
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.supervisorScope
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
class KtorRequestsExecutor( class KtorRequestsExecutor(
telegramAPIUrlsKeeper: TelegramAPIUrlsKeeper, telegramAPIUrlsKeeper: TelegramAPIUrlsKeeper,
private val client: HttpClient = HttpClient(), client: HttpClient = HttpClient(),
callsFactories: List<KtorCallFactory> = emptyList(), callsFactories: List<KtorCallFactory> = emptyList(),
excludeDefaultFactories: Boolean = false, excludeDefaultFactories: Boolean = false,
private val requestsLimiter: RequestLimiter = EmptyLimiter, private val requestsLimiter: RequestLimiter = EmptyLimiter,
@ -34,50 +36,59 @@ class KtorRequestsExecutor(
} }
} }
private val client = client.config {
install(HttpTimeout)
}
override suspend fun <T : Any> execute(request: Request<T>): T { override suspend fun <T : Any> execute(request: Request<T>): T {
return requestsLimiter.limit { return try {
var statement: HttpStatement? = null supervisorScope {
for (factory in callsFactories) { requestsLimiter.limit {
statement = factory.prepareCall( var statement: HttpStatement? = null
client, for (factory in callsFactories) {
telegramAPIUrlsKeeper.commonAPIUrl, statement = factory.prepareCall(
request client,
) telegramAPIUrlsKeeper.commonAPIUrl,
if (statement != null) { request
break )
if (statement != null) {
break
}
}
val response = statement?.execute() ?: throw IllegalArgumentException("Can't execute request: $request")
val content = response.receive<String>()
val responseObject = jsonFormatter.parse(Response.serializer(), content)
(responseObject.result?.let {
jsonFormatter.fromJson(request.resultDeserializer, it)
} ?: responseObject.parameters?.let {
val error = it.error
if (error is RetryAfterError) {
delay(error.leftToRetry)
execute(request)
} else {
null
}
} ?: response.let {
throw newRequestException(
responseObject,
content,
"Can't get result object from $content"
)
})
} }
} }
try { } catch (e: ClientRequestException) {
val response = statement ?.execute() ?: throw IllegalArgumentException("Can't execute request: $request") val content = e.response.readText()
val content = response.receive<String>() val responseObject = jsonFormatter.parse(Response.serializer(), content)
val responseObject = jsonFormatter.parse(Response.serializer(), content) throw newRequestException(
responseObject,
(responseObject.result?.let { content,
jsonFormatter.fromJson(request.resultDeserializer, it) "Can't get result object from $content"
} ?: responseObject.parameters?.let { )
val error = it.error } catch (e: Exception) {
if (error is RetryAfterError) { throw e
delay(error.leftToRetry)
execute(request)
} else {
null
}
} ?: response.let {
throw newRequestException(
responseObject,
content,
"Can't get result object from $content"
)
})
} catch (e: ClientRequestException) {
val content = e.response.readText()
val responseObject = jsonFormatter.parse(Response.serializer(), content)
throw newRequestException(
responseObject,
content,
"Can't get result object from $content"
)
}
} }
} }

View File

@ -1,8 +1,10 @@
package com.github.insanusmokrassar.TelegramBotAPI.bot.Ktor.base package com.github.insanusmokrassar.TelegramBotAPI.bot.Ktor.base
import com.github.insanusmokrassar.TelegramBotAPI.bot.Ktor.KtorCallFactory import com.github.insanusmokrassar.TelegramBotAPI.bot.Ktor.KtorCallFactory
import com.github.insanusmokrassar.TelegramBotAPI.requests.GetUpdates
import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.Request import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.Request
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import io.ktor.client.features.timeout
import io.ktor.client.request.* import io.ktor.client.request.*
import io.ktor.client.statement.HttpStatement import io.ktor.client.statement.HttpStatement
import io.ktor.http.ContentType import io.ktor.http.ContentType
@ -28,6 +30,16 @@ abstract class AbstractRequestCallFactory : KtorCallFactory {
method = HttpMethod.Post method = HttpMethod.Post
accept(ContentType.Application.Json) accept(ContentType.Application.Json)
if (request is GetUpdates) {
request.timeout ?.times(1000L) ?.let { customTimeoutMillis ->
if (customTimeoutMillis > 0) {
timeout {
requestTimeoutMillis = customTimeoutMillis
}
}
}
}
body = preparedBody body = preparedBody
}, },
client client