diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ad725bd37..e027e10444 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ * Added `UnauthorizedException` * `RequestException` now is sealed * Rename `ReplyMessageNotFound` to `ReplyMessageNotFoundException` +* Added `List#mediaGroupId` extension +* Added utility `T#asReference(): WeakReference(T)` extension +* Added `UpdatesPoller` class which can be instantiated for manage updates polling +* Separated execute extensions (now they are in file `Executes`) and poller creating extensions +* `BaseMessageUpdate#toMediaGroupUpdate()` will also check condition when update-receiver already is `MediaGroupUpdate` ## 0.11.0 diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/BaseMessageUpdateToMediaGroupUpdate.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/BaseMessageUpdateToMediaGroupUpdate.kt index 1e9767e9f6..595e9cc7ad 100644 --- a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/BaseMessageUpdateToMediaGroupUpdate.kt +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/BaseMessageUpdateToMediaGroupUpdate.kt @@ -4,6 +4,6 @@ import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaG import com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdate import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate -fun BaseMessageUpdate.toMediaGroupUpdate(): MediaGroupUpdate? = (data as? MediaGroupMessage) ?.let { +fun BaseMessageUpdate.toMediaGroupUpdate(): MediaGroupUpdate? = (this as? MediaGroupUpdate) ?: ((data as? MediaGroupMessage) ?.let { MediaGroupUpdate(updateId, it) -} +}) diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/MediaGroupList.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/MediaGroupList.kt index 08866b8335..e43da3f2ef 100644 --- a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/MediaGroupList.kt +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/MediaGroupList.kt @@ -1,8 +1,10 @@ package com.github.insanusmokrassar.TelegramBotAPI.utils +import com.github.insanusmokrassar.TelegramBotAPI.types.MediaGroupIdentifier import com.github.insanusmokrassar.TelegramBotAPI.types.chat.Chat import com.github.insanusmokrassar.TelegramBotAPI.types.message.ForwardedMessage import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.* +import com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdate import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate val List.forwarded: ForwardedMessage? @@ -17,3 +19,6 @@ val List.replyTo: Message? val List.chat: Chat? get() = first().data.chat + +val List.mediaGroupId: MediaGroupIdentifier? + get() = (first().data as? MediaGroupMessage) ?.mediaGroupId diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/AsReference.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/AsReference.kt new file mode 100644 index 0000000000..b8cb0b5eae --- /dev/null +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/AsReference.kt @@ -0,0 +1,5 @@ +package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions + +import java.lang.ref.WeakReference + +fun T.asReference() = WeakReference(this) \ No newline at end of file diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Executes.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Executes.kt new file mode 100644 index 0000000000..789ca33c49 --- /dev/null +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Executes.kt @@ -0,0 +1,51 @@ +package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions + +import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor +import com.github.insanusmokrassar.TelegramBotAPI.bot.exceptions.RequestException +import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.Request +import com.github.insanusmokrassar.TelegramBotAPI.types.Response +import kotlinx.coroutines.* + + +fun RequestsExecutor.executeAsync( + request: Request, + onFail: (suspend (Response<*>) -> Unit)? = null, + scope: CoroutineScope = GlobalScope, + onSuccess: (suspend (T) -> Unit)? = null +): Job { + return scope.launch { + try { + val result = execute(request) + onSuccess ?.invoke(result) + } catch (e: RequestException) { + onFail ?.invoke(e.response) + } + } +} + +fun RequestsExecutor.executeAsync( + request: Request, + scope: CoroutineScope = GlobalScope +): Deferred { + return scope.async { execute(request) } +} + +suspend fun RequestsExecutor.executeUnsafe( + request: Request, + retries: Int = 0, + retriesDelay: Long = 1000L +): T? { + var leftRetries = retries + while(true) { + try { + return execute(request) + } catch (e: RequestException) { + if (leftRetries > 0) { + leftRetries-- + delay(retriesDelay) + } else { + return null + } + } + } +} diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/RequestsExecutor.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/RequestsExecutor.kt deleted file mode 100644 index 1204e2f2b2..0000000000 --- a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/RequestsExecutor.kt +++ /dev/null @@ -1,212 +0,0 @@ -package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions - -import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor -import com.github.insanusmokrassar.TelegramBotAPI.bot.exceptions.RequestException -import com.github.insanusmokrassar.TelegramBotAPI.requests.* -import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.Request -import com.github.insanusmokrassar.TelegramBotAPI.types.Response -import com.github.insanusmokrassar.TelegramBotAPI.types.UpdateIdentifier -import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage -import com.github.insanusmokrassar.TelegramBotAPI.types.update.* -import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate -import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update -import kotlinx.coroutines.* - -typealias UpdateReceiver = suspend (T) -> Unit - -fun RequestsExecutor.startGettingOfUpdates( - requestsDelayMillis: Long = 1000, - scope: CoroutineScope = GlobalScope, - allowedUpdates: List? = null, - block: UpdateReceiver -): Job { - return scope.launch { - var lastHandledUpdate: UpdateIdentifier = 0L - while (isActive) { - delay(requestsDelayMillis) - try { - val updates = execute( - GetUpdates( - lastHandledUpdate + 1, - allowed_updates = allowedUpdates - ) - ) - - val adaptedUpdates = mutableListOf() - var mediaGroup: MutableList? = null - - fun pushMediaGroup() { - mediaGroup ?.also { - adaptedUpdates.add(it) - mediaGroup = null - } - } - - updates.map { - it.asUpdate - }.forEach { update -> - val data = update.data - if (data is MediaGroupMessage) { - mediaGroup ?.let { - val message = it.first().data as MediaGroupMessage - if (message.mediaGroupId == data.mediaGroupId) { - it.add(update) - } else { - null - } - } ?: data.also { - pushMediaGroup() - mediaGroup = mutableListOf() - mediaGroup ?.add(update) - } - } else { - pushMediaGroup() - adaptedUpdates.add(update) - } - } - - mediaGroup ?.also { - adaptedUpdates.add(it) - mediaGroup = null - } - - for (update in adaptedUpdates) { - - try { - block(update) - lastHandledUpdate = when (update) { - is Update -> update.updateId - is List<*> -> (update.last() as? Update) ?.updateId ?: throw IllegalStateException( - "Found non-updates oriented list" - ) - else -> throw IllegalStateException( - "Unknown type of data" - ) - } - } catch (e: Exception) { - // TODO:: add exception handling - e.printStackTrace() - break - } - } - } catch (e: Exception) { - // TODO:: add exception handling - e.printStackTrace() - } - } - } -} - -fun RequestsExecutor.startGettingOfUpdates( - messageCallback: UpdateReceiver? = null, - messageMediaGroupCallback: UpdateReceiver>? = null, - editedMessageCallback: UpdateReceiver? = null, - editedMessageMediaGroupCallback: UpdateReceiver>? = null, - channelPostCallback: UpdateReceiver? = null, - channelPostMediaGroupCallback: UpdateReceiver>? = null, - editedChannelPostCallback: UpdateReceiver? = null, - editedChannelPostMediaGroupCallback: UpdateReceiver>? = null, - chosenInlineResultCallback: UpdateReceiver? = null, - inlineQueryCallback: UpdateReceiver? = null, - callbackQueryCallback: UpdateReceiver? = null, - shippingQueryCallback: UpdateReceiver? = null, - preCheckoutQueryCallback: UpdateReceiver? = null, - requestsDelayMillis: Long = 1000, - scope: CoroutineScope = GlobalScope -): Job { - val filter = UpdatesFilter( - messageCallback, - messageMediaGroupCallback, - editedMessageCallback, - editedMessageMediaGroupCallback, - channelPostCallback, - channelPostMediaGroupCallback, - editedChannelPostCallback, - editedChannelPostMediaGroupCallback, - chosenInlineResultCallback, - inlineQueryCallback, - callbackQueryCallback, - shippingQueryCallback, - preCheckoutQueryCallback - ) - return startGettingOfUpdates( - requestsDelayMillis, - scope, - filter.allowedUpdates, - filter.asUpdateReceiver - ) -} - -fun RequestsExecutor.startGettingOfUpdates( - messageCallback: UpdateReceiver? = null, - mediaGroupCallback: UpdateReceiver>? = null, - editedMessageCallback: UpdateReceiver? = null, - channelPostCallback: UpdateReceiver? = null, - editedChannelPostCallback: UpdateReceiver? = null, - chosenInlineResultCallback: UpdateReceiver? = null, - inlineQueryCallback: UpdateReceiver? = null, - callbackQueryCallback: UpdateReceiver? = null, - shippingQueryCallback: UpdateReceiver? = null, - preCheckoutQueryCallback: UpdateReceiver? = null, - requestsDelayMillis: Long = 1000, - scope: CoroutineScope = GlobalScope -): Job = startGettingOfUpdates( - messageCallback = messageCallback, - messageMediaGroupCallback = mediaGroupCallback, - editedMessageCallback = editedMessageCallback, - editedMessageMediaGroupCallback = mediaGroupCallback, - channelPostCallback = channelPostCallback, - channelPostMediaGroupCallback = mediaGroupCallback, - editedChannelPostCallback = editedChannelPostCallback, - editedChannelPostMediaGroupCallback = mediaGroupCallback, - chosenInlineResultCallback = chosenInlineResultCallback, - inlineQueryCallback = inlineQueryCallback, - callbackQueryCallback = callbackQueryCallback, - shippingQueryCallback = shippingQueryCallback, - preCheckoutQueryCallback = preCheckoutQueryCallback, - requestsDelayMillis = requestsDelayMillis, - scope = scope -) - -fun RequestsExecutor.executeAsync( - request: Request, - onFail: (suspend (Response<*>) -> Unit)? = null, - scope: CoroutineScope = GlobalScope, - onSuccess: (suspend (T) -> Unit)? = null -): Job { - return scope.launch { - try { - val result = execute(request) - onSuccess ?.invoke(result) - } catch (e: RequestException) { - onFail ?.invoke(e.response) - } - } -} - -fun RequestsExecutor.executeAsync( - request: Request, - scope: CoroutineScope = GlobalScope -): Deferred { - return scope.async { execute(request) } -} - -suspend fun RequestsExecutor.executeUnsafe( - request: Request, - retries: Int = 0, - retriesDelay: Long = 1000L -): T? { - var leftRetries = retries - while(true) { - try { - return execute(request) - } catch (e: RequestException) { - if (leftRetries > 0) { - leftRetries-- - delay(retriesDelay) - } else { - return null - } - } - } -} diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPoller.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPoller.kt new file mode 100644 index 0000000000..7d19100ddc --- /dev/null +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPoller.kt @@ -0,0 +1,99 @@ +package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions + +import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor +import com.github.insanusmokrassar.TelegramBotAPI.requests.GetUpdates +import com.github.insanusmokrassar.TelegramBotAPI.types.UpdateIdentifier +import com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdate +import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate +import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update +import com.github.insanusmokrassar.TelegramBotAPI.utils.mediaGroupId +import com.github.insanusmokrassar.TelegramBotAPI.utils.toMediaGroupUpdate +import kotlinx.coroutines.* +import java.util.concurrent.Executors + +private val updatesPollerRequestExecutorCollectedException = IllegalStateException("RequestsExecutor was collected by GC. Can't continue getting updates by polling") + +class UpdatesPoller( + requestsExecutor: RequestsExecutor, + private val requestsDelayMillis: Long = 1000, + private val scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), + private val allowedUpdates: List? = null, + private val block: UpdateReceiver +) { + private val executor = requestsExecutor.asReference() + private var lastHandledUpdate: UpdateIdentifier = 0L + private val mediaGroup: MutableList = mutableListOf() + + private var pollerJob: Job? = null + + private suspend fun sendToBlock(data: Any) { + block(data) + lastHandledUpdate = when (data) { + is Update -> data.updateId + is List<*> -> (data.last() as? Update) ?.updateId ?: throw IllegalStateException( + "Found non-updates oriented list" + ) + else -> throw IllegalStateException( + "Unknown type of data" + ) + } + } + + private suspend fun pushMediaGroupUpdate(mediaGroupUpdate: MediaGroupUpdate? = null) { + val inputMediaGroupId = mediaGroupUpdate ?.data ?.mediaGroupId + if (mediaGroup.isNotEmpty() && inputMediaGroupId ?.equals(mediaGroup.mediaGroupId) != true) { + sendToBlock(listOf(*mediaGroup.toTypedArray())) + mediaGroup.clear() + } + mediaGroupUpdate ?.let { + mediaGroup.add(it) + } + } + + private suspend fun getUpdates(): List { + return executor.get() ?.execute( + GetUpdates( + lastHandledUpdate + 1, // incremented because offset counted from 1 when updates id from 0 + allowed_updates = allowedUpdates + ) + ) ?.map { + it.asUpdate + } ?: throw updatesPollerRequestExecutorCollectedException + } + + private suspend fun handleUpdates(updates: List) { + updates.forEach { update -> + val mediaGroupUpdate = (update as? BaseMessageUpdate) ?.toMediaGroupUpdate() + mediaGroupUpdate ?.let { _ -> + pushMediaGroupUpdate(mediaGroupUpdate) + } ?: let { + pushMediaGroupUpdate() + sendToBlock(update) + } + } + + pushMediaGroupUpdate() + } + + fun start(): Job { + return pollerJob ?: scope.launch { + while (isActive) { + delay(requestsDelayMillis) + try { + handleUpdates(getUpdates()) + } catch (e: Exception) { + if (e == updatesPollerRequestExecutorCollectedException) { + throw IllegalArgumentException(e.message) + } + e.printStackTrace() + } + } + }.also { + pollerJob = it + } + } + + suspend fun stop() { + pollerJob ?.cancelAndJoin() + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPolling.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPolling.kt new file mode 100644 index 0000000000..f4f6480035 --- /dev/null +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesPolling.kt @@ -0,0 +1,89 @@ +package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions + +import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor +import com.github.insanusmokrassar.TelegramBotAPI.types.update.* +import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate +import kotlinx.coroutines.* +import java.util.concurrent.Executors + +typealias UpdateReceiver = suspend (T) -> Unit + +fun RequestsExecutor.startGettingOfUpdates( + requestsDelayMillis: Long = 1000, + scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), + allowedUpdates: List? = null, + block: UpdateReceiver +): Job { + return UpdatesPoller(this, requestsDelayMillis, scope, allowedUpdates, block).start() +} + +fun RequestsExecutor.startGettingOfUpdates( + messageCallback: UpdateReceiver? = null, + messageMediaGroupCallback: UpdateReceiver>? = null, + editedMessageCallback: UpdateReceiver? = null, + editedMessageMediaGroupCallback: UpdateReceiver>? = null, + channelPostCallback: UpdateReceiver? = null, + channelPostMediaGroupCallback: UpdateReceiver>? = null, + editedChannelPostCallback: UpdateReceiver? = null, + editedChannelPostMediaGroupCallback: UpdateReceiver>? = null, + chosenInlineResultCallback: UpdateReceiver? = null, + inlineQueryCallback: UpdateReceiver? = null, + callbackQueryCallback: UpdateReceiver? = null, + shippingQueryCallback: UpdateReceiver? = null, + preCheckoutQueryCallback: UpdateReceiver? = null, + requestsDelayMillis: Long = 1000, + scope: CoroutineScope = GlobalScope +): Job { + val filter = UpdatesFilter( + messageCallback, + messageMediaGroupCallback, + editedMessageCallback, + editedMessageMediaGroupCallback, + channelPostCallback, + channelPostMediaGroupCallback, + editedChannelPostCallback, + editedChannelPostMediaGroupCallback, + chosenInlineResultCallback, + inlineQueryCallback, + callbackQueryCallback, + shippingQueryCallback, + preCheckoutQueryCallback + ) + return startGettingOfUpdates( + requestsDelayMillis, + scope, + filter.allowedUpdates, + filter.asUpdateReceiver + ) +} + +fun RequestsExecutor.startGettingOfUpdates( + messageCallback: UpdateReceiver? = null, + mediaGroupCallback: UpdateReceiver>? = null, + editedMessageCallback: UpdateReceiver? = null, + channelPostCallback: UpdateReceiver? = null, + editedChannelPostCallback: UpdateReceiver? = null, + chosenInlineResultCallback: UpdateReceiver? = null, + inlineQueryCallback: UpdateReceiver? = null, + callbackQueryCallback: UpdateReceiver? = null, + shippingQueryCallback: UpdateReceiver? = null, + preCheckoutQueryCallback: UpdateReceiver? = null, + requestsDelayMillis: Long = 1000, + scope: CoroutineScope = GlobalScope +): Job = startGettingOfUpdates( + messageCallback = messageCallback, + messageMediaGroupCallback = mediaGroupCallback, + editedMessageCallback = editedMessageCallback, + editedMessageMediaGroupCallback = mediaGroupCallback, + channelPostCallback = channelPostCallback, + channelPostMediaGroupCallback = mediaGroupCallback, + editedChannelPostCallback = editedChannelPostCallback, + editedChannelPostMediaGroupCallback = mediaGroupCallback, + chosenInlineResultCallback = chosenInlineResultCallback, + inlineQueryCallback = inlineQueryCallback, + callbackQueryCallback = callbackQueryCallback, + shippingQueryCallback = shippingQueryCallback, + preCheckoutQueryCallback = preCheckoutQueryCallback, + requestsDelayMillis = requestsDelayMillis, + scope = scope +)