From 5ae59f22cfbda07c8a22ea665606a05085658021 Mon Sep 17 00:00:00 2001 From: sharkpc138 Date: Thu, 10 Oct 2024 16:32:28 +0900 Subject: [PATCH] add option --- pkg/lobster/distributor/config.go | 31 ++++++++++++++------------ pkg/lobster/distributor/distributor.go | 9 +++++--- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/pkg/lobster/distributor/config.go b/pkg/lobster/distributor/config.go index 526860d..a77e7c2 100644 --- a/pkg/lobster/distributor/config.go +++ b/pkg/lobster/distributor/config.go @@ -22,13 +22,14 @@ import ( ) type config struct { - StdstreamLogRootPath *string - EmptyDirLogRootPath *string - FileInspectInterval *time.Duration - FileInspectMaxStale *time.Duration - TailFileMaxStale *time.Duration - MatchLookbackMin *time.Duration - MetricsInterval *time.Duration + StdstreamLogRootPath *string + EmptyDirLogRootPath *string + FileInspectInterval *time.Duration + FileInspectMaxStale *time.Duration + TailFileMaxStale *time.Duration + MatchLookbackMin *time.Duration + MetricsInterval *time.Duration + ShouldUpdateLogMatcher *bool } func setup() config { @@ -39,14 +40,16 @@ func setup() config { tailFileMaxStale := flag.Duration("distributor.tailFileMaxStale", 5*time.Second, "Decide how old files to look up to tailing") matchLookbackMin := flag.Duration("distributor.matchLookbackMin", 10*time.Second, "Determine how old the logs will be in metrics") metricsInterval := flag.Duration("distributor.metricsInterval", 5*time.Second, "metrics production interval") + shouldUpdateLogMatcher := flag.Bool("distributor.shouldUpdateLogMatcher", false, "When using the log sink function, set it to true for periodic log sink rule update") return config{ - StdstreamLogRootPath: stdstreamLogRootPath, - EmptyDirLogRootPath: emptyDirLogRootPath, - FileInspectInterval: fileInspectInterval, - FileInspectMaxStale: fileInspectMaxStale, - TailFileMaxStale: tailFileMaxStale, - MatchLookbackMin: matchLookbackMin, - MetricsInterval: metricsInterval, + StdstreamLogRootPath: stdstreamLogRootPath, + EmptyDirLogRootPath: emptyDirLogRootPath, + FileInspectInterval: fileInspectInterval, + FileInspectMaxStale: fileInspectMaxStale, + TailFileMaxStale: tailFileMaxStale, + MatchLookbackMin: matchLookbackMin, + MetricsInterval: metricsInterval, + ShouldUpdateLogMatcher: shouldUpdateLogMatcher, } } diff --git a/pkg/lobster/distributor/distributor.go b/pkg/lobster/distributor/distributor.go index b30a7cb..4b8bf5e 100644 --- a/pkg/lobster/distributor/distributor.go +++ b/pkg/lobster/distributor/distributor.go @@ -107,10 +107,13 @@ func (d *Distributor) Run(stopChan chan struct{}) { fileMap := d.extractFileMap(logfiles, *conf.FileInspectMaxStale) tailList := d.extractTailList(fileMap, *conf.TailFileMaxStale) - now := time.Now() - if err := d.matcher.Update(helper.FilterChunksByExistingPods(d.store.GetChunks(), podMap), now.Add(-*conf.FileInspectInterval), now); err != nil { - glog.Error(err) + if *conf.ShouldUpdateLogMatcher { + now := time.Now() + if err := d.matcher.Update(helper.FilterChunksByExistingPods(d.store.GetChunks(), podMap), now.Add(-*conf.FileInspectInterval), now); err != nil { + glog.Error(err) + } } + d.storeFiles(fileMap) d.tailFiles(tailList, stopChan)