From b085602b69384a4398d134fac953cdd6ff148283 Mon Sep 17 00:00:00 2001 From: Ahmed Yasin Koculu Date: Sat, 17 Aug 2024 01:40:33 +0200 Subject: [PATCH] Add EvictToDisk and refactor IMaintainer. --- README.md | 4 +- src/Playground/Benchmark/OldTests.cs | 8 +-- src/Playground/Benchmark/StevesChallenge.cs | 4 +- src/Playground/Benchmark/ZoneTreeTest1.cs | 4 +- src/Playground/Benchmark/ZoneTreeTest2.cs | 4 +- src/Playground/Benchmark/ZoneTreeTest3.cs | 4 +- src/Playground/Test1.cs | 6 +- src/ZoneTree/Core/ZoneTreeMaintainer.cs | 61 +++++++++++++++---- src/ZoneTree/IMaintainer.cs | 57 ++++++++++------- src/ZoneTree/docs/ZoneTree/README-NUGET.md | 4 +- .../docs/ZoneTree/guide/quick-start.md | 2 +- 11 files changed, 106 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index b899a11..389faa7 100644 --- a/README.md +++ b/README.md @@ -143,8 +143,8 @@ Note: For small data you don't need a maintainer. // 2. Read/Write data zoneTree.Upsert(39, "Hello ZoneTree!"); - // 3. Complete maintainer running tasks. - maintainer.CompleteRunningTasks(); + // 3. Wait for background threads. + maintainer.WaitForBackgroundThreads(); ``` ## How to delete keys? diff --git a/src/Playground/Benchmark/OldTests.cs b/src/Playground/Benchmark/OldTests.cs index 58ca8e1..4b87a7b 100644 --- a/src/Playground/Benchmark/OldTests.cs +++ b/src/Playground/Benchmark/OldTests.cs @@ -59,7 +59,7 @@ public static void Insert(WriteAheadLogMode mode, int count) zoneTree.Maintenance.MoveMutableSegmentForward(); zoneTree.Maintenance.StartMergeOperation()?.Join(); } - basicMaintainer.CompleteRunningTasks(); + basicMaintainer.WaitForBackgroundThreads(); new StatsCollector().LogWithColor( "Merged in:", stopWatch.ElapsedMilliseconds, @@ -183,7 +183,7 @@ public static void InsertSingleAndMerge(WriteAheadLogMode mode, int count, int k zoneTree.Maintenance.MoveMutableSegmentForward(); zoneTree.Maintenance.StartMergeOperation()?.Join(); - basicMaintainer.CompleteRunningTasks(); + basicMaintainer.WaitForBackgroundThreads(); new StatsCollector().LogWithColor( "Merged in:", stopWatch.ElapsedMilliseconds, @@ -222,7 +222,7 @@ public static void Iterate(WriteAheadLogMode mode, int count) "Completed in:", stopWatch.ElapsedMilliseconds, ConsoleColor.Green); - basicMaintainer.CompleteRunningTasks(); + basicMaintainer.WaitForBackgroundThreads(); } public static void MultipleIterate(WriteAheadLogMode mode, int count, int iteratorCount) @@ -262,7 +262,7 @@ public static void MultipleIterate(WriteAheadLogMode mode, int count, int iterat "Completed in:", stopWatch.ElapsedMilliseconds, ConsoleColor.Green); - basicMaintainer.CompleteRunningTasks(); + basicMaintainer.WaitForBackgroundThreads(); } private static IZoneTree OpenOrCreateZoneTree(WriteAheadLogMode mode, string dataPath) diff --git a/src/Playground/Benchmark/StevesChallenge.cs b/src/Playground/Benchmark/StevesChallenge.cs index fd2b8a8..91501b1 100644 --- a/src/Playground/Benchmark/StevesChallenge.cs +++ b/src/Playground/Benchmark/StevesChallenge.cs @@ -71,7 +71,7 @@ public void Insert(IStatsCollector stats) zoneTree.Maintenance.MoveMutableSegmentForward(); zoneTree.Maintenance.StartMergeOperation()?.Join(); } - maintainer.CompleteRunningTasks(); + maintainer.WaitForBackgroundThreads(); stats.AddStage("Merged In", ConsoleColor.DarkCyan); } @@ -100,7 +100,7 @@ public void Iterate(IStatsCollector stats) stats.AddStage( "Iterated in", ConsoleColor.Green); - maintainer.CompleteRunningTasks(); + maintainer.WaitForBackgroundThreads(); } } diff --git a/src/Playground/Benchmark/ZoneTreeTest1.cs b/src/Playground/Benchmark/ZoneTreeTest1.cs index 9ba3dd9..f3631ee 100644 --- a/src/Playground/Benchmark/ZoneTreeTest1.cs +++ b/src/Playground/Benchmark/ZoneTreeTest1.cs @@ -51,7 +51,7 @@ public void Insert(IStatsCollector stats) zoneTree.Maintenance.MoveMutableSegmentForward(); zoneTree.Maintenance.StartMergeOperation()?.Join(); } - maintainer.CompleteRunningTasks(); + maintainer.WaitForBackgroundThreads(); stats.AddStage("Merged In", ConsoleColor.DarkCyan); } @@ -82,6 +82,6 @@ public void Iterate(IStatsCollector stats) stats.AddStage( "Iterated in", ConsoleColor.Green); - maintainer.CompleteRunningTasks(); + maintainer.WaitForBackgroundThreads(); } } diff --git a/src/Playground/Benchmark/ZoneTreeTest2.cs b/src/Playground/Benchmark/ZoneTreeTest2.cs index 8ef8c6f..dbd298f 100644 --- a/src/Playground/Benchmark/ZoneTreeTest2.cs +++ b/src/Playground/Benchmark/ZoneTreeTest2.cs @@ -54,7 +54,7 @@ public void Insert(IStatsCollector stats) zoneTree.Maintenance.MoveMutableSegmentForward(); zoneTree.Maintenance.StartMergeOperation()?.Join(); } - maintainer.CompleteRunningTasks(); + maintainer.WaitForBackgroundThreads(); stats.AddStage("Merged In", ConsoleColor.DarkCyan); } @@ -85,6 +85,6 @@ public void Iterate(IStatsCollector stats) stats.AddStage( "Iterated in", ConsoleColor.Green); - maintainer.CompleteRunningTasks(); + maintainer.WaitForBackgroundThreads(); } } diff --git a/src/Playground/Benchmark/ZoneTreeTest3.cs b/src/Playground/Benchmark/ZoneTreeTest3.cs index 9536cb0..f88e248 100644 --- a/src/Playground/Benchmark/ZoneTreeTest3.cs +++ b/src/Playground/Benchmark/ZoneTreeTest3.cs @@ -51,7 +51,7 @@ public void Insert(IStatsCollector stats) zoneTree.Maintenance.ZoneTree.Maintenance.MoveMutableSegmentForward(); zoneTree.Maintenance.ZoneTree.Maintenance.StartMergeOperation()?.Join(); } - maintainer.CompleteRunningTasks(); + maintainer.WaitForBackgroundThreads(); stats.AddStage("Merged In", ConsoleColor.DarkCyan); } @@ -82,6 +82,6 @@ public void Iterate(IStatsCollector stats) stats.AddStage( "Iterated in", ConsoleColor.Green); - maintainer.CompleteRunningTasks(); + maintainer.WaitForBackgroundThreads(); } } diff --git a/src/Playground/Test1.cs b/src/Playground/Test1.cs index 00c50ed..7be4e0d 100644 --- a/src/Playground/Test1.cs +++ b/src/Playground/Test1.cs @@ -63,7 +63,7 @@ public static void SeveralParallelTransactions() }); } Console.WriteLine("Elapsed: " + stopWatch.ElapsedMilliseconds); - basicMaintainer.CompleteRunningTasks(); + basicMaintainer.WaitForBackgroundThreads(); zoneTree.Maintenance.SaveMetaData(); } @@ -110,7 +110,7 @@ public static void TestReverseIterator( return true; }); } - maintainer.CompleteRunningTasks(); + maintainer.WaitForBackgroundThreads(); } upload.Stop(); @@ -247,7 +247,7 @@ public static void TestIteratorBehavior( }); t1.Wait(); zoneTree1.Maintenance.TryCancelMergeOperation(); - maintainer.CompleteRunningTasks(); + maintainer.WaitForBackgroundThreads(); } diff --git a/src/ZoneTree/Core/ZoneTreeMaintainer.cs b/src/ZoneTree/Core/ZoneTreeMaintainer.cs index 8369156..4f69f27 100644 --- a/src/ZoneTree/Core/ZoneTreeMaintainer.cs +++ b/src/ZoneTree/Core/ZoneTreeMaintainer.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Threading; using Tenray.ZoneTree.Logger; using Tenray.ZoneTree.Segments; using Tenray.ZoneTree.Segments.Disk; @@ -36,12 +37,6 @@ public sealed class ZoneTreeMaintainer : IMaintainer, IDisposable /// public IZoneTreeMaintenance Maintenance { get; } - /// - public int MinimumSparseArrayLength { get; set; } - - /// - public int SparseArrayStepLength { get; set; } = 1_000; - /// public int ThresholdForMergeOperationStart { get; set; } = 0; @@ -105,17 +100,27 @@ void AttachEvents() Maintenance.OnDiskSegmentCreated += OnDiskSegmentCreated; Maintenance.OnMergeOperationEnded += OnMergeOperationEnded; Maintenance.OnZoneTreeIsDisposing += OnZoneTreeIsDisposing; + Maintenance.OnBottomSegmentsMergeOperationEnded += OnBottomSegmentsMergeOperationEnded; } + void OnZoneTreeIsDisposing(IZoneTreeMaintenance zoneTree) { Trace("ZoneTree is disposing. ZoneTreeMaintainer disposal started."); PeriodicTimerCancellationTokenSource.Cancel(); - CompleteRunningTasks(); + WaitForBackgroundThreads(); Dispose(); Trace("ZoneTreeMaintainer is disposed."); } + void OnBottomSegmentsMergeOperationEnded( + IZoneTreeMaintenance zoneTree, + MergeResult mergeResult) + { + Trace(mergeResult.ToString()); + MergerThreads.Remove(Environment.CurrentManagedThreadId, out _); + } + void OnMergeOperationEnded( IZoneTreeMaintenance zoneTree, MergeResult mergeResult) @@ -168,7 +173,8 @@ void OnMutableSegmentMovedForward(IZoneTreeMaintenance zoneTree) StartMerge(); } - void StartMerge() + /// + public void StartMerge() { lock (this) { @@ -182,14 +188,39 @@ void StartMerge() } } + + /// + public void StartBottomSegmentsMerge( + int fromIndex = 0, int toIndex = int.MaxValue) + { + lock (this) + { + var mergerThread = Maintenance + .StartBottomSegmentsMergeOperation(fromIndex, toIndex); + if (mergerThread == null) + return; + MergerThreads.AddOrUpdate( + mergerThread.ManagedThreadId, + mergerThread, + (key, value) => mergerThread); + } + } + /// - public void TryCancelRunningTasks() + public void TryCancelBackgroundThreads() { Maintenance.TryCancelMergeOperation(); + Maintenance.TryCancelBottomSegmentsMergeOperation(); } /// - public void CompleteRunningTasks() + public void WaitForBackgroundThreads() + { + WaitForBackgroundThreadsAsync().Wait(); + } + + /// + public async Task WaitForBackgroundThreadsAsync() { while (true) { @@ -203,7 +234,7 @@ public void CompleteRunningTasks() if (t.ThreadState == ThreadState.Stopped) MergerThreads.TryRemove(a.Key, out var _); else - t.Join(); + await Task.Run(() => t.Join()); } Trace("Wait ended"); } @@ -245,6 +276,13 @@ void Trace(string msg) Logger.LogTrace(msg); } + /// + public void EvictToDisk() + { + Maintenance.MoveMutableSegmentForward(); + StartMerge(); + } + /// /// Disposes this maintainer. /// @@ -256,5 +294,6 @@ public void Dispose() Maintenance.OnDiskSegmentCreated -= OnDiskSegmentCreated; Maintenance.OnMergeOperationEnded -= OnMergeOperationEnded; Maintenance.OnZoneTreeIsDisposing -= OnZoneTreeIsDisposing; + Maintenance.OnBottomSegmentsMergeOperationEnded -= OnBottomSegmentsMergeOperationEnded; } } \ No newline at end of file diff --git a/src/ZoneTree/IMaintainer.cs b/src/ZoneTree/IMaintainer.cs index 911d721..c7c4ba0 100644 --- a/src/ZoneTree/IMaintainer.cs +++ b/src/ZoneTree/IMaintainer.cs @@ -5,27 +5,11 @@ /// merge operations and memory compaction. /// /// -/// You must complete or cancel all pending tasks of this maintainer +/// You must complete or cancel all pending threads of this maintainer /// before disposing. /// public interface IMaintainer : IDisposable { - /// - /// Minimum sparse array length when a new disk segment is created. - /// Default value is 0. - /// - int MinimumSparseArrayLength { get; set; } - - /// - /// Configures sparse array step length when the disk segment length is bigger than - /// MinimumSparseArrayLength * SparseArrayStepLength. - /// The default value is 1000. - /// The sparse array length reduce binary lookup range on disk segment - /// to reduce IO. - /// - /// - int SparseArrayStepLength { get; set; } - /// /// Starts merge operation when records count /// in read-only segments exceeds this value. @@ -61,12 +45,43 @@ public interface IMaintainer : IDisposable TimeSpan InactiveBlockCacheCleanupInterval { get; set; } /// - /// Tries cancel running tasks. + /// Tries cancel background threads. + /// + void TryCancelBackgroundThreads(); + + /// + /// Blocks the calling thread until all background threads have completed their execution. + /// + void WaitForBackgroundThreads(); + + /// + /// Asynchronously waits for all background threads to complete. + /// + /// A task that represents the asynchronous wait operation. + Task WaitForBackgroundThreadsAsync(); + + /// + /// Evicts all in-memory data to disk by moving the mutable segment forward and initiating a merge process. + /// + /// + /// This method is responsible for freeing up memory in the LSM tree by moving data from the mutable in-memory segment to disk storage. + /// It first advances the current mutable segment to a new state, ensuring that any data currently in memory is prepared for disk storage. + /// Afterward, it starts the merging process, which combines the in-memory data with existing on-disk data to maintain the integrity + /// and efficiency of the LSM tree structure. + /// + void EvictToDisk(); + + /// + /// Initiates the merge process in a new thread. /// - void TryCancelRunningTasks(); + void StartMerge(); /// - /// Waits until all running tasks are completed. + /// Initiates a merge of selected bottom segments into a single bottom disk segment. /// - void CompleteRunningTasks(); + /// The lower bound + /// The upper bound + /// + void StartBottomSegmentsMerge( + int fromIndex = 0, int toIndex = int.MaxValue); } diff --git a/src/ZoneTree/docs/ZoneTree/README-NUGET.md b/src/ZoneTree/docs/ZoneTree/README-NUGET.md index fa12dce..d747e15 100644 --- a/src/ZoneTree/docs/ZoneTree/README-NUGET.md +++ b/src/ZoneTree/docs/ZoneTree/README-NUGET.md @@ -141,8 +141,8 @@ Note: For small data you don't need a maintainer. // 2. Read/Write data zoneTree.Upsert(39, "Hello ZoneTree!"); - // 3. Complete maintainer running tasks. - maintainer.CompleteRunningTasks(); + // 3. Wait for background threads. + maintainer.WaitForBackgroundThreads(); ``` ## How to delete keys? diff --git a/src/ZoneTree/docs/ZoneTree/guide/quick-start.md b/src/ZoneTree/docs/ZoneTree/guide/quick-start.md index a9771c4..a2797ef 100644 --- a/src/ZoneTree/docs/ZoneTree/guide/quick-start.md +++ b/src/ZoneTree/docs/ZoneTree/guide/quick-start.md @@ -61,7 +61,7 @@ for (var i = 0; i < 10_000_000; ++i){ } // Ensure maintainer merge operations are completed before tree disposal. -maintainer.CompleteRunningTasks(); +maintainer.WaitForBackgroundThreads(); ``` ## How to merge data to the disk segment manually?