UpdatesPoller, asReference, mediaGroupId for list and separate executes and update polling extensions

This commit is contained in:
InsanusMokrassar 2019-03-06 10:09:45 +08:00
parent 7b78dbdb3e
commit 336e76450f
8 changed files with 256 additions and 214 deletions

View File

@ -17,6 +17,11 @@
* Added `UnauthorizedException`
* `RequestException` now is sealed
* Rename `ReplyMessageNotFound` to `ReplyMessageNotFoundException`
* Added `List<BaseMessageUpdate>#mediaGroupId` extension
* Added utility `T#asReference(): WeakReference(T)` extension
* Added `UpdatesPoller` class which can be instantiated for manage updates polling
* Separated execute extensions (now they are in file `Executes`) and poller creating extensions
* `BaseMessageUpdate#toMediaGroupUpdate()` will also check condition when update-receiver already is `MediaGroupUpdate`
## 0.11.0

View File

@ -4,6 +4,6 @@ import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaG
import com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdate
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate
fun BaseMessageUpdate.toMediaGroupUpdate(): MediaGroupUpdate? = (data as? MediaGroupMessage) ?.let {
fun BaseMessageUpdate.toMediaGroupUpdate(): MediaGroupUpdate? = (this as? MediaGroupUpdate) ?: ((data as? MediaGroupMessage) ?.let {
MediaGroupUpdate(updateId, it)
}
})

View File

@ -1,8 +1,10 @@
package com.github.insanusmokrassar.TelegramBotAPI.utils
import com.github.insanusmokrassar.TelegramBotAPI.types.MediaGroupIdentifier
import com.github.insanusmokrassar.TelegramBotAPI.types.chat.Chat
import com.github.insanusmokrassar.TelegramBotAPI.types.message.ForwardedMessage
import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdate
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate
val List<BaseMessageUpdate>.forwarded: ForwardedMessage?
@ -17,3 +19,6 @@ val List<BaseMessageUpdate>.replyTo: Message?
val List<BaseMessageUpdate>.chat: Chat?
get() = first().data.chat
val List<BaseMessageUpdate>.mediaGroupId: MediaGroupIdentifier?
get() = (first().data as? MediaGroupMessage) ?.mediaGroupId

View File

@ -0,0 +1,5 @@
package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions
import java.lang.ref.WeakReference
fun <T> T.asReference() = WeakReference(this)

View File

@ -0,0 +1,51 @@
package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions
import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor
import com.github.insanusmokrassar.TelegramBotAPI.bot.exceptions.RequestException
import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.Request
import com.github.insanusmokrassar.TelegramBotAPI.types.Response
import kotlinx.coroutines.*
fun <T: Any> RequestsExecutor.executeAsync(
request: Request<T>,
onFail: (suspend (Response<*>) -> Unit)? = null,
scope: CoroutineScope = GlobalScope,
onSuccess: (suspend (T) -> Unit)? = null
): Job {
return scope.launch {
try {
val result = execute(request)
onSuccess ?.invoke(result)
} catch (e: RequestException) {
onFail ?.invoke(e.response)
}
}
}
fun <T: Any> RequestsExecutor.executeAsync(
request: Request<T>,
scope: CoroutineScope = GlobalScope
): Deferred<T> {
return scope.async { execute(request) }
}
suspend fun <T: Any> RequestsExecutor.executeUnsafe(
request: Request<T>,
retries: Int = 0,
retriesDelay: Long = 1000L
): T? {
var leftRetries = retries
while(true) {
try {
return execute(request)
} catch (e: RequestException) {
if (leftRetries > 0) {
leftRetries--
delay(retriesDelay)
} else {
return null
}
}
}
}

View File

