1
0
mirror of https://github.com/InsanusMokrassar/TelegramBotAPI.git synced 2024-11-22 08:13:47 +00:00

replace webhooks work into api subproject

This commit is contained in:
InsanusMokrassar 2020-05-13 21:21:07 +06:00
parent e776c5182f
commit 05e8c9c90d
7 changed files with 388 additions and 117 deletions

View File

@ -53,8 +53,10 @@
* `TelegramBotAPI`: * `TelegramBotAPI`:
* Currently `UpdateDeserializationStrategy` is publicly available * Currently `UpdateDeserializationStrategy` is publicly available
* All `setWebhook` extensions was marked as deprecated and replaced into `TelegramBotAPI-extensions-api`
* `TelegramBotAPI-extensions-api`: * `TelegramBotAPI-extensions-api`:
* New extensions `setWebhook` and `includeWebhookInRoute` was added * New extensions `setWebhook` and `includeWebhookInRoute` was added
* New extension `CoroutineScope#updateHandlerWithMediaGroupsAdaptation` was added
* `TelegramBotAPI-extensions-utils`: * `TelegramBotAPI-extensions-utils`:
* Extension `asTelegramUpdate` was added * Extension `asTelegramUpdate` was added

View File

@ -0,0 +1,11 @@
package com.github.insanusmokrassar.TelegramBotAPI.extensions.api
import kotlinx.serialization.json.Json
@Suppress("EXPERIMENTAL_API_USAGE")
internal val nonstrictJsonFormat = Json {
isLenient = true
ignoreUnknownKeys = true
serializeSpecialFloatingPointValues = true
useArrayPolymorphism = true
}

View File

@ -0,0 +1,47 @@
package com.github.insanusmokrassar.TelegramBotAPI.extensions.api.utils
import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.InternalUtils.convertWithMediaGroupUpdates
import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.*
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdateReceiver
import com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.accumulateByKey
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
/**
* Create [UpdateReceiver] object which will correctly accumulate updates and send into output updates which INCLUDE
* [com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdates.MediaGroupUpdate]s.
*
* @see UpdateReceiver
*/
fun CoroutineScope.updateHandlerWithMediaGroupsAdaptation(
output: UpdateReceiver<Update>
): UpdateReceiver<Update> {
val updatesChannel = Channel<Update>(Channel.UNLIMITED)
val mediaGroupChannel = Channel<Pair<String, BaseMessageUpdate>>(Channel.UNLIMITED)
val mediaGroupAccumulatedChannel = mediaGroupChannel.accumulateByKey(
1000L,
scope = this
)
launch {
launch {
for (update in updatesChannel) {
when (val data = update.data) {
is MediaGroupMessage -> mediaGroupChannel.send("${data.mediaGroupId}${update::class.simpleName}" to update as BaseMessageUpdate)
else -> output(update)
}
}
}
launch {
for ((_, mediaGroup) in mediaGroupAccumulatedChannel) {
mediaGroup.convertWithMediaGroupUpdates().forEach {
output(it)
}
}
}
}
return { updatesChannel.send(it) }
}

View File

