mirror of
https://github.com/InsanusMokrassar/TelegramBotAPI.git
synced 2024-11-22 16:23:48 +00:00
suspend inline function handleSafely was added
This commit is contained in:
parent
7668c48081
commit
8c76283db5
@ -50,6 +50,9 @@
|
|||||||
case will be created `UnknownUpdateType`
|
case will be created `UnknownUpdateType`
|
||||||
* `UnknownUpdateType$rawJson` value now is included (`JsonElement`)
|
* `UnknownUpdateType$rawJson` value now is included (`JsonElement`)
|
||||||
* **EXPERIMENTALLY** `BaseEditMessageUpdate#data` now is `CommonMessage<*>`
|
* **EXPERIMENTALLY** `BaseEditMessageUpdate#data` now is `CommonMessage<*>`
|
||||||
|
* Suspend inline function `handleSafely` was added
|
||||||
|
* `KtorRequestsExecutor` now use `handleSafely` instead of `try` with `supervisorScope`
|
||||||
|
* `UpdatesPolling` now use `handleSafely` instead of `try` with `supervisorScope`
|
||||||
|
|
||||||
### 0.26.2
|
### 0.26.2
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ import com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdates
|
|||||||
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update
|
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update
|
||||||
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.*
|
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.*
|
||||||
import com.github.insanusmokrassar.TelegramBotAPI.utils.PreviewFeature
|
import com.github.insanusmokrassar.TelegramBotAPI.utils.PreviewFeature
|
||||||
|
import com.github.insanusmokrassar.TelegramBotAPI.utils.handleSafely
|
||||||
import io.ktor.client.features.HttpRequestTimeoutException
|
import io.ktor.client.features.HttpRequestTimeoutException
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
|
|
||||||
@ -24,42 +25,44 @@ fun RequestsExecutor.startGettingOfUpdates(
|
|||||||
var lastUpdateIdentifier: UpdateIdentifier? = null
|
var lastUpdateIdentifier: UpdateIdentifier? = null
|
||||||
|
|
||||||
while (isActive) {
|
while (isActive) {
|
||||||
try {
|
handleSafely(
|
||||||
supervisorScope {
|
{ e ->
|
||||||
val updates = getUpdates(
|
when (e) {
|
||||||
offset = lastUpdateIdentifier?.plus(1),
|
is HttpRequestTimeoutException -> exceptionsHandler ?.invoke(e)
|
||||||
timeout = timeoutSeconds,
|
is RequestException -> {
|
||||||
allowed_updates = allowedUpdates
|
exceptionsHandler ?.invoke(e)
|
||||||
).let { originalUpdates ->
|
delay(1000L)
|
||||||
val converted = originalUpdates.convertWithMediaGroupUpdates()
|
|
||||||
/**
|
|
||||||
* Dirty hack for cases when the media group was retrieved not fully:
|
|
||||||
*
|
|
||||||
* We are throw out the last media group and will reretrieve it again in the next get updates
|
|
||||||
* and it will guarantee that it is full
|
|
||||||
*/
|
|
||||||
if (originalUpdates.size == getUpdatesLimit.last && converted.last() is SentMediaGroupUpdate) {
|
|
||||||
converted - converted.last()
|
|
||||||
} else {
|
|
||||||
converted
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
supervisorScope {
|
|
||||||
for (update in updates) {
|
|
||||||
updatesReceiver(update)
|
|
||||||
|
|
||||||
lastUpdateIdentifier = update.lastUpdateIdentifier()
|
|
||||||
}
|
}
|
||||||
|
else -> exceptionsHandler ?.invoke(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
) {
|
||||||
|
val updates = getUpdates(
|
||||||
|
offset = lastUpdateIdentifier?.plus(1),
|
||||||
|
timeout = timeoutSeconds,
|
||||||
|
allowed_updates = allowedUpdates
|
||||||
|
).let { originalUpdates ->
|
||||||
|
val converted = originalUpdates.convertWithMediaGroupUpdates()
|
||||||
|
/**
|
||||||
|
* Dirty hack for cases when the media group was retrieved not fully:
|
||||||
|
*
|
||||||
|
* We are throw out the last media group and will reretrieve it again in the next get updates
|
||||||
|
* and it will guarantee that it is full
|
||||||
|
*/
|
||||||
|
if (originalUpdates.size == getUpdatesLimit.last && converted.last() is SentMediaGroupUpdate) {
|
||||||
|
converted - converted.last()
|
||||||
|
} else {
|
||||||
|
converted
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
handleSafely {
|
||||||
|
for (update in updates) {
|
||||||
|
updatesReceiver(update)
|
||||||
|
|
||||||
|
lastUpdateIdentifier = update.lastUpdateIdentifier()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (e: HttpRequestTimeoutException) {
|
|
||||||
exceptionsHandler ?.invoke(e) // it is ok due to mechanism of long polling
|
|
||||||
} catch (e: RequestException) {
|
|
||||||
exceptionsHandler ?.invoke(e) // it is not ok, but in most cases it will mean that there is some limit for requests count
|
|
||||||
delay(1000L)
|
|
||||||
} catch (e: Exception) {
|
|
||||||
exceptionsHandler ?.invoke(e)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,7 @@ import com.github.insanusmokrassar.TelegramBotAPI.bot.settings.limiters.RequestL
|
|||||||
import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.Request
|
import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.Request
|
||||||
import com.github.insanusmokrassar.TelegramBotAPI.types.Response
|
import com.github.insanusmokrassar.TelegramBotAPI.types.Response
|
||||||
import com.github.insanusmokrassar.TelegramBotAPI.types.RetryAfterError
|
import com.github.insanusmokrassar.TelegramBotAPI.types.RetryAfterError
|
||||||
import com.github.insanusmokrassar.TelegramBotAPI.utils.TelegramAPIUrlsKeeper
|
import com.github.insanusmokrassar.TelegramBotAPI.utils.*
|
||||||
import com.github.insanusmokrassar.TelegramBotAPI.utils.nonstrictJsonFormat
|
import com.github.insanusmokrassar.TelegramBotAPI.utils.nonstrictJsonFormat
|
||||||
import io.ktor.client.HttpClient
|
import io.ktor.client.HttpClient
|
||||||
import io.ktor.client.call.receive
|
import io.ktor.client.call.receive
|
||||||
@ -17,7 +17,6 @@ import io.ktor.client.features.*
|
|||||||
import io.ktor.client.statement.HttpStatement
|
import io.ktor.client.statement.HttpStatement
|
||||||
import io.ktor.client.statement.readText
|
import io.ktor.client.statement.readText
|
||||||
import kotlinx.coroutines.delay
|
import kotlinx.coroutines.delay
|
||||||
import kotlinx.coroutines.supervisorScope
|
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
|
|
||||||
class KtorRequestsExecutor(
|
class KtorRequestsExecutor(
|
||||||
@ -43,54 +42,56 @@ class KtorRequestsExecutor(
|
|||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun <T : Any> execute(request: Request<T>): T {
|
override suspend fun <T : Any> execute(request: Request<T>): T {
|
||||||
return try {
|
return handleSafely(
|
||||||
supervisorScope {
|
{ e ->
|
||||||
requestsLimiter.limit {
|
throw if (e is ClientRequestException) {
|
||||||
var statement: HttpStatement? = null
|
val content = e.response.readText()
|
||||||
for (factory in callsFactories) {
|
|
||||||
statement = factory.prepareCall(
|
|
||||||
client,
|
|
||||||
telegramAPIUrlsKeeper.commonAPIUrl,
|
|
||||||
request
|
|
||||||
)
|
|
||||||
if (statement != null) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
val response = statement?.execute() ?: throw IllegalArgumentException("Can't execute request: $request")
|
|
||||||
val content = response.receive<String>()
|
|
||||||
val responseObject = jsonFormatter.parse(Response.serializer(), content)
|
val responseObject = jsonFormatter.parse(Response.serializer(), content)
|
||||||
|
newRequestException(
|
||||||
(responseObject.result?.let {
|
responseObject,
|
||||||
jsonFormatter.fromJson(request.resultDeserializer, it)
|
content,
|
||||||
} ?: responseObject.parameters?.let {
|
"Can't get result object from $content"
|
||||||
val error = it.error
|
)
|
||||||
if (error is RetryAfterError) {
|
} else {
|
||||||
delay(error.leftToRetry)
|
e
|
||||||
execute(request)
|
|
||||||
} else {
|
|
||||||
null
|
|
||||||
}
|
|
||||||
} ?: response.let {
|
|
||||||
throw newRequestException(
|
|
||||||
responseObject,
|
|
||||||
content,
|
|
||||||
"Can't get result object from $content"
|
|
||||||
)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (e: ClientRequestException) {
|
) {
|
||||||
val content = e.response.readText()
|
requestsLimiter.limit {
|
||||||
val responseObject = jsonFormatter.parse(Response.serializer(), content)
|
var statement: HttpStatement? = null
|
||||||
throw newRequestException(
|
for (factory in callsFactories) {
|
||||||
responseObject,
|
statement = factory.prepareCall(
|
||||||
content,
|
client,
|
||||||
"Can't get result object from $content"
|
telegramAPIUrlsKeeper.commonAPIUrl,
|
||||||
)
|
request
|
||||||
} catch (e: Exception) {
|
)
|
||||||
throw e
|
if (statement != null) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val response = statement?.execute() ?: throw IllegalArgumentException("Can't execute request: $request")
|
||||||
|
val content = response.receive<String>()
|
||||||
|
val responseObject = jsonFormatter.parse(Response.serializer(), content)
|
||||||
|
|
||||||
|
(responseObject.result?.let {
|
||||||
|
jsonFormatter.fromJson(request.resultDeserializer, it)
|
||||||
|
} ?: responseObject.parameters?.let {
|
||||||
|
val error = it.error
|
||||||
|
if (error is RetryAfterError) {
|
||||||
|
delay(error.leftToRetry)
|
||||||
|
execute(request)
|
||||||
|
} else {
|
||||||
|
null
|
||||||
|
}
|
||||||
|
} ?: response.let {
|
||||||
|
throw newRequestException(
|
||||||
|
responseObject,
|
||||||
|
content,
|
||||||
|
"Can't get result object from $content"
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,21 @@
|
|||||||
|
package com.github.insanusmokrassar.TelegramBotAPI.utils
|
||||||
|
|
||||||
|
import kotlinx.coroutines.CoroutineScope
|
||||||
|
import kotlinx.coroutines.supervisorScope
|
||||||
|
|
||||||
|
/**
|
||||||
|
* It will run [block] inside of [supervisorScope] to avoid problems with catching of exceptions
|
||||||
|
*
|
||||||
|
* @param [onException] Will be called when happen exception inside of [block]. By default will throw exception - this
|
||||||
|
* exception will be available for catching
|
||||||
|
*/
|
||||||
|
suspend inline fun <T> handleSafely(
|
||||||
|
noinline onException: suspend (Exception) -> T = { throw it },
|
||||||
|
noinline block: suspend CoroutineScope.() -> T
|
||||||
|
): T {
|
||||||
|
return try {
|
||||||
|
supervisorScope(block)
|
||||||
|
} catch (e: Exception) {
|
||||||
|
onException(e)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user