From 663ea22e63963935abe678244f974a2a55cd6165 Mon Sep 17 00:00:00 2001 From: Aman Mangal Date: Thu, 9 Mar 2023 23:35:46 +0530 Subject: [PATCH] setup oracle correctly in stream writer --- levels.go | 4 ++-- stream_writer.go | 5 +++++ stream_writer_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/levels.go b/levels.go index 387ee7e3e..61f560c0a 100644 --- a/levels.go +++ b/levels.go @@ -1574,8 +1574,8 @@ func (s *levelsController) close() error { } // get searches for a given key in all the levels of the LSM tree. It returns -// key version <= the expected version (maxVs). If not found, it returns an empty -// y.ValueStruct. +// key version <= the expected version (version in key). If not found, +// it returns an empty y.ValueStruct. func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) ( y.ValueStruct, error) { if s.kv.IsClosed() { diff --git a/stream_writer.go b/stream_writer.go index e696a6d3b..9dc9e8fd0 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -282,6 +282,11 @@ func (sw *StreamWriter) Flush() error { if sw.db.orc != nil { sw.db.orc.Stop() } + + if curMax := sw.db.orc.readTs(); curMax >= sw.maxVersion { + sw.maxVersion = curMax + } + sw.db.orc = newOracle(sw.db.opt) sw.db.orc.nextTxnTs = sw.maxVersion sw.db.orc.txnMark.Done(sw.maxVersion) diff --git a/stream_writer_test.go b/stream_writer_test.go index 43abc6ce8..4d18db8b1 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -750,4 +750,51 @@ func TestStreamWriterIncremental(t *testing.T) { require.NoError(t, err) }) }) + + t.Run("multiple incremental with older data first", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + buf := z.NewBuffer(10<<20, "test") + defer func() { require.NoError(t, buf.Release()) }() + KVToBuffer(&pb.KV{ + Key: []byte("a1"), + Value: []byte("val1"), + Version: 11, + }, buf) + sw := db.NewStreamWriter() + require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed") + require.NoError(t, sw.Write(buf), "sw.Write() failed") + require.NoError(t, sw.Flush(), "sw.Flush() failed") + + buf = z.NewBuffer(10<<20, "test") + defer func() { require.NoError(t, buf.Release()) }() + KVToBuffer(&pb.KV{ + Key: []byte("a2"), + Value: []byte("val2"), + Version: 9, + }, buf) + sw = db.NewStreamWriter() + require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed") + require.NoError(t, sw.Write(buf), "sw.Write() failed") + require.NoError(t, sw.Flush(), "sw.Flush() failed") + + // This will move the maxTs to 10 (earlier, without the fix) + require.NoError(t, db.Update(func(txn *Txn) error { + return txn.Set([]byte("a1"), []byte("val3")) + })) + // This will move the maxTs to 11 (earliler, without the fix) + require.NoError(t, db.Update(func(txn *Txn) error { + return txn.Set([]byte("a3"), []byte("val4")) + })) + + // And now, the first write with val1 will resurface (without the fix) + require.NoError(t, db.View(func(txn *Txn) error { + item, err := txn.Get([]byte("a1")) + require.NoError(t, err) + val, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, "val3", string(val)) + return nil + })) + }) + }) }