Skip to content

Commit

Permalink
Merge pull request #21 from naver/fix-write-buffer
Browse files Browse the repository at this point in the history
Fix timestamps of write buffer
  • Loading branch information
sharkpc138 authored Dec 12, 2024
2 parents b7e14b3 + 979805d commit 1bf0068
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 22 deletions.
11 changes: 8 additions & 3 deletions pkg/lobster/store/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,17 @@ func (w *writeBuffer) write(ts time.Time, input string) {

w.lines = w.lines + 1
w.fileOffset = w.fileOffset + int64(len(input))

w.start = w.histories[0].ts
w.end = w.histories[len(w.histories)-1].ts
}

func (w writeBuffer) inspect(ts time.Time) (int, int, bool) {
minTs := ts.Add(-maxAge)
historyIdx := len(w.histories) - 1
dataIdx := len(w.data)
var (
minTs = ts.Add(-maxAge)
historyIdx = len(w.histories) - 1
dataIdx = len(w.data)
)

if len(w.histories) == 0 || w.histories[len(w.histories)-1].ts.Before(ts) {
return 0, 0, false
Expand Down
21 changes: 2 additions & 19 deletions pkg/lobster/store/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ func writeFiledLogs(chunk *model.Chunk, files []model.LogFile, blockDirPath stri
func writeBlocks(chunk *model.Chunk, file model.LogFile, buf *writeBuffer, blockDirPath string, maxBlockSize int64, logHandler LogHandler) ([]*model.Block, error) {
var (
readLine string
prevTs time.Time
blocks = []*model.Block{}
)

blocks := []*model.Block{}

f, err := os.Open(file.Path)
if err != nil {
return blocks, err
Expand All @@ -79,18 +77,11 @@ func writeBlocks(chunk *model.Chunk, file model.LogFile, buf *writeBuffer, block
ts, err := logline.ParseTimestamp(readLine)
if err != nil {
glog.V(3).Info("failed to parse timestamp for %s: %s", file.Path, readLine)
if file.Source.Type == model.LogTypeStdStream || prevTs.IsZero() {
if file.Source.Type == model.LogTypeStdStream || buf.start.IsZero() {
continue
}

ts = prevTs
readLine = logline.MakeUnreliableTimestamp(ts, readLine)
} else {
prevTs = ts
}

if buf.start.IsZero() {
buf.start = ts
}

buf.write(ts, readLine)
Expand All @@ -101,7 +92,6 @@ func writeBlocks(chunk *model.Chunk, file model.LogFile, buf *writeBuffer, block
continue
}

buf.end = ts
block, err := writeBlock(blockDirPath, buf, file.Number)
if err != nil {
return blocks, err
Expand All @@ -110,14 +100,12 @@ func writeBlocks(chunk *model.Chunk, file model.LogFile, buf *writeBuffer, block
blocks = append(blocks, block)
}
buf.reset()
prevTs = time.Time{}
}

if buf.size() == 0 {
return blocks, nil
}

buf.end = prevTs
block, err := writeBlock(blockDirPath, buf, file.Number)
if err != nil {
return blocks, nil
Expand Down Expand Up @@ -192,11 +180,6 @@ func writeTailedLogs(chunk *model.Chunk, blockDirPath, tempBlockFilePath string,
buf.lastOffset = line.Offset
msg := line.Line + "\n"

if buf.start.IsZero() {
buf.start = line.Timestamp
}
buf.end = line.Timestamp

buf.write(line.Timestamp, msg)

go logHandler(chunk, msg, line.Timestamp)
Expand Down
29 changes: 29 additions & 0 deletions pkg/lobster/store/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,32 @@ func checkLogOrderInBuffer(buffer *writeBuffer, t *testing.T) {
prevTs = ts
}
}

func TestWriteBufferInsertOutOfOrderFrontBufferStatus(t *testing.T) {
testCount := 10
expectedLogMsg := "out-of-order start test"
buffer := emptyWriteBuffer()
outOfOrderTsFront := time.Now().Add(-time.Minute)
outOfOrderTsRear := time.Time{}

for i := 0; i < testCount; i++ {
ts := time.Now()
outOfOrderTsRear = ts
buffer.write(ts, fmt.Sprintf("%s test\n", ts.Format(time.RFC3339Nano)))
}

checkLogOrderInBuffer(buffer, t)

buffer.write(outOfOrderTsFront, fmt.Sprintf("%s %s\n", outOfOrderTsFront.Format(time.RFC3339Nano), expectedLogMsg))
checkLogOrderInBuffer(buffer, t)

t.Logf("buffer start: %s", buffer.start.Format(time.RFC3339Nano))
t.Logf("buffer end: %s", buffer.end.Format(time.RFC3339Nano))

if !buffer.start.Equal(outOfOrderTsFront) {
t.FailNow()
}
if !buffer.end.Equal(outOfOrderTsRear) {
t.FailNow()
}
}

0 comments on commit 1bf0068

Please sign in to comment.