@ -0,0 +1,245 @@
package com.github.insanusmokrassar.TelegramBotAPI.extensions.api
import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor
import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.InternalUtils.convertWithMediaGroupUpdates
import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.utils.updateHandlerWithMediaGroupsAdaptation
import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.InputFile
import com.github.insanusmokrassar.TelegramBotAPI.requests.webhook.SetWebhook
import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.*
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdateReceiver
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdatesFilter
import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.webhook.WebhookPrivateKeyConfig
import com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.accumulateByKey
import com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.executeAsync
import com.github.insanusmokrassar.TelegramBotAPI.utils.handleSafely
import io.ktor.application.call
import io.ktor.request.receiveText
import io.ktor.response.respond
import io.ktor.routing.*
import io.ktor.server.engine.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.Executors
/**
* @param [scope] Will be used for mapping of media groups
* @param [exceptionsHandler] Pass this parameter to set custom exception handler for getting updates
* @param [block] Some receiver block like [com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter]
*
* @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter
* @see UpdatesFilter
* @see UpdatesFilter.asUpdateReceiver
*/
fun Route.includeWebhookInRoute(
scope: CoroutineScope,
exceptionsHandler: (suspend (Exception) -> Unit)? = null,
block: UpdateReceiver<Update>
) {
val transformer = scope.updateHandlerWithMediaGroupsAdaptation(block)
post {
handleSafely(
exceptionsHandler ?: {}
) {
val asJson =
nonstrictJsonFormat.parseJson(call.receiveText())
val update = nonstrictJsonFormat.fromJson(
UpdateDeserializationStrategy,
asJson
)
transformer(update)
}
call.respond("Ok")
}
}
/**
* Setting up ktor server, set webhook info via [SetWebhook] request.
*
* @param port port which will be listen by bot
* @param listenRoute address to listen by bot
* @param scope Scope which will be used for
*
* @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter
* @see UpdatesFilter
* @see UpdatesFilter.asUpdateReceiver
*/
fun setWebhook(
port: Int,
engineFactory: ApplicationEngineFactory<*, *>,
listenHost: String = "0.0.0.0",
listenRoute: String = "/",
privateKeyConfig: WebhookPrivateKeyConfig? = null,
scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()),
exceptionsHandler: (suspend (Exception) -> Unit)? = null,
block: UpdateReceiver<Update>
): ApplicationEngine {
lateinit var engine: ApplicationEngine
val env = applicationEngineEnvironment {
module {
routing {
route(listenRoute) {
includeWebhookInRoute(scope, exceptionsHandler, block)
}
}
}
privateKeyConfig ?.let {
sslConnector(
privateKeyConfig.keyStore,
privateKeyConfig.aliasName,
privateKeyConfig::keyStorePassword,
privateKeyConfig::aliasPassword
) {
host = listenHost
this.port = port
}
} ?: connector {
host = listenHost
this.port = port
}
}
engine = embeddedServer(engineFactory, env)
engine.start(false)
return engine
}
/**
* Setting up ktor server, set webhook info via [SetWebhook] request.
*
* @param url URL of webhook WITHOUT including of [port]
* @param port port which will be listen by bot
* @param listenRoute address to listen by bot
* @param certificate [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.MultipartFile] or [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.FileId]
* which will be used by telegram to send encrypted messages
* @param scope Scope which will be used for
*
* @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter
* @see UpdatesFilter
* @see UpdatesFilter.asUpdateReceiver
*/
suspend fun RequestsExecutor.setWebhook(
url: String,
port: Int,
engineFactory: ApplicationEngineFactory<*, *>,
listenHost: String = "0.0.0.0",
listenRoute: String = "/",
certificate: InputFile? = null,
privateKeyConfig: WebhookPrivateKeyConfig? = null,
scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()),
allowedUpdates: List<String>? = null,
maxAllowedConnections: Int? = null,
exceptionsHandler: (suspend (Exception) -> Unit)? = null,
block: UpdateReceiver<Update>
): Job {
val executeDeferred = certificate ?.let {
executeAsync(
SetWebhook(
url,
certificate,
maxAllowedConnections,
allowedUpdates
)
)
} ?: executeAsync(
SetWebhook(
url,
maxAllowedConnections,
allowedUpdates
)
)
return try {
executeDeferred.await()
val engine = setWebhook(port, engineFactory, listenHost, listenRoute, privateKeyConfig, scope, exceptionsHandler, block)
scope.launch {
engine.environment.parentCoroutineContext[Job] ?.join()
engine.stop(1000, 5000)
}
} catch (e: Exception) {
throw e
}
}
suspend fun RequestsExecutor.setWebhook(
url: String,
port: Int,
engineFactory: ApplicationEngineFactory<*, *>,
listenHost: String = "0.0.0.0",
listenRoute: String = "/",
certificate: InputFile? = null,
privateKeyConfig: WebhookPrivateKeyConfig? = null,
scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()),
allowedUpdates: List<String>? = null,
maxAllowedConnections: Int? = null,
block: UpdateReceiver<Update>
) = setWebhook(
url,
port,
engineFactory,
listenHost,
listenRoute,
certificate,
privateKeyConfig,
scope,
allowedUpdates,
maxAllowedConnections,
null,
block
)
@Suppress("unused")
suspend fun RequestsExecutor.setWebhook(
url: String,
port: Int,
engineFactory: ApplicationEngineFactory<*, *>,
certificate: InputFile? = null,
privateKeyConfig: WebhookPrivateKeyConfig? = null,
scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()),
allowedUpdates: List<String>? = null,
maxAllowedConnections: Int? = null,
block: UpdateReceiver<Update>
) = setWebhook(
url,
port,
engineFactory,
certificate ?.let { "0.0.0.0" } ?: "localhost",
"/",
certificate,
privateKeyConfig,
scope,
allowedUpdates,
maxAllowedConnections,
block
)
@Suppress("unused")
suspend fun RequestsExecutor.setWebhook(
url: String,
port: Int,
filter: UpdatesFilter,
engineFactory: ApplicationEngineFactory<*, *>,
certificate: InputFile? = null,
privateKeyConfig: WebhookPrivateKeyConfig? = null,
scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()),
maxAllowedConnections: Int? = null,
listenHost: String = certificate ?.let { "0.0.0.0" } ?: "localhost",
listenRoute: String = "/"
): Job = setWebhook(
url,
port,
engineFactory,
listenHost,
listenRoute,
certificate,
privateKeyConfig,
scope,
filter.allowedUpdates,
maxAllowedConnections,
filter.asUpdateReceiver
)

