Skip to content

Commit

Permalink
Subscription Handling (#38)
Browse files Browse the repository at this point in the history
* Make Example GdtfWrapper for Tests a Singleton

Now, the example GDTF file only needs to be parsed once and the
resulting GdtfWrapper can be shared between all tets.

* Implement and Test Out Event for AddFixtureTypes

* Make Queue Tests for AddFixtureTypes More Complex

* Refactor Queue Notification for Patch into Generic Function

* Implement Out Events for RemoveFixtureTypes

* Implement Out Events for Fixture Operations in Patch

* RemoveFixture OutEvent when Removing FixtureType

* Make Putting Messages into Queue Blocking

* Scaffolding for OutEventHandler

Logging outgoing events is already working

* First Unit-Tested Implementation of JsonSubscriptionHandler

* Serialize JSON Message Only Once for All Subscribers

* Move Example Fixture & Awaitility Config to Utilities

* Move Integration Test Harness to Utilities

* Move GDTF Integration Tests to Specific Class

* Move Add Fixture Integration Tests to Specific Class

* Move Update Fixture Integration Tests to Specific Class

* Move Remove Fixture Integration Tests to Specific Class

* Clean Up Test Names in Specific Classes

Also clean up imports in whole project

* Delete integration Test Package

by moving specific classes to the specific packages

* Delete Documentation of integration Test Package

* Rename RemoveFixtureTest

* Move JsonSubscriptionHandler to Generic SubscriptionHandler

* Handle Subscribe in IncomingGlowRequestHandler

required some changes for dependency injection

* Implement Sync in SubscriptionHandler

* Rename JsonTopic to GlowTopic

* Integration Test for Subscription

* Test Random Sync Messages for SubscriptionHandler

* New Subscription API

It is now generic over the topic, as it was before
the rework. E.g. "patchSubscribe" -> "subscribe" with
"data": "patch".

I realized while implementing the subscription
mechanism that this would be less work once we add a new
topic.

* Log Unexpected Message Events from Client

* Remove Trivial TODOs

* Unsubscribe Upon WebSocket Close

* Add Some Docstrings

* Test and Implement Sync After Unsubscribe

* Add Docstrings and Refactors for Readability
  • Loading branch information
Firionus authored May 8, 2021
1 parent d635488 commit 38e85d8
Show file tree
Hide file tree
Showing 37 changed files with 1,215 additions and 782 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.cueglow.server
import io.javalin.Javalin
import org.apache.logging.log4j.kotlin.Logging
import org.cueglow.server.gdtf.GdtfHandler
import org.cueglow.server.json.JsonSubscriptionHandler
import org.cueglow.server.rest.handleGdtfUpload
import org.cueglow.server.websocket.GlowWebSocketHandler
import org.eclipse.jetty.server.Server
Expand All @@ -22,15 +23,19 @@ class CueGlowServer(port: Int = 7000) : Logging {
logger.info("Starting CueGlow Server")
}

val state = StateProvider()
val jsonSubscriptionHandler = JsonSubscriptionHandler()

val outEventHandler = OutEventHandler(listOf(jsonSubscriptionHandler))

val state = StateProvider(outEventHandler.queue)

private val gdtfHandler = GdtfHandler(state.patch)

val app: Javalin = Javalin.create { config ->
// add our own WebSocket Handler
config.server {
val server = Server()
server.handler = GlowWebSocketHandler(state)
server.handler = GlowWebSocketHandler(state, jsonSubscriptionHandler)
return@server server
}
config.requestLogger { ctx, executionTimeMs ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.cueglow.server

import org.apache.logging.log4j.kotlin.Logging
import org.cueglow.server.objects.messages.GlowMessage
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue

/**
* Starts a thread that takes GlowMessages from the [queue] of OutEvents and passes them to the registered receivers.
*/
class OutEventHandler(receivers: Iterable<OutEventReceiver>): Logging {
val queue = LinkedBlockingQueue<GlowMessage>()

init {
Executors.newSingleThreadExecutor().submit {
while (true) {
val glowMessage = queue.take()
logger.info("Handling OutEvent: $glowMessage")
receivers.forEach { it.receive(glowMessage) }
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.cueglow.server

import org.cueglow.server.objects.messages.GlowMessage

interface OutEventReceiver {
fun receive(glowMessage: GlowMessage)
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package org.cueglow.server

import org.cueglow.server.json.JsonHandler
import org.cueglow.server.objects.messages.GlowMessage
import org.cueglow.server.patch.Patch
import java.util.concurrent.BlockingQueue

/**
* Provides a collection of state objects
*
* The StateProvider is initialized by the main process and passed to e.g. a [JsonHandler] for mutation.
*/
class StateProvider {
val patch = Patch()
class StateProvider(val outEventQueue: BlockingQueue<GlowMessage>) {
val patch = Patch(outEventQueue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,12 @@ import org.cueglow.server.objects.messages.GlowMessage
* Represents a Client of the Server that can be sent a [GlowMessage] asynchronously (i.e. at any time)
*/
interface AsyncClient {
/**
* Send a [message].
*
* If the message cannot be sent (e.g. because the client is disconnected), nothing should happen.
*/
fun send(message: GlowMessage)

fun send(message: String)
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package org.cueglow.server.json

import com.beust.klaxon.Converter
import com.beust.klaxon.JsonValue
import com.beust.klaxon.Klaxon
import com.beust.klaxon.TypeAdapter
import com.beust.klaxon.*
import com.github.michaelbull.result.*
import org.cueglow.server.objects.ArtNetAddress
import org.cueglow.server.objects.DmxAddress
import org.cueglow.server.objects.messages.GlowEvent
import org.cueglow.server.objects.messages.GlowMessage
import org.cueglow.server.objects.messages.GlowTopic
import java.io.StringReader
import java.util.*
import kotlin.reflect.KClass
Expand All @@ -17,33 +15,32 @@ import kotlin.reflect.KClass
// Serialization and Parsing
//--------------------------

// TODO check if this works with the new GlowMessage design

/** Convert GlowMessage to JSON String by Extension Function */
fun GlowMessage.toJsonString(): String {
return Klaxon()
.fieldConverter(KlaxonArtNetAddressUpdate::class, ArtNetAddressResultConverter)
.fieldConverter(KlaxonDmxAddressUpdate::class, DmxAddressResultConverter)
.converter(KlaxonGlowEventConverter)
.converter(KlaxonGlowTopicConverter)
.converter(UUIDConverter)
.converter(DmxAddressConverter)
.converter(ArtNetAddressConverter)
.toJsonString(this)
}

// TODO check if this works with the new GlowMessage design
/**
* Parse JSON to the internal representation [GlowMessage]
*/
fun GlowMessage.Companion.fromJsonString(input: String): GlowMessage = Klaxon()
.fieldConverter(KlaxonArtNetAddressUpdate::class, ArtNetAddressResultConverter)
.fieldConverter(KlaxonDmxAddressUpdate::class, DmxAddressResultConverter)
.converter(KlaxonGlowEventConverter)
.converter(KlaxonGlowTopicConverter)
.converter(UUIDConverter)
.converter(DmxAddressConverter)
.converter(ArtNetAddressConverter)
.parse<GlowMessage>(StringReader(input))
?: TODO("Error Handling is WIP")
?: throw KlaxonException("Klaxon Parser returned null after parsing '$input'")

//-------------------------------
// Klaxon Adapters and Converters
Expand All @@ -62,6 +59,14 @@ object KlaxonGlowEventConverter: Converter {
override fun fromJson(jv: JsonValue): GlowEvent? = GlowEvent.fromString(jv.inside.toString())
}

object KlaxonGlowTopicConverter: Converter {
override fun canConvert(cls: Class<*>): Boolean = cls == GlowTopic::class.java

override fun toJson(value: Any): String = "\"$value\""

override fun fromJson(jv: JsonValue): GlowTopic? = GlowTopic.fromString(jv.inside.toString())
}

object UUIDConverter: Converter {
override fun canConvert(cls: Class<*>)
= cls == UUID::class.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.cueglow.server.StateProvider
import org.cueglow.server.objects.messages.GlowMessage
import org.cueglow.server.objects.messages.GlowRequest
import org.cueglow.server.objects.messages.IncomingGlowRequestHandler
import org.cueglow.server.objects.messages.SubscriptionHandler

/** Represents a Receiver that takes a message string and can answer asynchronously with the provided GlowClient */
interface StringReceiver {
Expand All @@ -14,7 +15,7 @@ interface StringReceiver {
* A stateful handler, created for each JSON Connection.
* It receives JSON messages, parses them and passes them to the handle implementation from [IncomingGlowRequestHandler].
*/
class JsonHandler(private val client: AsyncClient, state: StateProvider): StringReceiver, IncomingGlowRequestHandler(state) {
class JsonHandler(private val client: AsyncClient, state: StateProvider, subscriptionHandler: SubscriptionHandler): StringReceiver, IncomingGlowRequestHandler(state, subscriptionHandler) {
override fun receive(message: String) {
val glowMessage = GlowMessage.fromJsonString(message)
val request = GlowRequest(glowMessage, client)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.cueglow.server.json

import org.cueglow.server.objects.messages.GlowMessage
import org.cueglow.server.objects.messages.SubscriptionHandler

class JsonSubscriptionHandler: SubscriptionHandler() {
override fun serializeMessage(glowMessage: GlowMessage): String = glowMessage.toJsonString()
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@ package org.cueglow.server.objects.messages

import kotlin.reflect.KClass

// TODO change class associations to GlowMessage classes
/**
* The different events in the [GlowMessage]
*/
enum class GlowEvent(val string: String, val messageClass: KClass<out GlowMessage>?) {
// Generic

ERROR("error", GlowMessage.Error::class),
SYNC("sync", GlowMessage.Sync::class),

// Subscriptions

SUBSCRIBE("subscribe", GlowMessage.Subscribe::class),
UNSUBSCRIBE("unsubscribe", GlowMessage.Unsubscribe::class),

// Patch-specific

PATCH_SUBSCRIBE("patchSubscribe", GlowMessage.PatchSubscribe::class),
PATCH_INITIAL_STATE("patchInitialState", GlowMessage.PatchInitialState::class),
PATCH_UNSUBSCRIBE("patchUnsubscribe", GlowMessage.PatchUnsubscribe::class),

ADD_FIXTURES("addFixtures", GlowMessage.AddFixtures::class),
UPDATE_FIXTURES("updateFixtures", GlowMessage.UpdateFixtures::class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ sealed class GlowMessage constructor(
// Generic

class Error(@Json(index=1) val data: GlowError, messageId: Int? = null): GlowMessage(GlowEvent.ERROR, messageId)
class Sync(@Json(index=1) val data: UUID, messageId: Int? = null): GlowMessage(GlowEvent.SYNC, messageId)

// Patch-specific
// Subscriptions

class Subscribe(@Json(index=1) val data: GlowTopic): GlowMessage(GlowEvent.SUBSCRIBE, null)
class Unsubscribe(@Json(index=1) val data: GlowTopic): GlowMessage(GlowEvent.UNSUBSCRIBE, null)

// Patch Topic

class PatchSubscribe(): GlowMessage(GlowEvent.PATCH_SUBSCRIBE, null)
class PatchInitialState(@Json(index=1) val data: GlowPatch): GlowMessage(GlowEvent.PATCH_INITIAL_STATE, null)
class PatchUnsubscribe(): GlowMessage(GlowEvent.PATCH_UNSUBSCRIBE, null)

class AddFixtures(@Json(index=1) val data: List<PatchFixture>, messageId: Int? = null): GlowMessage(GlowEvent.ADD_FIXTURES, messageId)
class UpdateFixtures(@Json(index=1) val data: List<PatchFixtureUpdate>, messageId: Int? = null): GlowMessage(GlowEvent.UPDATE_FIXTURES, messageId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.cueglow.server.json.AsyncClient
/**
* Wrapper around [GlowMessage] that provides a convenient way to answer.
*/
class GlowRequest(val originalMessage: GlowMessage, private val client: AsyncClient) {
class GlowRequest(val originalMessage: GlowMessage, val client: AsyncClient) {
fun answer(data: GlowMessage) = client.send(data)

fun answer(error: GlowError) = client.send(error.toGlowMessage(originalMessage.messageId))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.cueglow.server.objects.messages

enum class GlowTopic(val string: String) {
PATCH("patch"),;

override fun toString() = string

companion object {
// lookup topic by topic string
private val map = values().associateBy(GlowTopic::string)
fun fromString(string: String) = map[string]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ import com.github.michaelbull.result.getOrElse
import org.apache.logging.log4j.kotlin.Logging
import org.cueglow.server.StateProvider

abstract class IncomingGlowRequestHandler(private val state: StateProvider): Logging {
abstract class IncomingGlowRequestHandler(private val state: StateProvider, private val subscriptionHandler: SubscriptionHandler): Logging {
fun handle(request: GlowRequest) {
when (request.originalMessage.event) {
// TODO remove events that shouldn't come from outside and handle them with Error in else clause
GlowEvent.PATCH_SUBSCRIBE -> TODO()
GlowEvent.PATCH_INITIAL_STATE -> TODO()
GlowEvent.PATCH_UNSUBSCRIBE -> TODO()
GlowEvent.ERROR -> TODO()
GlowEvent.SUBSCRIBE -> subscriptionHandler.subscribe(request.client, (request.originalMessage as GlowMessage.Subscribe).data, state)
GlowEvent.UNSUBSCRIBE -> subscriptionHandler.unsubscribe(request.client, (request.originalMessage as GlowMessage.Unsubscribe).data)
GlowEvent.ADD_FIXTURES -> handleAddFixtures(request)
GlowEvent.UPDATE_FIXTURES -> handleUpdateFixture(request)
GlowEvent.REMOVE_FIXTURES -> handleRemoveFixtures(request)
GlowEvent.FIXTURE_TYPE_ADDED -> TODO()
GlowEvent.REMOVE_FIXTURE_TYPES -> handleRemoveFixtureTypes(request)
else -> logger.warn("Received a message with event ${request.originalMessage.event} which should not be sent by client. Discarding message. ")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package org.cueglow.server.objects.messages

import org.apache.logging.log4j.kotlin.Logging
import org.cueglow.server.OutEventReceiver
import org.cueglow.server.StateProvider
import org.cueglow.server.json.AsyncClient
import java.util.*

/**
* Handles Subscribe/Unsubscribe Events. Receives OutEvents from the OutEventHandler and sends them to the subscribers.
**/
abstract class SubscriptionHandler: OutEventReceiver, Logging {
private val activeSubscriptions = EnumMap<GlowTopic, MutableSet<AsyncClient>>(GlowTopic::class.java) // TODO synchronize (see JavaDoc for EnumMap)

/** Keeps subscriptions that were sent the initial state but do not get updates yet because older updates
* in the OutEventQueue first need to be handled. Subscriptions move from pending to active once the sync message
* (identified by UUID) is received by the SubscriptionHandler.
**/
private val pendingSubscriptions = mutableMapOf<UUID, Pair<GlowTopic, AsyncClient>>()

init {
// populate subscriptions with empty sets
GlowTopic.values().forEach { activeSubscriptions[it] = mutableSetOf() }
}

abstract fun serializeMessage(glowMessage: GlowMessage): String

/** Receive and handle messages from the OutEventQueue **/
override fun receive(glowMessage: GlowMessage) {
logger.info("Receiving $glowMessage")

when (glowMessage.event) {
GlowEvent.ADD_FIXTURES, GlowEvent.UPDATE_FIXTURES,
GlowEvent.REMOVE_FIXTURES, GlowEvent.ADD_FIXTURE_TYPES,
GlowEvent.REMOVE_FIXTURE_TYPES -> publish(GlowTopic.PATCH, glowMessage)

GlowEvent.SYNC -> activateSubscription((glowMessage as GlowMessage.Sync).data)

else -> return
}
}

private fun publish(topic: GlowTopic, glowMessage: GlowMessage) {
val topicSubscribers = activeSubscriptions[topic]
if (topicSubscribers!!.isNotEmpty()) { // null asserted because all possible keys are initialized in init block
val messageString = serializeMessage(glowMessage)
topicSubscribers.forEach {it.send(messageString)}
}
}

/** Check if [syncUuid] is known. If yes, move subscription from pending to active **/
private fun activateSubscription(syncUuid: UUID) {
val (topic, subscriber) = pendingSubscriptions.remove(syncUuid) ?: return
activeSubscriptions[topic]!!.add(subscriber) // null asserted because all possible keys are initialized in init block
}

fun subscribe(subscriber: AsyncClient, topic: GlowTopic, state: StateProvider) {
// unsubscribe before subscribing
if (internalUnsubscribe(subscriber, topic)) {logger.warn("Client $subscriber subscribed to $topic but was already subscribed. Subscription was reset. ")}
when (topic) {
GlowTopic.PATCH -> {
val syncUuid = UUID.randomUUID()
val syncMessage = GlowMessage.Sync(syncUuid)
pendingSubscriptions[syncUuid] = Pair(GlowTopic.PATCH, subscriber)
// TODO acquire state lock here
val initialPatchState = state.patch.getGlowPatch()
state.outEventQueue.put(syncMessage) // TODO possible deadlock because SubscriptionHandler is locked and cannot work to reduce message count in queue
// no deadlock problem if we don't have the SubscriptionHandler Lock here?
// TODO release state lock here
val initialMessage = GlowMessage.PatchInitialState(initialPatchState)
subscriber.send(initialMessage)
}
}
}

fun unsubscribe(subscriber: AsyncClient, topic: GlowTopic) {
if (!internalUnsubscribe(subscriber, topic)) {logger.warn("Client $subscriber unsubscribed from $topic but was not subscribed")}
}

/** Returns true if the subscriber was successfully unsubscribed and false if the subscriber wasn't subscribed */
private fun internalUnsubscribe(subscriber: AsyncClient, topic: GlowTopic): Boolean {
val numberOfSubscriptionsRemovedFromPending = pendingSubscriptions
.filter {it.value.first == topic && it.value.second == subscriber}
.keys
.map {pendingSubscriptions.remove(it)}
.size
val removedFromActive = activeSubscriptions[topic]!!.remove(subscriber) // null asserted because all possible keys are initialized in init block
return removedFromActive || numberOfSubscriptionsRemovedFromPending > 0
}

fun unsubscribeFromAllTopics(subscriber: AsyncClient) {
pendingSubscriptions.filter {it.value.second == subscriber}.keys.map { pendingSubscriptions.remove(it) }
activeSubscriptions.values.forEach {
it.remove(subscriber)
}
}
}
Loading

0 comments on commit 38e85d8

Please sign in to comment.