@ -1,212 +0,0 @@
package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions
import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor
import com.github.insanusmokrassar.TelegramBotAPI.bot.exceptions.RequestException
import com.github.insanusmokrassar.TelegramBotAPI.requests.*
import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.Request
import com.github.insanusmokrassar.TelegramBotAPI.types.Response
import com.github.insanusmokrassar.TelegramBotAPI.types.UpdateIdentifier
import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage
import com.github.insanusmokrassar.TelegramBotAPI.types.update.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update
import kotlinx.coroutines.*
typealias UpdateReceiver<T> = suspend (T) -> Unit
fun RequestsExecutor.startGettingOfUpdates(
requestsDelayMillis: Long = 1000,
scope: CoroutineScope = GlobalScope,
allowedUpdates: List<String>? = null,
block: UpdateReceiver<Any>
): Job {
return scope.launch {
var lastHandledUpdate: UpdateIdentifier = 0L
while (isActive) {
delay(requestsDelayMillis)
try {
val updates = execute(
GetUpdates(
lastHandledUpdate + 1,
allowed_updates = allowedUpdates
)
)
val adaptedUpdates = mutableListOf<Any>()
var mediaGroup: MutableList<Update>? = null
fun pushMediaGroup() {
mediaGroup ?.also {
adaptedUpdates.add(it)
mediaGroup = null
}
}
updates.map {
it.asUpdate
}.forEach { update ->
val data = update.data
if (data is MediaGroupMessage) {
mediaGroup ?.let {
val message = it.first().data as MediaGroupMessage
if (message.mediaGroupId == data.mediaGroupId) {
it.add(update)
} else {
null
}
} ?: data.also {
pushMediaGroup()
mediaGroup = mutableListOf()
mediaGroup ?.add(update)
}
} else {
pushMediaGroup()
adaptedUpdates.add(update)
}
}
mediaGroup ?.also {
adaptedUpdates.add(it)
mediaGroup = null
}
for (update in adaptedUpdates) {
try {
block(update)
lastHandledUpdate = when (update) {
is Update -> update.updateId
is List<*> -> (update.last() as? Update) ?.updateId ?: throw IllegalStateException(
"Found non-updates oriented list"
)
else -> throw IllegalStateException(
"Unknown type of data"
)
}
} catch (e: Exception) {
// TODO:: add exception handling
e.printStackTrace()
break
}
}
} catch (e: Exception) {
// TODO:: add exception handling
e.printStackTrace()
}
}
}
}
fun RequestsExecutor.startGettingOfUpdates(
messageCallback: UpdateReceiver<MessageUpdate>? = null,
messageMediaGroupCallback: UpdateReceiver<List<MediaGroupUpdate>>? = null,
editedMessageCallback: UpdateReceiver<EditMessageUpdate>? = null,
editedMessageMediaGroupCallback: UpdateReceiver<List<MediaGroupUpdate>>? = null,
channelPostCallback: UpdateReceiver<ChannelPostUpdate>? = null,
channelPostMediaGroupCallback: UpdateReceiver<List<MediaGroupUpdate>>? = null,
editedChannelPostCallback: UpdateReceiver<EditChannelPostUpdate>? = null,
editedChannelPostMediaGroupCallback: UpdateReceiver<List<MediaGroupUpdate>>? = null,
chosenInlineResultCallback: UpdateReceiver<ChosenInlineResultUpdate>? = null,
inlineQueryCallback: UpdateReceiver<InlineQueryUpdate>? = null,
callbackQueryCallback: UpdateReceiver<CallbackQueryUpdate>? = null,
shippingQueryCallback: UpdateReceiver<ShippingQueryUpdate>? = null,
preCheckoutQueryCallback: UpdateReceiver<PreCheckoutQueryUpdate>? = null,
requestsDelayMillis: Long = 1000,
scope: CoroutineScope = GlobalScope
): Job {
val filter = UpdatesFilter(
messageCallback,
messageMediaGroupCallback,
editedMessageCallback,
editedMessageMediaGroupCallback,
channelPostCallback,
channelPostMediaGroupCallback,
editedChannelPostCallback,
editedChannelPostMediaGroupCallback,
chosenInlineResultCallback,
inlineQueryCallback,
callbackQueryCallback,
shippingQueryCallback,
preCheckoutQueryCallback
)
return startGettingOfUpdates(
requestsDelayMillis,
scope,
filter.allowedUpdates,
filter.asUpdateReceiver
)
}
fun RequestsExecutor.startGettingOfUpdates(
messageCallback: UpdateReceiver<MessageUpdate>? = null,
mediaGroupCallback: UpdateReceiver<List<BaseMessageUpdate>>? = null,
editedMessageCallback: UpdateReceiver<EditMessageUpdate>? = null,
channelPostCallback: UpdateReceiver<ChannelPostUpdate>? = null,
editedChannelPostCallback: UpdateReceiver<EditChannelPostUpdate>? = null,
chosenInlineResultCallback: UpdateReceiver<ChosenInlineResultUpdate>? = null,
inlineQueryCallback: UpdateReceiver<InlineQueryUpdate>? = null,
callbackQueryCallback: UpdateReceiver<CallbackQueryUpdate>? = null,
shippingQueryCallback: UpdateReceiver<ShippingQueryUpdate>? = null,
preCheckoutQueryCallback: UpdateReceiver<PreCheckoutQueryUpdate>? = null,
requestsDelayMillis: Long = 1000,
scope: CoroutineScope = GlobalScope
): Job = startGettingOfUpdates(
messageCallback = messageCallback,
messageMediaGroupCallback = mediaGroupCallback,
editedMessageCallback = editedMessageCallback,
editedMessageMediaGroupCallback = mediaGroupCallback,
channelPostCallback = channelPostCallback,
channelPostMediaGroupCallback = mediaGroupCallback,
editedChannelPostCallback = editedChannelPostCallback,
editedChannelPostMediaGroupCallback = mediaGroupCallback,
chosenInlineResultCallback = chosenInlineResultCallback,
inlineQueryCallback = inlineQueryCallback,
callbackQueryCallback = callbackQueryCallback,
shippingQueryCallback = shippingQueryCallback,
preCheckoutQueryCallback = preCheckoutQueryCallback,
requestsDelayMillis = requestsDelayMillis,
scope = scope
)
fun <T: Any> RequestsExecutor.executeAsync(
request: Request<T>,
onFail: (suspend (Response<*>) -> Unit)? = null,
scope: CoroutineScope = GlobalScope,
onSuccess: (suspend (T) -> Unit)? = null
): Job {
return scope.launch {
try {
val result = execute(request)
onSuccess ?.invoke(result)
} catch (e: RequestException) {
onFail ?.invoke(e.response)
}
}
}
fun <T: Any> RequestsExecutor.executeAsync(
request: Request<T>,
scope: CoroutineScope = GlobalScope
): Deferred<T> {
return scope.async { execute(request) }
}
suspend fun <T: Any> RequestsExecutor.executeUnsafe(
request: Request<T>,
retries: Int = 0,
retriesDelay: Long = 1000L
): T? {
var leftRetries = retries
while(true) {
try {
return execute(request)
} catch (e: RequestException) {
if (leftRetries > 0) {
leftRetries--
delay(retriesDelay)
} else {
return null
}
}
}
}

