add websockets work

This commit is contained in:
2020-08-10 23:38:27 +06:00
parent 9421edcb85
commit 61ccfe7948
33 changed files with 695 additions and 23 deletions
core
api
build.gradle
src
commonMain
kotlin
com
insanusmokrassar
postssystem
ktor
client
src
commonMain
kotlin
common
src
commonMain
kotlin
com
insanusmokrassar
postssystem
core
gradle.properties
ktor
client
build.gradle
src
commonMain
kotlin
com
insanusmokrassar
postssystem
common
build.gradle
src
commonMain
kotlin
server
tests
build.gradle
src
test
kotlin
com
insanusmokrassar
postssystem
settings.gradle
utils

@ -45,6 +45,7 @@ kotlin {
}
api "io.ktor:ktor-client-core:$ktor_version"
api "io.ktor:ktor-client-websockets:$ktor_version"
}
}
commonTest {
@ -58,6 +59,8 @@ kotlin {
implementation kotlin('stdlib-jdk8')
api "io.ktor:ktor-client:$ktor_version"
api "io.ktor:ktor-client-websockets-jvm:$ktor_version"
api "io.ktor:ktor-client-okhttp:$ktor_version"
}
}
jvmTest {
@ -70,6 +73,7 @@ kotlin {
implementation kotlin('stdlib-js')
api "io.ktor:ktor-client-js:$ktor_version"
api "io.ktor:ktor-client-websockets-js:$ktor_version"
}
}
jsTest {

@ -0,0 +1,45 @@
package com.insanusmokrassar.postssystem.ktor.client
import com.insanusmokrassar.postssystem.ktor.*
import com.insanusmokrassar.postssystem.utils.common.safely
import io.ktor.client.HttpClient
import io.ktor.client.features.websocket.ws
import io.ktor.client.request.url
import io.ktor.http.HttpMethod
import io.ktor.http.cio.websocket.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.isActive
inline fun <reified T> createStandardWebsocketFlow(
client: HttpClient,
url: String,
crossinline conversation: suspend (ByteArray) -> T
): Flow<T> {
val correctedUrl = url.asCorrectWebSocketUrl
return channelFlow {
val producerScope = this
safely(
{
producerScope.close()
throw it
}
) {
client.ws(
correctedUrl
) {
while (true) {
when (val received = incoming.receive()) {
is Frame.Binary -> producerScope.send(
conversation(received.readBytes())
)
else -> {
producerScope.close()
return@ws
}
}
}
}
}
}
}

@ -39,6 +39,8 @@ kotlin {
implementation kotlin('stdlib')
api "org.jetbrains.kotlinx:kotlinx-serialization-runtime-common:$kotlin_serialisation_runtime_version"
api "org.jetbrains.kotlinx:kotlinx-serialization-cbor-common:$kotlin_serialisation_runtime_version"
api projectByName("postssystem.utils.common")
}
}
commonTest {

@ -0,0 +1,3 @@
package com.insanusmokrassar.postssystem.ktor
object CorrectCloseException : Exception()

@ -0,0 +1,14 @@
package com.insanusmokrassar.postssystem.ktor
private val schemaRegex = Regex("[^:]*//")
val String.asCorrectWebSocketUrl: String
get() = if (startsWith("ws")) {
this
} else {
if (contains("://")) {
replace(schemaRegex, "ws://")
} else {
"ws://$this"
}
}

@ -0,0 +1,7 @@
package com.insanusmokrassar.postssystem.ktor
const val clientWebsocketHelloMessage = "Start getting of updates"
const val serverWebsocketHelloMessage = "Accepted"
const val serverWebsocketNewMessageMessage = "NewMessage"
const val websocketFinalizationMessage = "Final"

68
ktor/server/build.gradle Normal file

@ -0,0 +1,68 @@
buildscript {
repositories {
mavenLocal()
jcenter()
mavenCentral()
}
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
classpath "org.jetbrains.kotlin:kotlin-serialization:$kotlin_version"
classpath "com.jfrog.bintray.gradle:gradle-bintray-plugin:$gradle_bintray_plugin_version"
}
}
plugins {
id "org.jetbrains.kotlin.multiplatform" version "$kotlin_version"
id "org.jetbrains.kotlin.plugin.serialization" version "$kotlin_version"
}
project.version = "$core_version"
project.group = "com.insanusmokrassar"
apply from: "publish.gradle"
repositories {
mavenLocal()
jcenter()
mavenCentral()
maven { url "https://kotlin.bintray.com/kotlinx" }
}
kotlin {
jvm()
sourceSets {
commonMain {
dependencies {
implementation kotlin('stdlib')
api "org.jetbrains.kotlinx:kotlinx-coroutines-core-common:$kotlin_coroutines_version"
api projectByName("postssystem.ktor.common")
}
}
commonTest {
dependencies {
implementation kotlin('test-common')
implementation kotlin('test-annotations-common')
}
}
jvmMain {
dependencies {
api "org.jetbrains.kotlinx:kotlinx-serialization-runtime:$kotlin_serialisation_runtime_version"
api "org.jetbrains.kotlinx:kotlinx-serialization-properties:$kotlin_serialisation_runtime_version"
api "io.ktor:ktor-server:$ktor_version"
api "io.ktor:ktor-server-host-common:$ktor_version"
api "io.ktor:ktor-server-netty:$ktor_version"
api "io.ktor:ktor-websockets:$ktor_version"
}
}
jvmTest {
dependencies {
implementation kotlin('test-junit')
}
}
}
}

