From 1520271777f17dcb07844d41882b9c1823317c7f Mon Sep 17 00:00:00 2001 From: InsanusMokrassar Date: Tue, 26 Feb 2019 16:01:26 +0800 Subject: [PATCH] add main webhooks functionality for using with reverse proxy --- CHANGELOG.md | 5 +- build.gradle | 4 + .../utils/extensions/ReceiveChannel.kt | 101 ++++++++++++++ .../utils/extensions/RequestsExecutor.kt | 72 +++------- .../utils/extensions/UpdatesFilter.kt | 105 +++++++++++++++ .../utils/extensions/Webhooks.kt | 127 ++++++++++++++++++ 6 files changed, 359 insertions(+), 55 deletions(-) create mode 100644 src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/ReceiveChannel.kt create mode 100644 src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesFilter.kt create mode 100644 src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Webhooks.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 2623171d98..cdd5e88750 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,14 @@ # TelegramBotAPI changelog -## 0.12.0 +## 0.12.0 Webhooks * Added `DataRequest` interface which replace `Data` interface * `MultipartRequestImpl` now use `DataRequest` * All requests which implements `Data` now implement `DataRequest` * Added class `SetWebhook` and its factory +* Added class `UpdatesFilter` which can help to filter updates by categories +* Added function `accumulateByKey` which work as debounce for keys and send list of received values +* Added webhooks functions and workaround for `Reverse Proxy` mode ## 0.11.0 diff --git a/build.gradle b/build.gradle index 1937b4dfc1..0a44792233 100644 --- a/build.gradle +++ b/build.gradle @@ -34,9 +34,13 @@ dependencies { implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines_version" implementation "org.jetbrains.kotlinx:kotlinx-serialization-runtime:$kotlin_serialisation_runtime_version" implementation "joda-time:joda-time:$joda_time_version" + implementation "io.ktor:ktor-client-core:$ktor_version" implementation "io.ktor:ktor-client-okhttp:$ktor_version" + implementation "io.ktor:ktor-server-core:$ktor_version" + implementation "io.ktor:ktor-server-netty:$ktor_version" + // Use JUnit test framework testImplementation 'junit:junit:4.12' } diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/ReceiveChannel.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/ReceiveChannel.kt new file mode 100644 index 0000000000..0cbc0421a9 --- /dev/null +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/ReceiveChannel.kt @@ -0,0 +1,101 @@ +package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel + +private sealed class DebounceAction { + abstract val value: T +} + +private data class AddValue(override val value: T) : DebounceAction() +private data class RemoveJob(override val value: T, val job: Job) : DebounceAction() + +fun ReceiveChannel.debounceByValue( + delayMillis: Long, + scope: CoroutineScope = CoroutineScope(Dispatchers.Default), + resultBroadcastChannelCapacity: Int = 32 +): ReceiveChannel { + val outChannel = Channel(resultBroadcastChannelCapacity) + val values = HashMap() + + val channel = Channel>(Channel.UNLIMITED) + scope.launch { + for (action in channel) { + when (action) { + is AddValue -> { + val msg = action.value + values[msg] ?.cancel() + lateinit var job: Job + job = launch { + delay(delayMillis) + + outChannel.send(msg) + channel.send(RemoveJob(msg, job)) + } + values[msg] = job + } + is RemoveJob -> if (values[action.value] == action.job) { + values.remove(action.value) + } + } + + } + } + + scope.launch { + for (msg in this@debounceByValue) { + channel.send(AddValue(msg)) + } + } + + return outChannel +} + +typealias AccumulatedValues = Pair> + +fun ReceiveChannel>.accumulateByKey( + delayMillis: Long, + scope: CoroutineScope = CoroutineScope(Dispatchers.Default), + resultBroadcastChannelCapacity: Int = 32 +): ReceiveChannel> { + val outChannel = Channel>(resultBroadcastChannelCapacity) + val values = HashMap>() + val jobs = HashMap() + + val channel = Channel>>(Channel.UNLIMITED) + scope.launch { + for (action in channel) { + val (key, value) = action.value + when (action) { + is AddValue -> { + jobs[key] ?.cancel() + (values[key] ?: mutableListOf().also { values[key] = it }).add(value) + lateinit var job: Job + job = launch { + delay(delayMillis) + + values[key] ?.let { + outChannel.send(key to it) + channel.send(RemoveJob(key to value, job)) + } + } + jobs[key] = job + } + is RemoveJob -> if (values[key] == action.job) { + values.remove(key) + jobs.remove(key) + } + } + + } + } + + scope.launch { + for (msg in this@accumulateByKey) { + channel.send(AddValue(msg)) + } + } + + return outChannel +} diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/RequestsExecutor.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/RequestsExecutor.kt index d44cdb5871..8b1b61026e 100644 --- a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/RequestsExecutor.kt +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/RequestsExecutor.kt @@ -114,63 +114,27 @@ fun RequestsExecutor.startGettingOfUpdates( requestsDelayMillis: Long = 1000, scope: CoroutineScope = GlobalScope ): Job { + val filter = UpdatesFilter( + messageCallback, + messageMediaGroupCallback, + editedMessageCallback, + editedMessageMediaGroupCallback, + channelPostCallback, + channelPostMediaGroupCallback, + editedChannelPostCallback, + editedChannelPostMediaGroupCallback, + chosenInlineResultCallback, + inlineQueryCallback, + callbackQueryCallback, + shippingQueryCallback, + preCheckoutQueryCallback + ) return startGettingOfUpdates( requestsDelayMillis, scope, - listOfNotNull( - (messageCallback ?: messageMediaGroupCallback) ?.let { UPDATE_MESSAGE }, - (editedMessageCallback ?: editedMessageMediaGroupCallback) ?.let { UPDATE_EDITED_MESSAGE }, - (channelPostCallback ?: channelPostMediaGroupCallback) ?.let { UPDATE_CHANNEL_POST }, - (editedChannelPostCallback ?: editedChannelPostMediaGroupCallback) ?.let { UPDATE_EDITED_CHANNEL_POST }, - chosenInlineResultCallback ?.let { UPDATE_CHOSEN_INLINE_RESULT }, - inlineQueryCallback ?.let { UPDATE_INLINE_QUERY }, - callbackQueryCallback ?.let { UPDATE_CALLBACK_QUERY }, - shippingQueryCallback ?.let { UPDATE_SHIPPING_QUERY }, - preCheckoutQueryCallback ?.let { UPDATE_PRE_CHECKOUT_QUERY } - ) - ) { update -> - when (update) { - is MessageUpdate -> messageCallback ?.invoke(update) - is List<*> -> when (update.firstOrNull()) { - is MessageUpdate -> update.mapNotNull { it as? MessageUpdate }.let { mappedList -> - messageMediaGroupCallback ?.also { receiver -> - receiver(mappedList) - } ?: messageCallback ?.also { receiver -> - mappedList.forEach { receiver(it) } - } - } - is EditMessageUpdate -> update.mapNotNull { it as? EditMessageUpdate }.let { mappedList -> - editedMessageMediaGroupCallback ?.also { receiver -> - receiver(mappedList) - } ?: editedMessageCallback ?.also { receiver -> - mappedList.forEach { receiver(it) } - } - } - is ChannelPostUpdate -> update.mapNotNull { it as? ChannelPostUpdate }.let { mappedList -> - channelPostMediaGroupCallback ?.also { receiver -> - receiver(mappedList) - } ?: channelPostCallback ?.also { receiver -> - mappedList.forEach { receiver(it) } - } - } - is EditChannelPostUpdate -> update.mapNotNull { it as? EditChannelPostUpdate }.let { mappedList -> - editedChannelPostMediaGroupCallback ?.also { receiver -> - receiver(mappedList) - } ?: editedChannelPostCallback ?.also { receiver -> - mappedList.forEach { receiver(it) } - } - } - } - is EditMessageUpdate -> editedMessageCallback ?.invoke(update) - is ChannelPostUpdate -> channelPostCallback ?.invoke(update) - is EditChannelPostUpdate -> editedChannelPostCallback ?.invoke(update) - is ChosenInlineResultUpdate -> chosenInlineResultCallback ?.invoke(update) - is InlineQueryUpdate -> inlineQueryCallback ?.invoke(update) - is CallbackQueryUpdate -> callbackQueryCallback ?.invoke(update) - is ShippingQueryUpdate -> shippingQueryCallback ?.invoke(update) - is PreCheckoutQueryUpdate -> preCheckoutQueryCallback ?.invoke(update) - } - } + filter.allowedUpdates, + filter.asUpdateReceiver + ) } fun RequestsExecutor.startGettingOfUpdates( diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesFilter.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesFilter.kt new file mode 100644 index 0000000000..7c7ee726c2 --- /dev/null +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/UpdatesFilter.kt @@ -0,0 +1,105 @@ +package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions + +import com.github.insanusmokrassar.TelegramBotAPI.requests.* +import com.github.insanusmokrassar.TelegramBotAPI.types.update.* +import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate + +data class UpdatesFilter( + private val messageCallback: UpdateReceiver? = null, + private val messageMediaGroupCallback: UpdateReceiver>? = null, + private val editedMessageCallback: UpdateReceiver? = null, + private val editedMessageMediaGroupCallback: UpdateReceiver>? = null, + private val channelPostCallback: UpdateReceiver? = null, + private val channelPostMediaGroupCallback: UpdateReceiver>? = null, + private val editedChannelPostCallback: UpdateReceiver? = null, + private val editedChannelPostMediaGroupCallback: UpdateReceiver>? = null, + private val chosenInlineResultCallback: UpdateReceiver? = null, + private val inlineQueryCallback: UpdateReceiver? = null, + private val callbackQueryCallback: UpdateReceiver? = null, + private val shippingQueryCallback: UpdateReceiver? = null, + private val preCheckoutQueryCallback: UpdateReceiver? = null +) { + val asUpdateReceiver: UpdateReceiver = this::invoke + val allowedUpdates = listOfNotNull( + (messageCallback ?: messageMediaGroupCallback) ?.let { UPDATE_MESSAGE }, + (editedMessageCallback ?: editedMessageMediaGroupCallback) ?.let { UPDATE_EDITED_MESSAGE }, + (channelPostCallback ?: channelPostMediaGroupCallback) ?.let { UPDATE_CHANNEL_POST }, + (editedChannelPostCallback ?: editedChannelPostMediaGroupCallback) ?.let { UPDATE_EDITED_CHANNEL_POST }, + chosenInlineResultCallback ?.let { UPDATE_CHOSEN_INLINE_RESULT }, + inlineQueryCallback ?.let { UPDATE_INLINE_QUERY }, + callbackQueryCallback ?.let { UPDATE_CALLBACK_QUERY }, + shippingQueryCallback ?.let { UPDATE_SHIPPING_QUERY }, + preCheckoutQueryCallback ?.let { UPDATE_PRE_CHECKOUT_QUERY } + ) + + suspend fun invoke(update: Any) { + when (update) { + is MessageUpdate -> messageCallback ?.invoke(update) + is List<*> -> when (update.firstOrNull()) { + is MessageUpdate -> update.mapNotNull { it as? MessageUpdate }.let { mappedList -> + messageMediaGroupCallback ?.also { receiver -> + receiver(mappedList) + } ?: messageCallback ?.also { receiver -> + mappedList.forEach { receiver(it) } + } + } + is EditMessageUpdate -> update.mapNotNull { it as? EditMessageUpdate }.let { mappedList -> + editedMessageMediaGroupCallback ?.also { receiver -> + receiver(mappedList) + } ?: editedMessageCallback ?.also { receiver -> + mappedList.forEach { receiver(it) } + } + } + is ChannelPostUpdate -> update.mapNotNull { it as? ChannelPostUpdate }.let { mappedList -> + channelPostMediaGroupCallback ?.also { receiver -> + receiver(mappedList) + } ?: channelPostCallback ?.also { receiver -> + mappedList.forEach { receiver(it) } + } + } + is EditChannelPostUpdate -> update.mapNotNull { it as? EditChannelPostUpdate }.let { mappedList -> + editedChannelPostMediaGroupCallback ?.also { receiver -> + receiver(mappedList) + } ?: editedChannelPostCallback ?.also { receiver -> + mappedList.forEach { receiver(it) } + } + } + } + is EditMessageUpdate -> editedMessageCallback ?.invoke(update) + is ChannelPostUpdate -> channelPostCallback ?.invoke(update) + is EditChannelPostUpdate -> editedChannelPostCallback ?.invoke(update) + is ChosenInlineResultUpdate -> chosenInlineResultCallback ?.invoke(update) + is InlineQueryUpdate -> inlineQueryCallback ?.invoke(update) + is CallbackQueryUpdate -> callbackQueryCallback ?.invoke(update) + is ShippingQueryUpdate -> shippingQueryCallback ?.invoke(update) + is PreCheckoutQueryUpdate -> preCheckoutQueryCallback ?.invoke(update) + } + } +} + +fun createSimpleUpdateFilter( + messageCallback: UpdateReceiver? = null, + mediaGroupCallback: UpdateReceiver>? = null, + editedMessageCallback: UpdateReceiver? = null, + channelPostCallback: UpdateReceiver? = null, + editedChannelPostCallback: UpdateReceiver? = null, + chosenInlineResultCallback: UpdateReceiver? = null, + inlineQueryCallback: UpdateReceiver? = null, + callbackQueryCallback: UpdateReceiver? = null, + shippingQueryCallback: UpdateReceiver? = null, + preCheckoutQueryCallback: UpdateReceiver? = null +): UpdatesFilter = UpdatesFilter( + messageCallback = messageCallback, + messageMediaGroupCallback = mediaGroupCallback, + editedMessageCallback = editedMessageCallback, + editedMessageMediaGroupCallback = mediaGroupCallback, + channelPostCallback = channelPostCallback, + channelPostMediaGroupCallback = mediaGroupCallback, + editedChannelPostCallback = editedChannelPostCallback, + editedChannelPostMediaGroupCallback = mediaGroupCallback, + chosenInlineResultCallback = chosenInlineResultCallback, + inlineQueryCallback = inlineQueryCallback, + callbackQueryCallback = callbackQueryCallback, + shippingQueryCallback = shippingQueryCallback, + preCheckoutQueryCallback = preCheckoutQueryCallback +) diff --git a/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Webhooks.kt b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Webhooks.kt new file mode 100644 index 0000000000..53778666ca --- /dev/null +++ b/src/main/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Webhooks.kt @@ -0,0 +1,127 @@ +package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions + +import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor +import com.github.insanusmokrassar.TelegramBotAPI.requests.* +import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.InputFile +import com.github.insanusmokrassar.TelegramBotAPI.types.MediaGroupIdentifier +import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage +import com.github.insanusmokrassar.TelegramBotAPI.types.update.* +import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update +import io.ktor.application.Application +import io.ktor.application.call +import io.ktor.request.receiveText +import io.ktor.response.respond +import io.ktor.routing.* +import io.ktor.server.engine.* +import io.ktor.server.netty.Netty +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.serialization.json.Json +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +/** + * Reverse proxy webhook. + * + * @param url URL of webhook WITHOUT including of [port] + * @param port port which will be listen by bot + * @param certificate [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.MultipartFile] or [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.FileId] + * which will be used by telegram to send encrypted messages + * @param scope Scope which will be used for + */ +suspend fun RequestsExecutor.setWebhook( + url: String, + port: Int, + certificate: InputFile, + scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), + allowedUpdates: List? = null, + maxAllowedConnections: Int? = null, + engineFactory: ApplicationEngineFactory<*, *> = Netty, + block: UpdateReceiver +): Job { + val executeDeferred = executeAsync( + SetWebhook( + url, + certificate, + maxAllowedConnections, + allowedUpdates + ) + ) + val updatesChannel = Channel(Channel.UNLIMITED) + val mediaGroupChannel = Channel>(Channel.UNLIMITED) + val mediaGroupAccumulatedChannel = mediaGroupChannel.accumulateByKey( + 1000L, + scope = scope + ) + val env = applicationEngineEnvironment { + module { + fun Application.main() { + routing { + post("/") { + val deserialized = call.receiveText() + val update = Json.nonstrict.parse( + RawUpdate.serializer(), + deserialized + ) + updatesChannel.send(update.asUpdate) + call.respond("Ok") + } + } + } + main() + } + connector { + host = "0.0.0.0" + this.port = port + } + } + val engine = embeddedServer(engineFactory, env) + + try { + executeDeferred.await() + } catch (e: Exception) { + env.stop() + throw e + } + + return scope.launch { + launch { + for (update in updatesChannel) { + val data = update.data + when (data) { + is MediaGroupMessage -> mediaGroupChannel.send(data.mediaGroupId to update) + else -> block(update) + } + } + } + launch { + for (mediaGroupUpdate in mediaGroupAccumulatedChannel) { + block(mediaGroupUpdate.second) + } + } + engine.start(false) + }.also { + it.invokeOnCompletion { + engine.stop(1000L, 0L, TimeUnit.MILLISECONDS) + } + } +} + +suspend fun RequestsExecutor.setWebhook( + url: String, + port: Int, + certificate: InputFile, + scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), + maxAllowedConnections: Int? = null, + engineFactory: ApplicationEngineFactory<*, *> = Netty, + filter: UpdatesFilter +): Job = setWebhook( + url, + port, + certificate, + scope, + filter.allowedUpdates, + maxAllowedConnections, + engineFactory, + filter.asUpdateReceiver +)