diff --git a/CHANGELOG.md b/CHANGELOG.md index 31aa7c5f8a..670fce14ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,8 @@ * `TelegramBotAPI`: * Currently `UpdateDeserializationStrategy` is publicly available +* `TelegramBotAPI-extensions-api`: + * New extensions `setWebhook` and `includeWebhookInRoute` was added * `TelegramBotAPI-extensions-utils`: * Extension `asTelegramUpdate` was added diff --git a/TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/webhook/SetWebhook.kt b/TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/webhook/SetWebhook.kt index 7d2e45d493..73fb7c418e 100644 --- a/TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/webhook/SetWebhook.kt +++ b/TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/webhook/SetWebhook.kt @@ -5,6 +5,9 @@ import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.FileId import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.MultipartFile import com.github.insanusmokrassar.TelegramBotAPI.requests.webhook.SetWebhook +/** + * Use this method to send information about webhook (like [url] and [certificate]) + */ suspend fun RequestsExecutor.setWebhookInfo( url: String, certificate: FileId, @@ -16,6 +19,9 @@ suspend fun RequestsExecutor.setWebhookInfo( ) ) +/** + * Use this method to send information about webhook (like [url] and [certificate]) + */ suspend fun RequestsExecutor.setWebhookInfo( url: String, certificate: MultipartFile, diff --git a/TelegramBotAPI/src/jvmMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Webhooks.kt b/TelegramBotAPI/src/jvmMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Webhooks.kt index 64aaf9caf8..5ae230d16e 100644 --- a/TelegramBotAPI/src/jvmMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Webhooks.kt +++ b/TelegramBotAPI/src/jvmMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Webhooks.kt @@ -12,16 +12,119 @@ import com.github.insanusmokrassar.TelegramBotAPI.utils.* import io.ktor.application.call import io.ktor.request.receiveText import io.ktor.response.respond -import io.ktor.routing.post -import io.ktor.routing.routing +import io.ktor.routing.* import io.ktor.server.engine.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit /** - * Reverse proxy webhook. + * @param [scope] Will be used for mapping of media groups + * @param [exceptionsHandler] Pass this parameter to set custom exception handler for getting updates + * @param [block] Some receiver block like [com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter] + * + * @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter + * @see UpdatesFilter + * @see UpdatesFilter.asUpdateReceiver + */ +fun Route.includeWebhookInRoute( + scope: CoroutineScope, + exceptionsHandler: (suspend (Exception) -> Unit)? = null, + block: UpdateReceiver +): Job { + val updatesChannel = Channel(Channel.UNLIMITED) + val mediaGroupChannel = Channel>(Channel.UNLIMITED) + val mediaGroupAccumulatedChannel = mediaGroupChannel.accumulateByKey( + 1000L, + scope = scope + ) + post { + handleSafely( + exceptionsHandler ?: {} + ) { + val asJson = nonstrictJsonFormat.parseJson(call.receiveText()) + val update = nonstrictJsonFormat.fromJson( + UpdateDeserializationStrategy, + asJson + ) + updatesChannel.send(update) + } + call.respond("Ok") + } + return scope.launch { + launch { + for (update in updatesChannel) { + when (val data = update.data) { + is MediaGroupMessage -> mediaGroupChannel.send("${data.mediaGroupId}${update::class.simpleName}" to update as BaseMessageUpdate) + else -> block(update) + } + } + } + launch { + for ((_, mediaGroup) in mediaGroupAccumulatedChannel) { + mediaGroup.convertWithMediaGroupUpdates().forEach { + block(it) + } + } + } + } +} + +/** + * Setting up ktor server, set webhook info via [SetWebhook] request. + * + * @param port port which will be listen by bot + * @param listenRoute address to listen by bot + * @param scope Scope which will be used for + * + * @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter + * @see UpdatesFilter + * @see UpdatesFilter.asUpdateReceiver + */ +fun setWebhook( + port: Int, + engineFactory: ApplicationEngineFactory<*, *>, + listenHost: String = "0.0.0.0", + listenRoute: String = "/", + privateKeyConfig: WebhookPrivateKeyConfig? = null, + scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), + exceptionsHandler: (suspend (Exception) -> Unit)? = null, + block: UpdateReceiver +): ApplicationEngine { + lateinit var engine: ApplicationEngine + val env = applicationEngineEnvironment { + + module { + routing { + route(listenRoute) { + includeWebhookInRoute(scope, exceptionsHandler, block) + } + } + } + privateKeyConfig ?.let { + sslConnector( + privateKeyConfig.keyStore, + privateKeyConfig.aliasName, + privateKeyConfig::keyStorePassword, + privateKeyConfig::aliasPassword + ) { + host = listenHost + this.port = port + } + } ?: connector { + host = listenHost + this.port = port + } + + } + engine = embeddedServer(engineFactory, env) + engine.start(false) + + return engine +} + +/** + * Setting up ktor server, set webhook info via [SetWebhook] request. * * @param url URL of webhook WITHOUT including of [port] * @param port port which will be listen by bot @@ -29,6 +132,10 @@ import java.util.concurrent.TimeUnit * @param certificate [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.MultipartFile] or [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.FileId] * which will be used by telegram to send encrypted messages * @param scope Scope which will be used for + * + * @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter + * @see UpdatesFilter + * @see UpdatesFilter.asUpdateReceiver */ suspend fun RequestsExecutor.setWebhook( url: String, @@ -60,81 +167,18 @@ suspend fun RequestsExecutor.setWebhook( allowedUpdates ) ) - val updatesChannel = Channel(Channel.UNLIMITED) - val mediaGroupChannel = Channel>(Channel.UNLIMITED) - val mediaGroupAccumulatedChannel = mediaGroupChannel.accumulateByKey( - 1000L, - scope = scope - ) - val env = applicationEngineEnvironment { - module { - routing { - post(listenRoute) { - handleSafely( - { - exceptionsHandler ?.invoke(it) - } - ) { - val asJson = nonstrictJsonFormat.parseJson(call.receiveText()) - val update = nonstrictJsonFormat.fromJson( - UpdateDeserializationStrategy, - asJson - ) - updatesChannel.send(update) - } - call.respond("Ok") - } - } - } - privateKeyConfig ?.let { - sslConnector( - privateKeyConfig.keyStore, - privateKeyConfig.aliasName, - privateKeyConfig::keyStorePassword, - privateKeyConfig::aliasPassword - ) { - host = listenHost - this.port = port - } - } ?: connector { - host = listenHost - this.port = port - } - } - val engine = embeddedServer(engineFactory, env) - - try { + return try { executeDeferred.await() + val engine = setWebhook(port, engineFactory, listenHost, listenRoute, privateKeyConfig, scope, exceptionsHandler, block) + scope.launch { + engine.environment.parentCoroutineContext[Job] ?.join() + engine.stop(1000, 5000) + } } catch (e: Exception) { - env.stop() throw e } - - return scope.launch { - launch { - for (update in updatesChannel) { - val data = update.data - when (data) { - is MediaGroupMessage -> mediaGroupChannel.send("${data.mediaGroupId}${update::class.simpleName}" to update as BaseMessageUpdate) - else -> block(update) - } - } - } - launch { - for ((_, mediaGroup) in mediaGroupAccumulatedChannel) { - mediaGroup.convertWithMediaGroupUpdates().forEach { - block(it) - } - } - } - engine.start(false) - }.also { - it.invokeOnCompletion { - engine.stop(1000L, 0L, TimeUnit.MILLISECONDS) - } - } } suspend fun RequestsExecutor.setWebhook(