@ -0,0 +1,53 @@
apply plugin: 'maven-publish'
task javadocsJar(type: Jar) {
classifier = 'javadoc'
}
afterEvaluate {
project.publishing.publications.all {
// rename artifacts
groupId "${project.group}"
if (it.name.contains('kotlinMultiplatform')) {
artifactId = "${project.name}"
} else {
artifactId = "${project.name}-$name"
}
}
}
publishing {
publications.all {
artifact javadocsJar
pom {
description = "Common utils for all exposed modules"
name = "PostsSystem Exposed commons"
url = "https://git.insanusmokrassar.com/PostsSystem/Core/"
scm {
developerConnection = "scm:git:[fetch=]https://git.insanusmokrassar.com/PostsSystem/Core/.git[push=]https://git.insanusmokrassar.com/PostsSystem/Core/.git"
url = "https://git.insanusmokrassar.com/PostsSystem/Core/.git"
}
developers {
developer {
id = "InsanusMokrassar"
name = "Ovsiannikov Aleksei"
email = "ovsyannikov.alexey95@gmail.com"
}
}
licenses {
license {
name = "Apache Software License 2.0"
url = "https://git.insanusmokrassar.com/PostsSystem/Core/src/master/LICENSE"
}
}
}
}
}

@ -0,0 +1,55 @@
apply plugin: 'com.jfrog.bintray'
apply from: "maven.publish.gradle"
bintray {
user = project.hasProperty('BINTRAY_USER') ? project.property('BINTRAY_USER') : System.getenv('BINTRAY_USER')
key = project.hasProperty('BINTRAY_KEY') ? project.property('BINTRAY_KEY') : System.getenv('BINTRAY_KEY')
filesSpec {
from "${buildDir}/publications/"
eachFile {
String directorySubname = it.getFile().parentFile.name
if (it.getName() == "module.json") {
if (directorySubname == "kotlinMultiplatform") {
it.setPath("${project.name}/${project.version}/${project.name}-${project.version}.module")
} else {
it.setPath("${project.name}-${directorySubname}/${project.version}/${project.name}-${directorySubname}-${project.version}.module")
}
} else {
if (directorySubname == "kotlinMultiplatform" && it.getName() == "pom-default.xml") {
it.setPath("${project.name}/${project.version}/${project.name}-${project.version}.pom")
} else {
it.exclude()
}
}
}
into "${project.group}".replace(".", "/")
}
pkg {
repo = "InsanusMokrassar"
name = "${project.name}"
vcsUrl = "https://github.com/PostsSystem/PostsSystemCore"
licenses = ["Apache-2.0"]
version {
name = "${project.version}"
released = new Date()
vcsTag = "${project.version}"
gpg {
sign = true
passphrase = project.hasProperty('signing.gnupg.passphrase') ? project.property('signing.gnupg.passphrase') : System.getenv('signing.gnupg.passphrase')
}
}
}
}
bintrayUpload.doFirst {
publications = publishing.publications.collect {
if (it.name.contains('kotlinMultiplatform')) {
null
} else {
it.name
}
} - null
}
bintrayUpload.dependsOn publishToMavenLocal

@ -0,0 +1 @@
{"bintrayConfig":{"repo":"InsanusMokrassar","packageName":"${project.name}","packageVcs":"https://github.com/PostsSystem/PostsSystemCore"},"licenses":[{"id":"Apache-2.0","title":"Apache Software License 2.0","url":"https://git.insanusmokrassar.com/PostsSystem/Core/src/master/LICENSE"}],"mavenConfig":{"name":"PostsSystem Exposed commons","description":"Common utils for all exposed modules","url":"https://git.insanusmokrassar.com/PostsSystem/Core/","vcsUrl":"https://git.insanusmokrassar.com/PostsSystem/Core/.git","developers":[{"id":"InsanusMokrassar","name":"Ovsiannikov Aleksei","eMail":"ovsyannikov.alexey95@gmail.com"}]},"type":"Multiplatform"}

