Skip to content

Commit

Permalink
analyze: Add RunLoopWithMultipleIterators
Browse files Browse the repository at this point in the history
  • Loading branch information
taoky committed Sep 30, 2024
1 parent 339ff5c commit bbd6423
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 8 deletions.
21 changes: 13 additions & 8 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/spf13/cobra"
"github.com/taoky/ayano/pkg/analyze"
"github.com/taoky/ayano/pkg/fileiter"
"github.com/taoky/ayano/pkg/systemd"
"github.com/taoky/ayano/pkg/tui"
)
Expand All @@ -31,10 +32,6 @@ func runWithConfig(cmd *cobra.Command, args []string, config analyze.AnalyzerCon
}

filenames := filenamesFromArgs(args)
// Allow multiple files only in analyze mode
if !config.Analyze && len(filenames) != 1 {
return errors.New("only one log file can be specified when following or daemonizing")
}
fmt.Fprintln(cmd.ErrOrStderr(), "Using log files:", filenames)
cmd.SilenceUsage = true

Expand Down Expand Up @@ -67,9 +64,13 @@ func runWithConfig(cmd *cobra.Command, args []string, config analyze.AnalyzerCon
return err
} else {
// Tail mode
iter, err := analyzer.OpenTailIterator(filenames[0])
if err != nil {
return err
var iters []fileiter.Iterator
for _, filename := range filenames {
iter, err := analyzer.OpenTailIterator(filename)
if err != nil {
return err
}
iters = append(iters, iter)
}

if config.Daemon {
Expand All @@ -80,7 +81,11 @@ func runWithConfig(cmd *cobra.Command, args []string, config analyze.AnalyzerCon
go tui.New(analyzer).Run()
}

return analyzer.RunLoop(iter)
if len(iters) == 1 {
return analyzer.RunLoop(iters[0])
} else {
return analyzer.RunLoopWithMultipleIterators(iters)
}
}
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/analyze/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,55 @@ func (a *Analyzer) RunLoop(iter fileiter.Iterator) error {
return nil
}

func (a *Analyzer) RunLoopWithMultipleIterators(iters []fileiter.Iterator) error {
var wg sync.WaitGroup
linesChan := make(chan []byte)

var errorMu sync.Mutex
var collectedErrors []error

for _, iter := range iters {
wg.Add(1)
go func() {
defer wg.Done()
for {
line, err := iter.Next()
if err != nil {
errorMu.Lock()
collectedErrors = append(collectedErrors, err)
errorMu.Unlock()
return
}
if line == nil {
return
}
linesChan <- line
}
}()
}

go func() {
wg.Done()
close(linesChan)
}()

for result := range linesChan {
if err := a.handleLine(result); err != nil {
a.logger.Printf("analyze error: %v", err)
}
}

if len(collectedErrors) > 0 {
combinedError := fmt.Errorf("encountered %d errors", len(collectedErrors))
for _, err := range collectedErrors {
combinedError = fmt.Errorf("%w; %v", combinedError, err)
}
return combinedError
}

return nil
}

func (a *Analyzer) AnalyzeFile(filename string) error {
f, err := OpenFile(filename)
if err != nil {
Expand Down

0 comments on commit bbd6423

Please sign in to comment.