View File

@ -0,0 +1,99 @@
package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions
import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor
import com.github.insanusmokrassar.TelegramBotAPI.requests.GetUpdates
import com.github.insanusmokrassar.TelegramBotAPI.types.UpdateIdentifier
import com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdate
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update
import com.github.insanusmokrassar.TelegramBotAPI.utils.mediaGroupId
import com.github.insanusmokrassar.TelegramBotAPI.utils.toMediaGroupUpdate
import kotlinx.coroutines.*
import java.util.concurrent.Executors
private val updatesPollerRequestExecutorCollectedException = IllegalStateException("RequestsExecutor was collected by GC. Can't continue getting updates by polling")
class UpdatesPoller(
requestsExecutor: RequestsExecutor,
private val requestsDelayMillis: Long = 1000,
private val scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()),
private val allowedUpdates: List<String>? = null,
private val block: UpdateReceiver<Any>
) {
private val executor = requestsExecutor.asReference()
private var lastHandledUpdate: UpdateIdentifier = 0L
private val mediaGroup: MutableList<MediaGroupUpdate> = mutableListOf()
private var pollerJob: Job? = null
private suspend fun sendToBlock(data: Any) {
block(data)
lastHandledUpdate = when (data) {
is Update -> data.updateId
is List<*> -> (data.last() as? Update) ?.updateId ?: throw IllegalStateException(
"Found non-updates oriented list"
)
else -> throw IllegalStateException(
"Unknown type of data"
)
}
}
private suspend fun pushMediaGroupUpdate(mediaGroupUpdate: MediaGroupUpdate? = null) {
val inputMediaGroupId = mediaGroupUpdate ?.data ?.mediaGroupId
if (mediaGroup.isNotEmpty() && inputMediaGroupId ?.equals(mediaGroup.mediaGroupId) != true) {
sendToBlock(listOf(*mediaGroup.toTypedArray()))
mediaGroup.clear()
}
mediaGroupUpdate ?.let {
mediaGroup.add(it)
}
}
private suspend fun getUpdates(): List<Update> {
return executor.get() ?.execute(
GetUpdates(
lastHandledUpdate + 1, // incremented because offset counted from 1 when updates id from 0
allowed_updates = allowedUpdates
)
) ?.map {
it.asUpdate
} ?: throw updatesPollerRequestExecutorCollectedException
}
private suspend fun handleUpdates(updates: List<Update>) {
updates.forEach { update ->
val mediaGroupUpdate = (update as? BaseMessageUpdate) ?.toMediaGroupUpdate()
mediaGroupUpdate ?.let { _ ->
pushMediaGroupUpdate(mediaGroupUpdate)
} ?: let {
pushMediaGroupUpdate()
sendToBlock(update)
}
}
pushMediaGroupUpdate()
}
fun start(): Job {
return pollerJob ?: scope.launch {
while (isActive) {
delay(requestsDelayMillis)
try {
handleUpdates(getUpdates())
} catch (e: Exception) {
if (e == updatesPollerRequestExecutorCollectedException) {
throw IllegalArgumentException(e.message)
}
e.printStackTrace()
}
}
}.also {
pollerJob = it
}
}
suspend fun stop() {
pollerJob ?.cancelAndJoin()
}
}

