From f683c105643f855df6b319968063db84ea55400b Mon Sep 17 00:00:00 2001 From: sharkpc138 Date: Fri, 20 Dec 2024 17:30:07 +0900 Subject: [PATCH 1/3] fix to remove strict timestamp validation --- pkg/lobster/store/reader.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/pkg/lobster/store/reader.go b/pkg/lobster/store/reader.go index 4ae6ebe..120fb1a 100644 --- a/pkg/lobster/store/reader.go +++ b/pkg/lobster/store/reader.go @@ -217,16 +217,12 @@ func readBlocks(chunk model.Chunk, storeRootkDir string, onlySeries bool, start return nil, []model.Bucket{}, errors.New("invalid range") } - prevTs := time.Time{} for _, block := range blocks { if !(block.StartTime().Before(end) && block.EndTime().After(start)) { continue } - skip, err := readBlock(chunk.Source.Type, block, fmt.Sprintf("%s/%s/%s", storeRootkDir, chunk.RelativeBlockDir, block.FileName()), onlySeries, buffer, bucketBuilder, prevTs, start, end, filterers...) - if prevTs.Before(block.EndTime()) { - prevTs = block.EndTime() - } + skip, err := readBlock(chunk.Source.Type, block, fmt.Sprintf("%s/%s/%s", storeRootkDir, chunk.RelativeBlockDir, block.FileName()), onlySeries, buffer, bucketBuilder, start, end, filterers...) if skip { continue } @@ -240,8 +236,11 @@ func readBlocks(chunk model.Chunk, storeRootkDir string, onlySeries bool, start return buffer.Bytes(), bucketBuilder.Build(), nil } -func readBlock(sourceType string, block model.ReadableBlock, blockPath string, onlySeries bool, buffer *bytes.Buffer, bucketBuilder *model.BucketBuilder, prevTs, start, end time.Time, filterers ...filter.Filterer) (bool, error) { - var blkReader *blockReader +func readBlock(sourceType string, block model.ReadableBlock, blockPath string, onlySeries bool, buffer *bytes.Buffer, bucketBuilder *model.BucketBuilder, start, end time.Time, filterers ...filter.Filterer) (bool, error) { + var ( + blkReader *blockReader + isStartFound bool + ) f, err := directio.OpenFile(blockPath, os.O_RDONLY, 0) if err != nil { @@ -296,13 +295,10 @@ func readBlock(sourceType string, block model.ReadableBlock, blockPath string, o continue } - if !prevTs.IsZero() && (ts.Before(prevTs) || ts.Equal(prevTs)) { - continue // prevent to read duplicated contents - } - - if ts.Before(start) { + if !isStartFound && ts.Before(start) { continue } + isStartFound = true if ts.After(end) { break From 3b7b91dec948842ec66572daede24721cb2bb43f Mon Sep 17 00:00:00 2001 From: sharkpc138 Date: Fri, 20 Dec 2024 17:37:44 +0900 Subject: [PATCH 2/3] update guide --- docs/design/notes.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/design/notes.md b/docs/design/notes.md index 36bf6cf..8df4d8d 100644 --- a/docs/design/notes.md +++ b/docs/design/notes.md @@ -8,7 +8,8 @@ - There may be a discrepancy between the timestamp in the log and the time of logging, resulting in inconsistencies such as querying logs outside their intended time period - Measures taken - Logs are collected and sorted in chronological order within a specified time period before being stored - Logs are collected and sorted before retrieval + - This operation is performed only within the `store.leakyBucketInterval (default 1s)` + - Logs with a timestamp difference greater than this interval are written directly without reordering - This issue can occur if logs arrive in reverse order during the collection and storage cycle ### Explanation of each file log rotation method From aa865a180608c89de2cc8bdc80f0d5a3d6918679 Mon Sep 17 00:00:00 2001 From: sharkpc138 Date: Mon, 23 Dec 2024 11:32:49 +0900 Subject: [PATCH 3/3] fix test --- pkg/loggen/renamed_file_stub.go | 30 ++++++++++++++++++++---------- pkg/loggen/stub.go | 4 ++-- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/pkg/loggen/renamed_file_stub.go b/pkg/loggen/renamed_file_stub.go index 3a66ac4..43495de 100644 --- a/pkg/loggen/renamed_file_stub.go +++ b/pkg/loggen/renamed_file_stub.go @@ -20,6 +20,7 @@ import ( "fmt" "log" "os" + "syscall" "time" "github.com/naver/lobster/pkg/lobster/query" @@ -40,21 +41,20 @@ func (r RenamedFileStub) GenerateLogs(conf Config, stopChan chan struct{}) { if err != nil { panic(err) } - defer func() { - if cErr := logFile.Close(); err == nil { - err = cErr - } - }() + defer logFile.Close() + + var ( + logger = log.New(logFile, "", 0) + ticker = time.NewTicker(100 * time.Millisecond) + lastInode = getFileInode(*conf.RenamedFileLogPath) + ) - interval := 10 * time.Millisecond - logger := log.New(logFile, "", 0) - ticker := time.NewTicker(interval) go func() { for { select { case <-ticker.C: - stat, err := os.Stat(*conf.RenamedFileLogPath) - if err != nil || time.Since(stat.ModTime()) < interval*2 { + currentInode := getFileInode(*conf.RenamedFileLogPath) + if currentInode != lastInode { newLogFile, err := os.OpenFile(*conf.RenamedFileLogPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600) if err != nil { panic(err) @@ -62,6 +62,7 @@ func (r RenamedFileStub) GenerateLogs(conf Config, stopChan chan struct{}) { logger.SetOutput(newLogFile) logFile.Close() logFile = newLogFile + lastInode = currentInode } case <-stopChan: ticker.Stop() @@ -86,3 +87,12 @@ func (r RenamedFileStub) Query() query.Request { FilterIncludeExpr: r.Keyword(), } } + +func getFileInode(filePath string) uint64 { + stat, _ := os.Stat(filePath) + if stat == nil { + return 0 + } + + return stat.Sys().(*syscall.Stat_t).Ino +} diff --git a/pkg/loggen/stub.go b/pkg/loggen/stub.go index 91b4d07..ded0995 100644 --- a/pkg/loggen/stub.go +++ b/pkg/loggen/stub.go @@ -43,7 +43,7 @@ func generateLogs(logger *log.Logger, conf Config, stopChan chan struct{}, logFo str.WriteString(fmt.Sprintf("%d", i)) } - logSequences = append(logSequences, logFormatter(str.String())) + logSequences = append(logSequences, str.String()) } for { @@ -52,7 +52,7 @@ func generateLogs(logger *log.Logger, conf Config, stopChan chan struct{}, logFo index = (index + 1) % 10 for i := 0; i < *conf.LogLines; i++ { - logger.Print(logSequences[index]) + logger.Print(logFormatter(logSequences[index])) } case <-stopChan: