From bbd6423079b086030880911f0d23011e5e5f07c8 Mon Sep 17 00:00:00 2001 From: taoky Date: Mon, 30 Sep 2024 01:28:18 +0800 Subject: [PATCH] analyze: Add RunLoopWithMultipleIterators --- cmd/run.go | 21 +++++++++++------- pkg/analyze/analyze.go | 49 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 162f8cd..e2171fc 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -11,6 +11,7 @@ import ( "github.com/spf13/cobra" "github.com/taoky/ayano/pkg/analyze" + "github.com/taoky/ayano/pkg/fileiter" "github.com/taoky/ayano/pkg/systemd" "github.com/taoky/ayano/pkg/tui" ) @@ -31,10 +32,6 @@ func runWithConfig(cmd *cobra.Command, args []string, config analyze.AnalyzerCon } filenames := filenamesFromArgs(args) - // Allow multiple files only in analyze mode - if !config.Analyze && len(filenames) != 1 { - return errors.New("only one log file can be specified when following or daemonizing") - } fmt.Fprintln(cmd.ErrOrStderr(), "Using log files:", filenames) cmd.SilenceUsage = true @@ -67,9 +64,13 @@ func runWithConfig(cmd *cobra.Command, args []string, config analyze.AnalyzerCon return err } else { // Tail mode - iter, err := analyzer.OpenTailIterator(filenames[0]) - if err != nil { - return err + var iters []fileiter.Iterator + for _, filename := range filenames { + iter, err := analyzer.OpenTailIterator(filename) + if err != nil { + return err + } + iters = append(iters, iter) } if config.Daemon { @@ -80,7 +81,11 @@ func runWithConfig(cmd *cobra.Command, args []string, config analyze.AnalyzerCon go tui.New(analyzer).Run() } - return analyzer.RunLoop(iter) + if len(iters) == 1 { + return analyzer.RunLoop(iters[0]) + } else { + return analyzer.RunLoopWithMultipleIterators(iters) + } } } diff --git a/pkg/analyze/analyze.go b/pkg/analyze/analyze.go index 94b551f..8afc255 100644 --- a/pkg/analyze/analyze.go +++ b/pkg/analyze/analyze.go @@ -214,6 +214,55 @@ func (a *Analyzer) RunLoop(iter fileiter.Iterator) error { return nil } +func (a *Analyzer) RunLoopWithMultipleIterators(iters []fileiter.Iterator) error { + var wg sync.WaitGroup + linesChan := make(chan []byte) + + var errorMu sync.Mutex + var collectedErrors []error + + for _, iter := range iters { + wg.Add(1) + go func() { + defer wg.Done() + for { + line, err := iter.Next() + if err != nil { + errorMu.Lock() + collectedErrors = append(collectedErrors, err) + errorMu.Unlock() + return + } + if line == nil { + return + } + linesChan <- line + } + }() + } + + go func() { + wg.Done() + close(linesChan) + }() + + for result := range linesChan { + if err := a.handleLine(result); err != nil { + a.logger.Printf("analyze error: %v", err) + } + } + + if len(collectedErrors) > 0 { + combinedError := fmt.Errorf("encountered %d errors", len(collectedErrors)) + for _, err := range collectedErrors { + combinedError = fmt.Errorf("%w; %v", combinedError, err) + } + return combinedError + } + + return nil +} + func (a *Analyzer) AnalyzeFile(filename string) error { f, err := OpenFile(filename) if err != nil {