add main webhooks functionality for using with reverse proxy

This commit is contained in:
InsanusMokrassar 2019-02-26 16:01:26 +08:00
parent ff2b56b3a9
commit 1520271777
6 changed files with 359 additions and 55 deletions

View File

@ -1,11 +1,14 @@
# TelegramBotAPI changelog
## 0.12.0
## 0.12.0 Webhooks
* Added `DataRequest` interface which replace `Data` interface
* `MultipartRequestImpl` now use `DataRequest`
* All requests which implements `Data` now implement `DataRequest`
* Added class `SetWebhook` and its factory
* Added class `UpdatesFilter` which can help to filter updates by categories
* Added function `accumulateByKey` which work as debounce for keys and send list of received values
* Added webhooks functions and workaround for `Reverse Proxy` mode
## 0.11.0

View File

@ -34,9 +34,13 @@ dependencies {
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlin_coroutines_version"
implementation "org.jetbrains.kotlinx:kotlinx-serialization-runtime:$kotlin_serialisation_runtime_version"
implementation "joda-time:joda-time:$joda_time_version"
implementation "io.ktor:ktor-client-core:$ktor_version"
implementation "io.ktor:ktor-client-okhttp:$ktor_version"
implementation "io.ktor:ktor-server-core:$ktor_version"
implementation "io.ktor:ktor-server-netty:$ktor_version"
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}

View File

@ -0,0 +1,101 @@
package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
private sealed class DebounceAction<T> {
abstract val value: T
}
private data class AddValue<T>(override val value: T) : DebounceAction<T>()
private data class RemoveJob<T>(override val value: T, val job: Job) : DebounceAction<T>()
fun <T> ReceiveChannel<T>.debounceByValue(
delayMillis: Long,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
resultBroadcastChannelCapacity: Int = 32
): ReceiveChannel<T> {
val outChannel = Channel<T>(resultBroadcastChannelCapacity)
val values = HashMap<T, Job>()
val channel = Channel<DebounceAction<T>>(Channel.UNLIMITED)
scope.launch {
for (action in channel) {
when (action) {
is AddValue -> {
val msg = action.value
values[msg] ?.cancel()
lateinit var job: Job
job = launch {
delay(delayMillis)
outChannel.send(msg)
channel.send(RemoveJob(msg, job))
}
values[msg] = job
}
is RemoveJob -> if (values[action.value] == action.job) {
values.remove(action.value)
}
}
}
}
scope.launch {
for (msg in this@debounceByValue) {
channel.send(AddValue(msg))
}
}
return outChannel
}
typealias AccumulatedValues<K, V> = Pair<K, List<V>>
fun <K, V> ReceiveChannel<Pair<K, V>>.accumulateByKey(
delayMillis: Long,
scope: CoroutineScope = CoroutineScope(Dispatchers.Default),
resultBroadcastChannelCapacity: Int = 32
): ReceiveChannel<AccumulatedValues<K, V>> {
val outChannel = Channel<AccumulatedValues<K, V>>(resultBroadcastChannelCapacity)
val values = HashMap<K, MutableList<V>>()
val jobs = HashMap<K, Job>()
val channel = Channel<DebounceAction<Pair<K, V>>>(Channel.UNLIMITED)
scope.launch {
for (action in channel) {
val (key, value) = action.value
when (action) {
is AddValue -> {
jobs[key] ?.cancel()
(values[key] ?: mutableListOf<V>().also { values[key] = it }).add(value)
lateinit var job: Job
job = launch {
delay(delayMillis)
values[key] ?.let {
outChannel.send(key to it)
channel.send(RemoveJob(key to value, job))
}
}
jobs[key] = job
}
is RemoveJob -> if (values[key] == action.job) {
values.remove(key)
jobs.remove(key)
}
}
}
}
scope.launch {
for (msg in this@accumulateByKey) {
channel.send(AddValue(msg))
}
}
return outChannel
}

View File

