From de7af5f0e71f0529a15ec4075901f3d16790b889 Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Tue, 28 May 2019 19:13:01 +0800 Subject: [PATCH] fixes in long-polling, adding of UpdatesPoller as abstraction and rename of old UpdatesPoller to HtorUpdatesPoller --- CHANGELOG.md | 9 +++ build.gradle | 1 + .../TelegramBotAPI/bot/UpdatesPoller.kt | 8 ++ ...{UpdatesPoller.kt => KtorUpdatesPoller.kt} | 78 +++++++++++++++---- .../utils/extensions/UpdatesPoller.kt | 12 --- .../utils/extensions/UpdatesPolling.kt | 35 +++++---- 6 files changed, 102 insertions(+), 41 deletions(-) create mode 100644 src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/bot/UpdatesPoller.kt rename src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/updateshandlers/{UpdatesPoller.kt => KtorUpdatesPoller.kt} (57%) delete mode 100644 src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPoller.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 2880836494..e4a93c0037 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ ## 0.15.0 +* Old `UpdatesPoller` removed (was deprecated) +* `UpdatesPoller` renamed to `KtorUpdatesPoller` +* Now `KtorUpdatesPoller` do not use additional delay between requests and await answer from Telegram all timeout time +* Added abstraction `UpdatesPoller` +* Changed signature of the most count of `startGettingOfUpdates`: + * They are not `suspend` for now + * They are return `UpdatesPoller` + * They are using `timeoutMillis` instead of `requestsDelayMillis` + ## 0.14.0 * Now library have no default engine for both webhooks and requests executor. It is required for clients to set diff --git a/build.gradle b/build.gradle index cd0d9f1de1..a09fe01bf9 100644 --- a/build.gradle +++ b/build.gradle @@ -35,6 +35,7 @@ dependencies { implementation "joda-time:joda-time:$joda_time_version" implementation "io.ktor:ktor-client:$ktor_version" + implementation "io.ktor:ktor-client-cio:$ktor_version" implementation "io.ktor:ktor-server:$ktor_version" implementation "io.ktor:ktor-server-host-common:$ktor_version" diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/bot/UpdatesPoller.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/bot/UpdatesPoller.kt new file mode 100644 index 0000000000..c80bfa2610 --- /dev/null +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/bot/UpdatesPoller.kt @@ -0,0 +1,8 @@ +package com.github.insanusmokrassar.TelegramBotAPI.bot + +import kotlinx.coroutines.* +import kotlinx.io.core.Closeable + +interface UpdatesPoller : Closeable { + fun start(scope: CoroutineScope = CoroutineScope(Dispatchers.Default)) +} \ No newline at end of file diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/updateshandlers/UpdatesPoller.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/updateshandlers/KtorUpdatesPoller.kt similarity index 57% rename from src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/updateshandlers/UpdatesPoller.kt rename to src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/updateshandlers/KtorUpdatesPoller.kt index d146895c6c..4babe89cc6 100644 --- a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/updateshandlers/UpdatesPoller.kt +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/updateshandlers/KtorUpdatesPoller.kt @@ -1,8 +1,11 @@ package com.github.insanusmokrassar.TelegramBotAPI.updateshandlers +import com.github.insanusmokrassar.TelegramBotAPI.bot.Ktor.KtorRequestsExecutor import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor +import com.github.insanusmokrassar.TelegramBotAPI.bot.UpdatesPoller import com.github.insanusmokrassar.TelegramBotAPI.requests.GetUpdates import com.github.insanusmokrassar.TelegramBotAPI.requests.webhook.DeleteWebhook +import com.github.insanusmokrassar.TelegramBotAPI.types.ALL_UPDATES_LIST import com.github.insanusmokrassar.TelegramBotAPI.types.UpdateIdentifier import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate @@ -10,23 +13,58 @@ import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update import com.github.insanusmokrassar.TelegramBotAPI.utils.* import com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.UpdateReceiver import com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.executeUnsafe +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO import kotlinx.coroutines.* -import java.util.concurrent.Executors -class UpdatesPoller( +fun KtorUpdatesPoller( + token: String, + timeoutSeconds: Int? = null, + oneTimeUpdatesLimit: Int? = null, + allowedUpdates: List = ALL_UPDATES_LIST, + exceptionsHandler: (Exception) -> Boolean = { true }, + updatesReceiver: UpdateReceiver +): KtorUpdatesPoller { + val executor = KtorRequestsExecutor( + token, + HttpClient( + CIO.create { + timeoutSeconds ?.let { _ -> + val timeout = timeoutSeconds.toLong() * 1000 + endpoint.apply { + keepAliveTime = timeout + connectTimeout = 1000 + } + } + } + ) + ) + + return KtorUpdatesPoller( + executor, + allowedUpdates, + timeoutSeconds, + oneTimeUpdatesLimit, + exceptionsHandler, + updatesReceiver + ) +} + +class KtorUpdatesPoller( private val executor: RequestsExecutor, - private val requestsDelayMillis: Long = 1000, - private val scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), - private val allowedUpdates: List? = null, - private val block: UpdateReceiver -) { + private val allowedUpdates: List = ALL_UPDATES_LIST, + private val timeoutSeconds: Int? = null, + private val oneTimeUpdatesLimit: Int? = null, + private val exceptionsHandler: (Exception) -> Boolean = { it.printStackTrace(); true }, + private val updatesReceiver: UpdateReceiver +) : UpdatesPoller { private var lastHandledUpdate: UpdateIdentifier = 0L private val mediaGroup: MutableList = mutableListOf() private var pollerJob: Job? = null private suspend fun sendToBlock(data: Update) { - block(data) + updatesReceiver(data) lastHandledUpdate = data.updateId } @@ -48,7 +86,9 @@ class UpdatesPoller( return executor.execute( GetUpdates( lastHandledUpdate + 1, // incremented because offset counted from 1 when updates id from 0 - allowed_updates = allowedUpdates + oneTimeUpdatesLimit, + timeoutSeconds, + allowedUpdates ) ).map { it.asUpdate @@ -72,16 +112,21 @@ class UpdatesPoller( pushMediaGroupUpdate() } - suspend fun start(): Job { - executor.executeUnsafe(DeleteWebhook()) - return pollerJob ?: scope.launch { + @Synchronized + override fun start(scope: CoroutineScope) { + pollerJob ?: scope.launch { + executor.executeUnsafe(DeleteWebhook()) while (isActive) { - delay(requestsDelayMillis) try { val updates = getUpdates() handleUpdates(updates) } catch (e: Exception) { - e.printStackTrace() + if (exceptionsHandler(e)) { + continue + } else { + close() + break + } } } }.also { @@ -89,7 +134,8 @@ class UpdatesPoller( } } - suspend fun stop() { - pollerJob ?.cancelAndJoin() + @Synchronized + override fun close() { + pollerJob ?.cancel() } } \ No newline at end of file diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPoller.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPoller.kt deleted file mode 100644 index 1d87bd6c17..0000000000 --- a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPoller.kt +++ /dev/null @@ -1,12 +0,0 @@ -package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions - -import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdatesPoller - -@Deprecated( - "Replaced in separated package", - ReplaceWith( - "UpdatesPoller", - "com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdatesPoller" - ) -) -typealias UpdatesPoller = UpdatesPoller diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPolling.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPolling.kt index 42b87eced1..0b0a77b8d7 100644 --- a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPolling.kt +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPolling.kt @@ -1,26 +1,35 @@ package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor +import com.github.insanusmokrassar.TelegramBotAPI.bot.UpdatesPoller +import com.github.insanusmokrassar.TelegramBotAPI.types.ALL_UPDATES_LIST import com.github.insanusmokrassar.TelegramBotAPI.types.update.* import com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdates.* import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdatesFilter -import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdatesPoller +import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.KtorUpdatesPoller import kotlinx.coroutines.* import java.util.concurrent.Executors typealias UpdateReceiver = suspend (T) -> Unit -suspend fun RequestsExecutor.startGettingOfUpdates( - requestsDelayMillis: Long = 1000, +fun RequestsExecutor.startGettingOfUpdates( + timeoutMillis: Long = 30 * 1000, scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), allowedUpdates: List? = null, block: UpdateReceiver -): Job { - return UpdatesPoller(this, requestsDelayMillis, scope, allowedUpdates, block).start() +): UpdatesPoller { + return KtorUpdatesPoller( + this, + allowedUpdates ?: ALL_UPDATES_LIST, + timeoutMillis.toInt() / 1000, + updatesReceiver = block + ).also { + it.start(scope) + } } -suspend fun RequestsExecutor.startGettingOfUpdates( +fun RequestsExecutor.startGettingOfUpdates( messageCallback: UpdateReceiver? = null, messageMediaGroupCallback: UpdateReceiver? = null, editedMessageCallback: UpdateReceiver? = null, @@ -35,9 +44,9 @@ suspend fun RequestsExecutor.startGettingOfUpdates( shippingQueryCallback: UpdateReceiver? = null, preCheckoutQueryCallback: UpdateReceiver? = null, pollCallback: UpdateReceiver? = null, - requestsDelayMillis: Long = 1000, + timeoutMillis: Long = 30 * 1000, scope: CoroutineScope = GlobalScope -): Job { +): UpdatesPoller { val filter = UpdatesFilter( messageCallback, messageMediaGroupCallback, @@ -55,14 +64,14 @@ suspend fun RequestsExecutor.startGettingOfUpdates( pollCallback ) return startGettingOfUpdates( - requestsDelayMillis, + timeoutMillis, scope, filter.allowedUpdates, filter.asUpdateReceiver ) } -suspend fun RequestsExecutor.startGettingOfUpdates( +fun RequestsExecutor.startGettingOfUpdates( messageCallback: UpdateReceiver? = null, mediaGroupCallback: UpdateReceiver? = null, editedMessageCallback: UpdateReceiver? = null, @@ -74,9 +83,9 @@ suspend fun RequestsExecutor.startGettingOfUpdates( shippingQueryCallback: UpdateReceiver? = null, preCheckoutQueryCallback: UpdateReceiver? = null, pollCallback: UpdateReceiver? = null, - requestsDelayMillis: Long = 1000, + timeoutMillis: Long = 30 * 1000, scope: CoroutineScope = GlobalScope -): Job = startGettingOfUpdates( +): UpdatesPoller = startGettingOfUpdates( messageCallback = messageCallback, messageMediaGroupCallback = mediaGroupCallback, editedMessageCallback = editedMessageCallback, @@ -91,6 +100,6 @@ suspend fun RequestsExecutor.startGettingOfUpdates( shippingQueryCallback = shippingQueryCallback, preCheckoutQueryCallback = preCheckoutQueryCallback, pollCallback = pollCallback, - requestsDelayMillis = requestsDelayMillis, + timeoutMillis = timeoutMillis, scope = scope )