Skip to content

Commit

Permalink
feat(ds-host): refactor app getter events
Browse files Browse the repository at this point in the history
- remove appgetterservice from ds-host
- add app getter events using generic events
- remove event system from AppGetter and use new events instead
- remove /twine/ route from ds-host
- move appspace log twine service to ds-dev (unused in ds-host)
- send events to ds-host frontend via SSE
- remove twine from frontend-ds-host
- add app-getter store to ds-host frontend for managing and updating app getter events
- refactor NewApp.. vue compoenents to work with new app getter store
- refactor ds-dev app watcher and app packager to use new app getter events
  • Loading branch information
teleclimber committed Sep 20, 2024
1 parent df43dfa commit 1a5eb34
Show file tree
Hide file tree
Showing 24 changed files with 267 additions and 567 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package twineservices
package main

import (
"encoding/json"
Expand Down Expand Up @@ -29,14 +29,14 @@ type AppspaceLogService struct {

// Start creates listeners and then shuts everything down when twine exits
func (s *AppspaceLogService) Start(authUser domain.UserID, t *twine.Twine) domain.TwineServiceI {
asl := &appspaceLogService{
asl := &appspaceLogServiceInternal{
AppspaceLogService: s,
twine: t,
authUser: authUser}
return asl
}

type appspaceLogService struct {
type appspaceLogServiceInternal struct {
*AppspaceLogService

twine *twine.Twine
Expand All @@ -56,7 +56,7 @@ const entrysubCmd = 13
const unsubscribeLogCmd = 13

// HandleMessage handles incoming twine message
func (s *appspaceLogService) HandleMessage(m twine.ReceivedMessageI) {
func (s *appspaceLogServiceInternal) HandleMessage(m twine.ReceivedMessageI) {
switch m.CommandID() {
case subscribeAppspaceLogCmd:
s.handleSubscribeAppspace(m)
Expand Down Expand Up @@ -87,7 +87,7 @@ func (s *appspaceLogService) HandleMessage(m twine.ReceivedMessageI) {
// [reply under this message to keep separate from initial chunk]
// - 13> unsubscribe

func (s *appspaceLogService) handleSubscribeAppspace(m twine.ReceivedMessageI) {
func (s *appspaceLogServiceInternal) handleSubscribeAppspace(m twine.ReceivedMessageI) {
appspace, err := s.getMessageAppspace(m)
if err != nil {
return
Expand All @@ -102,7 +102,7 @@ func (s *appspaceLogService) handleSubscribeAppspace(m twine.ReceivedMessageI) {
ls.start()
}

func (s *appspaceLogService) handleSubscribeApp(m twine.ReceivedMessageI) {
func (s *appspaceLogServiceInternal) handleSubscribeApp(m twine.ReceivedMessageI) {
appVersion, err := s.getMessageApp(m)
if err != nil {
return
Expand All @@ -117,12 +117,7 @@ func (s *appspaceLogService) handleSubscribeApp(m twine.ReceivedMessageI) {
ls.start()
}

// IncomingSubscribeAppspace is json encoded payload to subscribe to appspace status
type IncomingSubscribeAppspace struct {
AppspaceID domain.AppspaceID `json:"appspace_id"`
}

func (s *appspaceLogService) getMessageAppspace(m twine.ReceivedMessageI) (domain.Appspace, error) {
func (s *appspaceLogServiceInternal) getMessageAppspace(m twine.ReceivedMessageI) (domain.Appspace, error) {
var incoming IncomingSubscribeAppspace
err := json.Unmarshal(m.Payload(), &incoming)
if err != nil {
Expand All @@ -147,7 +142,7 @@ type IncomingSubscribeApp struct {
Version domain.Version `json:"version"`
}

func (s *appspaceLogService) getMessageApp(m twine.ReceivedMessageI) (domain.AppVersion, error) {
func (s *appspaceLogServiceInternal) getMessageApp(m twine.ReceivedMessageI) (domain.AppVersion, error) {
var incoming IncomingSubscribeApp
err := json.Unmarshal(m.Payload(), &incoming)
if err != nil {
Expand Down
19 changes: 11 additions & 8 deletions cmd/ds-dev/appwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ var ignorePaths = []string{
type DevAppWatcher struct {
AppGetter interface {
Reprocess(userID domain.UserID, appID domain.AppID, locationKey string) (domain.AppGetKey, error)
SubscribeKey(key domain.AppGetKey) (domain.AppGetEvent, <-chan domain.AppGetEvent)
GetResults(key domain.AppGetKey) (domain.AppGetMeta, bool)
DeleteKeyData(key domain.AppGetKey)
} `checkinject:"required"`
Expand All @@ -31,6 +30,10 @@ type DevAppWatcher struct {
DevAppProcessEvents interface {
Send(AppProcessEvent)
} `checkinject:"required"`
AppGetterEvents interface {
SubscribeOwner(domain.UserID) <-chan domain.AppGetEvent
Unsubscribe(<-chan domain.AppGetEvent)
} `checkinject:"required"`
AppVersionEvents interface {
Send(string)
} `checkinject:"required"`
Expand Down Expand Up @@ -81,17 +84,17 @@ func (w *DevAppWatcher) reprocessAppFiles() { // Maybe export this so it can be
panic(err)
}

lastEvent, appGetCh := w.AppGetter.SubscribeKey(appGetKey)
if lastEvent.Done || appGetCh == nil {
w.reloadMetadata(appGetKey)
return
}
appGetCh := w.AppGetterEvents.SubscribeOwner(ownerID)
defer w.AppGetterEvents.Unsubscribe(appGetCh)

// subscribe and wait
reloading := false
for e := range appGetCh {
if e.Key != appGetKey {
continue
}
if e.Done {
// if processing is done, get results to get the errors.
// if processing is done, unsubscribe to stop loop, then get results to get the errors.
w.AppGetterEvents.Unsubscribe(appGetCh)
results, ok := w.AppGetter.GetResults(appGetKey)
if ok {
w.DevAppProcessEvents.Send(AppProcessEvent{
Expand Down
11 changes: 7 additions & 4 deletions cmd/ds-dev/ds-dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/teleclimber/DropServer/cmd/ds-host/models/appfilesmodel"
"github.com/teleclimber/DropServer/cmd/ds-host/sandboxproxy"
"github.com/teleclimber/DropServer/cmd/ds-host/sandboxservices"
"github.com/teleclimber/DropServer/cmd/ds-host/twineservices"
"github.com/teleclimber/DropServer/denosandboxcode"
"github.com/teleclimber/DropServer/internal/checkinject"
"github.com/teleclimber/DropServer/internal/embedutils"
Expand Down Expand Up @@ -136,6 +135,7 @@ func main() {
appspaceFilesEvents := &events.AppspaceFilesEvents{}
migrationJobEvents := &events.MigrationJobEvents{}
appspaceStatusEvents := &events.AppspaceStatusEvents{}
appGetterEvents := &events.AppGetterEvents{}
routeHitEvents := &events.AppspaceRouteHitEvents{}

appLocation2Path := &AppLocation2Path{
Expand Down Expand Up @@ -180,6 +180,7 @@ func main() {
AppModel: devAppModel,
AppRoutes: AppRoutes,
AppLogger: appLogger,
AppGetterEvents: appGetterEvents,
}
appGetter.Init()

Expand All @@ -193,6 +194,7 @@ func main() {
DevAppModel: devAppModel,
DevAppspaceModel: devAppspaceModel,
DevAppProcessEvents: appProcessingEvents,
AppGetterEvents: appGetterEvents,
AppVersionEvents: appVersionEvents,
}
if appOriginType == Directory {
Expand All @@ -214,8 +216,9 @@ func main() {

if *createPackageFlag != "" {
packager := &AppPackager{
AppGetter: appGetter,
AppFilesModel: appFilesModel}
AppGetter: appGetter,
AppFilesModel: appFilesModel,
AppGetterEvents: appGetterEvents}
packager.PackageApp(appOrigin, *createPackageFlag, *packageNameFlag)
os.Exit(0)
}
Expand Down Expand Up @@ -380,7 +383,7 @@ func main() {
MigrationJobModel: devMigrationJobModel,
MigrationJobEvents: migrationJobEvents,
}
appspaceLogTwine := &twineservices.AppspaceLogService{
appspaceLogTwine := &AppspaceLogService{
AppspaceModel: devAppspaceModel,
AppModel: devSingleAppModel,
AppspaceLogger: appspaceLogger,
Expand Down
15 changes: 10 additions & 5 deletions cmd/ds-dev/packageapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
type AppPackager struct {
AppGetter interface {
Reprocess(userID domain.UserID, appID domain.AppID, locationKey string) (domain.AppGetKey, error)
SubscribeKey(key domain.AppGetKey) (domain.AppGetEvent, <-chan domain.AppGetEvent)
GetResults(key domain.AppGetKey) (domain.AppGetMeta, bool)
DeleteKeyData(key domain.AppGetKey)
}
AppFilesModel interface {
ReadManifest(string) (domain.AppVersionManifest, error)
}
AppGetterEvents interface {
SubscribeOwner(domain.UserID) <-chan domain.AppGetEvent
Unsubscribe(<-chan domain.AppGetEvent)
} `checkinject:"required"`
}

func (p *AppPackager) PackageApp(appDir, outDir string, appName string) {
Expand Down Expand Up @@ -126,14 +129,15 @@ func (p *AppPackager) loadAppData() domain.AppGetMeta {
panic(err)
}

lastEvent, appGetCh := p.AppGetter.SubscribeKey(appGetKey)
if lastEvent.Done || appGetCh == nil {
return p.getResults(appGetKey)
}
appGetCh := p.AppGetterEvents.SubscribeOwner(ownerID)
defer p.AppGetterEvents.Unsubscribe(appGetCh)

rChan := make(chan domain.AppGetMeta, 1)
done := false
for e := range appGetCh {
if e.Key != appGetKey {
continue
}
if e.Done {
if !done {
fmt.Println("Done processing app")
Expand All @@ -143,6 +147,7 @@ func (p *AppPackager) loadAppData() domain.AppGetMeta {
}()
}
done = true
p.AppGetterEvents.Unsubscribe(appGetCh) // unsubscribe to stop for loop
} else {
fmt.Println(e.Step)
}
Expand Down
3 changes: 0 additions & 3 deletions cmd/ds-dev/packageapp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,6 @@ func makeTestDir() (string, error) {
if err = os.MkdirAll(filepath.Dir(f2Path), 0777); err != nil {
return "", err
}
if err != nil {
return "", err
}
if err = os.WriteFile(f2Path, []byte("abc"), 0666); err != nil {
return "", err
}
Expand Down
58 changes: 8 additions & 50 deletions cmd/ds-host/appops/appgetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,16 @@ type AppGetter struct {
AppRoutes interface {
ValidateRoutes(routes []domain.AppRoute) error
} `checkinject:"required"`
AppGetterEvents interface {
Send(domain.AppGetEvent)
} `checkinject:"required"`

keysMux sync.Mutex
keys map[domain.AppGetKey]appGetData
results map[domain.AppGetKey]domain.AppGetMeta

eventsMux sync.Mutex
lastEvent map[domain.AppGetKey]domain.AppGetEvent // stash the last event so we can
subscribers []subscriber
eventsMux sync.Mutex
lastEvent map[domain.AppGetKey]domain.AppGetEvent // stash the last event so we can
}

// Init creates the map [and starts the timers]
Expand Down Expand Up @@ -978,8 +980,6 @@ func (g *AppGetter) DeleteKeyData(key domain.AppGetKey) {

// Send one last event in case there are any subscribers
g.sendEvent(appGetData, domain.AppGetEvent{Key: key, Done: true, Step: "Deleting processing data"})
// unsubscribe the channels listenning for updates to the key
g.unsubscribeKey(appGetData.key)
// delete the last_event
g.eventsMux.Lock()
delete(g.lastEvent, key)
Expand Down Expand Up @@ -1136,18 +1136,6 @@ func (g *AppGetter) GetLastEvent(key domain.AppGetKey) (domain.AppGetEvent, bool
return e, ok
}

// SubscribeKey returns the last event and a channel if the process is ongoing
func (g *AppGetter) SubscribeKey(key domain.AppGetKey) (domain.AppGetEvent, <-chan domain.AppGetEvent) {
g.eventsMux.Lock()
defer g.eventsMux.Unlock()
lastEvent, ok := g.lastEvent[key]
if !ok || lastEvent.Done {
return lastEvent, nil
}
ch := make(chan domain.AppGetEvent)
g.subscribers = append(g.subscribers, subscriber{hasKey: true, key: key, ch: ch})
return lastEvent, ch
}
func (g *AppGetter) sendEvent(getData appGetData, ev domain.AppGetEvent) {
ev.Key = getData.key
g.eventsMux.Lock()
Expand All @@ -1157,41 +1145,11 @@ func (g *AppGetter) sendEvent(getData appGetData, ev domain.AppGetEvent) {
// But what happens if we do send conflicing signals?
// Maybe just log it? We can investigate further if we see this in logs.

ev.OwnerID = getData.userID

g.lastEvent[ev.Key] = ev

for _, s := range g.subscribers {
if s.hasKey && ev.Key == s.key {
s.ch <- ev
}
// else if hasUserID; else if hasAppID ..
}
}
func (g *AppGetter) unsubscribeKey(key domain.AppGetKey) { // TODO please at least test this.
g.eventsMux.Lock()
defer g.eventsMux.Unlock()
k := 0
for _, s := range g.subscribers {
if s.hasKey && s.key == key {
close(s.ch)
} else {
g.subscribers[k] = s
k++
}
}
g.subscribers = g.subscribers[:k]
}
func (g *AppGetter) Unsubscribe(ch <-chan domain.AppGetEvent) {
g.eventsMux.Lock()
defer g.eventsMux.Unlock()
for i, s := range g.subscribers {
if s.ch == ch {
g.subscribers[i] = g.subscribers[len(g.subscribers)-1]
g.subscribers = g.subscribers[:len(g.subscribers)-1]
close(s.ch)
return
}
}
g.getLogger("Unsubscribe").Log("Failed to find subscriber channel.")
g.AppGetterEvents.Send(ev)
}

func (g *AppGetter) getLogger(note string) *record.DsLogger {
Expand Down
3 changes: 2 additions & 1 deletion cmd/ds-host/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,8 @@ type AppGetMeta struct {

// AppGetEvent contains updates to an app getter process
type AppGetEvent struct {
Key AppGetKey `json:"key"`
OwnerID UserID `json:"owner_id"`
Key AppGetKey `json:"key"`
// Done means the entire process is finished, nothing more is going to happen.
Done bool `json:"done"`
// Input is non-empty string when user input is needed (like "commit", or "see warnings then continue")
Expand Down
9 changes: 3 additions & 6 deletions cmd/ds-host/ds-host.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/teleclimber/DropServer/cmd/ds-host/sandboxproxy"
"github.com/teleclimber/DropServer/cmd/ds-host/sandboxservices"
"github.com/teleclimber/DropServer/cmd/ds-host/server"
"github.com/teleclimber/DropServer/cmd/ds-host/twineservices"
"github.com/teleclimber/DropServer/cmd/ds-host/userroutes"
"github.com/teleclimber/DropServer/cmd/ds-host/views"
"github.com/teleclimber/DropServer/internal/checkinject"
Expand Down Expand Up @@ -140,6 +139,7 @@ func main() {
appspaceFilesEvents := &events.AppspaceFilesEvents{}
appspaceStatusEvents := &events.AppspaceStatusEvents{}
migrationJobEvents := &events.MigrationJobEvents{}
appGetterEvents := &events.AppGetterEvents{}
appUrlDataEvents := &events.AppUrlDataEvents{}

// models
Expand Down Expand Up @@ -325,6 +325,7 @@ func main() {
RemoteAppGetter: remoteAppGetter,
SandboxManager: sandboxManager,
AppRoutes: AppRoutes,
AppGetterEvents: appGetterEvents,
}
appGetter.Init()

Expand Down Expand Up @@ -500,10 +501,6 @@ func main() {
MigrationJobController: migrationJobCtl,
}

appGetterTwine := &twineservices.AppGetterService{
AppGetter: appGetter,
}

userRoutes := &userroutes.UserRoutes{
Config: runtimeConfig,
Authenticator: authenticator,
Expand All @@ -519,7 +516,7 @@ func main() {
MigrationJobRoutes: migrationJobRoutes,
AppspaceStatusEvents: appspaceStatusEvents,
MigrationJobEvents: migrationJobEvents,
AppGetterTwine: appGetterTwine,
AppGetterEvents: appGetterEvents,
UserModel: userModel,
Views: views}
userRoutes.Init()
Expand Down
17 changes: 17 additions & 0 deletions cmd/ds-host/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,23 @@ func (e *AppspaceRouteHitEvents) removeSubscriber(ch chan<- *domain.AppspaceRout
}
}

// AppGetterEvents
type AppGetterEvents struct {
ownerSubs eventIDSubs[domain.UserID, domain.AppGetEvent]
}

func (e *AppGetterEvents) SubscribeOwner(ownerID domain.UserID) <-chan domain.AppGetEvent {
return e.ownerSubs.subscribe(ownerID)
}

func (e *AppGetterEvents) Unsubscribe(ch <-chan domain.AppGetEvent) {
e.ownerSubs.unsubscribe(ch)
}

func (e *AppGetterEvents) Send(data domain.AppGetEvent) {
e.ownerSubs.send(data.OwnerID, data)
}

// AppUrlDataEvents sends AppURLData
type AppUrlDataEvents struct {
ownerSubs eventIDSubs[domain.UserID, domain.AppURLData]
Expand Down
Loading

0 comments on commit 1a5eb34

Please sign in to comment.