View File

@ -3,6 +3,13 @@ package com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdate
import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage
import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.* import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.*
/**
* By default there is no instances of objects which could be deserialized from raw updates. If you want to get objects
* with this type, you should use something like [com.github.insanusmokrassar.TelegramBotAPI.extensions.api.SetWebhookKt.includeWebhookInRoute]
*
* @see com.github.insanusmokrassar.TelegramBotAPI.extensions.api.SetWebhookKt.includeWebhookInRoute
* @see com.github.insanusmokrassar.TelegramBotAPI.extensions.api.updates.UpdatesPollingKt.startGettingOfUpdates
*/
interface MediaGroupUpdate : Update interface MediaGroupUpdate : Update
interface SentMediaGroupUpdate: MediaGroupUpdate { interface SentMediaGroupUpdate: MediaGroupUpdate {

View File

@ -17,114 +17,10 @@ import io.ktor.server.engine.*
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
/** /**
* @param [scope] Will be used for mapping of media groups * Reverse proxy webhook.
* @param [exceptionsHandler] Pass this parameter to set custom exception handler for getting updates
* @param [block] Some receiver block like [com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter]
*
* @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter
* @see UpdatesFilter
* @see UpdatesFilter.asUpdateReceiver
*/
fun Route.includeWebhookInRoute(
scope: CoroutineScope,
exceptionsHandler: (suspend (Exception) -> Unit)? = null,
block: UpdateReceiver<Update>
): Job {
val updatesChannel = Channel<Update>(Channel.UNLIMITED)
val mediaGroupChannel = Channel<Pair<String, BaseMessageUpdate>>(Channel.UNLIMITED)
val mediaGroupAccumulatedChannel = mediaGroupChannel.accumulateByKey(
1000L,
scope = scope
)
post {
handleSafely(
exceptionsHandler ?: {}
) {
val asJson = nonstrictJsonFormat.parseJson(call.receiveText())
val update = nonstrictJsonFormat.fromJson(
UpdateDeserializationStrategy,
asJson
)
updatesChannel.send(update)
}
call.respond("Ok")
}
return scope.launch {
launch {
for (update in updatesChannel) {
when (val data = update.data) {
is MediaGroupMessage -> mediaGroupChannel.send("${data.mediaGroupId}${update::class.simpleName}" to update as BaseMessageUpdate)
else -> block(update)
}
}
}
launch {
for ((_, mediaGroup) in mediaGroupAccumulatedChannel) {
mediaGroup.convertWithMediaGroupUpdates().forEach {
block(it)
}
}
}
}
}
/**
* Setting up ktor server, set webhook info via [SetWebhook] request.
*
* @param port port which will be listen by bot
* @param listenRoute address to listen by bot
* @param scope Scope which will be used for
*
* @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter
* @see UpdatesFilter
* @see UpdatesFilter.asUpdateReceiver
*/
fun setWebhook(
port: Int,
engineFactory: ApplicationEngineFactory<*, *>,
listenHost: String = "0.0.0.0",
listenRoute: String = "/",
privateKeyConfig: WebhookPrivateKeyConfig? = null,
scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()),
exceptionsHandler: (suspend (Exception) -> Unit)? = null,
block: UpdateReceiver<Update>
): ApplicationEngine {
lateinit var engine: ApplicationEngine
val env = applicationEngineEnvironment {
module {
routing {
route(listenRoute) {
includeWebhookInRoute(scope, exceptionsHandler, block)
}
}
}
privateKeyConfig ?.let {
sslConnector(
privateKeyConfig.keyStore,
privateKeyConfig.aliasName,
privateKeyConfig::keyStorePassword,
privateKeyConfig::aliasPassword
) {
host = listenHost
this.port = port
}
} ?: connector {
host = listenHost
this.port = port
}
}
engine = embeddedServer(engineFactory, env)
engine.start(false)
return engine
}
/**
* Setting up ktor server, set webhook info via [SetWebhook] request.
* *
* @param url URL of webhook WITHOUT including of [port] * @param url URL of webhook WITHOUT including of [port]
* @param port port which will be listen by bot * @param port port which will be listen by bot
@ -132,11 +28,8 @@ fun setWebhook(
* @param certificate [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.MultipartFile] or [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.FileId] * @param certificate [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.MultipartFile] or [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.FileId]
* which will be used by telegram to send encrypted messages * which will be used by telegram to send encrypted messages
* @param scope Scope which will be used for * @param scope Scope which will be used for
*
* @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter
* @see UpdatesFilter
* @see UpdatesFilter.asUpdateReceiver
*/ */
@Deprecated("Replaced into project TelegramBotAPI-extensions-api")
suspend fun RequestsExecutor.setWebhook( suspend fun RequestsExecutor.setWebhook(
url: String, url: String,
port: Int, port: Int,
@ -167,20 +60,84 @@ suspend fun RequestsExecutor.setWebhook(
allowedUpdates allowedUpdates
) )
) )
val updatesChannel = Channel<Update>(Channel.UNLIMITED)
val mediaGroupChannel = Channel<Pair<String, BaseMessageUpdate>>(Channel.UNLIMITED)
val mediaGroupAccumulatedChannel = mediaGroupChannel.accumulateByKey(
1000L,
scope = scope
)
val env = applicationEngineEnvironment {
module {
return try { routing {
executeDeferred.await() post(listenRoute) {
val engine = setWebhook(port, engineFactory, listenHost, listenRoute, privateKeyConfig, scope, exceptionsHandler, block) handleSafely(
scope.launch { {
engine.environment.parentCoroutineContext[Job] ?.join() exceptionsHandler ?.invoke(it)
engine.stop(1000, 5000) }
) {
val asJson = nonstrictJsonFormat.parseJson(call.receiveText())
val update = nonstrictJsonFormat.fromJson(
UpdateDeserializationStrategy,
asJson
)
updatesChannel.send(update)
}
call.respond("Ok")
}
}
} }
privateKeyConfig ?.let {
sslConnector(
privateKeyConfig.keyStore,
privateKeyConfig.aliasName,
privateKeyConfig::keyStorePassword,
privateKeyConfig::aliasPassword
) {
host = listenHost
this.port = port
}
} ?: connector {
host = listenHost
this.port = port
}
}
val engine = embeddedServer(engineFactory, env)
try {
executeDeferred.await()
} catch (e: Exception) { } catch (e: Exception) {
env.stop()
throw e throw e
} }
return scope.launch {
launch {
for (update in updatesChannel) {
val data = update.data
when (data) {
is MediaGroupMessage -> mediaGroupChannel.send("${data.mediaGroupId}${update::class.simpleName}" to update as BaseMessageUpdate)
else -> block(update)
}
}
}
launch {
for ((_, mediaGroup) in mediaGroupAccumulatedChannel) {
mediaGroup.convertWithMediaGroupUpdates().forEach {
block(it)
}
}
}
engine.start(false)
}.also {
it.invokeOnCompletion {
engine.stop(1000L, 0L, TimeUnit.MILLISECONDS)
}
}
} }
@Deprecated("Replaced into project TelegramBotAPI-extensions-api")
suspend fun RequestsExecutor.setWebhook( suspend fun RequestsExecutor.setWebhook(
url: String, url: String,
port: Int, port: Int,
@ -208,6 +165,7 @@ suspend fun RequestsExecutor.setWebhook(
block block
) )
@Deprecated("Replaced into project TelegramBotAPI-extensions-api")
suspend fun RequestsExecutor.setWebhook( suspend fun RequestsExecutor.setWebhook(
url: String, url: String,
port: Int, port: Int,
@ -232,6 +190,7 @@ suspend fun RequestsExecutor.setWebhook(
block block
) )
@Deprecated("Replaced into project TelegramBotAPI-extensions-api")
suspend fun RequestsExecutor.setWebhook( suspend fun RequestsExecutor.setWebhook(
url: String, url: String,
port: Int, port: Int,