diff --git a/CHANGELOG.md b/CHANGELOG.md index 670fce14ce..a7b616d0e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,8 +53,10 @@ * `TelegramBotAPI`: * Currently `UpdateDeserializationStrategy` is publicly available + * All `setWebhook` extensions was marked as deprecated and replaced into `TelegramBotAPI-extensions-api` * `TelegramBotAPI-extensions-api`: * New extensions `setWebhook` and `includeWebhookInRoute` was added + * New extension `CoroutineScope#updateHandlerWithMediaGroupsAdaptation` was added * `TelegramBotAPI-extensions-utils`: * Extension `asTelegramUpdate` was added diff --git a/TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/JsonUtils.kt b/TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/JsonUtils.kt new file mode 100644 index 0000000000..a88439d920 --- /dev/null +++ b/TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/JsonUtils.kt @@ -0,0 +1,11 @@ +package com.github.insanusmokrassar.TelegramBotAPI.extensions.api + +import kotlinx.serialization.json.Json + +@Suppress("EXPERIMENTAL_API_USAGE") +internal val nonstrictJsonFormat = Json { + isLenient = true + ignoreUnknownKeys = true + serializeSpecialFloatingPointValues = true + useArrayPolymorphism = true +} diff --git a/TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/utils/UpdatesHandling.kt b/TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/utils/UpdatesHandling.kt new file mode 100644 index 0000000000..37b3bf1ec0 --- /dev/null +++ b/TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/utils/UpdatesHandling.kt @@ -0,0 +1,47 @@ +package com.github.insanusmokrassar.TelegramBotAPI.extensions.api.utils + +import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.InternalUtils.convertWithMediaGroupUpdates +import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage +import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.* +import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdateReceiver +import com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.accumulateByKey +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch + +/** + * Create [UpdateReceiver] object which will correctly accumulate updates and send into output updates which INCLUDE + * [com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdates.MediaGroupUpdate]s. + * + * @see UpdateReceiver + */ +fun CoroutineScope.updateHandlerWithMediaGroupsAdaptation( + output: UpdateReceiver +): UpdateReceiver { + val updatesChannel = Channel(Channel.UNLIMITED) + val mediaGroupChannel = Channel>(Channel.UNLIMITED) + val mediaGroupAccumulatedChannel = mediaGroupChannel.accumulateByKey( + 1000L, + scope = this + ) + + launch { + launch { + for (update in updatesChannel) { + when (val data = update.data) { + is MediaGroupMessage -> mediaGroupChannel.send("${data.mediaGroupId}${update::class.simpleName}" to update as BaseMessageUpdate) + else -> output(update) + } + } + } + launch { + for ((_, mediaGroup) in mediaGroupAccumulatedChannel) { + mediaGroup.convertWithMediaGroupUpdates().forEach { + output(it) + } + } + } + } + + return { updatesChannel.send(it) } +} diff --git a/TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/webhook/SetWebhook.kt b/TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/webhook/SetWebhookInfo.kt similarity index 100% rename from TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/webhook/SetWebhook.kt rename to TelegramBotAPI-extensions-api/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/webhook/SetWebhookInfo.kt diff --git a/TelegramBotAPI-extensions-api/src/jvmMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/SetWebhook.kt b/TelegramBotAPI-extensions-api/src/jvmMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/SetWebhook.kt new file mode 100644 index 0000000000..62ee809846 --- /dev/null +++ b/TelegramBotAPI-extensions-api/src/jvmMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/extensions/api/SetWebhook.kt @@ -0,0 +1,245 @@ +package com.github.insanusmokrassar.TelegramBotAPI.extensions.api + +import com.github.insanusmokrassar.TelegramBotAPI.bot.RequestsExecutor +import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.InternalUtils.convertWithMediaGroupUpdates +import com.github.insanusmokrassar.TelegramBotAPI.extensions.api.utils.updateHandlerWithMediaGroupsAdaptation +import com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.InputFile +import com.github.insanusmokrassar.TelegramBotAPI.requests.webhook.SetWebhook +import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage +import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.* +import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdateReceiver +import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.UpdatesFilter +import com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.webhook.WebhookPrivateKeyConfig +import com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.accumulateByKey +import com.github.insanusmokrassar.TelegramBotAPI.utils.extensions.executeAsync +import com.github.insanusmokrassar.TelegramBotAPI.utils.handleSafely +import io.ktor.application.call +import io.ktor.request.receiveText +import io.ktor.response.respond +import io.ktor.routing.* +import io.ktor.server.engine.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import java.util.concurrent.Executors + +/** + * @param [scope] Will be used for mapping of media groups + * @param [exceptionsHandler] Pass this parameter to set custom exception handler for getting updates + * @param [block] Some receiver block like [com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter] + * + * @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter + * @see UpdatesFilter + * @see UpdatesFilter.asUpdateReceiver + */ +fun Route.includeWebhookInRoute( + scope: CoroutineScope, + exceptionsHandler: (suspend (Exception) -> Unit)? = null, + block: UpdateReceiver +) { + val transformer = scope.updateHandlerWithMediaGroupsAdaptation(block) + post { + handleSafely( + exceptionsHandler ?: {} + ) { + val asJson = + nonstrictJsonFormat.parseJson(call.receiveText()) + val update = nonstrictJsonFormat.fromJson( + UpdateDeserializationStrategy, + asJson + ) + transformer(update) + } + call.respond("Ok") + } +} + +/** + * Setting up ktor server, set webhook info via [SetWebhook] request. + * + * @param port port which will be listen by bot + * @param listenRoute address to listen by bot + * @param scope Scope which will be used for + * + * @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter + * @see UpdatesFilter + * @see UpdatesFilter.asUpdateReceiver + */ +fun setWebhook( + port: Int, + engineFactory: ApplicationEngineFactory<*, *>, + listenHost: String = "0.0.0.0", + listenRoute: String = "/", + privateKeyConfig: WebhookPrivateKeyConfig? = null, + scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), + exceptionsHandler: (suspend (Exception) -> Unit)? = null, + block: UpdateReceiver +): ApplicationEngine { + lateinit var engine: ApplicationEngine + val env = applicationEngineEnvironment { + + module { + routing { + route(listenRoute) { + includeWebhookInRoute(scope, exceptionsHandler, block) + } + } + } + privateKeyConfig ?.let { + sslConnector( + privateKeyConfig.keyStore, + privateKeyConfig.aliasName, + privateKeyConfig::keyStorePassword, + privateKeyConfig::aliasPassword + ) { + host = listenHost + this.port = port + } + } ?: connector { + host = listenHost + this.port = port + } + + } + engine = embeddedServer(engineFactory, env) + engine.start(false) + + return engine +} + +/** + * Setting up ktor server, set webhook info via [SetWebhook] request. + * + * @param url URL of webhook WITHOUT including of [port] + * @param port port which will be listen by bot + * @param listenRoute address to listen by bot + * @param certificate [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.MultipartFile] or [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.FileId] + * which will be used by telegram to send encrypted messages + * @param scope Scope which will be used for + * + * @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter + * @see UpdatesFilter + * @see UpdatesFilter.asUpdateReceiver + */ +suspend fun RequestsExecutor.setWebhook( + url: String, + port: Int, + engineFactory: ApplicationEngineFactory<*, *>, + listenHost: String = "0.0.0.0", + listenRoute: String = "/", + certificate: InputFile? = null, + privateKeyConfig: WebhookPrivateKeyConfig? = null, + scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), + allowedUpdates: List? = null, + maxAllowedConnections: Int? = null, + exceptionsHandler: (suspend (Exception) -> Unit)? = null, + block: UpdateReceiver +): Job { + val executeDeferred = certificate ?.let { + executeAsync( + SetWebhook( + url, + certificate, + maxAllowedConnections, + allowedUpdates + ) + ) + } ?: executeAsync( + SetWebhook( + url, + maxAllowedConnections, + allowedUpdates + ) + ) + + + return try { + executeDeferred.await() + val engine = setWebhook(port, engineFactory, listenHost, listenRoute, privateKeyConfig, scope, exceptionsHandler, block) + scope.launch { + engine.environment.parentCoroutineContext[Job] ?.join() + engine.stop(1000, 5000) + } + } catch (e: Exception) { + throw e + } +} + +suspend fun RequestsExecutor.setWebhook( + url: String, + port: Int, + engineFactory: ApplicationEngineFactory<*, *>, + listenHost: String = "0.0.0.0", + listenRoute: String = "/", + certificate: InputFile? = null, + privateKeyConfig: WebhookPrivateKeyConfig? = null, + scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), + allowedUpdates: List? = null, + maxAllowedConnections: Int? = null, + block: UpdateReceiver +) = setWebhook( + url, + port, + engineFactory, + listenHost, + listenRoute, + certificate, + privateKeyConfig, + scope, + allowedUpdates, + maxAllowedConnections, + null, + block +) + +@Suppress("unused") +suspend fun RequestsExecutor.setWebhook( + url: String, + port: Int, + engineFactory: ApplicationEngineFactory<*, *>, + certificate: InputFile? = null, + privateKeyConfig: WebhookPrivateKeyConfig? = null, + scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), + allowedUpdates: List? = null, + maxAllowedConnections: Int? = null, + block: UpdateReceiver +) = setWebhook( + url, + port, + engineFactory, + certificate ?.let { "0.0.0.0" } ?: "localhost", + "/", + certificate, + privateKeyConfig, + scope, + allowedUpdates, + maxAllowedConnections, + block +) + +@Suppress("unused") +suspend fun RequestsExecutor.setWebhook( + url: String, + port: Int, + filter: UpdatesFilter, + engineFactory: ApplicationEngineFactory<*, *>, + certificate: InputFile? = null, + privateKeyConfig: WebhookPrivateKeyConfig? = null, + scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), + maxAllowedConnections: Int? = null, + listenHost: String = certificate ?.let { "0.0.0.0" } ?: "localhost", + listenRoute: String = "/" +): Job = setWebhook( + url, + port, + engineFactory, + listenHost, + listenRoute, + certificate, + privateKeyConfig, + scope, + filter.allowedUpdates, + maxAllowedConnections, + filter.asUpdateReceiver +) + + diff --git a/TelegramBotAPI/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/types/update/MediaGroupUpdates/MediaGroupUpdate.kt b/TelegramBotAPI/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/types/update/MediaGroupUpdates/MediaGroupUpdate.kt index 5724d2d9b6..6a9936cfcf 100644 --- a/TelegramBotAPI/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/types/update/MediaGroupUpdates/MediaGroupUpdate.kt +++ b/TelegramBotAPI/src/commonMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/types/update/MediaGroupUpdates/MediaGroupUpdate.kt @@ -3,6 +3,13 @@ package com.github.insanusmokrassar.TelegramBotAPI.types.update.MediaGroupUpdate import com.github.insanusmokrassar.TelegramBotAPI.types.message.abstracts.MediaGroupMessage import com.github.insanusmokrassar.TelegramBotAPI.types.update.abstracts.* +/** + * By default there is no instances of objects which could be deserialized from raw updates. If you want to get objects + * with this type, you should use something like [com.github.insanusmokrassar.TelegramBotAPI.extensions.api.SetWebhookKt.includeWebhookInRoute] + * + * @see com.github.insanusmokrassar.TelegramBotAPI.extensions.api.SetWebhookKt.includeWebhookInRoute + * @see com.github.insanusmokrassar.TelegramBotAPI.extensions.api.updates.UpdatesPollingKt.startGettingOfUpdates + */ interface MediaGroupUpdate : Update interface SentMediaGroupUpdate: MediaGroupUpdate { diff --git a/TelegramBotAPI/src/jvmMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Webhooks.kt b/TelegramBotAPI/src/jvmMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Webhooks.kt index 5ae230d16e..7afc0f7e69 100644 --- a/TelegramBotAPI/src/jvmMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Webhooks.kt +++ b/TelegramBotAPI/src/jvmMain/kotlin/com/github/insanusmokrassar/TelegramBotAPI/utils/extensions/Webhooks.kt @@ -17,114 +17,10 @@ import io.ktor.server.engine.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit /** - * @param [scope] Will be used for mapping of media groups - * @param [exceptionsHandler] Pass this parameter to set custom exception handler for getting updates - * @param [block] Some receiver block like [com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter] - * - * @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter - * @see UpdatesFilter - * @see UpdatesFilter.asUpdateReceiver - */ -fun Route.includeWebhookInRoute( - scope: CoroutineScope, - exceptionsHandler: (suspend (Exception) -> Unit)? = null, - block: UpdateReceiver -): Job { - val updatesChannel = Channel(Channel.UNLIMITED) - val mediaGroupChannel = Channel>(Channel.UNLIMITED) - val mediaGroupAccumulatedChannel = mediaGroupChannel.accumulateByKey( - 1000L, - scope = scope - ) - post { - handleSafely( - exceptionsHandler ?: {} - ) { - val asJson = nonstrictJsonFormat.parseJson(call.receiveText()) - val update = nonstrictJsonFormat.fromJson( - UpdateDeserializationStrategy, - asJson - ) - updatesChannel.send(update) - } - call.respond("Ok") - } - return scope.launch { - launch { - for (update in updatesChannel) { - when (val data = update.data) { - is MediaGroupMessage -> mediaGroupChannel.send("${data.mediaGroupId}${update::class.simpleName}" to update as BaseMessageUpdate) - else -> block(update) - } - } - } - launch { - for ((_, mediaGroup) in mediaGroupAccumulatedChannel) { - mediaGroup.convertWithMediaGroupUpdates().forEach { - block(it) - } - } - } - } -} - -/** - * Setting up ktor server, set webhook info via [SetWebhook] request. - * - * @param port port which will be listen by bot - * @param listenRoute address to listen by bot - * @param scope Scope which will be used for - * - * @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter - * @see UpdatesFilter - * @see UpdatesFilter.asUpdateReceiver - */ -fun setWebhook( - port: Int, - engineFactory: ApplicationEngineFactory<*, *>, - listenHost: String = "0.0.0.0", - listenRoute: String = "/", - privateKeyConfig: WebhookPrivateKeyConfig? = null, - scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), - exceptionsHandler: (suspend (Exception) -> Unit)? = null, - block: UpdateReceiver -): ApplicationEngine { - lateinit var engine: ApplicationEngine - val env = applicationEngineEnvironment { - - module { - routing { - route(listenRoute) { - includeWebhookInRoute(scope, exceptionsHandler, block) - } - } - } - privateKeyConfig ?.let { - sslConnector( - privateKeyConfig.keyStore, - privateKeyConfig.aliasName, - privateKeyConfig::keyStorePassword, - privateKeyConfig::aliasPassword - ) { - host = listenHost - this.port = port - } - } ?: connector { - host = listenHost - this.port = port - } - - } - engine = embeddedServer(engineFactory, env) - engine.start(false) - - return engine -} - -/** - * Setting up ktor server, set webhook info via [SetWebhook] request. + * Reverse proxy webhook. * * @param url URL of webhook WITHOUT including of [port] * @param port port which will be listen by bot @@ -132,11 +28,8 @@ fun setWebhook( * @param certificate [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.MultipartFile] or [com.github.insanusmokrassar.TelegramBotAPI.requests.abstracts.FileId] * which will be used by telegram to send encrypted messages * @param scope Scope which will be used for - * - * @see com.github.insanusmokrassar.TelegramBotAPI.updateshandlers.FlowsUpdatesFilter - * @see UpdatesFilter - * @see UpdatesFilter.asUpdateReceiver */ +@Deprecated("Replaced into project TelegramBotAPI-extensions-api") suspend fun RequestsExecutor.setWebhook( url: String, port: Int, @@ -167,20 +60,84 @@ suspend fun RequestsExecutor.setWebhook( allowedUpdates ) ) + val updatesChannel = Channel(Channel.UNLIMITED) + val mediaGroupChannel = Channel>(Channel.UNLIMITED) + val mediaGroupAccumulatedChannel = mediaGroupChannel.accumulateByKey( + 1000L, + scope = scope + ) + val env = applicationEngineEnvironment { - - return try { - executeDeferred.await() - val engine = setWebhook(port, engineFactory, listenHost, listenRoute, privateKeyConfig, scope, exceptionsHandler, block) - scope.launch { - engine.environment.parentCoroutineContext[Job] ?.join() - engine.stop(1000, 5000) + module { + routing { + post(listenRoute) { + handleSafely( + { + exceptionsHandler ?.invoke(it) + } + ) { + val asJson = nonstrictJsonFormat.parseJson(call.receiveText()) + val update = nonstrictJsonFormat.fromJson( + UpdateDeserializationStrategy, + asJson + ) + updatesChannel.send(update) + } + call.respond("Ok") + } + } } + privateKeyConfig ?.let { + sslConnector( + privateKeyConfig.keyStore, + privateKeyConfig.aliasName, + privateKeyConfig::keyStorePassword, + privateKeyConfig::aliasPassword + ) { + host = listenHost + this.port = port + } + } ?: connector { + host = listenHost + this.port = port + } + + } + val engine = embeddedServer(engineFactory, env) + + try { + executeDeferred.await() } catch (e: Exception) { + env.stop() throw e } + + return scope.launch { + launch { + for (update in updatesChannel) { + val data = update.data + when (data) { + is MediaGroupMessage -> mediaGroupChannel.send("${data.mediaGroupId}${update::class.simpleName}" to update as BaseMessageUpdate) + else -> block(update) + } + } + } + launch { + for ((_, mediaGroup) in mediaGroupAccumulatedChannel) { + mediaGroup.convertWithMediaGroupUpdates().forEach { + block(it) + } + } + } + engine.start(false) + }.also { + it.invokeOnCompletion { + engine.stop(1000L, 0L, TimeUnit.MILLISECONDS) + } + } } +@Deprecated("Replaced into project TelegramBotAPI-extensions-api") suspend fun RequestsExecutor.setWebhook( url: String, port: Int, @@ -208,6 +165,7 @@ suspend fun RequestsExecutor.setWebhook( block ) +@Deprecated("Replaced into project TelegramBotAPI-extensions-api") suspend fun RequestsExecutor.setWebhook( url: String, port: Int, @@ -232,6 +190,7 @@ suspend fun RequestsExecutor.setWebhook( block ) +@Deprecated("Replaced into project TelegramBotAPI-extensions-api") suspend fun RequestsExecutor.setWebhook( url: String, port: Int,