Skip to content

Commit

Permalink
feat(ds-host): refactor appspace migration events
Browse files Browse the repository at this point in the history
- refactor MigrationJobEvents to implement generic events
- fix missing subscribers in AppspaceStatusEvents
- move MigrationJobTwine to ds-dev
- refactor migration jobs store in frontend-ds-host
  • Loading branch information
teleclimber committed Sep 18, 2024
1 parent 84b5dc8 commit df43dfa
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 230 deletions.
2 changes: 1 addition & 1 deletion cmd/ds-dev/ds-dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func main() {
RouteHitEvents: routeHitEvents,
AppspaceUsersModel: appspaceUserModel}

migrationJobTwine := &twineservices.MigrationJobService{
migrationJobTwine := &MigrationJobService{
AppspaceModel: devAppspaceModel,
MigrationJobModel: devMigrationJobModel,
MigrationJobEvents: migrationJobEvents,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package twineservices
package main

import (
"encoding/json"
Expand All @@ -17,22 +17,22 @@ type MigrationJobService struct {
GetRunning() ([]domain.MigrationJob, error)
} `checkinject:"required"`
MigrationJobEvents interface {
SubscribeAppspace(domain.AppspaceID) <-chan domain.MigrationJob
Subscribe() <-chan domain.MigrationJob
Unsubscribe(<-chan domain.MigrationJob)
} `checkinject:"required"`
}

// Start creates listeners and then shuts everything down when twine exits
func (s *MigrationJobService) Start(authUser domain.UserID, t *twine.Twine) domain.TwineServiceI {
mjs := migrationJobService{
mjs := migrationJobServiceInternal{
MigrationJobService: s,
authUser: authUser,
twine: t,
}
return &mjs
}

type migrationJobService struct {
type migrationJobServiceInternal struct {
*MigrationJobService

authUser domain.UserID
Expand All @@ -43,8 +43,8 @@ const subscribeMigration = 11
const subscribeAppspaceMigration = 12
const unsubscribeMigration = 13

//HandleMessage handles incoming twine message
func (s *migrationJobService) HandleMessage(m twine.ReceivedMessageI) {
// HandleMessage handles incoming twine message
func (s *migrationJobServiceInternal) HandleMessage(m twine.ReceivedMessageI) {
switch m.CommandID() {
case subscribeAppspaceMigration:
go s.handleSubscribeAppspace(m)
Expand All @@ -58,7 +58,7 @@ type IncomingSubscribeAppspace struct {
AppspaceID domain.AppspaceID `json:"appspace_id"`
}

func (s *migrationJobService) handleSubscribeAppspace(m twine.ReceivedMessageI) {
func (s *migrationJobServiceInternal) handleSubscribeAppspace(m twine.ReceivedMessageI) {
var incoming IncomingSubscribeAppspace
err := json.Unmarshal(m.Payload(), &incoming)
if err != nil {
Expand All @@ -77,7 +77,7 @@ func (s *migrationJobService) handleSubscribeAppspace(m twine.ReceivedMessageI)
}

// First subscribe
migrationJobChan := s.MigrationJobEvents.SubscribeAppspace(incoming.AppspaceID)
migrationJobChan := s.MigrationJobEvents.Subscribe()
go func() {
for statusEvent := range migrationJobChan {
go s.sendMigrationJob(m, statusEvent)
Expand Down Expand Up @@ -118,7 +118,7 @@ func (s *migrationJobService) handleSubscribeAppspace(m twine.ReceivedMessageI)
}

// see appspacestatustwine which uses a same pattern wrt Twine
func (s *migrationJobService) sendMigrationJob(m twine.ReceivedMessageI, migrationJob domain.MigrationJob) {
func (s *migrationJobServiceInternal) sendMigrationJob(m twine.ReceivedMessageI, migrationJob domain.MigrationJob) {
bytes, err := json.Marshal(migrationJob)
if err != nil {
s.getLogger("sendMigrationJob json Marshal Error").Error(err)
Expand Down
7 changes: 1 addition & 6 deletions cmd/ds-host/ds-host.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,6 @@ func main() {
MigrationJobController: migrationJobCtl,
}

migrationJobTwine := &twineservices.MigrationJobService{
AppspaceModel: appspaceModel,
MigrationJobModel: migrationJobModel,
MigrationJobEvents: migrationJobEvents,
}
appGetterTwine := &twineservices.AppGetterService{
AppGetter: appGetter,
}
Expand All @@ -523,7 +518,7 @@ func main() {
DropIDRoutes: dropIDRoutes,
MigrationJobRoutes: migrationJobRoutes,
AppspaceStatusEvents: appspaceStatusEvents,
MigrationJobTwine: migrationJobTwine,
MigrationJobEvents: migrationJobEvents,
AppGetterTwine: appGetterTwine,
UserModel: userModel,
Views: views}
Expand Down
76 changes: 12 additions & 64 deletions cmd/ds-host/events/events.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package events

import (
"sync"
"time"

"github.com/teleclimber/DropServer/cmd/ds-host/domain"
Expand All @@ -12,79 +11,26 @@ import (

// MigrationJobEvents forwards events related to migration jobs
type MigrationJobEvents struct {
subsMux sync.Mutex
subscribers []chan domain.MigrationJob
appspaceSubscribers map[domain.AppspaceID][]chan domain.MigrationJob
subscribers eventSubs[domain.MigrationJob]
ownerSubs eventIDSubs[domain.UserID, domain.MigrationJob]
}

// Send sends an appspace status event
func (e *MigrationJobEvents) Send(event domain.MigrationJob) {
e.subsMux.Lock()
defer e.subsMux.Unlock()
for _, ch := range e.subscribers {
ch <- event
}
if e.appspaceSubscribers == nil {
return
}
appspaceSubs, ok := e.appspaceSubscribers[event.AppspaceID]
if ok {
for _, ch := range appspaceSubs {
ch <- event
}
}
}

// Subscribe to an event to know when the status of a migration has changed
func (e *MigrationJobEvents) Subscribe() <-chan domain.MigrationJob {
e.subsMux.Lock()
defer e.subsMux.Unlock()
ch := make(chan domain.MigrationJob)
e.subscribers = append(e.subscribers, ch)
return ch
return e.subscribers.subscribe()
}

// SubscribeAppspace to an event to know when the status of a migration for an appspace has changed
func (e *MigrationJobEvents) SubscribeAppspace(appspaceID domain.AppspaceID) <-chan domain.MigrationJob {
e.subsMux.Lock()
defer e.subsMux.Unlock()
ch := make(chan domain.MigrationJob)
if e.appspaceSubscribers == nil {
e.appspaceSubscribers = make(map[domain.AppspaceID][]chan domain.MigrationJob)
}
e.appspaceSubscribers[appspaceID] = append(e.appspaceSubscribers[appspaceID], ch)
return ch
func (e *MigrationJobEvents) SubscribeOwner(ownerID domain.UserID) <-chan domain.MigrationJob {
return e.ownerSubs.subscribe(ownerID)
}

// Unsubscribe to the event
func (e *MigrationJobEvents) Unsubscribe(ch <-chan domain.MigrationJob) {
e.subsMux.Lock()
defer e.subsMux.Unlock()
e.removeSubscriber(ch)
e.subscribers.unsubscribe(ch)
e.ownerSubs.unsubscribe(ch)
}

func (e *MigrationJobEvents) removeSubscriber(ch <-chan domain.MigrationJob) {
for i, c := range e.subscribers {
if c == ch {
e.subscribers[i] = e.subscribers[len(e.subscribers)-1]
e.subscribers = e.subscribers[:len(e.subscribers)-1]
close(c)
return
}
}
if e.appspaceSubscribers == nil {
return
}
for appspaceID, subs := range e.appspaceSubscribers {
for i, c := range subs {
if c == ch {
e.appspaceSubscribers[appspaceID][i] = e.appspaceSubscribers[appspaceID][len(subs)-1]
e.appspaceSubscribers[appspaceID] = e.appspaceSubscribers[appspaceID][:len(subs)-1]
close(c)
return
}
}
}
func (e *MigrationJobEvents) Send(data domain.MigrationJob) {
e.subscribers.send(data)
e.ownerSubs.send(data.OwnerID, data)
}

////// Apppsace Files Event
Expand Down Expand Up @@ -127,10 +73,12 @@ func (e *AppspaceStatusEvents) SubscribeOwner(ownerID domain.UserID) <-chan doma
}

func (e *AppspaceStatusEvents) Unsubscribe(ch <-chan domain.AppspaceStatusEvent) {
e.subscribers.unsubscribe(ch)
e.ownerSubs.unsubscribe(ch)
}

func (e *AppspaceStatusEvents) Send(data domain.AppspaceStatusEvent) {
e.subscribers.send(data)
e.ownerSubs.send(data.OwnerID, data)
}

Expand Down
39 changes: 0 additions & 39 deletions cmd/ds-host/events/events_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion cmd/ds-host/events/genericevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type SubscribeIDs interface {
domain.UserID | domain.AppID | domain.AppspaceID
}
type DataTypes interface {
domain.AppURLData | domain.AppspaceID | domain.AppspaceStatusEvent
domain.AppURLData | domain.AppspaceID | domain.AppspaceStatusEvent | domain.MigrationJob
}

type eventIDSubs[T SubscribeIDs, D DataTypes] struct {
Expand Down
7 changes: 6 additions & 1 deletion cmd/ds-host/twineservices/appspacelogservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,13 @@ func (s *appspaceLogService) handleSubscribeApp(m twine.ReceivedMessageI) {
ls.start()
}

// IncomingSubscribeAppspace is json encoded payload to subscribe to appspace status
type IncomingSubscribeAppspace struct {
AppspaceID domain.AppspaceID `json:"appspace_id"`
}

func (s *appspaceLogService) getMessageAppspace(m twine.ReceivedMessageI) (domain.Appspace, error) {
var incoming IncomingSubscribeAppspace // reused from MigrationService
var incoming IncomingSubscribeAppspace
err := json.Unmarshal(m.Payload(), &incoming)
if err != nil {
m.SendError(err.Error())
Expand Down
20 changes: 12 additions & 8 deletions cmd/ds-host/userroutes/userroutes.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,12 @@ type UserRoutes struct {
SubscribeOwner(domain.UserID) <-chan domain.AppspaceStatusEvent
Unsubscribe(ch <-chan domain.AppspaceStatusEvent)
} `checkinject:"required"`
MigrationJobTwine domain.TwineService2 `checkinject:"required"`
AppGetterTwine domain.TwineService `checkinject:"required"`
UserModel interface {
MigrationJobEvents interface {
SubscribeOwner(domain.UserID) <-chan domain.MigrationJob
Unsubscribe(ch <-chan domain.MigrationJob)
} `checkinject:"required"`
AppGetterTwine domain.TwineService `checkinject:"required"`
UserModel interface {
GetFromID(userID domain.UserID) (domain.User, error)
UpdateEmail(userID domain.UserID, email string) error
UpdatePassword(userID domain.UserID, password string) error
Expand Down Expand Up @@ -337,8 +340,6 @@ func (u *UserRoutes) changeUserPassword(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusOK) // TODO send no content.
}

// const appspaceStatusService = 11
const migrationJobService = 12
const appGetterService = 13

// startTwineService connects a new twine instance to the twine services
Expand All @@ -363,14 +364,11 @@ func (u *UserRoutes) startTwineService(w http.ResponseWriter, r *http.Request) {
return
}

migrationJobTwine := u.MigrationJobTwine.Start(authUserID, t)
go u.AppGetterTwine.Start(authUserID, t)

go func() {
for m := range t.MessageChan {
switch m.ServiceID() {
case migrationJobService:
go migrationJobTwine.HandleMessage(m)
case appGetterService:
go u.AppGetterTwine.HandleMessage(m)
default:
Expand Down Expand Up @@ -398,14 +396,20 @@ func (u *UserRoutes) startSSEEvents(w http.ResponseWriter, r *http.Request) {
asStatCh := u.AppspaceStatusEvents.SubscribeOwner(authUserID)
defer u.AppspaceStatusEvents.Unsubscribe(asStatCh)

migrationJobCh := u.MigrationJobEvents.SubscribeOwner(authUserID)
defer u.MigrationJobEvents.Unsubscribe(migrationJobCh)

rc := http.NewResponseController(w)
for {
select {
case <-clientGone:
return
case stat := <-asStatCh:
u.sendSSEEvent(w, "AppspaceStatus", stat)
case job := <-migrationJobCh:
u.sendSSEEvent(w, "MigrationJob", job)
}

err := rc.Flush()
if err != nil {
u.getLogger("startSSEEvents() rc.Flush()").Error(err)
Expand Down
2 changes: 1 addition & 1 deletion frontend-ds-host/src/stores/appspaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ function appspaceStatusFromRaw(raw:any) :AppspaceStatus {
owner_id: Number(raw.owner_id),
appspace_id: Number(raw.appspace_id),
paused: !!raw.paused,
temp_paused: raw.temp_pasued,
temp_paused: raw.temp_paused,
temp_pause_reason: raw.temp_pause_reason+'',
appspace_schema: Number(raw.appspace_schema),
app_version_schema: Number(raw.app_version_schema),
Expand Down
Loading

0 comments on commit df43dfa

Please sign in to comment.