Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support out of timestamp order #29

Merged
merged 3 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/design/notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
- There may be a discrepancy between the timestamp in the log and the time of logging, resulting in inconsistencies such as querying logs outside their intended time period
- Measures taken
- Logs are collected and sorted in chronological order within a specified time period before being stored
Logs are collected and sorted before retrieval
- This operation is performed only within the `store.leakyBucketInterval (default 1s)`
- Logs with a timestamp difference greater than this interval are written directly without reordering
- This issue can occur if logs arrive in reverse order during the collection and storage cycle

### Explanation of each file log rotation method
Expand Down
20 changes: 8 additions & 12 deletions pkg/lobster/store/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,12 @@ func readBlocks(chunk model.Chunk, storeRootkDir string, onlySeries bool, start
return nil, []model.Bucket{}, errors.New("invalid range")
}

prevTs := time.Time{}
for _, block := range blocks {
if !(block.StartTime().Before(end) && block.EndTime().After(start)) {
continue
}

skip, err := readBlock(chunk.Source.Type, block, fmt.Sprintf("%s/%s/%s", storeRootkDir, chunk.RelativeBlockDir, block.FileName()), onlySeries, buffer, bucketBuilder, prevTs, start, end, filterers...)
if prevTs.Before(block.EndTime()) {
prevTs = block.EndTime()
}
skip, err := readBlock(chunk.Source.Type, block, fmt.Sprintf("%s/%s/%s", storeRootkDir, chunk.RelativeBlockDir, block.FileName()), onlySeries, buffer, bucketBuilder, start, end, filterers...)
if skip {
continue
}
Expand All @@ -240,8 +236,11 @@ func readBlocks(chunk model.Chunk, storeRootkDir string, onlySeries bool, start
return buffer.Bytes(), bucketBuilder.Build(), nil
}

func readBlock(sourceType string, block model.ReadableBlock, blockPath string, onlySeries bool, buffer *bytes.Buffer, bucketBuilder *model.BucketBuilder, prevTs, start, end time.Time, filterers ...filter.Filterer) (bool, error) {
var blkReader *blockReader
func readBlock(sourceType string, block model.ReadableBlock, blockPath string, onlySeries bool, buffer *bytes.Buffer, bucketBuilder *model.BucketBuilder, start, end time.Time, filterers ...filter.Filterer) (bool, error) {
var (
blkReader *blockReader
isStartFound bool
)

f, err := directio.OpenFile(blockPath, os.O_RDONLY, 0)
if err != nil {
Expand Down Expand Up @@ -296,13 +295,10 @@ func readBlock(sourceType string, block model.ReadableBlock, blockPath string, o
continue
}

if !prevTs.IsZero() && (ts.Before(prevTs) || ts.Equal(prevTs)) {
continue // prevent to read duplicated contents
}

if ts.Before(start) {
if !isStartFound && ts.Before(start) {
continue
}
isStartFound = true

if ts.After(end) {
break
Expand Down
30 changes: 20 additions & 10 deletions pkg/loggen/renamed_file_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"log"
"os"
"syscall"
"time"

"github.com/naver/lobster/pkg/lobster/query"
Expand All @@ -40,28 +41,28 @@ func (r RenamedFileStub) GenerateLogs(conf Config, stopChan chan struct{}) {
if err != nil {
panic(err)
}
defer func() {
if cErr := logFile.Close(); err == nil {
err = cErr
}
}()
defer logFile.Close()

var (
logger = log.New(logFile, "", 0)
ticker = time.NewTicker(100 * time.Millisecond)
lastInode = getFileInode(*conf.RenamedFileLogPath)
)

interval := 10 * time.Millisecond
logger := log.New(logFile, "", 0)
ticker := time.NewTicker(interval)
go func() {
for {
select {
case <-ticker.C:
stat, err := os.Stat(*conf.RenamedFileLogPath)
if err != nil || time.Since(stat.ModTime()) < interval*2 {
currentInode := getFileInode(*conf.RenamedFileLogPath)
if currentInode != lastInode {
newLogFile, err := os.OpenFile(*conf.RenamedFileLogPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
panic(err)
}
logger.SetOutput(newLogFile)
logFile.Close()
logFile = newLogFile
lastInode = currentInode
}
case <-stopChan:
ticker.Stop()
Expand All @@ -86,3 +87,12 @@ func (r RenamedFileStub) Query() query.Request {
FilterIncludeExpr: r.Keyword(),
}
}

func getFileInode(filePath string) uint64 {
stat, _ := os.Stat(filePath)
if stat == nil {
return 0
}

return stat.Sys().(*syscall.Stat_t).Ino
}
4 changes: 2 additions & 2 deletions pkg/loggen/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func generateLogs(logger *log.Logger, conf Config, stopChan chan struct{}, logFo
str.WriteString(fmt.Sprintf("%d", i))
}

logSequences = append(logSequences, logFormatter(str.String()))
logSequences = append(logSequences, str.String())
}

for {
Expand All @@ -52,7 +52,7 @@ func generateLogs(logger *log.Logger, conf Config, stopChan chan struct{}, logFo
index = (index + 1) % 10

for i := 0; i < *conf.LogLines; i++ {
logger.Print(logSequences[index])
logger.Print(logFormatter(logSequences[index]))
}

case <-stopChan:
Expand Down
Loading