@ -114,63 +114,27 @@ fun RequestsExecutor.startGettingOfUpdates(
requestsDelayMillis: Long = 1000,
scope: CoroutineScope = GlobalScope
): Job {
val filter = UpdatesFilter(
messageCallback,
messageMediaGroupCallback,
editedMessageCallback,
editedMessageMediaGroupCallback,
channelPostCallback,
channelPostMediaGroupCallback,
editedChannelPostCallback,
editedChannelPostMediaGroupCallback,
chosenInlineResultCallback,
inlineQueryCallback,
callbackQueryCallback,
shippingQueryCallback,
preCheckoutQueryCallback
)
return startGettingOfUpdates(
requestsDelayMillis,
scope,
listOfNotNull(
(messageCallback ?: messageMediaGroupCallback) ?.let { UPDATE_MESSAGE },
(editedMessageCallback ?: editedMessageMediaGroupCallback) ?.let { UPDATE_EDITED_MESSAGE },
(channelPostCallback ?: channelPostMediaGroupCallback) ?.let { UPDATE_CHANNEL_POST },
(editedChannelPostCallback ?: editedChannelPostMediaGroupCallback) ?.let { UPDATE_EDITED_CHANNEL_POST },
chosenInlineResultCallback ?.let { UPDATE_CHOSEN_INLINE_RESULT },
inlineQueryCallback ?.let { UPDATE_INLINE_QUERY },
callbackQueryCallback ?.let { UPDATE_CALLBACK_QUERY },
shippingQueryCallback ?.let { UPDATE_SHIPPING_QUERY },
preCheckoutQueryCallback ?.let { UPDATE_PRE_CHECKOUT_QUERY }
)
) { update ->
when (update) {
is MessageUpdate -> messageCallback ?.invoke(update)
is List<*> -> when (update.firstOrNull()) {
is MessageUpdate -> update.mapNotNull { it as? MessageUpdate }.let { mappedList ->
messageMediaGroupCallback ?.also { receiver ->
receiver(mappedList)
} ?: messageCallback ?.also { receiver ->
mappedList.forEach { receiver(it) }
}
}
is EditMessageUpdate -> update.mapNotNull { it as? EditMessageUpdate }.let { mappedList ->
editedMessageMediaGroupCallback ?.also { receiver ->
receiver(mappedList)
} ?: editedMessageCallback ?.also { receiver ->
mappedList.forEach { receiver(it) }
}
}
is ChannelPostUpdate -> update.mapNotNull { it as? ChannelPostUpdate }.let { mappedList ->
channelPostMediaGroupCallback ?.also { receiver ->
receiver(mappedList)
} ?: channelPostCallback ?.also { receiver ->
mappedList.forEach { receiver(it) }
}
}
is EditChannelPostUpdate -> update.mapNotNull { it as? EditChannelPostUpdate }.let { mappedList ->
editedChannelPostMediaGroupCallback ?.also { receiver ->
receiver(mappedList)
} ?: editedChannelPostCallback ?.also { receiver ->
mappedList.forEach { receiver(it) }
}
}
}
is EditMessageUpdate -> editedMessageCallback ?.invoke(update)
is ChannelPostUpdate -> channelPostCallback ?.invoke(update)
is EditChannelPostUpdate -> editedChannelPostCallback ?.invoke(update)
is ChosenInlineResultUpdate -> chosenInlineResultCallback ?.invoke(update)
is InlineQueryUpdate -> inlineQueryCallback ?.invoke(update)
is CallbackQueryUpdate -> callbackQueryCallback ?.invoke(update)
is ShippingQueryUpdate -> shippingQueryCallback ?.invoke(update)
is PreCheckoutQueryUpdate -> preCheckoutQueryCallback ?.invoke(update)
}
}
filter.allowedUpdates,
filter.asUpdateReceiver
)
}
fun RequestsExecutor.startGettingOfUpdates(

View File

@ -0,0 +1,105 @@
package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions
import com.github.insanusmokrassar.TelegramBotAPI.requests.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.BaseMessageUpdate
data class UpdatesFilter(
private val messageCallback: UpdateReceiver<MessageUpdate>? = null,
private val messageMediaGroupCallback: UpdateReceiver<List<MessageUpdate>>? = null,
private val editedMessageCallback: UpdateReceiver<EditMessageUpdate>? = null,
private val editedMessageMediaGroupCallback: UpdateReceiver<List<EditMessageUpdate>>? = null,
private val channelPostCallback: UpdateReceiver<ChannelPostUpdate>? = null,
private val channelPostMediaGroupCallback: UpdateReceiver<List<ChannelPostUpdate>>? = null,
private val editedChannelPostCallback: UpdateReceiver<EditChannelPostUpdate>? = null,
private val editedChannelPostMediaGroupCallback: UpdateReceiver<List<EditChannelPostUpdate>>? = null,
private val chosenInlineResultCallback: UpdateReceiver<ChosenInlineResultUpdate>? = null,
private val inlineQueryCallback: UpdateReceiver<InlineQueryUpdate>? = null,
private val callbackQueryCallback: UpdateReceiver<CallbackQueryUpdate>? = null,
private val shippingQueryCallback: UpdateReceiver<ShippingQueryUpdate>? = null,
private val preCheckoutQueryCallback: UpdateReceiver<PreCheckoutQueryUpdate>? = null
) {
val asUpdateReceiver: UpdateReceiver<Any> = this::invoke
val allowedUpdates = listOfNotNull(
(messageCallback ?: messageMediaGroupCallback) ?.let { UPDATE_MESSAGE },
(editedMessageCallback ?: editedMessageMediaGroupCallback) ?.let { UPDATE_EDITED_MESSAGE },
(channelPostCallback ?: channelPostMediaGroupCallback) ?.let { UPDATE_CHANNEL_POST },
(editedChannelPostCallback ?: editedChannelPostMediaGroupCallback) ?.let { UPDATE_EDITED_CHANNEL_POST },
chosenInlineResultCallback ?.let { UPDATE_CHOSEN_INLINE_RESULT },
inlineQueryCallback ?.let { UPDATE_INLINE_QUERY },
callbackQueryCallback ?.let { UPDATE_CALLBACK_QUERY },
shippingQueryCallback ?.let { UPDATE_SHIPPING_QUERY },
preCheckoutQueryCallback ?.let { UPDATE_PRE_CHECKOUT_QUERY }
)
suspend fun invoke(update: Any) {
when (update) {
is MessageUpdate -> messageCallback ?.invoke(update)
is List<*> -> when (update.firstOrNull()) {
is MessageUpdate -> update.mapNotNull { it as? MessageUpdate }.let { mappedList ->
messageMediaGroupCallback ?.also { receiver ->
receiver(mappedList)
} ?: messageCallback ?.also { receiver ->
mappedList.forEach { receiver(it) }
}
}
is EditMessageUpdate -> update.mapNotNull { it as? EditMessageUpdate }.let { mappedList ->
editedMessageMediaGroupCallback ?.also { receiver ->
receiver(mappedList)
} ?: editedMessageCallback ?.also { receiver ->
mappedList.forEach { receiver(it) }
}
}
is ChannelPostUpdate -> update.mapNotNull { it as? ChannelPostUpdate }.let { mappedList ->
channelPostMediaGroupCallback ?.also { receiver ->
receiver(mappedList)
} ?: channelPostCallback ?.also { receiver ->
mappedList.forEach { receiver(it) }
}
}
is EditChannelPostUpdate -> update.mapNotNull { it as? EditChannelPostUpdate }.let { mappedList ->
editedChannelPostMediaGroupCallback ?.also { receiver ->
receiver(mappedList)
} ?: editedChannelPostCallback ?.also { receiver ->
mappedList.forEach { receiver(it) }
}
}
}
is EditMessageUpdate -> editedMessageCallback ?.invoke(update)
is ChannelPostUpdate -> channelPostCallback ?.invoke(update)
is EditChannelPostUpdate -> editedChannelPostCallback ?.invoke(update)
is ChosenInlineResultUpdate -> chosenInlineResultCallback ?.invoke(update)
is InlineQueryUpdate -> inlineQueryCallback ?.invoke(update)
is CallbackQueryUpdate -> callbackQueryCallback ?.invoke(update)
is ShippingQueryUpdate -> shippingQueryCallback ?.invoke(update)
is PreCheckoutQueryUpdate -> preCheckoutQueryCallback ?.invoke(update)
}
}
}
fun createSimpleUpdateFilter(
messageCallback: UpdateReceiver<MessageUpdate>? = null,
mediaGroupCallback: UpdateReceiver<List<BaseMessageUpdate>>? = null,
editedMessageCallback: UpdateReceiver<EditMessageUpdate>? = null,
channelPostCallback: UpdateReceiver<ChannelPostUpdate>? = null,
editedChannelPostCallback: UpdateReceiver<EditChannelPostUpdate>? = null,
chosenInlineResultCallback: UpdateReceiver<ChosenInlineResultUpdate>? = null,
inlineQueryCallback: UpdateReceiver<InlineQueryUpdate>? = null,
callbackQueryCallback: UpdateReceiver<CallbackQueryUpdate>? = null,
shippingQueryCallback: UpdateReceiver<ShippingQueryUpdate>? = null,
preCheckoutQueryCallback: UpdateReceiver<PreCheckoutQueryUpdate>? = null
): UpdatesFilter = UpdatesFilter(
messageCallback = messageCallback,
messageMediaGroupCallback = mediaGroupCallback,
editedMessageCallback = editedMessageCallback,
editedMessageMediaGroupCallback = mediaGroupCallback,
channelPostCallback = channelPostCallback,
channelPostMediaGroupCallback = mediaGroupCallback,
editedChannelPostCallback = editedChannelPostCallback,
editedChannelPostMediaGroupCallback = mediaGroupCallback,
chosenInlineResultCallback = chosenInlineResultCallback,
inlineQueryCallback = inlineQueryCallback,
callbackQueryCallback = callbackQueryCallback,
shippingQueryCallback = shippingQueryCallback,
preCheckoutQueryCallback = preCheckoutQueryCallback
)

View File

@ -0,0 +1,127 @@
package com.github.insanusmokrassar.TelegramBotAPI.utils.extensions
import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor
import com.github.insanusmokrassar.TelegramBotAPI.requests.*
import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.InputFile
import com.github.insanusmokrassar.TelegramBotAPI.types.MediaGroupIdentifier
import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage
import com.github.insanusmokrassar.TelegramBotAPI.types.update.*
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.Update
import io.ktor.application.Application
import io.ktor.application.call
import io.ktor.request.receiveText
import io.ktor.response.respond
import io.ktor.routing.*
import io.ktor.server.engine.*
import io.ktor.server.netty.Netty
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.serialization.json.Json
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
/**
* Reverse proxy webhook.
*
* @param url URL of webhook WITHOUT including of [port]
* @param port port which will be listen by bot
* @param certificate [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.MultipartFile] or [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.FileId]
* which will be used by telegram to send encrypted messages
* @param scope Scope which will be used for
*/
suspend fun RequestsExecutor.setWebhook(
url: String,
port: Int,
certificate: InputFile,
scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()),
allowedUpdates: List<String>? = null,
maxAllowedConnections: Int? = null,
engineFactory: ApplicationEngineFactory<*, *> = Netty,
block: UpdateReceiver<Any>
): Job {
val executeDeferred = executeAsync(
SetWebhook(
url,
certificate,
maxAllowedConnections,
allowedUpdates
)
)
val updatesChannel = Channel<Update>(Channel.UNLIMITED)
val mediaGroupChannel = Channel<Pair<MediaGroupIdentifier, Update>>(Channel.UNLIMITED)
val mediaGroupAccumulatedChannel = mediaGroupChannel.accumulateByKey(
1000L,
scope = scope
)
val env = applicationEngineEnvironment {
module {
fun Application.main() {
routing {
post("/") {
val deserialized = call.receiveText()
val update = Json.nonstrict.parse(
RawUpdate.serializer(),
deserialized
)
updatesChannel.send(update.asUpdate)
call.respond("Ok")
}
}
}
main()
}
connector {
host = "0.0.0.0"
this.port = port
}
}
val engine = embeddedServer(engineFactory, env)
try {
executeDeferred.await()
} catch (e: Exception) {
env.stop()
throw e
}
return scope.launch {
launch {
for (update in updatesChannel) {
val data = update.data
when (data) {
is MediaGroupMessage -> mediaGroupChannel.send(data.mediaGroupId to update)
else -> block(update)
}
}
}
launch {
for (mediaGroupUpdate in mediaGroupAccumulatedChannel) {
block(mediaGroupUpdate.second)
}
}
engine.start(false)
}.also {
it.invokeOnCompletion {
engine.stop(1000L, 0L, TimeUnit.MILLISECONDS)
}
}
}
suspend fun RequestsExecutor.setWebhook(
url: String,
port: Int,
certificate: InputFile,
scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()),
maxAllowedConnections: Int? = null,
engineFactory: ApplicationEngineFactory<*, *> = Netty,
filter: UpdatesFilter
): Job = setWebhook(
url,
port,
certificate,
scope,
filter.allowedUpdates,
maxAllowedConnections,
engineFactory,
filter.asUpdateReceiver
)