diff --git a/cmd/ds-dev/ds-dev.go b/cmd/ds-dev/ds-dev.go index e57b9fd1..348f1c58 100644 --- a/cmd/ds-dev/ds-dev.go +++ b/cmd/ds-dev/ds-dev.go @@ -375,7 +375,7 @@ func main() { RouteHitEvents: routeHitEvents, AppspaceUsersModel: appspaceUserModel} - migrationJobTwine := &twineservices.MigrationJobService{ + migrationJobTwine := &MigrationJobService{ AppspaceModel: devAppspaceModel, MigrationJobModel: devMigrationJobModel, MigrationJobEvents: migrationJobEvents, diff --git a/cmd/ds-host/twineservices/migrationjobservice.go b/cmd/ds-dev/migrationjobservice.go similarity index 85% rename from cmd/ds-host/twineservices/migrationjobservice.go rename to cmd/ds-dev/migrationjobservice.go index 2aee1510..59769aaf 100644 --- a/cmd/ds-host/twineservices/migrationjobservice.go +++ b/cmd/ds-dev/migrationjobservice.go @@ -1,4 +1,4 @@ -package twineservices +package main import ( "encoding/json" @@ -17,14 +17,14 @@ type MigrationJobService struct { GetRunning() ([]domain.MigrationJob, error) } `checkinject:"required"` MigrationJobEvents interface { - SubscribeAppspace(domain.AppspaceID) <-chan domain.MigrationJob + Subscribe() <-chan domain.MigrationJob Unsubscribe(<-chan domain.MigrationJob) } `checkinject:"required"` } // Start creates listeners and then shuts everything down when twine exits func (s *MigrationJobService) Start(authUser domain.UserID, t *twine.Twine) domain.TwineServiceI { - mjs := migrationJobService{ + mjs := migrationJobServiceInternal{ MigrationJobService: s, authUser: authUser, twine: t, @@ -32,7 +32,7 @@ func (s *MigrationJobService) Start(authUser domain.UserID, t *twine.Twine) doma return &mjs } -type migrationJobService struct { +type migrationJobServiceInternal struct { *MigrationJobService authUser domain.UserID @@ -43,8 +43,8 @@ const subscribeMigration = 11 const subscribeAppspaceMigration = 12 const unsubscribeMigration = 13 -//HandleMessage handles incoming twine message -func (s *migrationJobService) HandleMessage(m twine.ReceivedMessageI) { +// HandleMessage handles incoming twine message +func (s *migrationJobServiceInternal) HandleMessage(m twine.ReceivedMessageI) { switch m.CommandID() { case subscribeAppspaceMigration: go s.handleSubscribeAppspace(m) @@ -58,7 +58,7 @@ type IncomingSubscribeAppspace struct { AppspaceID domain.AppspaceID `json:"appspace_id"` } -func (s *migrationJobService) handleSubscribeAppspace(m twine.ReceivedMessageI) { +func (s *migrationJobServiceInternal) handleSubscribeAppspace(m twine.ReceivedMessageI) { var incoming IncomingSubscribeAppspace err := json.Unmarshal(m.Payload(), &incoming) if err != nil { @@ -77,7 +77,7 @@ func (s *migrationJobService) handleSubscribeAppspace(m twine.ReceivedMessageI) } // First subscribe - migrationJobChan := s.MigrationJobEvents.SubscribeAppspace(incoming.AppspaceID) + migrationJobChan := s.MigrationJobEvents.Subscribe() go func() { for statusEvent := range migrationJobChan { go s.sendMigrationJob(m, statusEvent) @@ -118,7 +118,7 @@ func (s *migrationJobService) handleSubscribeAppspace(m twine.ReceivedMessageI) } // see appspacestatustwine which uses a same pattern wrt Twine -func (s *migrationJobService) sendMigrationJob(m twine.ReceivedMessageI, migrationJob domain.MigrationJob) { +func (s *migrationJobServiceInternal) sendMigrationJob(m twine.ReceivedMessageI, migrationJob domain.MigrationJob) { bytes, err := json.Marshal(migrationJob) if err != nil { s.getLogger("sendMigrationJob json Marshal Error").Error(err) diff --git a/cmd/ds-host/ds-host.go b/cmd/ds-host/ds-host.go index e70efea6..9b31c079 100644 --- a/cmd/ds-host/ds-host.go +++ b/cmd/ds-host/ds-host.go @@ -500,11 +500,6 @@ func main() { MigrationJobController: migrationJobCtl, } - migrationJobTwine := &twineservices.MigrationJobService{ - AppspaceModel: appspaceModel, - MigrationJobModel: migrationJobModel, - MigrationJobEvents: migrationJobEvents, - } appGetterTwine := &twineservices.AppGetterService{ AppGetter: appGetter, } @@ -523,7 +518,7 @@ func main() { DropIDRoutes: dropIDRoutes, MigrationJobRoutes: migrationJobRoutes, AppspaceStatusEvents: appspaceStatusEvents, - MigrationJobTwine: migrationJobTwine, + MigrationJobEvents: migrationJobEvents, AppGetterTwine: appGetterTwine, UserModel: userModel, Views: views} diff --git a/cmd/ds-host/events/events.go b/cmd/ds-host/events/events.go index c7f25b3c..af8cacd3 100644 --- a/cmd/ds-host/events/events.go +++ b/cmd/ds-host/events/events.go @@ -1,7 +1,6 @@ package events import ( - "sync" "time" "github.com/teleclimber/DropServer/cmd/ds-host/domain" @@ -12,79 +11,26 @@ import ( // MigrationJobEvents forwards events related to migration jobs type MigrationJobEvents struct { - subsMux sync.Mutex - subscribers []chan domain.MigrationJob - appspaceSubscribers map[domain.AppspaceID][]chan domain.MigrationJob + subscribers eventSubs[domain.MigrationJob] + ownerSubs eventIDSubs[domain.UserID, domain.MigrationJob] } -// Send sends an appspace status event -func (e *MigrationJobEvents) Send(event domain.MigrationJob) { - e.subsMux.Lock() - defer e.subsMux.Unlock() - for _, ch := range e.subscribers { - ch <- event - } - if e.appspaceSubscribers == nil { - return - } - appspaceSubs, ok := e.appspaceSubscribers[event.AppspaceID] - if ok { - for _, ch := range appspaceSubs { - ch <- event - } - } -} - -// Subscribe to an event to know when the status of a migration has changed func (e *MigrationJobEvents) Subscribe() <-chan domain.MigrationJob { - e.subsMux.Lock() - defer e.subsMux.Unlock() - ch := make(chan domain.MigrationJob) - e.subscribers = append(e.subscribers, ch) - return ch + return e.subscribers.subscribe() } -// SubscribeAppspace to an event to know when the status of a migration for an appspace has changed -func (e *MigrationJobEvents) SubscribeAppspace(appspaceID domain.AppspaceID) <-chan domain.MigrationJob { - e.subsMux.Lock() - defer e.subsMux.Unlock() - ch := make(chan domain.MigrationJob) - if e.appspaceSubscribers == nil { - e.appspaceSubscribers = make(map[domain.AppspaceID][]chan domain.MigrationJob) - } - e.appspaceSubscribers[appspaceID] = append(e.appspaceSubscribers[appspaceID], ch) - return ch +func (e *MigrationJobEvents) SubscribeOwner(ownerID domain.UserID) <-chan domain.MigrationJob { + return e.ownerSubs.subscribe(ownerID) } -// Unsubscribe to the event func (e *MigrationJobEvents) Unsubscribe(ch <-chan domain.MigrationJob) { - e.subsMux.Lock() - defer e.subsMux.Unlock() - e.removeSubscriber(ch) + e.subscribers.unsubscribe(ch) + e.ownerSubs.unsubscribe(ch) } -func (e *MigrationJobEvents) removeSubscriber(ch <-chan domain.MigrationJob) { - for i, c := range e.subscribers { - if c == ch { - e.subscribers[i] = e.subscribers[len(e.subscribers)-1] - e.subscribers = e.subscribers[:len(e.subscribers)-1] - close(c) - return - } - } - if e.appspaceSubscribers == nil { - return - } - for appspaceID, subs := range e.appspaceSubscribers { - for i, c := range subs { - if c == ch { - e.appspaceSubscribers[appspaceID][i] = e.appspaceSubscribers[appspaceID][len(subs)-1] - e.appspaceSubscribers[appspaceID] = e.appspaceSubscribers[appspaceID][:len(subs)-1] - close(c) - return - } - } - } +func (e *MigrationJobEvents) Send(data domain.MigrationJob) { + e.subscribers.send(data) + e.ownerSubs.send(data.OwnerID, data) } ////// Apppsace Files Event @@ -127,10 +73,12 @@ func (e *AppspaceStatusEvents) SubscribeOwner(ownerID domain.UserID) <-chan doma } func (e *AppspaceStatusEvents) Unsubscribe(ch <-chan domain.AppspaceStatusEvent) { + e.subscribers.unsubscribe(ch) e.ownerSubs.unsubscribe(ch) } func (e *AppspaceStatusEvents) Send(data domain.AppspaceStatusEvent) { + e.subscribers.send(data) e.ownerSubs.send(data.OwnerID, data) } diff --git a/cmd/ds-host/events/events_test.go b/cmd/ds-host/events/events_test.go deleted file mode 100644 index 5aae960f..00000000 --- a/cmd/ds-host/events/events_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package events - -import ( - "sync" - "testing" - - "github.com/teleclimber/DropServer/cmd/ds-host/domain" -) - -func TestMigrationJobAppspace(t *testing.T) { - appspaceID1 := domain.AppspaceID(7) - appspaceID2 := domain.AppspaceID(11) - - e := &MigrationJobEvents{} - - c := e.SubscribeAppspace(appspaceID1) - - var wg sync.WaitGroup - wg.Add(1) - - go func() { - d := <-c - if d.JobID != 77 { - t.Error("got the wrong data") - } - wg.Done() - }() - - e.Send(domain.MigrationJob{AppspaceID: appspaceID2}) - e.Send(domain.MigrationJob{AppspaceID: appspaceID1, JobID: 77}) - - wg.Wait() - - e.Unsubscribe(c) - - if len(e.appspaceSubscribers[appspaceID1]) != 0 { - t.Error("unsubscribe did not work") - } -} diff --git a/cmd/ds-host/events/genericevents.go b/cmd/ds-host/events/genericevents.go index 0e56b01f..92ac7d29 100644 --- a/cmd/ds-host/events/genericevents.go +++ b/cmd/ds-host/events/genericevents.go @@ -10,7 +10,7 @@ type SubscribeIDs interface { domain.UserID | domain.AppID | domain.AppspaceID } type DataTypes interface { - domain.AppURLData | domain.AppspaceID | domain.AppspaceStatusEvent + domain.AppURLData | domain.AppspaceID | domain.AppspaceStatusEvent | domain.MigrationJob } type eventIDSubs[T SubscribeIDs, D DataTypes] struct { diff --git a/cmd/ds-host/twineservices/appspacelogservice.go b/cmd/ds-host/twineservices/appspacelogservice.go index db65e894..f74e359f 100644 --- a/cmd/ds-host/twineservices/appspacelogservice.go +++ b/cmd/ds-host/twineservices/appspacelogservice.go @@ -117,8 +117,13 @@ func (s *appspaceLogService) handleSubscribeApp(m twine.ReceivedMessageI) { ls.start() } +// IncomingSubscribeAppspace is json encoded payload to subscribe to appspace status +type IncomingSubscribeAppspace struct { + AppspaceID domain.AppspaceID `json:"appspace_id"` +} + func (s *appspaceLogService) getMessageAppspace(m twine.ReceivedMessageI) (domain.Appspace, error) { - var incoming IncomingSubscribeAppspace // reused from MigrationService + var incoming IncomingSubscribeAppspace err := json.Unmarshal(m.Payload(), &incoming) if err != nil { m.SendError(err.Error()) diff --git a/cmd/ds-host/userroutes/userroutes.go b/cmd/ds-host/userroutes/userroutes.go index 8f0e8517..d5947cdc 100644 --- a/cmd/ds-host/userroutes/userroutes.go +++ b/cmd/ds-host/userroutes/userroutes.go @@ -65,9 +65,12 @@ type UserRoutes struct { SubscribeOwner(domain.UserID) <-chan domain.AppspaceStatusEvent Unsubscribe(ch <-chan domain.AppspaceStatusEvent) } `checkinject:"required"` - MigrationJobTwine domain.TwineService2 `checkinject:"required"` - AppGetterTwine domain.TwineService `checkinject:"required"` - UserModel interface { + MigrationJobEvents interface { + SubscribeOwner(domain.UserID) <-chan domain.MigrationJob + Unsubscribe(ch <-chan domain.MigrationJob) + } `checkinject:"required"` + AppGetterTwine domain.TwineService `checkinject:"required"` + UserModel interface { GetFromID(userID domain.UserID) (domain.User, error) UpdateEmail(userID domain.UserID, email string) error UpdatePassword(userID domain.UserID, password string) error @@ -337,8 +340,6 @@ func (u *UserRoutes) changeUserPassword(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusOK) // TODO send no content. } -// const appspaceStatusService = 11 -const migrationJobService = 12 const appGetterService = 13 // startTwineService connects a new twine instance to the twine services @@ -363,14 +364,11 @@ func (u *UserRoutes) startTwineService(w http.ResponseWriter, r *http.Request) { return } - migrationJobTwine := u.MigrationJobTwine.Start(authUserID, t) go u.AppGetterTwine.Start(authUserID, t) go func() { for m := range t.MessageChan { switch m.ServiceID() { - case migrationJobService: - go migrationJobTwine.HandleMessage(m) case appGetterService: go u.AppGetterTwine.HandleMessage(m) default: @@ -398,6 +396,9 @@ func (u *UserRoutes) startSSEEvents(w http.ResponseWriter, r *http.Request) { asStatCh := u.AppspaceStatusEvents.SubscribeOwner(authUserID) defer u.AppspaceStatusEvents.Unsubscribe(asStatCh) + migrationJobCh := u.MigrationJobEvents.SubscribeOwner(authUserID) + defer u.MigrationJobEvents.Unsubscribe(migrationJobCh) + rc := http.NewResponseController(w) for { select { @@ -405,7 +406,10 @@ func (u *UserRoutes) startSSEEvents(w http.ResponseWriter, r *http.Request) { return case stat := <-asStatCh: u.sendSSEEvent(w, "AppspaceStatus", stat) + case job := <-migrationJobCh: + u.sendSSEEvent(w, "MigrationJob", job) } + err := rc.Flush() if err != nil { u.getLogger("startSSEEvents() rc.Flush()").Error(err) diff --git a/frontend-ds-host/src/stores/appspaces.ts b/frontend-ds-host/src/stores/appspaces.ts index 28e825c4..39658b99 100644 --- a/frontend-ds-host/src/stores/appspaces.ts +++ b/frontend-ds-host/src/stores/appspaces.ts @@ -18,7 +18,7 @@ function appspaceStatusFromRaw(raw:any) :AppspaceStatus { owner_id: Number(raw.owner_id), appspace_id: Number(raw.appspace_id), paused: !!raw.paused, - temp_paused: raw.temp_pasued, + temp_paused: raw.temp_paused, temp_pause_reason: raw.temp_pause_reason+'', appspace_schema: Number(raw.appspace_schema), app_version_schema: Number(raw.app_version_schema), diff --git a/frontend-ds-host/src/stores/migration_jobs.ts b/frontend-ds-host/src/stores/migration_jobs.ts index 8d0b2860..5722221e 100644 --- a/frontend-ds-host/src/stores/migration_jobs.ts +++ b/frontend-ds-host/src/stores/migration_jobs.ts @@ -1,16 +1,8 @@ import { reactive, ref, shallowRef, ShallowRef, computed } from 'vue'; import { defineStore } from 'pinia'; import { ax } from '../controllers/userapi'; -import twineClient from '../twine-services/twine_client'; -import {SentMessageI} from 'twine-web'; +import { on } from '../sse'; import { LoadState, AppspaceMigrationJob } from './types'; -import { useAppspacesStore } from './appspaces'; - -const remoteService = 12; - -const remoteSubscribe = 11; -const remoteAppspaceSubscribe = 12; -const remoteUnsubscribe = 13; function migrationJobFromRaw(raw :any) :AppspaceMigrationJob { return { @@ -28,9 +20,8 @@ function migrationJobFromRaw(raw :any) :AppspaceMigrationJob { export const useAppspaceMigrationJobsStore = defineStore('appspace-migration-jobs', () => { const load_state :Map = reactive(new Map); - const connections :Map = reactive(new Map); - const migration_jobs : ShallowRef>>>> = shallowRef(new Map()); + const jobs :ShallowRef>> = shallowRef(new Map()); function isLoaded(appspace_id: number) { const l = load_state.get(appspace_id); @@ -39,94 +30,68 @@ export const useAppspaceMigrationJobsStore = defineStore('appspace-migration-job async function loadData(appspace_id: number) { const l = load_state.get(appspace_id); - if( !l ) { // || l === LoadState.NotLoaded ) { + if( !l ) { load_state.set(appspace_id, LoadState.Loading); const resp = await ax.get('/api/migration-job?appspace_id='+appspace_id); if( !Array.isArray(resp.data) ) throw new Error("expected array for migration jobs"); - const jobs :Map> = new Map; resp.data.forEach( (raw:any) => { - const j = migrationJobFromRaw(raw); - jobs.set(j.job_id, shallowRef(j)); + setReplaceJob(migrationJobFromRaw(raw)); }); - migration_jobs.value.set(appspace_id, shallowRef(jobs)); - migration_jobs.value = new Map(migration_jobs.value); load_state.set(appspace_id, LoadState.Loaded); } } - async function reloadData(appspace_id: number) { + function setReplaceJob(job:AppspaceMigrationJob) { + const job_id = job.job_id; + const ex_job = jobs.value.get(job_id); + if( ex_job === undefined ) { + jobs.value.set(job_id, shallowRef(job)); + jobs.value = new Map(jobs.value); + } + else { + ex_job.value = job; + } + return mustGetJob(job_id); + } + async function reloadData(appspace_id: number) { // TODO this should become unnecessary with proper events from the data model const l = load_state.get(appspace_id); if( l === LoadState.Loading ) return; // its' already loading so don't reload - await disconnect(appspace_id); load_state.delete(appspace_id); await loadData(appspace_id); } - function connected(appspace_id:number) :boolean { - return connections.has(appspace_id); - } - async function connect(appspace_id:number) { - if( !isLoaded(appspace_id) )throw new Error("wait until jobs are loaded to connect"); - if( connected(appspace_id) ) return; - - connections.set(appspace_id, undefined); + on('MigrationJob', (raw) => { + setReplaceJob(migrationJobFromRaw(raw)); + }); - const payload = new TextEncoder().encode(JSON.stringify({appspace_id})); - - await twineClient.ready(); - const subMessage = await twineClient.twine.send(remoteService, remoteAppspaceSubscribe, payload); - connections.set(appspace_id, subMessage); - - const jobs = mustGetJobs(appspace_id); - - for await (const m of subMessage.incomingMessages()) { - switch (m.command) { - case 11: - const raw :any = JSON.parse(new TextDecoder('utf-8').decode(m.payload)); - const in_job = migrationJobFromRaw(raw); - const job_id = in_job.job_id; - const ex_job = jobs.value.get(job_id); - if( ex_job === undefined ) { - jobs.value.set(job_id, shallowRef(in_job)); - jobs.value = new Map(jobs.value); - } - else { - ex_job.value = in_job; - } - m.sendOK(); - break; - - default: - m.sendError("What is this command?"); - throw new Error("what is this command? "+m.command); - } - } + function getJob(job_id: number) { + return jobs.value.get(job_id); } - async function disconnect(appspace_id:number) { - const subMessage = connections.get(appspace_id); - if( subMessage === undefined ) return; - connections.set(appspace_id, undefined); - await subMessage.refSendBlock(remoteUnsubscribe, undefined); - connections.delete(appspace_id); + function mustGetJob(job_id:number) { + const j = getJob(job_id); + if( j === undefined ) throw new Error("expected job id to exist"); + return j; } - function getJobs(appspace_id:number) { - if( isLoaded(appspace_id) ) return migration_jobs.value.get(appspace_id); - } - function mustGetJobs(appspace_id:number) { - const jobs = getJobs(appspace_id); - if( jobs === undefined ) throw new Error("expected appspace jobs to exist"); - return jobs; + function getRunningAppspaceJobs(appspace_id: number) { + return computed( () => { + const ongoing = Array.from(jobs.value.values()).filter( j => + j.value.appspace_id === appspace_id + && j.value.started + && !j.value.finished); + ongoing.sort( (a,b) => { + if( !a.value.started && !b.value.started ) return 0; + if( !a.value.started ) return -1; + if( !b.value.started ) return 1; + return a.value.started.getTime() - b.value.started.getTime(); + }); + return ongoing; + }); } async function createMigrationJob(appspace_id:number, to_version:string) :Promise> { - const jobs = mustGetJobs(appspace_id); const resp = await ax.post('/api/migration-job', {appspace_id, to_version}); - const job = shallowRef(migrationJobFromRaw(resp.data)); - jobs.value.set(job.value.job_id, job); - jobs.value = new Map(jobs.value); - - return job; + return setReplaceJob(migrationJobFromRaw(resp.data)); } - return { isLoaded, loadData, reloadData, createMigrationJob, getJobs, mustGetJobs, connect, disconnect }; + return { isLoaded, loadData, reloadData, createMigrationJob, getJob, mustGetJob, getRunningAppspaceJobs }; }); \ No newline at end of file diff --git a/frontend-ds-host/src/views/MigrateAppspace.vue b/frontend-ds-host/src/views/MigrateAppspace.vue index f69cd5c4..27efb1ca 100644 --- a/frontend-ds-host/src/views/MigrateAppspace.vue +++ b/frontend-ds-host/src/views/MigrateAppspace.vue @@ -1,10 +1,10 @@