Skip to content

Commit

Permalink
Feat(ds-host): refactor appspace status events and add SSE events
Browse files Browse the repository at this point in the history
- AppspaceStatus now evaluates status for all appspaces instead of just "tracked" ones
- AppspaceStatus.Track() renamed to Get()
- Add OwnerID field to AppspaceStatus Event
- AppspaceStatusEvent is now based on generic events
- AppspaceStatusEvents subscribe to all or owner, no longer appspace ID
- Add status to appspace getters for ds-host frontend data
- Add /events/ path to subscribe to Server Sent Events
- Send AppspaceStatusEvents via SSE channel
- adapt ds-host frontend to use SSE data to keep appspaces data up to date.
- Fix typo in AppURLDataEvents
  • Loading branch information
teleclimber committed Sep 13, 2024
1 parent 9585ea7 commit 84b5dc8
Show file tree
Hide file tree
Showing 24 changed files with 296 additions and 562 deletions.
14 changes: 6 additions & 8 deletions cmd/ds-dev/appspacestatusservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (

type AppspaceStatusService struct {
AppspaceStatus interface {
Track(appspaceID domain.AppspaceID) domain.AppspaceStatusEvent
Get(appspaceID domain.AppspaceID) domain.AppspaceStatusEvent
} `checkinject:"required"`
AppspaceStatusEvents interface {
Subscribe(domain.AppspaceID, chan<- domain.AppspaceStatusEvent)
Unsubscribe(domain.AppspaceID, chan<- domain.AppspaceStatusEvent)
Subscribe() <-chan domain.AppspaceStatusEvent
Unsubscribe(<-chan domain.AppspaceStatusEvent)
} `checkinject:"required"`
}

Expand All @@ -23,20 +23,18 @@ func (s *AppspaceStatusService) HandleMessage(m twine.ReceivedMessageI) {
}

func (s *AppspaceStatusService) Start(t *twine.Twine) {
appspaceStatusChan := make(chan domain.AppspaceStatusEvent)
s.AppspaceStatusEvents.Subscribe(appspaceID, appspaceStatusChan)
appspaceStatusChan := s.AppspaceStatusEvents.Subscribe()
go func() {
for statusEvent := range appspaceStatusChan {
go s.sendStatusEvent(t, statusEvent)
}
}()

go s.sendStatusEvent(t, s.AppspaceStatus.Track(appspaceID))
go s.sendStatusEvent(t, s.AppspaceStatus.Get(appspaceID))

t.WaitClose()

s.AppspaceStatusEvents.Unsubscribe(appspaceID, appspaceStatusChan)
close(appspaceStatusChan)
s.AppspaceStatusEvents.Unsubscribe(appspaceStatusChan)
}

const statusEventCmd = 11
Expand Down
58 changes: 35 additions & 23 deletions cmd/ds-host/appspacestatus/appspacestatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ type tempPause struct {
}

type statusData struct {
paused bool // paused status is set in appspace DB
tempPauses []tempPause // pauses for appspace operations like migrations, backups, etc...
dataSchema int // from appspace metadata
appVersionSchema int // from app files
problem bool // Something went wrong, appsapce can't be used
ownerID domain.UserID // ownerID piggybacks on statusDatat to simplify event displatching
paused bool // paused status is set in appspace DB
tempPauses []tempPause // pauses for appspace operations like migrations, backups, etc...
dataSchema int // from appspace metadata
appVersionSchema int // from app files
problem bool // Something went wrong, appsapce can't be used
}

// possible flags to add:
Expand Down Expand Up @@ -95,7 +96,7 @@ type AppspaceStatus struct {
} `checkinject:"optional"`

AppspaceStatusEvents interface {
Send(domain.AppspaceID, domain.AppspaceStatusEvent)
Send(domain.AppspaceStatusEvent)
} `checkinject:"required"`

hostStopMux sync.Mutex
Expand Down Expand Up @@ -146,7 +147,7 @@ func (s *AppspaceStatus) Ready(appspaceID domain.AppspaceID) bool {
return false
}

status := s.getStatus(appspaceID)
status, _ := s.getStatus(appspaceID)
status.lock.Lock()
defer status.lock.Unlock()

Expand All @@ -164,15 +165,18 @@ func (s *AppspaceStatus) Ready(appspaceID domain.AppspaceID) bool {
// That would wait for ready state (until context says no more)

// PauseAppspace sets the pause flag on the status if it is tracked
// Shouldn't this be in reaction to setting pause in model?
func (s *AppspaceStatus) PauseAppspace(appspaceID domain.AppspaceID, pause bool) {
status := s.getTrackedStatus(appspaceID)
if status != nil {
status, found := s.getStatus(appspaceID)
if found {
status.lock.Lock()
defer status.lock.Unlock()
if status.data.paused != pause {
status.data.paused = pause
s.sendChangedEvent(appspaceID, status.data)
}
} else {
s.sendChangedEvent(appspaceID, status.data)
}
}

Expand All @@ -198,14 +202,14 @@ func (s *AppspaceStatus) WaitTempPaused(appspaceID domain.AppspaceID, reason str
// IsTempPaused returns true if a temp pause is in effect
// It does not consider whether the appspace has effectively stopped
func (s *AppspaceStatus) IsTempPaused(appspaceID domain.AppspaceID) bool {
status := s.getStatus(appspaceID)
status, _ := s.getStatus(appspaceID)
status.lock.Lock()
defer status.lock.Unlock()
return len(status.data.tempPauses) != 0
}

func (s *AppspaceStatus) getTempPause(appspaceID domain.AppspaceID, reason string) chan struct{} {
status := s.getStatus(appspaceID)
status, _ := s.getStatus(appspaceID)

status.lock.Lock()
defer status.lock.Unlock()
Expand All @@ -223,7 +227,7 @@ func (s *AppspaceStatus) getTempPause(appspaceID domain.AppspaceID, reason strin
// finishTempPause closes the 0th temp pause and starts the next one
// or sends a status change notification if there are none.
func (s *AppspaceStatus) finishTempPause(appspaceID domain.AppspaceID) {
status := s.getStatus(appspaceID)
status, _ := s.getStatus(appspaceID)

status.lock.Lock()
defer status.lock.Unlock()
Expand All @@ -243,14 +247,14 @@ func (s *AppspaceStatus) finishTempPause(appspaceID domain.AppspaceID) {
// With this, migration would check status and ensure it's "active". Don't migrate an archived or deleted appspace.
// Then we can just use regular pause, with maybe a "waitPaused"

// Track causes appspace id to monitored and future events will be sent
// Get causes appspace id to monitored and future events will be sent
// It returns an event struct that represents the current state
func (s *AppspaceStatus) Track(appspaceID domain.AppspaceID) domain.AppspaceStatusEvent {
stat := s.getStatus(appspaceID)
func (s *AppspaceStatus) Get(appspaceID domain.AppspaceID) domain.AppspaceStatusEvent {
stat, _ := s.getStatus(appspaceID)
return getEvent(appspaceID, stat.data)
}

func (s *AppspaceStatus) getStatus(appspaceID domain.AppspaceID) *status {
func (s *AppspaceStatus) getStatus(appspaceID domain.AppspaceID) (*status, bool) {
s.statusMux.Lock()
defer s.statusMux.Unlock()

Expand All @@ -260,7 +264,7 @@ func (s *AppspaceStatus) getStatus(appspaceID domain.AppspaceID) *status {
data: s.getData(appspaceID)}
s.status[appspaceID] = stat
}
return stat
return stat, ok
}

func (s *AppspaceStatus) getTrackedStatus(appspaceID domain.AppspaceID) *status {
Expand All @@ -284,6 +288,7 @@ func (s *AppspaceStatus) getData(appspaceID domain.AppspaceID) statusData {
data.problem = true
return data
}
data.ownerID = appspace.OwnerID
data.paused = appspace.Paused

// load appVersionSchema. Note that it should not change over time, so no need to subscribe.
Expand Down Expand Up @@ -312,18 +317,24 @@ func (s *AppspaceStatus) getData(appspaceID domain.AppspaceID) statusData {

func (s *AppspaceStatus) handleAppspaceFiles(ch <-chan domain.AppspaceID) {
for appspaceID := range ch {
status := s.getTrackedStatus(appspaceID)
if status != nil {
status, found := s.getStatus(appspaceID)
if found {
s.updateStatus(appspaceID, status)
} else {
s.sendChangedEvent(appspaceID, status.data)
}
}
}

func (s *AppspaceStatus) handleMigrationJobUpdate(ch <-chan domain.MigrationJob) {
for d := range ch {
status := s.getTrackedStatus(d.AppspaceID)
if status != nil && d.Finished.Valid {
s.updateStatus(d.AppspaceID, status) //reload whole status because migration might have changed many things
if d.Finished.Valid {
status, ok := s.getStatus(d.AppspaceID)
if ok {
s.updateStatus(d.AppspaceID, status)
} else {
s.sendChangedEvent(d.AppspaceID, status.data)
}
}
}
}
Expand Down Expand Up @@ -379,7 +390,7 @@ func (s *AppspaceStatus) updateStatus(appspaceID domain.AppspaceID, curStatus *s
}

func (s *AppspaceStatus) sendChangedEvent(appspaceID domain.AppspaceID, status statusData) {
go s.AppspaceStatusEvents.Send(appspaceID, getEvent(appspaceID, status))
go s.AppspaceStatusEvents.Send(getEvent(appspaceID, status))
}

func getEvent(appspaceID domain.AppspaceID, status statusData) domain.AppspaceStatusEvent {
Expand All @@ -388,6 +399,7 @@ func getEvent(appspaceID domain.AppspaceID, status statusData) domain.AppspaceSt
pReason = status.tempPauses[0].reason
}
return domain.AppspaceStatusEvent{
OwnerID: status.ownerID,
AppspaceID: appspaceID,
Paused: status.paused, // maybe add archived, deleted. Or put everything under an "active"
TempPaused: len(status.tempPauses) != 0,
Expand Down
6 changes: 3 additions & 3 deletions cmd/ds-host/appspacestatus/appspacestatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestTempPause(t *testing.T) {
problem: false}

appspaceStatusEvents := testmocks.NewMockAppspaceStatusEvents(mockCtrl)
appspaceStatusEvents.EXPECT().Send(appspaceID, gomock.Any()).AnyTimes()
appspaceStatusEvents.EXPECT().Send(gomock.Any()).AnyTimes()

appspaceRouter := testmocks.NewMockAppspaceRouter(mockCtrl)
appspaceRouter.EXPECT().SubscribeLiveCount(appspaceID, gomock.Any())
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestMultiTempPause(t *testing.T) {
problem: false}

appspaceStatusEvents := testmocks.NewMockAppspaceStatusEvents(mockCtrl)
appspaceStatusEvents.EXPECT().Send(appspaceID, gomock.Any()).Times(2)
appspaceStatusEvents.EXPECT().Send(gomock.Any()).Times(2)

appspaceRouter := testmocks.NewMockAppspaceRouter(mockCtrl)
appspaceRouter.EXPECT().SubscribeLiveCount(appspaceID, gomock.Any()).Times(2)
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestMigrationEvent(t *testing.T) {
appspaceInfoModel.EXPECT().GetSchema(appspaceID).Return(4, nil)

appspaceStatusEvents := testmocks.NewMockAppspaceStatusEvents(mockCtrl)
appspaceStatusEvents.EXPECT().Send(appspaceID, event1)
appspaceStatusEvents.EXPECT().Send(event1)

s := AppspaceStatus{
AppspaceModel: appspaceModel,
Expand Down
1 change: 1 addition & 0 deletions cmd/ds-host/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ type AppspacePausedEvent struct {

// AppspaceStatusEvent indicates readiness of appspace and the reason
type AppspaceStatusEvent struct {
OwnerID UserID `json:"owner_id"`
AppspaceID AppspaceID `json:"appspace_id"`
Paused bool `json:"paused"`
TempPaused bool `json:"temp_paused"`
Expand Down
8 changes: 2 additions & 6 deletions cmd/ds-host/ds-host.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ func main() {
Config: *runtimeConfig,
AppspaceUserRoutes: userAppspaceUserRoutes,
AppspaceModel: appspaceModel,
AppspaceStatus: appspaceStatus,
AppspaceExportRoutes: exportAppspaceRoutes,
AppspaceRestoreRoutes: restoreAppspaceRoutes,
DropIDModel: dropIDModel,
Expand Down Expand Up @@ -499,11 +500,6 @@ func main() {
MigrationJobController: migrationJobCtl,
}

appspaceStatusTwine := &twineservices.AppspaceStatusService{
AppspaceModel: appspaceModel,
AppspaceStatus: appspaceStatus,
AppspaceStatusEvents: appspaceStatusEvents,
}
migrationJobTwine := &twineservices.MigrationJobService{
AppspaceModel: appspaceModel,
MigrationJobModel: migrationJobModel,
Expand All @@ -526,7 +522,7 @@ func main() {
DomainRoutes: domainNameRoutes,
DropIDRoutes: dropIDRoutes,
MigrationJobRoutes: migrationJobRoutes,
AppspaceStatusTwine: appspaceStatusTwine,
AppspaceStatusEvents: appspaceStatusEvents,
MigrationJobTwine: migrationJobTwine,
AppGetterTwine: appGetterTwine,
UserModel: userModel,
Expand Down
64 changes: 14 additions & 50 deletions cmd/ds-host/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,81 +93,45 @@ func (e *MigrationJobEvents) removeSubscriber(ch <-chan domain.MigrationJob) {
// have been written to outside of normal appspace use.
// Usually this means they were imported, or a backup restored
type AppspaceFilesEvents struct {
subscribers *eventSubs[domain.AppspaceID]
subscribers eventSubs[domain.AppspaceID]
}

// Send sends an appspace paused or unpaused event
func (e *AppspaceFilesEvents) Send(appspaceID domain.AppspaceID) {
if e.subscribers != nil {
e.subscribers.send(appspaceID)
}
e.subscribers.send(appspaceID)
}

// Subscribe to an event for when an appspace is paused or unpaused
func (e *AppspaceFilesEvents) Subscribe() <-chan domain.AppspaceID {
if e.subscribers == nil {
e.subscribers = &eventSubs[domain.AppspaceID]{} // makeEventSubs[domain.AppspaceID]()
}
return e.subscribers.subscribe()
}

// Unsubscribe to an event for when an appspace is paused or unpaused
func (e *AppspaceFilesEvents) Unsubscribe(ch <-chan domain.AppspaceID) {
if e.subscribers != nil {
e.subscribers.unsubscribe(ch)
}
e.subscribers.unsubscribe(ch)
}

// //////////////////////////////////////
// Appspace Status events
type appspaceStatusSubscriber struct {
appspaceID domain.AppspaceID
ch chan<- domain.AppspaceStatusEvent
}

// AppspaceStatusEvents handles appspace pause and unpause events
type AppspaceStatusEvents struct {
subscribers []appspaceStatusSubscriber
subscribers eventSubs[domain.AppspaceStatusEvent]
ownerSubs eventIDSubs[domain.UserID, domain.AppspaceStatusEvent]
}

// Send sends an appspace status event
func (e *AppspaceStatusEvents) Send(appspaceID domain.AppspaceID, event domain.AppspaceStatusEvent) {
for _, sub := range e.subscribers {
if sub.appspaceID == appspaceID {
sub.ch <- event
}
}
}

// Subscribe to an event to know when the status of an appspace has changed
func (e *AppspaceStatusEvents) Subscribe(appspaceID domain.AppspaceID, ch chan<- domain.AppspaceStatusEvent) {
e.removeSubscriber(appspaceID, ch)
e.subscribers = append(e.subscribers, appspaceStatusSubscriber{appspaceID, ch})
func (e *AppspaceStatusEvents) Subscribe() <-chan domain.AppspaceStatusEvent {
return e.subscribers.subscribe()
}

// Unsubscribe to the event
func (e *AppspaceStatusEvents) Unsubscribe(appspaceID domain.AppspaceID, ch chan<- domain.AppspaceStatusEvent) {
e.removeSubscriber(appspaceID, ch)
func (e *AppspaceStatusEvents) SubscribeOwner(ownerID domain.UserID) <-chan domain.AppspaceStatusEvent {
return e.ownerSubs.subscribe(ownerID)
}

// UnsubscribeChannel removes the channel from all subscriptions
func (e *AppspaceStatusEvents) UnsubscribeChannel(ch chan<- domain.AppspaceStatusEvent) {
for i := len(e.subscribers) - 1; i >= 0; i-- {
if e.subscribers[i].ch == ch {
e.subscribers[i] = e.subscribers[len(e.subscribers)-1]
e.subscribers = e.subscribers[:len(e.subscribers)-1]
}
}
func (e *AppspaceStatusEvents) Unsubscribe(ch <-chan domain.AppspaceStatusEvent) {
e.ownerSubs.unsubscribe(ch)
}

func (e *AppspaceStatusEvents) removeSubscriber(appspaceID domain.AppspaceID, ch chan<- domain.AppspaceStatusEvent) {
// get a feeling you'll need a mutex to cover subscribers?
for i, sub := range e.subscribers {
if sub.appspaceID == appspaceID && sub.ch == ch {
e.subscribers[i] = e.subscribers[len(e.subscribers)-1]
e.subscribers = e.subscribers[:len(e.subscribers)-1]
}
}
func (e *AppspaceStatusEvents) Send(data domain.AppspaceStatusEvent) {
e.ownerSubs.send(data.OwnerID, data)
}

//////////////////////////////////////////
Expand Down Expand Up @@ -218,7 +182,7 @@ type AppUrlDataEvents struct {
}

func (e *AppUrlDataEvents) SubscribeOwner(ownerID domain.UserID) <-chan domain.AppURLData {
return e.appSubs.subscribe(domain.AppID(ownerID))
return e.ownerSubs.subscribe(ownerID)
}

func (e *AppUrlDataEvents) SubscribeApp(appID domain.AppID) <-chan domain.AppURLData {
Expand Down
Loading

0 comments on commit 84b5dc8

Please sign in to comment.