1
0
mirror of https://github.com/InsanusMokrassar/TelegramBotAPI.git synced 2024-11-08 17:33:47 +00:00

migrate to ktor 3.0.0

This commit is contained in:
d1snin 2024-10-27 05:43:27 +03:00
parent dccd15a5e9
commit 3fe26b057a
6 changed files with 82 additions and 64 deletions

View File

@ -8,7 +8,7 @@ javax-activation = "1.1.1"
korlibs = "5.4.0" korlibs = "5.4.0"
uuid = "0.8.4" uuid = "0.8.4"
ktor = "2.3.12" ktor = "3.0.0"
ksp = "2.0.21-1.0.25" ksp = "2.0.21-1.0.25"
kotlin-poet = "1.18.1" kotlin-poet = "1.18.1"

View File

@ -8,9 +8,7 @@ import dev.inmo.tgbotapi.utils.ByteReadChannelAllocator
import dev.inmo.tgbotapi.utils.RiskFeature import dev.inmo.tgbotapi.utils.RiskFeature
import dev.inmo.tgbotapi.utils.TelegramAPIUrlsKeeper import dev.inmo.tgbotapi.utils.TelegramAPIUrlsKeeper
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import io.ktor.client.call.receive
import io.ktor.client.request.get import io.ktor.client.request.get
import io.ktor.client.statement.HttpStatement
import io.ktor.client.statement.bodyAsChannel import io.ktor.client.statement.bodyAsChannel
import io.ktor.utils.io.* import io.ktor.utils.io.*
import kotlinx.coroutines.* import kotlinx.coroutines.*

View File

@ -6,9 +6,9 @@ import dev.inmo.tgbotapi.requests.DownloadFile
import dev.inmo.tgbotapi.requests.abstracts.Request import dev.inmo.tgbotapi.requests.abstracts.Request
import dev.inmo.tgbotapi.utils.RiskFeature import dev.inmo.tgbotapi.utils.RiskFeature
import dev.inmo.tgbotapi.utils.TelegramAPIUrlsKeeper import dev.inmo.tgbotapi.utils.TelegramAPIUrlsKeeper
import io.ktor.client.HttpClient import io.ktor.client.*
import io.ktor.client.request.get import io.ktor.client.request.*
import io.ktor.client.statement.readBytes import io.ktor.client.statement.*
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
@RiskFeature @RiskFeature
@ -17,13 +17,13 @@ object DownloadFileRequestCallFactory : KtorCallFactory {
client: HttpClient, client: HttpClient,
urlsKeeper: TelegramAPIUrlsKeeper, urlsKeeper: TelegramAPIUrlsKeeper,
request: Request<T>, request: Request<T>,
jsonFormatter: Json jsonFormatter: Json,
): T? = (request as? DownloadFile) ?.let { ): T? = (request as? DownloadFile)?.let {
val fullUrl = urlsKeeper.createFileLinkUrl(it.filePath) val fullUrl = urlsKeeper.createFileLinkUrl(it.filePath)
safely { safely {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
client.get(fullUrl).readBytes() as T // always ByteArray client.get(fullUrl).readRawBytes() as T // always ByteArray
} }
} }
} }

View File

@ -38,7 +38,7 @@ internal expect inline fun platformClientCopy(client: HttpClient): HttpClient
* @param requestExecutorsCount Amount of [DefaultKtorRequestsExecutor] which will be created and used under the * @param requestExecutorsCount Amount of [DefaultKtorRequestsExecutor] which will be created and used under the
* hood * hood
*/ */
class MultipleClientKtorRequestsExecutor ( class MultipleClientKtorRequestsExecutor(
telegramAPIUrlsKeeper: TelegramAPIUrlsKeeper, telegramAPIUrlsKeeper: TelegramAPIUrlsKeeper,
callsFactories: List<KtorCallFactory>, callsFactories: List<KtorCallFactory>,
excludeDefaultFactories: Boolean, excludeDefaultFactories: Boolean,
@ -47,7 +47,7 @@ class MultipleClientKtorRequestsExecutor (
pipelineStepsHolder: TelegramBotPipelinesHandler, pipelineStepsHolder: TelegramBotPipelinesHandler,
requestExecutorsCount: Int, requestExecutorsCount: Int,
logger: KSLog, logger: KSLog,
clientFactory: () -> HttpClient clientFactory: () -> HttpClient,
) : BaseRequestsExecutor(telegramAPIUrlsKeeper) { ) : BaseRequestsExecutor(telegramAPIUrlsKeeper) {
private val requestExecutors = (0 until requestExecutorsCount).map { private val requestExecutors = (0 until requestExecutorsCount).map {
DefaultKtorRequestsExecutor( DefaultKtorRequestsExecutor(
@ -66,7 +66,7 @@ class MultipleClientKtorRequestsExecutor (
private val clientAllocationMutex = Mutex() private val clientAllocationMutex = Mutex()
private val takerFlow = freeClients.mapNotNull { private val takerFlow = freeClients.mapNotNull {
clientAllocationMutex.withLock { clientAllocationMutex.withLock {
freeClients.value.firstOrNull() ?.also { freeClients.value.firstOrNull()?.also {
freeClients.value -= it freeClients.value -= it
} ?: return@mapNotNull null } ?: return@mapNotNull null
} }
@ -81,7 +81,7 @@ class MultipleClientKtorRequestsExecutor (
jsonFormatter: Json, jsonFormatter: Json,
pipelineStepsHolder: TelegramBotPipelinesHandler, pipelineStepsHolder: TelegramBotPipelinesHandler,
logger: KSLog, logger: KSLog,
diff: Unit diff: Unit,
) : this( ) : this(
telegramAPIUrlsKeeper, telegramAPIUrlsKeeper,
callsFactories, callsFactories,
@ -89,7 +89,7 @@ class MultipleClientKtorRequestsExecutor (
requestsLimiter, requestsLimiter,
jsonFormatter, jsonFormatter,
pipelineStepsHolder, pipelineStepsHolder,
client.engineConfig.threadsCount, requestExecutorsCount = 4, // default threads count; configurable through dispatcher property
logger, logger,
{ platformClientCopy(client) } { platformClientCopy(client) }
) )

View File

@ -1,8 +1,6 @@
package dev.inmo.tgbotapi.utils package dev.inmo.tgbotapi.utils
import io.ktor.util.toByteArray import io.ktor.utils.io.*
import io.ktor.utils.io.ByteReadChannel import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.Input
actual suspend fun ByteReadChannel.asInput(): Input = ByteReadPacket(toByteArray()) actual suspend fun ByteReadChannel.asInput(): Input = ByteReadPacket(toByteArray())

View File

@ -1,19 +1,21 @@
package dev.inmo.tgbotapi.extensions.utils.updates.retrieving package dev.inmo.tgbotapi.extensions.utils.updates.retrieving
import dev.inmo.micro_utils.coroutines.* import dev.inmo.micro_utils.coroutines.ExceptionHandler
import dev.inmo.micro_utils.coroutines.runCatchingSafely
import dev.inmo.tgbotapi.bot.RequestsExecutor import dev.inmo.tgbotapi.bot.RequestsExecutor
import dev.inmo.tgbotapi.extensions.utils.nonstrictJsonFormat import dev.inmo.tgbotapi.extensions.utils.nonstrictJsonFormat
import dev.inmo.tgbotapi.extensions.utils.updates.flowsUpdatesFilter import dev.inmo.tgbotapi.extensions.utils.updates.flowsUpdatesFilter
import dev.inmo.tgbotapi.requests.webhook.SetWebhookRequest import dev.inmo.tgbotapi.requests.webhook.SetWebhookRequest
import dev.inmo.tgbotapi.types.update.abstracts.Update import dev.inmo.tgbotapi.types.update.abstracts.Update
import dev.inmo.tgbotapi.types.update.abstracts.UpdateDeserializationStrategy import dev.inmo.tgbotapi.types.update.abstracts.UpdateDeserializationStrategy
import dev.inmo.tgbotapi.updateshandlers.* import dev.inmo.tgbotapi.updateshandlers.FlowsUpdatesFilter
import dev.inmo.tgbotapi.updateshandlers.UpdateReceiver
import dev.inmo.tgbotapi.updateshandlers.UpdatesFilter
import dev.inmo.tgbotapi.updateshandlers.webhook.WebhookPrivateKeyConfig import dev.inmo.tgbotapi.updateshandlers.webhook.WebhookPrivateKeyConfig
import io.ktor.http.HttpStatusCode import io.ktor.http.*
import io.ktor.server.application.call
import io.ktor.server.engine.* import io.ktor.server.engine.*
import io.ktor.server.request.receiveText import io.ktor.server.request.*
import io.ktor.server.response.respond import io.ktor.server.response.*
import io.ktor.server.routing.* import io.ktor.server.routing.*
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.asCoroutineDispatcher
@ -38,7 +40,7 @@ fun Route.includeWebhookHandlingInRoute(
scope: CoroutineScope, scope: CoroutineScope,
exceptionsHandler: ExceptionHandler<Unit>? = null, exceptionsHandler: ExceptionHandler<Unit>? = null,
mediaGroupsDebounceTimeMillis: Long = 1000L, mediaGroupsDebounceTimeMillis: Long = 1000L,
block: UpdateReceiver<Update> block: UpdateReceiver<Update>,
) { ) {
val transformer = scope.updateHandlerWithMediaGroupsAdaptation(block, mediaGroupsDebounceTimeMillis) val transformer = scope.updateHandlerWithMediaGroupsAdaptation(block, mediaGroupsDebounceTimeMillis)
post { post {
@ -55,7 +57,7 @@ fun Route.includeWebhookHandlingInRoute(
call.respond(HttpStatusCode.InternalServerError) call.respond(HttpStatusCode.InternalServerError)
}.getOrThrow() }.getOrThrow()
} catch (e: Throwable) { } catch (e: Throwable) {
exceptionsHandler ?.invoke(e) exceptionsHandler?.invoke(e)
} }
} }
} }
@ -69,7 +71,7 @@ fun Route.includeWebhookHandlingInRouteWithFlows(
scope: CoroutineScope, scope: CoroutineScope,
exceptionsHandler: ExceptionHandler<Unit>? = null, exceptionsHandler: ExceptionHandler<Unit>? = null,
mediaGroupsDebounceTimeMillis: Long = 1000L, mediaGroupsDebounceTimeMillis: Long = 1000L,
block: FlowsUpdatesFilter.() -> Unit block: FlowsUpdatesFilter.() -> Unit,
) = includeWebhookHandlingInRoute( ) = includeWebhookHandlingInRoute(
scope, scope,
exceptionsHandler, exceptionsHandler,
@ -92,29 +94,26 @@ fun Route.includeWebhookHandlingInRouteWithFlows(
* @see UpdatesFilter * @see UpdatesFilter
* @see UpdatesFilter.asUpdateReceiver * @see UpdatesFilter.asUpdateReceiver
*/ */
fun startListenWebhooks( fun <TEngine : ApplicationEngine, TConfiguration : ApplicationEngine.Configuration> startListenWebhooks(
listenPort: Int, listenPort: Int,
engineFactory: ApplicationEngineFactory<*, *>, engineFactory: ApplicationEngineFactory<TEngine, TConfiguration>,
exceptionsHandler: ExceptionHandler<Unit>, exceptionsHandler: ExceptionHandler<Unit>,
listenHost: String = "0.0.0.0", listenHost: String = "0.0.0.0",
listenRoute: String? = null, listenRoute: String? = null,
privateKeyConfig: WebhookPrivateKeyConfig? = null, privateKeyConfig: WebhookPrivateKeyConfig? = null,
scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()),
mediaGroupsDebounceTimeMillis: Long = 1000L, mediaGroupsDebounceTimeMillis: Long = 1000L,
additionalApplicationEngineEnvironmentConfigurator: ApplicationEngineEnvironmentBuilder.() -> Unit = {}, additionalApplicationEnvironmentConfigurator: ApplicationEnvironmentBuilder.() -> Unit = {},
block: UpdateReceiver<Update> additionalEngineConfigurator: TConfiguration.() -> Unit = {},
): ApplicationEngine { block: UpdateReceiver<Update>,
val env = applicationEngineEnvironment { ): EmbeddedServer<TEngine, TConfiguration> =
embeddedServer(
module { factory = engineFactory,
routing { environment = applicationEnvironment {
listenRoute ?.also { additionalApplicationEnvironmentConfigurator()
createRouteFromPath(it).includeWebhookHandlingInRoute(scope, exceptionsHandler, mediaGroupsDebounceTimeMillis, block) },
} ?: includeWebhookHandlingInRoute(scope, exceptionsHandler, mediaGroupsDebounceTimeMillis, block) configure = {
} privateKeyConfig?.let {
}
privateKeyConfig ?.let {
sslConnector( sslConnector(
privateKeyConfig.keyStore, privateKeyConfig.keyStore,
privateKeyConfig.aliasName, privateKeyConfig.aliasName,
@ -129,13 +128,23 @@ fun startListenWebhooks(
port = listenPort port = listenPort
} }
additionalApplicationEngineEnvironmentConfigurator() additionalEngineConfigurator()
},
module = {
routing {
listenRoute?.also {
createRouteFromPath(it).includeWebhookHandlingInRoute(
scope,
exceptionsHandler,
mediaGroupsDebounceTimeMillis,
block
)
} ?: includeWebhookHandlingInRoute(scope, exceptionsHandler, mediaGroupsDebounceTimeMillis, block)
} }
}
return embeddedServer(engineFactory, env).also { ).also {
it.start(false) it.start(false)
} }
}
/** /**
* Setting up ktor server, set webhook info via [SetWebhookRequest] request. * Setting up ktor server, set webhook info via [SetWebhookRequest] request.
@ -152,9 +161,9 @@ fun startListenWebhooks(
* @see UpdatesFilter.asUpdateReceiver * @see UpdatesFilter.asUpdateReceiver
*/ */
@Suppress("unused") @Suppress("unused")
suspend fun RequestsExecutor.setWebhookInfoAndStartListenWebhooks( suspend fun <TEngine : ApplicationEngine, TConfiguration : ApplicationEngine.Configuration> RequestsExecutor.setWebhookInfoAndStartListenWebhooks(
listenPort: Int, listenPort: Int,
engineFactory: ApplicationEngineFactory<*, *>, engineFactory: ApplicationEngineFactory<TEngine, TConfiguration>,
setWebhookRequest: SetWebhookRequest, setWebhookRequest: SetWebhookRequest,
exceptionsHandler: ExceptionHandler<Unit> = {}, exceptionsHandler: ExceptionHandler<Unit> = {},
listenHost: String = "0.0.0.0", listenHost: String = "0.0.0.0",
@ -162,11 +171,24 @@ suspend fun RequestsExecutor.setWebhookInfoAndStartListenWebhooks(
privateKeyConfig: WebhookPrivateKeyConfig? = null, privateKeyConfig: WebhookPrivateKeyConfig? = null,
scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()), scope: CoroutineScope = CoroutineScope(Executors.newFixedThreadPool(4).asCoroutineDispatcher()),
mediaGroupsDebounceTimeMillis: Long = 1000L, mediaGroupsDebounceTimeMillis: Long = 1000L,
additionalApplicationEngineEnvironmentConfigurator: ApplicationEngineEnvironmentBuilder.() -> Unit = {}, additionalApplicationEnvironmentConfigurator: ApplicationEnvironmentBuilder.() -> Unit = {},
block: UpdateReceiver<Update> additionalEngineConfigurator: TConfiguration.() -> Unit = {},
): ApplicationEngine = try { block: UpdateReceiver<Update>,
): EmbeddedServer<TEngine, TConfiguration> = try {
execute(setWebhookRequest) execute(setWebhookRequest)
startListenWebhooks(listenPort, engineFactory, exceptionsHandler, listenHost, listenRoute, privateKeyConfig, scope, mediaGroupsDebounceTimeMillis, additionalApplicationEngineEnvironmentConfigurator, block) startListenWebhooks(
listenPort,
engineFactory,
exceptionsHandler,
listenHost,
listenRoute,
privateKeyConfig,
scope,
mediaGroupsDebounceTimeMillis,
additionalApplicationEnvironmentConfigurator,
additionalEngineConfigurator,
block
)
} catch (e: Exception) { } catch (e: Exception) {
throw e throw e
} }