Skip to content

Commit

Permalink
Thread Safety (#42)
Browse files Browse the repository at this point in the history
* Test and Implement Immutable Patch Getters

We may want to move from copy-on-read to copy-on-write
for performance reasons in the future.

* Concurrently Test Patch Getters (Failing)

Code may not be super readable yet, this is the first
time I've written such concurrent code. May need some
refactoring down the line to improve clarity.

* Locking in Patch Methods

Patch now passes the previously failing concurrency tests after
introducing locking on all methods.
The Lock is provided by the StateProvider.

* Concurrent Tests for AddFixtures & AddFixtureTypes (failing)

* Add Some Concurrent Tests (failing)

4 out of 6 are quite janky.

* Make Concurrent Patch Tests Reliable (failing)

Works pretty reliably on my machine.

* Make Tests Pass By Enabling Lock Again

* Refactor and Clean Concurrent Patch Tests

Includes new Dependency: Google Truth for better assertions.
I liked the API and error messages.

* Refactor Concurrent Test for Reusability

* Make SubscriptionHandler Thread Safe

Only Partial Test Coverage of Thread Safety

* Add Concurrent Test for publish Lock

and for lock in unsubscribeFromAll

* Extend Concurrent Test to unsubscribe

* Test State Lock in SubscriptionHandler

* Clean Up State Lock Test

* Add Documentation

* Add Dev-Doc about Concurrency

* Readability Refactors in JsonSubscriptionHandlerTest

* Add Log-Retry Error Handling to OutEventHandler
  • Loading branch information
Firionus authored May 8, 2021
1 parent 38e85d8 commit 40c7329
Show file tree
Hide file tree
Showing 17 changed files with 640 additions and 114 deletions.
1 change: 1 addition & 0 deletions cueglow-server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
testImplementation("com.github.kittinunf.fuel:fuel:2.3.1")
testImplementation("org.java-websocket:Java-WebSocket:1.5.1")
testImplementation("org.awaitility:awaitility-kotlin:4.0.3")
testImplementation("com.google.truth:truth:1.1.2")

implementation("com.michael-bull.kotlin-result:kotlin-result:1.1.9")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.eclipse.jetty.server.Server


fun main(args: Array<String>) {
// TODO what happens if there is an exception in this main thread?
CueGlowServer()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@ class OutEventHandler(receivers: Iterable<OutEventReceiver>): Logging {
init {
Executors.newSingleThreadExecutor().submit {
while (true) {
val glowMessage = queue.take()
logger.info("Handling OutEvent: $glowMessage")
receivers.forEach { it.receive(glowMessage) }
try {
val glowMessage = queue.take()
logger.debug("Handling OutEvent: $glowMessage")
receivers.forEach { it.receive(glowMessage) }
} catch (e: Throwable) {
logger.error(e)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import org.cueglow.server.json.JsonHandler
import org.cueglow.server.objects.messages.GlowMessage
import org.cueglow.server.patch.Patch
import java.util.concurrent.BlockingQueue
import java.util.concurrent.locks.ReentrantLock

/**
* Provides a collection of state objects
*
* The StateProvider is initialized by the main process and passed to e.g. a [JsonHandler] for mutation.
*/
class StateProvider(val outEventQueue: BlockingQueue<GlowMessage>) {
val patch = Patch(outEventQueue)
val lock = ReentrantLock()
val patch = Patch(outEventQueue, lock)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.cueglow.server.objects.messages.*
*
* @property[value] A Short representing the 15-bit Port-Address.
*/
// TODO constructor cannot be private due to Klaxon, so do validation in constructor instead of companion object by throwing
data class ArtNetAddress constructor(val value: Short) {

companion object Factory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.cueglow.server.objects.messages.InvalidDmxAddress
*
* @property[value] A Short representing the DMX Address.
*/
// TODO constructor can't be private due to Klaxon, so move validation of input from companion object to throwing constructor
data class DmxAddress constructor(val value: Short) {
companion object Factory {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ import org.cueglow.server.OutEventReceiver
import org.cueglow.server.StateProvider
import org.cueglow.server.json.AsyncClient
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

/**
* Handles Subscribe/Unsubscribe Events. Receives OutEvents from the OutEventHandler and sends them to the subscribers.
**/
abstract class SubscriptionHandler: OutEventReceiver, Logging {
private val activeSubscriptions = EnumMap<GlowTopic, MutableSet<AsyncClient>>(GlowTopic::class.java) // TODO synchronize (see JavaDoc for EnumMap)
val lock = ReentrantLock()

private val activeSubscriptions = EnumMap<GlowTopic, MutableSet<AsyncClient>>(GlowTopic::class.java)

/** Keeps subscriptions that were sent the initial state but do not get updates yet because older updates
* in the OutEventQueue first need to be handled. Subscriptions move from pending to active once the sync message
Expand All @@ -27,7 +32,7 @@ abstract class SubscriptionHandler: OutEventReceiver, Logging {

/** Receive and handle messages from the OutEventQueue **/
override fun receive(glowMessage: GlowMessage) {
logger.info("Receiving $glowMessage")
logger.debug("Receiving $glowMessage")

when (glowMessage.event) {
GlowEvent.ADD_FIXTURES, GlowEvent.UPDATE_FIXTURES,
Expand All @@ -41,17 +46,21 @@ abstract class SubscriptionHandler: OutEventReceiver, Logging {
}

private fun publish(topic: GlowTopic, glowMessage: GlowMessage) {
val topicSubscribers = activeSubscriptions[topic]
if (topicSubscribers!!.isNotEmpty()) { // null asserted because all possible keys are initialized in init block
val messageString = serializeMessage(glowMessage)
topicSubscribers.forEach {it.send(messageString)}
lock.withLock {
val topicSubscribers = activeSubscriptions[topic]
if (topicSubscribers!!.isNotEmpty()) { // null asserted because all possible keys are initialized in init block
val messageString = serializeMessage(glowMessage)
topicSubscribers.forEach {it.send(messageString)}
}
}
}

/** Check if [syncUuid] is known. If yes, move subscription from pending to active **/
private fun activateSubscription(syncUuid: UUID) {
val (topic, subscriber) = pendingSubscriptions.remove(syncUuid) ?: return
activeSubscriptions[topic]!!.add(subscriber) // null asserted because all possible keys are initialized in init block
lock.withLock {
val (topic, subscriber) = pendingSubscriptions.remove(syncUuid) ?: return
activeSubscriptions[topic]!!.add(subscriber) // null asserted because all possible keys are initialized in init block
}
}

fun subscribe(subscriber: AsyncClient, topic: GlowTopic, state: StateProvider) {
Expand All @@ -61,12 +70,15 @@ abstract class SubscriptionHandler: OutEventReceiver, Logging {
GlowTopic.PATCH -> {
val syncUuid = UUID.randomUUID()
val syncMessage = GlowMessage.Sync(syncUuid)
pendingSubscriptions[syncUuid] = Pair(GlowTopic.PATCH, subscriber)
// TODO acquire state lock here
val initialPatchState = state.patch.getGlowPatch()
state.outEventQueue.put(syncMessage) // TODO possible deadlock because SubscriptionHandler is locked and cannot work to reduce message count in queue
// no deadlock problem if we don't have the SubscriptionHandler Lock here?
// TODO release state lock here
val initialPatchState = lock.withLock {
val initialPatchState = state.lock.withLock {
// -> need to test multiple threads subscribing while changes are happening -> all threads should have same state in the end with their respective updates applied
state.outEventQueue.offer(syncMessage, 1, TimeUnit.SECONDS)
state.patch.getGlowPatch()
}
pendingSubscriptions[syncUuid] = Pair(GlowTopic.PATCH, subscriber)
initialPatchState
}
val initialMessage = GlowMessage.PatchInitialState(initialPatchState)
subscriber.send(initialMessage)
}
Expand All @@ -79,19 +91,23 @@ abstract class SubscriptionHandler: OutEventReceiver, Logging {

/** Returns true if the subscriber was successfully unsubscribed and false if the subscriber wasn't subscribed */
private fun internalUnsubscribe(subscriber: AsyncClient, topic: GlowTopic): Boolean {
val numberOfSubscriptionsRemovedFromPending = pendingSubscriptions
.filter {it.value.first == topic && it.value.second == subscriber}
.keys
.map {pendingSubscriptions.remove(it)}
.size
val removedFromActive = activeSubscriptions[topic]!!.remove(subscriber) // null asserted because all possible keys are initialized in init block
return removedFromActive || numberOfSubscriptionsRemovedFromPending > 0
lock.withLock {
val numberOfSubscriptionsRemovedFromPending = pendingSubscriptions
.filter {it.value.first == topic && it.value.second == subscriber}
.keys
.map {pendingSubscriptions.remove(it)}
.size
val removedFromActive = activeSubscriptions[topic]!!.remove(subscriber) // null asserted because all possible keys are initialized in init block
return removedFromActive || numberOfSubscriptionsRemovedFromPending > 0
}
}

fun unsubscribeFromAllTopics(subscriber: AsyncClient) {
pendingSubscriptions.filter {it.value.second == subscriber}.keys.map { pendingSubscriptions.remove(it) }
activeSubscriptions.values.forEach {
it.remove(subscriber)
lock.withLock {
pendingSubscriptions.filter {it.value.second == subscriber}.keys.map { pendingSubscriptions.remove(it) }
activeSubscriptions.values.forEach {
it.remove(subscriber)
}
}
}
}
91 changes: 47 additions & 44 deletions cueglow-server/src/main/kotlin/org/cueglow/server/patch/Patch.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package org.cueglow.server.patch

import com.github.michaelbull.result.*
import org.cueglow.server.gdtf.GdtfWrapper
import org.cueglow.server.objects.ImmutableMap
import org.cueglow.server.objects.messages.*
import java.util.*
import java.util.concurrent.BlockingQueue
import java.util.concurrent.locks.Lock
import kotlin.concurrent.withLock
import kotlin.reflect.KClass
import kotlin.reflect.full.primaryConstructor

Expand All @@ -14,55 +15,61 @@ import kotlin.reflect.full.primaryConstructor
*
* The data is isolated such that it can only be modified by methods that notify the StreamHandler on change.
*/
class Patch(private val outEventQueue: BlockingQueue<GlowMessage>) {
class Patch(private val outEventQueue: BlockingQueue<GlowMessage>, val lock: Lock) {
private val fixtures: HashMap<UUID, PatchFixture> = HashMap()
private val fixtureTypes: HashMap<UUID, GdtfWrapper> = HashMap()

fun getFixtures() = ImmutableMap(this.fixtures)
// TODO maybe move getters from copy-on-read to copy-on-write for possible performance improvement

fun getFixtureTypes() = ImmutableMap(this.fixtureTypes)
/** Returns an immutable copy of the fixtures in the Patch. **/
fun getFixtures() = lock.withLock{
fixtures.toMap()
}

/** Returns an immutable copy of the Patch */
fun getGlowPatch(): GlowPatch = GlowPatch(fixtures.values.toList(), fixtureTypes.values.toList())
/** Returns an immutable copy of the fixture types in the Patch. **/
fun getFixtureTypes() = lock.withLock {
fixtureTypes.toMap()
}

/**
* Execute [lambda] for every element in [collection]. If the result of [lambda] is an Error, add it to the
* error list which is returned once all elements are dealt with.
*/
private fun <T, E>executeWithErrorList(collection: Iterable<T>, lambda: (T) -> Result<Unit, E>): Result<Unit, List<E>> {
val errorList: MutableList<E> = mutableListOf()
collection.forEach{ element ->
lambda(element).getError()?.let{errorList.add(it)}
}
if (errorList.isNotEmpty()) {return Err(errorList)}
return Ok(Unit)
/** Returns an immutable copy of the Patch */
fun getGlowPatch(): GlowPatch = lock.withLock {
GlowPatch(fixtures.values.toList(), fixtureTypes.values.toList())
}

/**
* Calls [executeWithErrorList] but also keeps a list of successful operations. When the operations are done, the
* successful operations are wrapped in the specified [messageType] and added to the [outEventQueue].
* Execute [lambda] for every element in [collection]. If the result of [lambda] is [Ok], wrap it in the specified
* [messageType] and add it to the [outEventQueue]. If the result of [lambda] is an Error, add it to the
* error list which is returned once all elements are dealt with.
*
* This function handles locking for [Patch] modifications. All modifications in [Patch] must only call this
* function and nothing else to ensure thread safety.
*/
private fun <T,R,E>executeWithErrorListAndSendOutEvent(messageType: KClass<out GlowMessage>, collection: Iterable<T>, lambda: (T) -> Result<R, E>): Result<Unit, List<E>> {
private fun <T,R,E>executeWithErrorListAndSendOutEvent(
messageType: KClass<out GlowMessage>,
collection: Iterable<T>,
lambda: (T) -> Result<R, E>
): Result<Unit, List<E>> {
val successList = mutableListOf<R>()
val mainResult = executeWithErrorList(collection) { collectionElement ->
lambda(collectionElement).map{
successList.add(it)
Unit
val errorList= mutableListOf<E>()
lock.withLock {
collection.forEach{ element ->
lambda(element).mapError{errorList.add(it)}.map{successList.add(it)}
}
if (successList.isNotEmpty()) {
val glowMessage = messageType.primaryConstructor?.call(successList, null) ?: throw IllegalArgumentException("messageType does not have a primary constructor")
outEventQueue.put(glowMessage) // TODO possibly blocks rendering/etc. but the changes were already made so the message needs to be put into the queue
}
}
if (successList.isNotEmpty()) {
val glowMessage = messageType.primaryConstructor?.call(successList, null) ?: throw IllegalArgumentException("messageType does not have a primary constructor")
outEventQueue.put(glowMessage) // TODO possibly blocks rendering/etc. but the changes were already made so the message needs to be put into the queue
}
return mainResult
if (errorList.isNotEmpty()) {return Err(errorList)}
return Ok(Unit)
}

// -------------------
// Modify Fixture List
// -------------------

fun addFixtures(fixturesToAdd: Iterable<PatchFixture>): Result<Unit, List<GlowError>> {
return executeWithErrorListAndSendOutEvent(GlowMessage.AddFixtures::class, fixturesToAdd) eachFixture@{ patchFixtureToAdd ->
fun addFixtures(fixturesToAdd: Iterable<PatchFixture>): Result<Unit, List<GlowError>> =
executeWithErrorListAndSendOutEvent(GlowMessage.AddFixtures::class, fixturesToAdd) eachFixture@{ patchFixtureToAdd ->
// validate uuid does not exist yet
if (fixtures.contains(patchFixtureToAdd.uuid)) {
return@eachFixture Err(FixtureUuidAlreadyExistsError(patchFixtureToAdd.uuid))
Expand All @@ -78,10 +85,10 @@ class Patch(private val outEventQueue: BlockingQueue<GlowMessage>) {
fixtures[patchFixtureToAdd.uuid] = patchFixtureToAdd
return@eachFixture Ok(patchFixtureToAdd)
}
}

fun updateFixtures(fixtureUpdates: Iterable<PatchFixtureUpdate>): Result<Unit, List<GlowError>> {
return executeWithErrorListAndSendOutEvent(GlowMessage.UpdateFixtures::class, fixtureUpdates) eachUpdate@{ fixtureUpdate ->

fun updateFixtures(fixtureUpdates: Iterable<PatchFixtureUpdate>): Result<Unit, List<GlowError>> =
executeWithErrorListAndSendOutEvent(GlowMessage.UpdateFixtures::class, fixtureUpdates) eachUpdate@{ fixtureUpdate ->
// validate fixture uuid exists already
val oldFixture = fixtures[fixtureUpdate.uuid] ?: run {
return@eachUpdate Err(UnknownFixtureUuidError(fixtureUpdate.uuid))
Expand All @@ -95,37 +102,33 @@ class Patch(private val outEventQueue: BlockingQueue<GlowMessage>) {
fixtures[newFixture.uuid] = newFixture
return@eachUpdate Ok(fixtureUpdate)
}
}

fun removeFixtures(uuids: Iterable<UUID>): Result<Unit, List<UnknownFixtureUuidError>> {
return executeWithErrorListAndSendOutEvent(GlowMessage.RemoveFixtures::class, uuids) eachFixture@{ uuidToRemove ->
fun removeFixtures(uuids: Iterable<UUID>): Result<Unit, List<UnknownFixtureUuidError>> =
executeWithErrorListAndSendOutEvent(GlowMessage.RemoveFixtures::class, uuids) eachFixture@{ uuidToRemove ->
fixtures.remove(uuidToRemove) ?: return@eachFixture Err(UnknownFixtureUuidError(uuidToRemove))
return@eachFixture Ok(uuidToRemove)
}
}

// ------------------------
// Modify Fixture Type List
// ------------------------

fun addFixtureTypes(fixtureTypesToAdd: Iterable<GdtfWrapper>): Result<Unit, List<FixtureTypeAlreadyExistsError>> {
return executeWithErrorListAndSendOutEvent(GlowMessage.AddFixtureTypes::class, fixtureTypesToAdd) eachFixtureType@{ fixtureTypeToAdd ->
fun addFixtureTypes(fixtureTypesToAdd: Iterable<GdtfWrapper>): Result<Unit, List<FixtureTypeAlreadyExistsError>> =
executeWithErrorListAndSendOutEvent(GlowMessage.AddFixtureTypes::class, fixtureTypesToAdd) eachFixtureType@{ fixtureTypeToAdd ->
// validate fixture type is not patched already
if (fixtureTypes.containsKey(fixtureTypeToAdd.fixtureTypeId)) {
return@eachFixtureType Err(FixtureTypeAlreadyExistsError(fixtureTypeToAdd.fixtureTypeId))
}
fixtureTypes[fixtureTypeToAdd.fixtureTypeId] = fixtureTypeToAdd
return@eachFixtureType Ok(fixtureTypeToAdd)
}
}

fun removeFixtureTypes(fixtureTypeIdsToRemove: List<UUID>): Result<Unit, List<UnpatchedFixtureTypeIdError>> {
return executeWithErrorListAndSendOutEvent(GlowMessage.RemoveFixtureTypes::class, fixtureTypeIdsToRemove) eachFixtureType@{ fixtureTypeIdToRemove ->
fun removeFixtureTypes(fixtureTypeIdsToRemove: List<UUID>): Result<Unit, List<UnpatchedFixtureTypeIdError>> =
executeWithErrorListAndSendOutEvent(GlowMessage.RemoveFixtureTypes::class, fixtureTypeIdsToRemove) eachFixtureType@{ fixtureTypeIdToRemove ->
fixtureTypes.remove(fixtureTypeIdToRemove) ?:
return@eachFixtureType Err(UnpatchedFixtureTypeIdError(fixtureTypeIdToRemove))
// remove associated fixtures
fixtures.filter { it.value.fixtureTypeId == fixtureTypeIdToRemove }.keys.let {this.removeFixtures(it)}
return@eachFixtureType Ok(fixtureTypeIdToRemove)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package org.cueglow.server.gdtf

import org.cueglow.server.test_utilities.fixtureTypeFromGdtfResource
import org.cueglow.server.test_utilities.ExampleFixtureType
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test

internal class ChannelLayoutTest {
private val exampleFixtureType =
fixtureTypeFromGdtfResource("ChannelLayoutTest/Test@Channel_Layout_Test@v1_first_try.gdtf", this.javaClass)
private val exampleFixtureType = ExampleFixtureType.channelLayoutTestGdtf

@Test
fun testFindingAbstractGeometries() {
Expand Down
Loading

0 comments on commit 40c7329

Please sign in to comment.