@ -0,0 +1,32 @@
package com.insanusmokrassar.postssystem.ktor.server
import com.insanusmokrassar.postssystem.ktor.*
import com.insanusmokrassar.postssystem.utils.common.safely
import io.ktor.http.cio.websocket.*
import io.ktor.routing.Route
import io.ktor.routing.route
import io.ktor.websocket.webSocket
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
private suspend fun DefaultWebSocketSession.checkReceivedAndCloseIfExists() {
if (incoming.poll() != null) {
close()
throw CorrectCloseException
}
}
fun <T> Route.includeWebsocketHandling(
suburl: String,
flow: Flow<T>,
converter: (T) -> ByteArray
) {
webSocket(suburl) {
safely {
flow.collect {
checkReceivedAndCloseIfExists()
send(converter(it))
}
}
}
}

@ -0,0 +1,28 @@
package com.insanusmokrassar.postssystem.ktor.server
import io.ktor.application.Application
import io.ktor.server.engine.*
import io.ktor.server.netty.Netty
import kotlin.random.Random
fun <TEngine : ApplicationEngine, TConfiguration : ApplicationEngine.Configuration> createKtorServer(
engine: ApplicationEngineFactory<TEngine, TConfiguration>,
host: String = "localhost",
port: Int = Random.nextInt(1024, 65535),
block: Application.() -> Unit
): TEngine {
val env = applicationEngineEnvironment {
module(block)
connector {
this@connector.host = host
this@connector.port = port
}
}
return embeddedServer(engine, env)
}
fun createKtorServer(
host: String = "localhost",
port: Int = Random.nextInt(1024, 65535),
block: Application.() -> Unit
) = createKtorServer(Netty, host, port, block)

39
ktor/tests/build.gradle Normal file

@ -0,0 +1,39 @@
buildscript {
repositories {
mavenLocal()
jcenter()
mavenCentral()
}
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
classpath "org.jetbrains.kotlin:kotlin-serialization:$kotlin_version"
}
}
plugins {
id "org.jetbrains.kotlin.plugin.serialization" version "$kotlin_version"
}
project.version = "$core_version"
project.group = "com.insanusmokrassar"
apply plugin: "java-library"
apply plugin: "kotlin"
repositories {
mavenLocal()
jcenter()
mavenCentral()
maven { url "https://kotlin.bintray.com/kotlinx" }
}
dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
api projectByName("postssystem.ktor.client")
api projectByName("postssystem.ktor.server")
testImplementation "org.jetbrains.kotlin:kotlin-test"
testImplementation "org.jetbrains.kotlin:kotlin-test-junit"
}

@ -0,0 +1,6 @@
package com.insanusmokrassar.postssystem.ktor.tests
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
val testsScope = CoroutineScope(Dispatchers.Default)

@ -0,0 +1,72 @@
package com.insanusmokrassar.postssystem.ktor.tests
import com.insanusmokrassar.postssystem.ktor.client.createStandardWebsocketFlow
import com.insanusmokrassar.postssystem.ktor.server.createKtorServer
import com.insanusmokrassar.postssystem.ktor.server.includeWebsocketHandling
import com.insanusmokrassar.postssystem.ktor.standardKtorSerializer
import io.ktor.application.install
import io.ktor.application.log
import io.ktor.client.HttpClient
import io.ktor.http.cio.websocket.pingPeriod
import io.ktor.http.cio.websocket.timeout
import io.ktor.routing.route
import io.ktor.routing.routing
import io.ktor.websocket.WebSockets
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.serialization.builtins.serializer
import org.junit.Test
import java.time.Duration
import kotlin.test.assertEquals
import kotlin.test.assertNull
class WebsocketsTest {
@Test
fun testCommonWebsocketFunctionality() {
val port = 60000
val suburl = "test"
val dataFlowChannel = Channel<Int>(Channel.UNLIMITED)
val dataFlow = dataFlowChannel.consumeAsFlow()
val serverUrl = "127.0.0.1:$port"
val server = createKtorServer(host = "127.0.0.1", port = port) {
install(WebSockets)
routing {
includeWebsocketHandling(suburl, dataFlow) {
standardKtorSerializer.dump(Int.serializer(), it)
}
}
}.also {
it.start(false)
}
runBlocking {
delay(100L)
}
val client = HttpClient {
install(io.ktor.client.features.websocket.WebSockets)
}
val incomingWebsocketFlow = createStandardWebsocketFlow(
client,
"$serverUrl/$suburl"
) {
standardKtorSerializer.load(Int.serializer(), it)
}
var currentlyCheckingData: Int? = null
incomingWebsocketFlow.onEach {
assertEquals(currentlyCheckingData, it)
currentlyCheckingData = null
}.launchIn(CoroutineScope(testsScope.coroutineContext + SupervisorJob()))
runBlocking {
(0 until 100).asFlow().collect {
currentlyCheckingData = it
dataFlowChannel.send(it)
while (currentlyCheckingData != null) {
delay(10L)
}
assertNull(currentlyCheckingData)
}
}
server.stop(1000L, 1000L)
}
}