View File

@ -0,0 +1,89 @@
package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions
import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor
import com.github.insanusmokrassar.TelegramBotAPI.types.update.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate
import kotlinx.coroutines.*
import java.util.concurrent.Executors
typealias UpdateReceiver<T> = suspend (T) -> Unit
fun RequestsExecutor.startGettingOfUpdates(
requestsDelayMillis: Long = 1000,
scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()),
allowedUpdates: List<String>? = null,
block: UpdateReceiver<Any>
): Job {
return UpdatesPoller(this, requestsDelayMillis, scope, allowedUpdates, block).start()
}
fun RequestsExecutor.startGettingOfUpdates(
messageCallback: UpdateReceiver<MessageUpdate>? = null,
messageMediaGroupCallback: UpdateReceiver<List<MediaGroupUpdate>>? = null,
editedMessageCallback: UpdateReceiver<EditMessageUpdate>? = null,
editedMessageMediaGroupCallback: UpdateReceiver<List<MediaGroupUpdate>>? = null,
channelPostCallback: UpdateReceiver<ChannelPostUpdate>? = null,
channelPostMediaGroupCallback: UpdateReceiver<List<MediaGroupUpdate>>? = null,
editedChannelPostCallback: UpdateReceiver<EditChannelPostUpdate>? = null,
editedChannelPostMediaGroupCallback: UpdateReceiver<List<MediaGroupUpdate>>? = null,
chosenInlineResultCallback: UpdateReceiver<ChosenInlineResultUpdate>? = null,
inlineQueryCallback: UpdateReceiver<InlineQueryUpdate>? = null,
callbackQueryCallback: UpdateReceiver<CallbackQueryUpdate>? = null,
shippingQueryCallback: UpdateReceiver<ShippingQueryUpdate>? = null,
preCheckoutQueryCallback: UpdateReceiver<PreCheckoutQueryUpdate>? = null,
requestsDelayMillis: Long = 1000,
scope: CoroutineScope = GlobalScope
): Job {
val filter = UpdatesFilter(
messageCallback,
messageMediaGroupCallback,
editedMessageCallback,
editedMessageMediaGroupCallback,
channelPostCallback,
channelPostMediaGroupCallback,
editedChannelPostCallback,
editedChannelPostMediaGroupCallback,
chosenInlineResultCallback,
inlineQueryCallback,
callbackQueryCallback,
shippingQueryCallback,
preCheckoutQueryCallback
)
return startGettingOfUpdates(
requestsDelayMillis,
scope,
filter.allowedUpdates,
filter.asUpdateReceiver
)
}
fun RequestsExecutor.startGettingOfUpdates(
messageCallback: UpdateReceiver<MessageUpdate>? = null,
mediaGroupCallback: UpdateReceiver<List<BaseMessageUpdate>>? = null,
editedMessageCallback: UpdateReceiver<EditMessageUpdate>? = null,
channelPostCallback: UpdateReceiver<ChannelPostUpdate>? = null,
editedChannelPostCallback: UpdateReceiver<EditChannelPostUpdate>? = null,
chosenInlineResultCallback: UpdateReceiver<ChosenInlineResultUpdate>? = null,
inlineQueryCallback: UpdateReceiver<InlineQueryUpdate>? = null,
callbackQueryCallback: UpdateReceiver<CallbackQueryUpdate>? = null,
shippingQueryCallback: UpdateReceiver<ShippingQueryUpdate>? = null,
preCheckoutQueryCallback: UpdateReceiver<PreCheckoutQueryUpdate>? = null,
requestsDelayMillis: Long = 1000,
scope: CoroutineScope = GlobalScope
): Job = startGettingOfUpdates(
messageCallback = messageCallback,
messageMediaGroupCallback = mediaGroupCallback,
editedMessageCallback = editedMessageCallback,
editedMessageMediaGroupCallback = mediaGroupCallback,
channelPostCallback = channelPostCallback,
channelPostMediaGroupCallback = mediaGroupCallback,
editedChannelPostCallback = editedChannelPostCallback,
editedChannelPostMediaGroupCallback = mediaGroupCallback,
chosenInlineResultCallback = chosenInlineResultCallback,
inlineQueryCallback = inlineQueryCallback,
callbackQueryCallback = callbackQueryCallback,
shippingQueryCallback = shippingQueryCallback,
preCheckoutQueryCallback = preCheckoutQueryCallback,
requestsDelayMillis = requestsDelayMillis,
scope = scope
)