Skip to content

Commit

Permalink
analyze: Add RunLoopWithMultipleIterators
Browse files Browse the repository at this point in the history
  • Loading branch information
taoky committed Sep 30, 2024
1 parent 2a428d5 commit 087ca0c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 11 deletions.
26 changes: 15 additions & 11 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/spf13/cobra"
"github.com/taoky/ayano/pkg/analyze"
"github.com/taoky/ayano/pkg/fileiter"
"github.com/taoky/ayano/pkg/systemd"
"github.com/taoky/ayano/pkg/tui"
)
Expand All @@ -31,10 +32,6 @@ func runWithConfig(cmd *cobra.Command, args []string, config analyze.AnalyzerCon
}

filenames := filenamesFromArgs(args)
// Allow multiple files only in analyze mode
if !config.Analyze && len(filenames) != 1 {
return errors.New("only one log file can be specified when following or daemonizing")
}
fmt.Fprintln(cmd.ErrOrStderr(), "Using log files:", filenames)
cmd.SilenceUsage = true

Expand Down Expand Up @@ -67,9 +64,13 @@ func runWithConfig(cmd *cobra.Command, args []string, config analyze.AnalyzerCon
return err
} else {
// Tail mode
iter, err := analyzer.OpenTailIterator(filenames[0])
if err != nil {
return err
var iters []fileiter.Iterator
for _, filename := range filenames {
iter, err := analyzer.OpenTailIterator(filename)
if err != nil {
return err
}
iters = append(iters, iter)
}

if config.Daemon {
Expand All @@ -80,15 +81,18 @@ func runWithConfig(cmd *cobra.Command, args []string, config analyze.AnalyzerCon
go tui.New(analyzer).Run()
}

return analyzer.RunLoop(iter)
if len(iters) == 1 {
return analyzer.RunLoop(iters[0])
} else {
return analyzer.RunLoopWithMultipleIterators(iters)
}
}
}

func runCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "run [filename]",
Short: "Run and follow the log file",
Args: cobra.MaximumNArgs(1),
Use: "run [filename...]",
Short: "Run and follow the log file(s)",
}
config := analyze.DefaultConfig()
config.InstallFlags(cmd.Flags())
Expand Down
46 changes: 46 additions & 0 deletions pkg/analyze/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package analyze

import (
"bytes"
"errors"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -214,6 +215,51 @@ func (a *Analyzer) RunLoop(iter fileiter.Iterator) error {
return nil
}

func (a *Analyzer) RunLoopWithMultipleIterators(iters []fileiter.Iterator) error {
var wg sync.WaitGroup
linesChan := make(chan []byte, 2*len(iters))

var errorMu sync.Mutex
var collectedErrors []error

for _, iter := range iters {
wg.Add(1)
go func() {
defer wg.Done()
for {
line, err := iter.Next()
if err != nil {
errorMu.Lock()
collectedErrors = append(collectedErrors, err)
errorMu.Unlock()
return
}
if line == nil {
return
}
linesChan <- line
}
}()
}

go func() {
wg.Wait()
close(linesChan)
}()

for result := range linesChan {
if err := a.handleLine(result); err != nil {
a.logger.Printf("analyze error: %v", err)
}
}

if len(collectedErrors) > 0 {
return errors.Join(collectedErrors...)
}

return nil
}

func (a *Analyzer) AnalyzeFile(filename string) error {
f, err := OpenFile(filename)
if err != nil {
Expand Down

0 comments on commit 087ca0c

Please sign in to comment.