diff --git a/cueglow-server/build.gradle.kts b/cueglow-server/build.gradle.kts index 82da12a..06b49b5 100644 --- a/cueglow-server/build.gradle.kts +++ b/cueglow-server/build.gradle.kts @@ -34,6 +34,7 @@ dependencies { testImplementation("com.github.kittinunf.fuel:fuel:2.3.1") testImplementation("org.java-websocket:Java-WebSocket:1.5.1") testImplementation("org.awaitility:awaitility-kotlin:4.0.3") + testImplementation("com.google.truth:truth:1.1.2") implementation("com.michael-bull.kotlin-result:kotlin-result:1.1.9") diff --git a/cueglow-server/src/main/kotlin/org/cueglow/server/CueGlowServer.kt b/cueglow-server/src/main/kotlin/org/cueglow/server/CueGlowServer.kt index 1abc135..3abf313 100644 --- a/cueglow-server/src/main/kotlin/org/cueglow/server/CueGlowServer.kt +++ b/cueglow-server/src/main/kotlin/org/cueglow/server/CueGlowServer.kt @@ -10,6 +10,7 @@ import org.eclipse.jetty.server.Server fun main(args: Array) { + // TODO what happens if there is an exception in this main thread? CueGlowServer() } diff --git a/cueglow-server/src/main/kotlin/org/cueglow/server/OutEventHandler.kt b/cueglow-server/src/main/kotlin/org/cueglow/server/OutEventHandler.kt index 59a627b..b11d817 100644 --- a/cueglow-server/src/main/kotlin/org/cueglow/server/OutEventHandler.kt +++ b/cueglow-server/src/main/kotlin/org/cueglow/server/OutEventHandler.kt @@ -14,9 +14,13 @@ class OutEventHandler(receivers: Iterable): Logging { init { Executors.newSingleThreadExecutor().submit { while (true) { - val glowMessage = queue.take() - logger.info("Handling OutEvent: $glowMessage") - receivers.forEach { it.receive(glowMessage) } + try { + val glowMessage = queue.take() + logger.debug("Handling OutEvent: $glowMessage") + receivers.forEach { it.receive(glowMessage) } + } catch (e: Throwable) { + logger.error(e) + } } } } diff --git a/cueglow-server/src/main/kotlin/org/cueglow/server/StateProvider.kt b/cueglow-server/src/main/kotlin/org/cueglow/server/StateProvider.kt index f20d8ed..d2e4c8f 100644 --- a/cueglow-server/src/main/kotlin/org/cueglow/server/StateProvider.kt +++ b/cueglow-server/src/main/kotlin/org/cueglow/server/StateProvider.kt @@ -4,6 +4,7 @@ import org.cueglow.server.json.JsonHandler import org.cueglow.server.objects.messages.GlowMessage import org.cueglow.server.patch.Patch import java.util.concurrent.BlockingQueue +import java.util.concurrent.locks.ReentrantLock /** * Provides a collection of state objects @@ -11,5 +12,6 @@ import java.util.concurrent.BlockingQueue * The StateProvider is initialized by the main process and passed to e.g. a [JsonHandler] for mutation. */ class StateProvider(val outEventQueue: BlockingQueue) { - val patch = Patch(outEventQueue) + val lock = ReentrantLock() + val patch = Patch(outEventQueue, lock) } \ No newline at end of file diff --git a/cueglow-server/src/main/kotlin/org/cueglow/server/objects/ArtNetAddress.kt b/cueglow-server/src/main/kotlin/org/cueglow/server/objects/ArtNetAddress.kt index 269f921..6a9ccaf 100644 --- a/cueglow-server/src/main/kotlin/org/cueglow/server/objects/ArtNetAddress.kt +++ b/cueglow-server/src/main/kotlin/org/cueglow/server/objects/ArtNetAddress.kt @@ -13,6 +13,7 @@ import org.cueglow.server.objects.messages.* * * @property[value] A Short representing the 15-bit Port-Address. */ +// TODO constructor cannot be private due to Klaxon, so do validation in constructor instead of companion object by throwing data class ArtNetAddress constructor(val value: Short) { companion object Factory { diff --git a/cueglow-server/src/main/kotlin/org/cueglow/server/objects/DmxAddress.kt b/cueglow-server/src/main/kotlin/org/cueglow/server/objects/DmxAddress.kt index b151e02..5828633 100644 --- a/cueglow-server/src/main/kotlin/org/cueglow/server/objects/DmxAddress.kt +++ b/cueglow-server/src/main/kotlin/org/cueglow/server/objects/DmxAddress.kt @@ -16,6 +16,7 @@ import org.cueglow.server.objects.messages.InvalidDmxAddress * * @property[value] A Short representing the DMX Address. */ +// TODO constructor can't be private due to Klaxon, so move validation of input from companion object to throwing constructor data class DmxAddress constructor(val value: Short) { companion object Factory { /** diff --git a/cueglow-server/src/main/kotlin/org/cueglow/server/objects/messages/SubscriptionHandler.kt b/cueglow-server/src/main/kotlin/org/cueglow/server/objects/messages/SubscriptionHandler.kt index 441d6f8..9c15b53 100644 --- a/cueglow-server/src/main/kotlin/org/cueglow/server/objects/messages/SubscriptionHandler.kt +++ b/cueglow-server/src/main/kotlin/org/cueglow/server/objects/messages/SubscriptionHandler.kt @@ -5,12 +5,17 @@ import org.cueglow.server.OutEventReceiver import org.cueglow.server.StateProvider import org.cueglow.server.json.AsyncClient import java.util.* +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock /** * Handles Subscribe/Unsubscribe Events. Receives OutEvents from the OutEventHandler and sends them to the subscribers. **/ abstract class SubscriptionHandler: OutEventReceiver, Logging { - private val activeSubscriptions = EnumMap>(GlowTopic::class.java) // TODO synchronize (see JavaDoc for EnumMap) + val lock = ReentrantLock() + + private val activeSubscriptions = EnumMap>(GlowTopic::class.java) /** Keeps subscriptions that were sent the initial state but do not get updates yet because older updates * in the OutEventQueue first need to be handled. Subscriptions move from pending to active once the sync message @@ -27,7 +32,7 @@ abstract class SubscriptionHandler: OutEventReceiver, Logging { /** Receive and handle messages from the OutEventQueue **/ override fun receive(glowMessage: GlowMessage) { - logger.info("Receiving $glowMessage") + logger.debug("Receiving $glowMessage") when (glowMessage.event) { GlowEvent.ADD_FIXTURES, GlowEvent.UPDATE_FIXTURES, @@ -41,17 +46,21 @@ abstract class SubscriptionHandler: OutEventReceiver, Logging { } private fun publish(topic: GlowTopic, glowMessage: GlowMessage) { - val topicSubscribers = activeSubscriptions[topic] - if (topicSubscribers!!.isNotEmpty()) { // null asserted because all possible keys are initialized in init block - val messageString = serializeMessage(glowMessage) - topicSubscribers.forEach {it.send(messageString)} + lock.withLock { + val topicSubscribers = activeSubscriptions[topic] + if (topicSubscribers!!.isNotEmpty()) { // null asserted because all possible keys are initialized in init block + val messageString = serializeMessage(glowMessage) + topicSubscribers.forEach {it.send(messageString)} + } } } /** Check if [syncUuid] is known. If yes, move subscription from pending to active **/ private fun activateSubscription(syncUuid: UUID) { - val (topic, subscriber) = pendingSubscriptions.remove(syncUuid) ?: return - activeSubscriptions[topic]!!.add(subscriber) // null asserted because all possible keys are initialized in init block + lock.withLock { + val (topic, subscriber) = pendingSubscriptions.remove(syncUuid) ?: return + activeSubscriptions[topic]!!.add(subscriber) // null asserted because all possible keys are initialized in init block + } } fun subscribe(subscriber: AsyncClient, topic: GlowTopic, state: StateProvider) { @@ -61,12 +70,15 @@ abstract class SubscriptionHandler: OutEventReceiver, Logging { GlowTopic.PATCH -> { val syncUuid = UUID.randomUUID() val syncMessage = GlowMessage.Sync(syncUuid) - pendingSubscriptions[syncUuid] = Pair(GlowTopic.PATCH, subscriber) - // TODO acquire state lock here - val initialPatchState = state.patch.getGlowPatch() - state.outEventQueue.put(syncMessage) // TODO possible deadlock because SubscriptionHandler is locked and cannot work to reduce message count in queue - // no deadlock problem if we don't have the SubscriptionHandler Lock here? - // TODO release state lock here + val initialPatchState = lock.withLock { + val initialPatchState = state.lock.withLock { + // -> need to test multiple threads subscribing while changes are happening -> all threads should have same state in the end with their respective updates applied + state.outEventQueue.offer(syncMessage, 1, TimeUnit.SECONDS) + state.patch.getGlowPatch() + } + pendingSubscriptions[syncUuid] = Pair(GlowTopic.PATCH, subscriber) + initialPatchState + } val initialMessage = GlowMessage.PatchInitialState(initialPatchState) subscriber.send(initialMessage) } @@ -79,19 +91,23 @@ abstract class SubscriptionHandler: OutEventReceiver, Logging { /** Returns true if the subscriber was successfully unsubscribed and false if the subscriber wasn't subscribed */ private fun internalUnsubscribe(subscriber: AsyncClient, topic: GlowTopic): Boolean { - val numberOfSubscriptionsRemovedFromPending = pendingSubscriptions - .filter {it.value.first == topic && it.value.second == subscriber} - .keys - .map {pendingSubscriptions.remove(it)} - .size - val removedFromActive = activeSubscriptions[topic]!!.remove(subscriber) // null asserted because all possible keys are initialized in init block - return removedFromActive || numberOfSubscriptionsRemovedFromPending > 0 + lock.withLock { + val numberOfSubscriptionsRemovedFromPending = pendingSubscriptions + .filter {it.value.first == topic && it.value.second == subscriber} + .keys + .map {pendingSubscriptions.remove(it)} + .size + val removedFromActive = activeSubscriptions[topic]!!.remove(subscriber) // null asserted because all possible keys are initialized in init block + return removedFromActive || numberOfSubscriptionsRemovedFromPending > 0 + } } fun unsubscribeFromAllTopics(subscriber: AsyncClient) { - pendingSubscriptions.filter {it.value.second == subscriber}.keys.map { pendingSubscriptions.remove(it) } - activeSubscriptions.values.forEach { - it.remove(subscriber) + lock.withLock { + pendingSubscriptions.filter {it.value.second == subscriber}.keys.map { pendingSubscriptions.remove(it) } + activeSubscriptions.values.forEach { + it.remove(subscriber) + } } } } \ No newline at end of file diff --git a/cueglow-server/src/main/kotlin/org/cueglow/server/patch/Patch.kt b/cueglow-server/src/main/kotlin/org/cueglow/server/patch/Patch.kt index 2bcd334..32833ed 100644 --- a/cueglow-server/src/main/kotlin/org/cueglow/server/patch/Patch.kt +++ b/cueglow-server/src/main/kotlin/org/cueglow/server/patch/Patch.kt @@ -2,10 +2,11 @@ package org.cueglow.server.patch import com.github.michaelbull.result.* import org.cueglow.server.gdtf.GdtfWrapper -import org.cueglow.server.objects.ImmutableMap import org.cueglow.server.objects.messages.* import java.util.* import java.util.concurrent.BlockingQueue +import java.util.concurrent.locks.Lock +import kotlin.concurrent.withLock import kotlin.reflect.KClass import kotlin.reflect.full.primaryConstructor @@ -14,55 +15,61 @@ import kotlin.reflect.full.primaryConstructor * * The data is isolated such that it can only be modified by methods that notify the StreamHandler on change. */ -class Patch(private val outEventQueue: BlockingQueue) { +class Patch(private val outEventQueue: BlockingQueue, val lock: Lock) { private val fixtures: HashMap = HashMap() private val fixtureTypes: HashMap = HashMap() - fun getFixtures() = ImmutableMap(this.fixtures) + // TODO maybe move getters from copy-on-read to copy-on-write for possible performance improvement - fun getFixtureTypes() = ImmutableMap(this.fixtureTypes) + /** Returns an immutable copy of the fixtures in the Patch. **/ + fun getFixtures() = lock.withLock{ + fixtures.toMap() + } - /** Returns an immutable copy of the Patch */ - fun getGlowPatch(): GlowPatch = GlowPatch(fixtures.values.toList(), fixtureTypes.values.toList()) + /** Returns an immutable copy of the fixture types in the Patch. **/ + fun getFixtureTypes() = lock.withLock { + fixtureTypes.toMap() + } - /** - * Execute [lambda] for every element in [collection]. If the result of [lambda] is an Error, add it to the - * error list which is returned once all elements are dealt with. - */ - private fun executeWithErrorList(collection: Iterable, lambda: (T) -> Result): Result> { - val errorList: MutableList = mutableListOf() - collection.forEach{ element -> - lambda(element).getError()?.let{errorList.add(it)} - } - if (errorList.isNotEmpty()) {return Err(errorList)} - return Ok(Unit) + /** Returns an immutable copy of the Patch */ + fun getGlowPatch(): GlowPatch = lock.withLock { + GlowPatch(fixtures.values.toList(), fixtureTypes.values.toList()) } /** - * Calls [executeWithErrorList] but also keeps a list of successful operations. When the operations are done, the - * successful operations are wrapped in the specified [messageType] and added to the [outEventQueue]. + * Execute [lambda] for every element in [collection]. If the result of [lambda] is [Ok], wrap it in the specified + * [messageType] and add it to the [outEventQueue]. If the result of [lambda] is an Error, add it to the + * error list which is returned once all elements are dealt with. + * + * This function handles locking for [Patch] modifications. All modifications in [Patch] must only call this + * function and nothing else to ensure thread safety. */ - private fun executeWithErrorListAndSendOutEvent(messageType: KClass, collection: Iterable, lambda: (T) -> Result): Result> { + private fun executeWithErrorListAndSendOutEvent( + messageType: KClass, + collection: Iterable, + lambda: (T) -> Result + ): Result> { val successList = mutableListOf() - val mainResult = executeWithErrorList(collection) { collectionElement -> - lambda(collectionElement).map{ - successList.add(it) - Unit + val errorList= mutableListOf() + lock.withLock { + collection.forEach{ element -> + lambda(element).mapError{errorList.add(it)}.map{successList.add(it)} + } + if (successList.isNotEmpty()) { + val glowMessage = messageType.primaryConstructor?.call(successList, null) ?: throw IllegalArgumentException("messageType does not have a primary constructor") + outEventQueue.put(glowMessage) // TODO possibly blocks rendering/etc. but the changes were already made so the message needs to be put into the queue } } - if (successList.isNotEmpty()) { - val glowMessage = messageType.primaryConstructor?.call(successList, null) ?: throw IllegalArgumentException("messageType does not have a primary constructor") - outEventQueue.put(glowMessage) // TODO possibly blocks rendering/etc. but the changes were already made so the message needs to be put into the queue - } - return mainResult + if (errorList.isNotEmpty()) {return Err(errorList)} + return Ok(Unit) } // ------------------- // Modify Fixture List // ------------------- - fun addFixtures(fixturesToAdd: Iterable): Result> { - return executeWithErrorListAndSendOutEvent(GlowMessage.AddFixtures::class, fixturesToAdd) eachFixture@{ patchFixtureToAdd -> + fun addFixtures(fixturesToAdd: Iterable): Result> = + executeWithErrorListAndSendOutEvent(GlowMessage.AddFixtures::class, fixturesToAdd) eachFixture@{ patchFixtureToAdd -> // validate uuid does not exist yet if (fixtures.contains(patchFixtureToAdd.uuid)) { return@eachFixture Err(FixtureUuidAlreadyExistsError(patchFixtureToAdd.uuid)) @@ -78,10 +85,10 @@ class Patch(private val outEventQueue: BlockingQueue) { fixtures[patchFixtureToAdd.uuid] = patchFixtureToAdd return@eachFixture Ok(patchFixtureToAdd) } - } - fun updateFixtures(fixtureUpdates: Iterable): Result> { - return executeWithErrorListAndSendOutEvent(GlowMessage.UpdateFixtures::class, fixtureUpdates) eachUpdate@{ fixtureUpdate -> + + fun updateFixtures(fixtureUpdates: Iterable): Result> = + executeWithErrorListAndSendOutEvent(GlowMessage.UpdateFixtures::class, fixtureUpdates) eachUpdate@{ fixtureUpdate -> // validate fixture uuid exists already val oldFixture = fixtures[fixtureUpdate.uuid] ?: run { return@eachUpdate Err(UnknownFixtureUuidError(fixtureUpdate.uuid)) @@ -95,21 +102,19 @@ class Patch(private val outEventQueue: BlockingQueue) { fixtures[newFixture.uuid] = newFixture return@eachUpdate Ok(fixtureUpdate) } - } - fun removeFixtures(uuids: Iterable): Result> { - return executeWithErrorListAndSendOutEvent(GlowMessage.RemoveFixtures::class, uuids) eachFixture@{ uuidToRemove -> + fun removeFixtures(uuids: Iterable): Result> = + executeWithErrorListAndSendOutEvent(GlowMessage.RemoveFixtures::class, uuids) eachFixture@{ uuidToRemove -> fixtures.remove(uuidToRemove) ?: return@eachFixture Err(UnknownFixtureUuidError(uuidToRemove)) return@eachFixture Ok(uuidToRemove) } - } // ------------------------ // Modify Fixture Type List // ------------------------ - fun addFixtureTypes(fixtureTypesToAdd: Iterable): Result> { - return executeWithErrorListAndSendOutEvent(GlowMessage.AddFixtureTypes::class, fixtureTypesToAdd) eachFixtureType@{ fixtureTypeToAdd -> + fun addFixtureTypes(fixtureTypesToAdd: Iterable): Result> = + executeWithErrorListAndSendOutEvent(GlowMessage.AddFixtureTypes::class, fixtureTypesToAdd) eachFixtureType@{ fixtureTypeToAdd -> // validate fixture type is not patched already if (fixtureTypes.containsKey(fixtureTypeToAdd.fixtureTypeId)) { return@eachFixtureType Err(FixtureTypeAlreadyExistsError(fixtureTypeToAdd.fixtureTypeId)) @@ -117,15 +122,13 @@ class Patch(private val outEventQueue: BlockingQueue) { fixtureTypes[fixtureTypeToAdd.fixtureTypeId] = fixtureTypeToAdd return@eachFixtureType Ok(fixtureTypeToAdd) } - } - fun removeFixtureTypes(fixtureTypeIdsToRemove: List): Result> { - return executeWithErrorListAndSendOutEvent(GlowMessage.RemoveFixtureTypes::class, fixtureTypeIdsToRemove) eachFixtureType@{ fixtureTypeIdToRemove -> + fun removeFixtureTypes(fixtureTypeIdsToRemove: List): Result> = + executeWithErrorListAndSendOutEvent(GlowMessage.RemoveFixtureTypes::class, fixtureTypeIdsToRemove) eachFixtureType@{ fixtureTypeIdToRemove -> fixtureTypes.remove(fixtureTypeIdToRemove) ?: return@eachFixtureType Err(UnpatchedFixtureTypeIdError(fixtureTypeIdToRemove)) // remove associated fixtures fixtures.filter { it.value.fixtureTypeId == fixtureTypeIdToRemove }.keys.let {this.removeFixtures(it)} return@eachFixtureType Ok(fixtureTypeIdToRemove) } - } } diff --git a/cueglow-server/src/test/kotlin/org/cueglow/server/gdtf/ChannelLayoutTest.kt b/cueglow-server/src/test/kotlin/org/cueglow/server/gdtf/ChannelLayoutTest.kt index 9e91083..674d158 100644 --- a/cueglow-server/src/test/kotlin/org/cueglow/server/gdtf/ChannelLayoutTest.kt +++ b/cueglow-server/src/test/kotlin/org/cueglow/server/gdtf/ChannelLayoutTest.kt @@ -1,12 +1,11 @@ package org.cueglow.server.gdtf -import org.cueglow.server.test_utilities.fixtureTypeFromGdtfResource +import org.cueglow.server.test_utilities.ExampleFixtureType import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test internal class ChannelLayoutTest { - private val exampleFixtureType = - fixtureTypeFromGdtfResource("ChannelLayoutTest/Test@Channel_Layout_Test@v1_first_try.gdtf", this.javaClass) + private val exampleFixtureType = ExampleFixtureType.channelLayoutTestGdtf @Test fun testFindingAbstractGeometries() { diff --git a/cueglow-server/src/test/kotlin/org/cueglow/server/json/ConcurrentJsonSubscriptionHandlerTest.kt b/cueglow-server/src/test/kotlin/org/cueglow/server/json/ConcurrentJsonSubscriptionHandlerTest.kt new file mode 100644 index 0000000..dcc67cb --- /dev/null +++ b/cueglow-server/src/test/kotlin/org/cueglow/server/json/ConcurrentJsonSubscriptionHandlerTest.kt @@ -0,0 +1,176 @@ +package org.cueglow.server.json + +import com.beust.klaxon.JsonArray +import com.beust.klaxon.JsonObject +import com.beust.klaxon.Parser +import com.github.michaelbull.result.unwrap +import org.cueglow.server.StateProvider +import org.cueglow.server.objects.messages.GlowMessage +import org.cueglow.server.objects.messages.GlowPatch +import org.cueglow.server.objects.messages.GlowTopic +import org.cueglow.server.test_utilities.ExampleFixtureType +import org.cueglow.server.test_utilities.concurrentTaskListTest +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import java.util.* +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +class ConcurrentJsonSubscriptionHandlerTest { + private val queue = LinkedBlockingQueue() + private val state = StateProvider(queue) + private val patch = state.patch + + private val client = TestClient() + private val subscriptionHandler = JsonSubscriptionHandler() + + private val testMessage = GlowMessage.RemoveFixtures(listOf()) + private val testMessageString = testMessage.toJsonString() + + private val expectedInitialState = GlowMessage.PatchInitialState(GlowPatch(listOf(), listOf())) + private val expectedInitialStateString = expectedInitialState.toJsonString() + + private fun isClientSubscribed(): Boolean { + subscriptionHandler.receive(testMessage) + val receivedMessage = client.messages.poll() ?: return false + assertEquals(testMessageString, receivedMessage) + return true + } + + @Test + fun subscribeSyncReceiveUnsubscribeOnDifferentThreads() { + val subscribeTask = { barrier: CyclicBarrier -> + barrier.await(1, TimeUnit.SECONDS) + subscriptionHandler.subscribe(client, GlowTopic.PATCH, state) + assertEquals(expectedInitialStateString, client.messages.remove()) + barrier.await(1, TimeUnit.SECONDS) + barrier.await(2, TimeUnit.SECONDS) + assertFalse(isClientSubscribed()) + 1 + } + + val syncTask = { barrier: CyclicBarrier -> + barrier.await(1, TimeUnit.SECONDS) + subscriptionHandler.receive( + queue.poll(1, TimeUnit.SECONDS) ?: throw Error("Timeout while waiting for sync message") + ) + barrier.await(1, TimeUnit.SECONDS) + barrier.await(2, TimeUnit.SECONDS) + 1 + } + + val sendTask = { barrier: CyclicBarrier -> + barrier.await(1, TimeUnit.SECONDS) + barrier.await(1, TimeUnit.SECONDS) + assertTrue(isClientSubscribed()) + subscriptionHandler.unsubscribe(client, GlowTopic.PATCH) + barrier.await(2, TimeUnit.SECONDS) + 1 + } + + val taskList = listOf(subscribeTask, syncTask, sendTask) + + concurrentTaskListTest(1000, taskList, { results -> + assertEquals(listOf(1,1,1), results) + }) + } + + /** Tests SubscriptionHandler Lock in publish, internalUnsubscribe and unsubscribeFromAllTopics. + * Unfortunately the performance impact is relatively large and the test isn't fully reliable. + */ + @Test + fun unsubscribeWhileReceiving() { + val setupTask = { barrier: CyclicBarrier -> + subscriptionHandler.subscribe(client, GlowTopic.PATCH, state) + subscriptionHandler.receive(queue.remove()) + assertEquals(expectedInitialStateString, client.messages.remove()) + assertTrue(isClientSubscribed()) + barrier.await(1, TimeUnit.SECONDS) + 1 + } + + val receiveTask = { barrier: CyclicBarrier -> + barrier.await(1, TimeUnit.SECONDS) + isClientSubscribed() + 1 + } + + val unsubscribeTask1 = { barrier: CyclicBarrier -> + barrier.await(1, TimeUnit.SECONDS) + subscriptionHandler.unsubscribeFromAllTopics(client) + 1 + } + + val unsubscribeTask2 = { barrier: CyclicBarrier -> + barrier.await(1, TimeUnit.SECONDS) + subscriptionHandler.unsubscribe(client, GlowTopic.PATCH) + 1 + } + + val taskList = mutableListOf(setupTask).apply { + repeat(4) {this.add(receiveTask)} + repeat(2) {this.add(unsubscribeTask1)} + repeat(2) {this.add(unsubscribeTask2)} + } + + concurrentTaskListTest(2000, taskList) + } + + /** + * Tests the state lock in SubscriptionHandler + */ + @Test + fun subscribeWhileChangingPatch() { + patch.addFixtureTypes(listOf(ExampleFixtureType.esprite)).unwrap() + val fixtureList = List(4) { + ExampleFixtureType.esprite_fixture.copy(uuid = UUID.randomUUID()) + } + + val changePatchTask = { barrier: CyclicBarrier -> + patch.addFixtures(fixtureList).unwrap() + barrier.await(1, TimeUnit.SECONDS) // start + fixtureList.forEach { + patch.removeFixtures(listOf(it.uuid)).unwrap() + } + assertEquals(0, patch.getFixtures().size) + barrier.await(1, TimeUnit.SECONDS) // main done + // dump message queue + while (queue.isNotEmpty()) { + subscriptionHandler.receive(queue.remove()) + } + barrier.await(1, TimeUnit.SECONDS) // queue dump done + Unit + } + + val subscribeTask = { barrier: CyclicBarrier -> + val threadClient = TestClient() + barrier.await(1, TimeUnit.SECONDS) // start + subscriptionHandler.subscribe(threadClient, GlowTopic.PATCH, state) + barrier.await(1, TimeUnit.SECONDS) // main done + barrier.await(1, TimeUnit.SECONDS) // queue dump done + // parse patch initial state + val initialJsonMessage = Parser.default().parse(StringBuilder(threadClient.messages.remove())) as JsonObject + val fixtureData = (initialJsonMessage["data"] as JsonObject)["fixtures"] as JsonArray<*> + val initialFixtureUuids = fixtureData + .map { (it as JsonObject)["uuid"] as String } + .map(UUID::fromString) + .toSet() + val fixtureUuidsRemovedInMessages = threadClient.messages + .map { GlowMessage.fromJsonString(it) as GlowMessage.RemoveFixtures } + .flatMap { it.data } + .toSet() + try { + assertEquals(initialFixtureUuids, fixtureUuidsRemovedInMessages) + } catch (e: Throwable) { + println("failing: initially got ${initialFixtureUuids.size} and removed ${fixtureUuidsRemovedInMessages.size}") + throw e + } + } + + val taskList = mutableListOf(changePatchTask) + repeat(3) {taskList.add(subscribeTask)} + + concurrentTaskListTest(3, taskList) + } +} \ No newline at end of file diff --git a/cueglow-server/src/test/kotlin/org/cueglow/server/json/JsonSubscriptionHandlerTest.kt b/cueglow-server/src/test/kotlin/org/cueglow/server/json/JsonSubscriptionHandlerTest.kt index b312029..be86862 100644 --- a/cueglow-server/src/test/kotlin/org/cueglow/server/json/JsonSubscriptionHandlerTest.kt +++ b/cueglow-server/src/test/kotlin/org/cueglow/server/json/JsonSubscriptionHandlerTest.kt @@ -5,7 +5,7 @@ import org.cueglow.server.StateProvider import org.cueglow.server.objects.messages.GlowMessage import org.cueglow.server.objects.messages.GlowPatch import org.cueglow.server.objects.messages.GlowTopic -import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import java.util.* import java.util.concurrent.LinkedBlockingQueue @@ -18,49 +18,55 @@ class JsonSubscriptionHandlerTest { private val subscriptionHandler = JsonSubscriptionHandler() private val testMessage = GlowMessage.RemoveFixtures(listOf()) + private val testMessageString = testMessage.toJsonString() private val expectedInitialState = GlowMessage.PatchInitialState(GlowPatch(listOf(), listOf())) + private val expectedInitialStateString = expectedInitialState.toJsonString() + + private fun isClientGettingMessage(): Boolean { + subscriptionHandler.receive(testMessage) + val receivedMessage = client.messages.poll() ?: return false + assertEquals(testMessageString, receivedMessage) + return true + } + + private fun clientReceivedInitialState() { + assertEquals(expectedInitialStateString, client.messages.remove()) + } @Test fun singleSubscriberLifecycle() { - subscriptionHandler.receive(testMessage) // without subscription, the message should not arrive - assertEquals(0, client.messages.size) + assertFalse(isClientGettingMessage()) subscriptionHandler.subscribe(client, GlowTopic.PATCH, state) // client should get initial state assertEquals(1, client.messages.size) - assertEquals(expectedInitialState.toJsonString(), client.messages.remove()) + clientReceivedInitialState() // SubscriptionHandler should have put a sync message into the outEventQueue assertEquals(1, outEventQueue.size) val syncMessage = outEventQueue.remove() as GlowMessage.Sync // updates should not get delivered until sync is delivered to the SubscriptionHandler - subscriptionHandler.receive(testMessage) - assertEquals(0, client.messages.size) + assertFalse(isClientGettingMessage()) // when a foreign sync message arrives, updates are still not delivered subscriptionHandler.receive(GlowMessage.Sync(UUID.randomUUID())) - subscriptionHandler.receive(testMessage) - assertEquals(0, client.messages.size) + assertFalse(isClientGettingMessage()) // now feed the sync message to SubscriptionHandler to "activate" subscription subscriptionHandler.receive(syncMessage) // updates should now get delivered - subscriptionHandler.receive(testMessage) - // now the client should have gotten the message - assertEquals(1, client.messages.size) - assertEquals(testMessage.toJsonString(), client.messages.remove()) + assertTrue(isClientGettingMessage()) // unsubscribe subscriptionHandler.unsubscribe(client, GlowTopic.PATCH) // now the client should not get another message because he unsubscribed - subscriptionHandler.receive(testMessage) - assertEquals(0, client.messages.size) + assertFalse(isClientGettingMessage()) } @Test @@ -68,15 +74,14 @@ class JsonSubscriptionHandlerTest { subscriptionHandler.subscribe(client, GlowTopic.PATCH, state) assertEquals(1, client.messages.size) - assertEquals(expectedInitialState.toJsonString(), client.messages.remove()) + clientReceivedInitialState() subscriptionHandler.receive(outEventQueue.remove()) // function under test - unsubscribe without specifying topic subscriptionHandler.unsubscribeFromAllTopics(client) - subscriptionHandler.receive(testMessage) - assertEquals(0, client.messages.size) + assertFalse(isClientGettingMessage()) } @Test @@ -88,46 +93,41 @@ class JsonSubscriptionHandlerTest { subscriptionHandler.subscribe(client, GlowTopic.PATCH, state) assertEquals(2, client.messages.size) - assertEquals(expectedInitialState.toJsonString(), client.messages.remove()) - assertEquals(expectedInitialState.toJsonString(), client.messages.remove()) + clientReceivedInitialState() + clientReceivedInitialState() // client should not get messages yet - subscriptionHandler.receive(testMessage) - assertEquals(0, client.messages.size) + assertFalse(isClientGettingMessage()) // feed sync message subscriptionHandler.receive(outEventQueue.remove()) // now client should get message - subscriptionHandler.receive(testMessage) - assertEquals(1, client.messages.size) - assertEquals(testMessage.toJsonString(), client.messages.remove()) + assertTrue(isClientGettingMessage()) } @Test fun unsubscribeAllBeforeSync() { subscriptionHandler.subscribe(client, GlowTopic.PATCH, state) - assertEquals(expectedInitialState.toJsonString(), client.messages.remove()) + clientReceivedInitialState() subscriptionHandler.unsubscribeFromAllTopics(client) // sync subscriptionHandler.receive(outEventQueue.remove()) // client should not get updates - subscriptionHandler.receive(testMessage) - assertEquals(0, client.messages.size) + assertFalse(isClientGettingMessage()) } @Test fun unsubscribeBeforeSync() { subscriptionHandler.subscribe(client, GlowTopic.PATCH, state) - assertEquals(expectedInitialState.toJsonString(), client.messages.remove()) + clientReceivedInitialState() subscriptionHandler.unsubscribe(client, GlowTopic.PATCH) // sync subscriptionHandler.receive(outEventQueue.remove()) // client should not get updates - subscriptionHandler.receive(testMessage) - assertEquals(0, client.messages.size) + assertFalse(isClientGettingMessage()) } } @@ -140,7 +140,7 @@ class TestClient: AsyncClient, Logging { } override fun send(message: String) { - logger.info("Client is instructed to send: $message") + logger.debug("Client is instructed to send: $message") messages.add(message) } } \ No newline at end of file diff --git a/cueglow-server/src/test/kotlin/org/cueglow/server/patch/ConcurrentPatchTest.kt b/cueglow-server/src/test/kotlin/org/cueglow/server/patch/ConcurrentPatchTest.kt new file mode 100644 index 0000000..bc18572 --- /dev/null +++ b/cueglow-server/src/test/kotlin/org/cueglow/server/patch/ConcurrentPatchTest.kt @@ -0,0 +1,162 @@ +package org.cueglow.server.patch + +import com.github.michaelbull.result.Ok +import com.google.common.truth.Truth.assertThat +import org.cueglow.server.test_utilities.ExampleFixtureType +import org.cueglow.server.test_utilities.concurrentTaskListTest +import org.junit.jupiter.api.Test +import java.util.* +import java.util.concurrent.* +import java.util.concurrent.locks.ReentrantLock + +class ConcurrentPatchTest { + val patch = Patch(LinkedBlockingQueue(), ReentrantLock()) + + private val exampleFixtureTypeList = listOf( + ExampleFixtureType.esprite, + ExampleFixtureType.channelLayoutTestGdtf + ) + + private val threadCount = 4 + private val iterationCount = 100 + + private fun printPatchStateAfterException() { + println("failing with ${patch.getFixtures().size} fixtures: ${patch.getFixtures()}") + println("failing with ${patch.getFixtureTypes().size} fixture types: ${patch.getFixtureTypes()}") + } + + /** + * wrapper around [concurrentTaskListTest] from utilites to always use the same [iterationCount] and + * [printPatchStateAfterException]. + */ + private fun concurrentTaskListTest(concurrentTaskList: List<(CyclicBarrier) -> T>, afterConcurrentTasks: (List) -> Unit) { + concurrentTaskListTest(iterationCount, concurrentTaskList, afterConcurrentTasks, ::printPatchStateAfterException) + } + + @Test + fun updateAndRemoveFixtures() { + patch.addFixtureTypes(listOf(ExampleFixtureType.esprite)) + + val fixtureList = List(10) { + ExampleFixtureType.esprite_fixture.copy(uuid = UUID.randomUUID()) + } + patch.addFixtures(fixtureList) + + val removeTask = { barrier: CyclicBarrier -> + fixtureList.map { it.uuid }.forEach { + barrier.await(2, TimeUnit.SECONDS) + patch.removeFixtures(listOf(it)) + } + } + + val updateTask = { barrier: CyclicBarrier -> + fixtureList.map { it.uuid }.forEach { + val update = PatchFixtureUpdate(it, fid = ThreadLocalRandom.current().nextInt()) + barrier.await(2, TimeUnit.SECONDS) + patch.updateFixtures(listOf(update)) + } + } + + // one threads removes, others update + val taskList = mutableListOf(removeTask).apply { + repeat(threadCount - 1) { this.add(updateTask) } + } + + concurrentTaskListTest(taskList) { + assertThat(patch.getFixtures()).hasSize(0) + } + } + + private fun singleTaskConcurrentTest(task: (CyclicBarrier) -> T, afterConcurrentTest: (List) -> Unit) { + org.cueglow.server.test_utilities.singleTaskConcurrentTest(threadCount, iterationCount, task, afterConcurrentTest, ::printPatchStateAfterException) + } + + @Test + fun addingAndRemovingFixtureConcurrently() { + patch.addFixtureTypes(listOf(ExampleFixtureType.esprite)) + + singleTaskConcurrentTest( + task = { barrier -> + barrier.await(2, TimeUnit.SECONDS) + val addResult = patch.addFixtures(listOf(ExampleFixtureType.esprite_fixture)) + barrier.await(2, TimeUnit.SECONDS) + val removeResult = patch.removeFixtures(listOf(ExampleFixtureType.esprite_fixture.uuid)) + Pair(addResult, removeResult) + }, + afterConcurrentTest = { results -> + val (addResults, removeResults) = results.unzip() + assertThat(addResults.filterIsInstance>()).hasSize(1) + assertThat(removeResults.filterIsInstance>()).hasSize(1) + assertThat(patch.getFixtures()).hasSize(0) + } + ) + } + + @Test + fun addingAndRemovingFixtureTypesConcurrently() = singleTaskConcurrentTest( + task = { barrier -> + barrier.await(2, TimeUnit.SECONDS) + val addResult = patch.addFixtureTypes(exampleFixtureTypeList) + barrier.await(2, TimeUnit.SECONDS) + val removeResult = patch.removeFixtureTypes(exampleFixtureTypeList.map{it.fixtureTypeId}) + Pair(addResult, removeResult) + }, + afterConcurrentTest = { results -> + val (addResults, removeResults) = results.unzip() + assertThat(addResults.filterIsInstance>()).hasSize(1) + assertThat(removeResults.filterIsInstance>()).hasSize(1) + assertThat(patch.getFixtureTypes()).hasSize(0) + } + ) + + private fun setupExampleFixtures() { + patch.addFixtureTypes(exampleFixtureTypeList) + patch.addFixtures(listOf(ExampleFixtureType.esprite_fixture, ExampleFixtureType.esprite_fixture2, ExampleFixtureType.channelLayoutTestGdtfFixture)) + } + + private fun testGettersConcurrentlyWhileRemoving(getterLambda: (CyclicBarrier) -> Unit) { + val removeTask = { barrier: CyclicBarrier -> + barrier.await(2, TimeUnit.SECONDS) + patch.removeFixtureTypes(exampleFixtureTypeList.map{it.fixtureTypeId}) + Unit + } + + val taskList = mutableListOf(removeTask).apply { + repeat(threadCount - 1) {this.add(getterLambda)} + } + + setupExampleFixtures() + + concurrentTaskListTest(taskList) { + setupExampleFixtures() + } + } + + @Test + fun getFixturesReturnsCoherentStateDuringRemove() { + testGettersConcurrentlyWhileRemoving{ barrier -> + barrier.await(2, TimeUnit.SECONDS) + val returnedFixtures = patch.getFixtures() + assertThat(returnedFixtures.size).isAnyOf(0, 3) + } + } + + @Test + fun getFixtureTypesReturnsCoherentStateDuringRemove() { + testGettersConcurrentlyWhileRemoving{ barrier -> + barrier.await(2, TimeUnit.SECONDS) + val returnedFixtureTypes = patch.getFixtureTypes() + assertThat(returnedFixtureTypes.size).isAnyOf(0, 2) + } + } + + @Test + fun getGlowPatchReturnsCoherentStateDuringRemove() { + testGettersConcurrentlyWhileRemoving{ barrier -> + barrier.await(2, TimeUnit.SECONDS) + val returnedGlowPatch = patch.getGlowPatch() + assertThat(returnedGlowPatch.fixtures.size).isAnyOf(0, 3) + assertThat(returnedGlowPatch.fixtureTypes.size).isAnyOf(0, 2) + } + } +} \ No newline at end of file diff --git a/cueglow-server/src/test/kotlin/org/cueglow/server/patch/PatchSubscriptionTest.kt b/cueglow-server/src/test/kotlin/org/cueglow/server/patch/PatchSubscriptionTest.kt index e83f309..294da1b 100644 --- a/cueglow-server/src/test/kotlin/org/cueglow/server/patch/PatchSubscriptionTest.kt +++ b/cueglow-server/src/test/kotlin/org/cueglow/server/patch/PatchSubscriptionTest.kt @@ -1,12 +1,10 @@ package org.cueglow.server.patch -import com.github.michaelbull.result.Ok import org.cueglow.server.json.toJsonString import org.cueglow.server.objects.messages.GlowMessage import org.cueglow.server.objects.messages.GlowPatch import org.cueglow.server.test_utilities.ClientAndServerTest import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test class PatchSubscriptionTest: ClientAndServerTest() { diff --git a/cueglow-server/src/test/kotlin/org/cueglow/server/patch/PatchTest.kt b/cueglow-server/src/test/kotlin/org/cueglow/server/patch/PatchTest.kt index cd05375..87a61bc 100644 --- a/cueglow-server/src/test/kotlin/org/cueglow/server/patch/PatchTest.kt +++ b/cueglow-server/src/test/kotlin/org/cueglow/server/patch/PatchTest.kt @@ -1,19 +1,22 @@ package org.cueglow.server.patch import com.github.michaelbull.result.Err +import com.github.michaelbull.result.Ok import com.github.michaelbull.result.unwrap import org.cueglow.server.objects.ArtNetAddress import org.cueglow.server.objects.DmxAddress -import org.cueglow.server.objects.messages.GlowMessage import org.cueglow.server.test_utilities.ExampleFixtureType -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows import java.util.* import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.locks.ReentrantLock internal class PatchTest { + private val patch = Patch(LinkedBlockingQueue(), ReentrantLock()) + private val exampleFixtureType = ExampleFixtureType.esprite private val exampleFixture = PatchFixture(UUID.randomUUID(),1, "", exampleFixtureType.fixtureTypeId, @@ -23,8 +26,6 @@ internal class PatchTest { @Test fun patchList() { - // instantiate - val patch = Patch(LinkedBlockingQueue()) assertTrue(patch.getFixtures().isEmpty()) assertTrue(patch.getFixtureTypes().isEmpty()) @@ -79,7 +80,6 @@ internal class PatchTest { @Test fun getGlowPatchIsImmutable() { - val patch = Patch(LinkedBlockingQueue()) patch.addFixtureTypes(listOf(exampleFixtureType)).unwrap() patch.addFixtures(listOf(exampleFixture)).unwrap() val glowPatch = patch.getGlowPatch() @@ -88,4 +88,52 @@ internal class PatchTest { assertEquals(2, patch.getFixtures().size) assertEquals(1, glowPatch.fixtures.size) } + + @Test + fun getFixturesIsImmutable() { + patch.addFixtureTypes(listOf(exampleFixtureType)).unwrap() + patch.addFixtures(listOf(exampleFixture)).unwrap() + // get fixtures + val fixtures = patch.getFixtures() + assertEquals(1, fixtures.size) + // change fixtures + patch.addFixtures(listOf(exampleFixture2)).unwrap() + // getting again should change, the map we got before should not change + assertEquals(2, patch.getFixtures().size) + assertEquals(1, fixtures.size) + // we should not be able to cast the map to a mutable map + assertThrows { (fixtures as MutableMap).remove(exampleFixture.uuid) } + // nothing should have changed from trying to cast + assertEquals(2, patch.getFixtures().size) + assertEquals(1, fixtures.size) + // update a fixture + patch.updateFixtures(listOf(PatchFixtureUpdate( + exampleFixture.uuid, + fid = 42, + name = "newName", + universe = Ok(ArtNetAddress.tryFrom(42).unwrap()), + address = Ok(DmxAddress.tryFrom(42).unwrap())))) + // old map should not change, getting again should change + assertTrue(exampleFixture.isSimilar(fixtures[exampleFixture.uuid]!!)) + assertFalse(exampleFixture.isSimilar(patch.getFixtures()[exampleFixture.uuid]!!)) + } + + @Test + fun getFixtureTypesIsImmutable() { + patch.addFixtureTypes(listOf(exampleFixtureType)).unwrap() + patch.addFixtures(listOf(exampleFixture)).unwrap() + // get fixture types + val fixtureTypes = patch.getFixtureTypes() + assertEquals(1, fixtureTypes.size) + // remove the fixture type + patch.removeFixtureTypes(listOf(exampleFixtureType.fixtureTypeId)) + // getting again should change, the map we got before should not change + assertEquals(0, patch.getFixtureTypes().size) + assertEquals(1, fixtureTypes.size) + // we should not be able to cast the map to a mutable map + assertThrows { (fixtureTypes as MutableMap).remove(exampleFixtureType.fixtureTypeId) } + // nothing should have changed from trying to cast + assertEquals(0, patch.getFixtureTypes().size) + assertEquals(1, fixtureTypes.size) + } } \ No newline at end of file diff --git a/cueglow-server/src/test/kotlin/org/cueglow/server/test_utilities/Concurrent.kt b/cueglow-server/src/test/kotlin/org/cueglow/server/test_utilities/Concurrent.kt new file mode 100644 index 0000000..fb3dc9d --- /dev/null +++ b/cueglow-server/src/test/kotlin/org/cueglow/server/test_utilities/Concurrent.kt @@ -0,0 +1,70 @@ +package org.cueglow.server.test_utilities + +import java.util.concurrent.Callable +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.Executors + +/** + * Takes a list of tasks and executes each on a separate Thread. + * To synchronize, the tasks can take a [CyclicBarrier] that resets after all threads awaited it. + * Then, [afterConcurrentTasks] can be used to assert on the results returned by the tasks. + * If an exception is thrown during execution of the tasks or during [afterConcurrentTasks], debug information + * can be printed in [afterException]. + * All of the above is repeated [repeat] times. + */ +fun concurrentTaskListTest( + repeat: Int, + concurrentTaskList: List<(CyclicBarrier) -> T>, + afterConcurrentTasks: (List) -> Unit = {}, + afterException: () -> Unit = {}, + ) { + val threadCount = concurrentTaskList.size + val barrier = CyclicBarrier(threadCount) + val pool = Executors.newFixedThreadPool(threadCount)!! + + val callableList = concurrentTaskList.map { + Callable { + it(barrier) + } + } + + fun printExceptionInfo(iteration: Int) { + println("failing in iteration $iteration") + afterException() + } + + repeat(repeat) { iteration -> + val futures = pool.invokeAll(callableList) + + val results = try { + futures.map { it.get() } + } catch (error: Throwable) { + printExceptionInfo(iteration) + throw Error("Error during execution of concurrent tasks", error) + } + + try { + afterConcurrentTasks(results) + } catch (error: Throwable) { + printExceptionInfo(iteration) + println("failing list of Results: $results") + throw Error("Error during afterConcurrentTest", error) + } + } +} + +/** + * Wrapper around [concurrentTaskListTest] that runs a single task on a [threadCount] threads. + * + * For details, see [concurrentTaskListTest]. + */ +fun singleTaskConcurrentTest( + threadCount: Int, + repeat: Int, + task: (CyclicBarrier) -> T, + afterConcurrentTasks: (List) -> Unit, + afterException: () -> Unit, +) { + val taskList = List(threadCount) {task} + concurrentTaskListTest(repeat, taskList, afterConcurrentTasks, afterException) +} \ No newline at end of file diff --git a/cueglow-server/src/test/kotlin/org/cueglow/server/test_utilities/ExampleFixtureType.kt b/cueglow-server/src/test/kotlin/org/cueglow/server/test_utilities/ExampleFixtureType.kt index ccc95a3..d2a4811 100644 --- a/cueglow-server/src/test/kotlin/org/cueglow/server/test_utilities/ExampleFixtureType.kt +++ b/cueglow-server/src/test/kotlin/org/cueglow/server/test_utilities/ExampleFixtureType.kt @@ -26,6 +26,25 @@ object ExampleFixtureType { DmxAddress.tryFrom(1).unwrap(), ) + val esprite_fixture2 = esprite_fixture.copy( + uuid = UUID.randomUUID(), + fid = 2, + name = "exampleFixture2", + address = DmxAddress(100) + ) + + val channelLayoutTestGdtf = fixtureTypeFromGdtfResource("ChannelLayoutTest/Test@Channel_Layout_Test@v1_first_try.gdtf", this.javaClass) + + val channelLayoutTestGdtfFixture = PatchFixture( + UUID.fromString("6c0e661f-058e-4331-b673-db836aefc9cb"), + 10, + "channelLayoutTest1", + channelLayoutTestGdtf.fixtureTypeId, + "Mode 1", + ArtNetAddress(2), + DmxAddress(1) + ) + // additional: Global settings for Awaitility init { Awaitility.setDefaultPollInterval(fibonacci()) diff --git a/dev-docs/Concurrency and Locking Strategy.md b/dev-docs/Concurrency and Locking Strategy.md new file mode 100644 index 0000000..5270d66 --- /dev/null +++ b/dev-docs/Concurrency and Locking Strategy.md @@ -0,0 +1,25 @@ +# Concurrency and Locking Strategy + +The used server framework, Javalin, builds upon Jetty where each REST handler +and each WebSocket connection is executed concurrently in a thread pool, so the +structures that the handlers access need to be thread-safe. + +We ensure this by locking on `StateProvider` with `StateProvider.lock` in all +access methods. While holding this lock, the methods notify subscribers by +putting a `GlowMessage` into the queue provided by `OutEventHandler`. These +outgoing events are picked up on another thread and passed to the +`JsonSubscriptionHandler`. This Handler itself is locked during all access +methods, as it can be accessed concurrently by either the +OutEventHandler-Thread or the Jetty Servlets. + +Some concurrent Tests are provided to test proper locking, but it must be noted +that the performance cost and cost of developing these concurrent tests is high +compared to their utility. + +## Locking Order + +To reduce the likelihood of deadlock, if a thread at any time needs to hold more +than lock, it should acquire them in the given order: + +1. JsonSubscriptionHandler.lock +2. StateProvider.lock \ No newline at end of file