replace long polling and webhook tools

This commit is contained in:
InsanusMokrassar 2020-05-13 23:22:35 +06:00
parent 05e8c9c90d
commit 420b846466
7 changed files with 354 additions and 16 deletions

View File

@ -53,12 +53,15 @@
* `TelegramBotAPI`:
* Currently `UpdateDeserializationStrategy` is publicly available
* All `setWebhook` extensions was marked as deprecated and replaced into `TelegramBotAPI-extensions-api`
* All `setWebhook` extensions was marked as deprecated and replaced into `TelegramBotAPI-extensions-utils`
* `TelegramBotAPI-extensions-api`:
* New extensions `setWebhook` and `includeWebhookInRoute` was added
* New extension `CoroutineScope#updateHandlerWithMediaGroupsAdaptation` was added
* Long Polling extensions now are deprecated in this project. It was replaced into `TelegramBotAPI-extensions-utils`
* `TelegramBotAPI-extensions-utils`:
* Extension `asTelegramUpdate` was added
* Long Polling extensions were added
* Updates utils were added
* New extensions `setWebhook` and `includeWebhookInRoute` was added
* New extension `CoroutineScope#updateHandlerWithMediaGroupsAdaptation` was added
### 0.27.2

View File

@ -14,6 +14,7 @@ import com.github.insanusmokrassar.TelegramBotAPI.utils.PreviewFeature
import com.github.insanusmokrassar.TelegramBotAPI.utils.handleSafely
import kotlinx.coroutines.*
@Deprecated("Replaced and renamed into TelegramBotAPI-extensions-utils")
fun RequestsExecutor.startGettingOfUpdates(
timeoutSeconds: Seconds = 30,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
@ -71,6 +72,7 @@ fun RequestsExecutor.startGettingOfUpdates(
@FlowPreview
@PreviewFeature
@Suppress("unused")
@Deprecated("Replaced and renamed into TelegramBotAPI-extensions-utils")
fun RequestsExecutor.startGettingFlowsUpdates(
timeoutSeconds: Seconds = 30,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
@ -82,6 +84,7 @@ fun RequestsExecutor.startGettingFlowsUpdates(
startGettingOfUpdates(timeoutSeconds, scope, exceptionsHandler, allowedUpdates, asUpdateReceiver)
}
@Deprecated("Replaced and renamed into TelegramBotAPI-extensions-utils")
fun RequestsExecutor.startGettingOfUpdates(
updatesFilter: UpdatesFilter,
timeoutSeconds: Seconds = 30,
@ -95,6 +98,7 @@ fun RequestsExecutor.startGettingOfUpdates(
updatesFilter.asUpdateReceiver
)
@Deprecated("Replaced and renamed into TelegramBotAPI-extensions-utils")
fun RequestsExecutor.startGettingOfUpdates(
messageCallback: UpdateReceiver<MessageUpdate>? = null,
messageMediaGroupCallback: UpdateReceiver<MessageMediaGroupUpdate>? = null,
@ -140,6 +144,7 @@ fun RequestsExecutor.startGettingOfUpdates(
}
@Suppress("unused")
@Deprecated("Replaced and renamed into TelegramBotAPI-extensions-utils")
fun RequestsExecutor.startGettingOfUpdates(
messageCallback: UpdateReceiver<MessageUpdate>? = null,
mediaGroupCallback: UpdateReceiver<MediaGroupUpdate>? = null,

View File

@ -0,0 +1,93 @@
package com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.updates
import com.github.insanusmokrassar.TelegramBotAPI.types.MediaGroupIdentifier
import com.github.insanusmokrassar.TelegramBotAPI.types.UpdateIdentifier
import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage
import com.github.insanusmokrassar.TelegramBotAPI.types.update.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdates.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.*
/**
* @return If [this] is [SentMediaGroupUpdate] - [Update.updateId] of [last] element, or its own [Update.updateId]
*/
fun Update.lastUpdateIdentifier(): UpdateIdentifier {
return if (this is SentMediaGroupUpdate) {
origins.last().updateId
} else {
updateId
}
}
/**
* @return The biggest [UpdateIdentifier] OR null
*
* @see [Update.lastUpdateIdentifier]
*/
fun List<Update>.lastUpdateIdentifier(): UpdateIdentifier? {
return maxBy { it.updateId } ?.lastUpdateIdentifier()
}
/**
* Will convert incoming list of updates to list with [MediaGroupUpdate]s
*/
fun List<Update>.convertWithMediaGroupUpdates(): List<Update> {
val resultUpdates = mutableListOf<Update>()
val mediaGroups = mutableMapOf<MediaGroupIdentifier, MutableList<BaseSentMessageUpdate>>()
for (update in this) {
val data = (update.data as? MediaGroupMessage)
if (data == null) {
resultUpdates.add(update)
continue
}
when (update) {
is BaseEditMessageUpdate -> resultUpdates.add(
update.toEditMediaGroupUpdate()
)
is BaseSentMessageUpdate -> {
mediaGroups.getOrPut(data.mediaGroupId) {
mutableListOf()
}.add(update)
}
else -> resultUpdates.add(update)
}
}
mediaGroups.values.map {
it.toSentMediaGroupUpdate() ?.let { mediaGroupUpdate ->
resultUpdates.add(mediaGroupUpdate)
}
}
resultUpdates.sortBy { it.updateId }
return resultUpdates
}
/**
* @receiver List of [BaseSentMessageUpdate] where [BaseSentMessageUpdate.data] is [MediaGroupMessage] and all messages
* have the same [MediaGroupMessage.mediaGroupId]
* @return [MessageMediaGroupUpdate] in case if [first] object of [this] is [MessageUpdate]. When [first] object is
* [ChannelPostUpdate] instance - will return [ChannelPostMediaGroupUpdate]. Otherwise will be returned null
*/
fun List<BaseSentMessageUpdate>.toSentMediaGroupUpdate(): SentMediaGroupUpdate? = (this as? SentMediaGroupUpdate) ?: let {
if (isEmpty()) {
return@let null
}
val resultList = sortedBy { it.updateId }
when (first()) {
is MessageUpdate -> MessageMediaGroupUpdate(resultList)
is ChannelPostUpdate -> ChannelPostMediaGroupUpdate(resultList)
else -> null
}
}
/**
* @return [EditMessageMediaGroupUpdate] in case if [this] is [EditMessageUpdate]. When [this] object is
* [EditChannelPostUpdate] instance - will return [EditChannelPostMediaGroupUpdate]
*
* @throws IllegalStateException
*/
fun BaseEditMessageUpdate.toEditMediaGroupUpdate(): EditMediaGroupUpdate = (this as? EditMediaGroupUpdate) ?: let {
when (this) {
is EditMessageUpdate -> EditMessageMediaGroupUpdate(this)
is EditChannelPostUpdate -> EditChannelPostMediaGroupUpdate(this)
else -> error("Unsupported type of ${BaseEditMessageUpdate::class.simpleName}")
}
}

View File

@ -0,0 +1,181 @@
package com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.updates.retrieving
import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor
import com.github.insanusmokrassar.TelegramBotAPI.bot.exceptions.RequestException
import com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.updates.convertWithMediaGroupUpdates
import com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.updates.lastUpdateIdentifier
import com.github.insanusmokrassar.TelegramBotAPI.requests.GetUpdates
import com.github.insanusmokrassar.TelegramBotAPI.types.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdates.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.*
import com.github.insanusmokrassar.TelegramBotAPI.utils.PreviewFeature
import com.github.insanusmokrassar.TelegramBotAPI.utils.handleSafely
import kotlinx.coroutines.*
fun RequestsExecutor.startGettingOfUpdatesByLongPolling(
timeoutSeconds: Seconds = 30,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
exceptionsHandler: (suspend (Exception) -> Unit)? = null,
allowedUpdates: List<String>? = null,
updatesReceiver: UpdateReceiver<Update>
): Job = scope.launch {
var lastUpdateIdentifier: UpdateIdentifier? = null
while (isActive) {
handleSafely(
{ e ->
exceptionsHandler ?.invoke(e)
if (e is RequestException) {
delay(1000L)
}
}
) {
val updates = execute(
GetUpdates(
offset = lastUpdateIdentifier?.plus(1),
timeout = timeoutSeconds,
allowed_updates = allowedUpdates
)
).let { originalUpdates ->
val converted = originalUpdates.convertWithMediaGroupUpdates()
/**
* Dirty hack for cases when the media group was retrieved not fully:
*
* We are throw out the last media group and will reretrieve it again in the next get updates
* and it will guarantee that it is full
*/
if (originalUpdates.size == getUpdatesLimit.last && converted.last() is SentMediaGroupUpdate) {
converted - converted.last()
} else {
converted
}
}
handleSafely {
for (update in updates) {
updatesReceiver(update)
lastUpdateIdentifier = update.lastUpdateIdentifier()
}
}
}
}
}
/**
* This method will create a new one [FlowsUpdatesFilter]. This method could be unsafe due to the fact that it will start
* getting updates IMMEDIATELY. That means that your bot will be able to skip some of them until you will call
* [kotlinx.coroutines.flow.Flow.collect] on one of [FlowsUpdatesFilter] flows. To avoid it, you can pass
* [flowUpdatesPreset] lambda - it will be called BEFORE starting updates getting
*/
@FlowPreview
@PreviewFeature
@Suppress("unused")
fun RequestsExecutor.startGettingFlowsUpdatesByLongPolling(
timeoutSeconds: Seconds = 30,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
exceptionsHandler: (suspend (Exception) -> Unit)? = null,
flowsUpdatesFilterUpdatesKeeperCount: Int = 64,
flowUpdatesPreset: FlowsUpdatesFilter.() -> Unit = {}
): FlowsUpdatesFilter = FlowsUpdatesFilter(flowsUpdatesFilterUpdatesKeeperCount).apply {
flowUpdatesPreset()
startGettingOfUpdatesByLongPolling(timeoutSeconds, scope, exceptionsHandler, allowedUpdates, asUpdateReceiver)
}
fun RequestsExecutor.startGettingOfUpdatesByLongPolling(
updatesFilter: UpdatesFilter,
timeoutSeconds: Seconds = 30,
exceptionsHandler: (suspend (Exception) -> Unit)? = null,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default)
): Job = startGettingOfUpdatesByLongPolling(
timeoutSeconds,
scope,
exceptionsHandler,
updatesFilter.allowedUpdates,
updatesFilter.asUpdateReceiver
)
fun RequestsExecutor.startGettingOfUpdatesByLongPolling(
messageCallback: UpdateReceiver<MessageUpdate>? = null,
messageMediaGroupCallback: UpdateReceiver<MessageMediaGroupUpdate>? = null,
editedMessageCallback: UpdateReceiver<EditMessageUpdate>? = null,
editedMessageMediaGroupCallback: UpdateReceiver<EditMessageMediaGroupUpdate>? = null,
channelPostCallback: UpdateReceiver<ChannelPostUpdate>? = null,
channelPostMediaGroupCallback: UpdateReceiver<ChannelPostMediaGroupUpdate>? = null,
editedChannelPostCallback: UpdateReceiver<EditChannelPostUpdate>? = null,
editedChannelPostMediaGroupCallback: UpdateReceiver<EditChannelPostMediaGroupUpdate>? = null,
chosenInlineResultCallback: UpdateReceiver<ChosenInlineResultUpdate>? = null,
inlineQueryCallback: UpdateReceiver<InlineQueryUpdate>? = null,
callbackQueryCallback: UpdateReceiver<CallbackQueryUpdate>? = null,
shippingQueryCallback: UpdateReceiver<ShippingQueryUpdate>? = null,
preCheckoutQueryCallback: UpdateReceiver<PreCheckoutQueryUpdate>? = null,
pollCallback: UpdateReceiver<PollUpdate>? = null,
pollAnswerCallback: UpdateReceiver<PollAnswerUpdate>? = null,
timeoutSeconds: Seconds = 30,
exceptionsHandler: (suspend (Exception) -> Unit)? = null,
scope: CoroutineScope = GlobalScope
): Job {
return startGettingOfUpdatesByLongPolling(
SimpleUpdatesFilter(
messageCallback,
messageMediaGroupCallback,
editedMessageCallback,
editedMessageMediaGroupCallback,
channelPostCallback,
channelPostMediaGroupCallback,
editedChannelPostCallback,
editedChannelPostMediaGroupCallback,
chosenInlineResultCallback,
inlineQueryCallback,
callbackQueryCallback,
shippingQueryCallback,
preCheckoutQueryCallback,
pollCallback,
pollAnswerCallback
),
timeoutSeconds,
exceptionsHandler,
scope
)
}
@Suppress("unused")
fun RequestsExecutor.startGettingOfUpdatesByLongPolling(
messageCallback: UpdateReceiver<MessageUpdate>? = null,
mediaGroupCallback: UpdateReceiver<MediaGroupUpdate>? = null,
editedMessageCallback: UpdateReceiver<EditMessageUpdate>? = null,
channelPostCallback: UpdateReceiver<ChannelPostUpdate>? = null,
editedChannelPostCallback: UpdateReceiver<EditChannelPostUpdate>? = null,
chosenInlineResultCallback: UpdateReceiver<ChosenInlineResultUpdate>? = null,
inlineQueryCallback: UpdateReceiver<InlineQueryUpdate>? = null,
callbackQueryCallback: UpdateReceiver<CallbackQueryUpdate>? = null,
shippingQueryCallback: UpdateReceiver<ShippingQueryUpdate>? = null,
preCheckoutQueryCallback: UpdateReceiver<PreCheckoutQueryUpdate>? = null,
pollCallback: UpdateReceiver<PollUpdate>? = null,
pollAnswerCallback: UpdateReceiver<PollAnswerUpdate>? = null,
timeoutSeconds: Seconds = 30,
exceptionsHandler: (suspend (Exception) -> Unit)? = null,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default)
): Job = startGettingOfUpdatesByLongPolling(
messageCallback = messageCallback,
messageMediaGroupCallback = mediaGroupCallback,
editedMessageCallback = editedMessageCallback,
editedMessageMediaGroupCallback = mediaGroupCallback,
channelPostCallback = channelPostCallback,
channelPostMediaGroupCallback = mediaGroupCallback,
editedChannelPostCallback = editedChannelPostCallback,
editedChannelPostMediaGroupCallback = mediaGroupCallback,
chosenInlineResultCallback = chosenInlineResultCallback,
inlineQueryCallback = inlineQueryCallback,
callbackQueryCallback = callbackQueryCallback,
shippingQueryCallback = shippingQueryCallback,
preCheckoutQueryCallback = preCheckoutQueryCallback,
pollCallback = pollCallback,
pollAnswerCallback = pollAnswerCallback,
timeoutSeconds = timeoutSeconds,
exceptionsHandler = exceptionsHandler,
scope = scope
)

View File

@ -0,0 +1,60 @@
package com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.updates.retrieving
import com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.updates.convertWithMediaGroupUpdates
import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdateReceiver
import com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.accumulateByKey
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
/**
* Create [UpdateReceiver] object which will correctly accumulate updates and send into output updates which INCLUDE
* [com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdates.MediaGroupUpdate]s.
*
* @see UpdateReceiver
*/
fun CoroutineScope.updateHandlerWithMediaGroupsAdaptation(
output: UpdateReceiver<Update>,
debounceTimeMillis: Long = 1000L
): UpdateReceiver<Update> {
val updatesChannel = Channel<Update>(Channel.UNLIMITED)
val mediaGroupChannel = Channel<Pair<String, BaseMessageUpdate>>(Channel.UNLIMITED)
val mediaGroupAccumulatedChannel = mediaGroupChannel.accumulateByKey(
debounceTimeMillis,
scope = this
)
launch {
launch {
for (update in updatesChannel) {
when (val data = update.data) {
is MediaGroupMessage -> mediaGroupChannel.send("${data.mediaGroupId}${update::class.simpleName}" to update as BaseMessageUpdate)
else -> output(update)
}
}
}
launch {
for ((_, mediaGroup) in mediaGroupAccumulatedChannel) {
mediaGroup.convertWithMediaGroupUpdates().forEach {
output(it)
}
}
}
}
return { updatesChannel.send(it) }
}
/**
* Create [UpdateReceiver] object which will correctly accumulate updates and send into output updates which INCLUDE
* [com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdates.MediaGroupUpdate]s.
*
* @see UpdateReceiver
*/
fun CoroutineScope.updateHandlerWithMediaGroupsAdaptation(
output: UpdateReceiver<Update>
) = updateHandlerWithMediaGroupsAdaptation(output, 1000L)

View File

@ -1,16 +1,14 @@
package com.github.insanusmokrassar.TelegramBotAPI.extensions.api
package com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.updates.retrieving
import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor
import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.InternalUtils.convertWithMediaGroupUpdates
import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.utils.updateHandlerWithMediaGroupsAdaptation
import com.github.insanusmokrassar.TelegramBotAPI.extensions.utils.nonstrictJsonFormat
import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.InputFile
import com.github.insanusmokrassar.TelegramBotAPI.requests.webhook.SetWebhook
import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.UpdateDeserializationStrategy
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdateReceiver
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdatesFilter
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.webhook.WebhookPrivateKeyConfig
import com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.accumulateByKey
import com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.executeAsync
import com.github.insanusmokrassar.TelegramBotAPI.utils.handleSafely
import io.ktor.application.call
@ -19,9 +17,9 @@ import io.ktor.response.respond
import io.ktor.routing.*
import io.ktor.server.engine.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.Executors
/**
* @param [scope] Will be used for mapping of media groups
* @param [exceptionsHandler] Pass this parameter to set custom exception handler for getting updates
@ -241,5 +239,3 @@ suspend fun RequestsExecutor.setWebhook(
maxAllowedConnections,
filter.asUpdateReceiver
)

View File

@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit
* which will be used by telegram to send encrypted messages
* @param scope Scope which will be used for
*/
@Deprecated("Replaced into project TelegramBotAPI-extensions-api")
@Deprecated("Replaced into project TelegramBotAPI-extensions-utils")
suspend fun RequestsExecutor.setWebhook(
url: String,
port: Int,
@ -137,7 +137,7 @@ suspend fun RequestsExecutor.setWebhook(
}
}
@Deprecated("Replaced into project TelegramBotAPI-extensions-api")
@Deprecated("Replaced into project TelegramBotAPI-extensions-utils")
suspend fun RequestsExecutor.setWebhook(
url: String,
port: Int,
@ -165,7 +165,7 @@ suspend fun RequestsExecutor.setWebhook(
block
)
@Deprecated("Replaced into project TelegramBotAPI-extensions-api")
@Deprecated("Replaced into project TelegramBotAPI-extensions-utils")
suspend fun RequestsExecutor.setWebhook(
url: String,
port: Int,
@ -190,7 +190,7 @@ suspend fun RequestsExecutor.setWebhook(
block
)
@Deprecated("Replaced into project TelegramBotAPI-extensions-api")
@Deprecated("Replaced into project TelegramBotAPI-extensions-utils")
suspend fun RequestsExecutor.setWebhook(
url: String,
port: Int,