Skip to content

Commit

Permalink
Avoid concurrent wrapper generation
Browse files Browse the repository at this point in the history
We had logic to debounce regenerating wrappers,
but if there were a large stream of FS events it could
still lead to several instances of the code generation
running concurrently. Fix this by tracking when the
last run completed, and only then re-running the
code generation.
  • Loading branch information
eandre committed Dec 11, 2023
1 parent 9d0883b commit 26ccedf
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 21 deletions.
31 changes: 18 additions & 13 deletions cli/daemon/apps/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,19 +393,24 @@ func (i *Instance) beginWatch() error {
}

go func() {
for range i.watcher.EventsReady {
batch := i.watcher.GetEventsBatch()
events := batch.Events()

if i.mgr != nil {
i.mgr.onWatchEvent(i, events)
}

i.watchMu.Lock()
watchers := i.watchers
i.watchMu.Unlock()
for _, sub := range watchers {
sub.f(i, events)
for {
select {
case <-i.watcher.Done():
return
case <-i.watcher.EventsReady:
batch := i.watcher.GetEventsBatch()
events := batch.Events()

if i.mgr != nil {
i.mgr.onWatchEvent(i, events)
}

i.watchMu.Lock()
watchers := i.watchers
i.watchMu.Unlock()
for _, sub := range watchers {
sub.f(i, events)
}
}
}
}()
Expand Down
4 changes: 2 additions & 2 deletions cli/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Server struct {
availableVer atomic.Value // string

appDebounceMu sync.Mutex
appDebouncers map[*apps.Instance]debouncer
appDebouncers map[*apps.Instance]*regenerateCodeDebouncer

daemonpb.UnimplementedDaemonServer
}
Expand All @@ -67,7 +67,7 @@ func New(appsMgr *apps.Manager, mgr *run.Manager, cm *sqldb.ClusterManager, sm *
ns: ns,
streams: make(map[string]*streamLog),

appDebouncers: make(map[*apps.Instance]debouncer),
appDebouncers: make(map[*apps.Instance]*regenerateCodeDebouncer),
}

mgr.AddListener(srv)
Expand Down
47 changes: 43 additions & 4 deletions cli/daemon/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/fs"
"os"
"path/filepath"
"sync"
"time"

"github.com/bep/debounce"
Expand Down Expand Up @@ -44,12 +45,52 @@ func (s *Server) onWatchEvent(i *apps.Instance, events []watcher.Event) {
s.appDebounceMu.Lock()
deb := s.appDebouncers[i]
if deb == nil {
deb = debounce.New(100 * time.Millisecond)
deb = &regenerateCodeDebouncer{
debounce: debounce.New(100 * time.Millisecond),
doRun: func() { s.regenerateUserCode(context.Background(), i) },
}
s.appDebouncers[i] = deb
}
s.appDebounceMu.Unlock()

deb(func() { s.regenerateUserCode(context.Background(), i) })
deb.ChangeEvent()
}

type regenerateCodeDebouncer struct {
debounce func(func())
mu sync.Mutex
running bool
runAfter bool

doRun func()
}

func (g *regenerateCodeDebouncer) ChangeEvent() {
g.debounce(func() {
g.mu.Lock()

// If we're already running, mark to run again when complete.
if g.running {
g.runAfter = true
g.mu.Unlock()
return
}

// Otherwise, keep re-running for as long as change events come in.
g.running = true
g.runAfter = true // to start us off, at least once.
for g.runAfter {
g.runAfter = false // reset for next time
g.mu.Unlock()
g.doRun() // actually run
g.mu.Lock()
}

// If we get here g.runAfter nobody requested another run, so we can stop.
g.running = false

g.mu.Unlock()
})
}

func (s *Server) regenerateUserCode(ctx context.Context, app *apps.Instance) {
Expand Down Expand Up @@ -99,5 +140,3 @@ func (s *Server) updateGitIgnore(i *apps.Instance) error {
}
return nil
}

type debouncer = func(fn func())
6 changes: 4 additions & 2 deletions pkg/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ func (w *Watcher) GetEventsBatch() *Events {
}

func (w *Watcher) Close() error {
w.stop <- struct{}{}
close(w.EventsReady)
close(w.stop)
return nil
}

func (w *Watcher) Done() <-chan struct{} {
return w.stop
}

0 comments on commit 26ccedf

Please sign in to comment.