Skip to content

Commit

Permalink
Add EvictToDisk and refactor IMaintainer.
Browse files Browse the repository at this point in the history
  • Loading branch information
koculu committed Aug 16, 2024
1 parent 9b45449 commit b085602
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 52 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ Note: For small data you don't need a maintainer.
// 2. Read/Write data
zoneTree.Upsert(39, "Hello ZoneTree!");

// 3. Complete maintainer running tasks.
maintainer.CompleteRunningTasks();
// 3. Wait for background threads.
maintainer.WaitForBackgroundThreads();
```

## How to delete keys?
Expand Down
8 changes: 4 additions & 4 deletions src/Playground/Benchmark/OldTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void Insert(WriteAheadLogMode mode, int count)
zoneTree.Maintenance.MoveMutableSegmentForward();
zoneTree.Maintenance.StartMergeOperation()?.Join();
}
basicMaintainer.CompleteRunningTasks();
basicMaintainer.WaitForBackgroundThreads();
new StatsCollector().LogWithColor(
"Merged in:",
stopWatch.ElapsedMilliseconds,
Expand Down Expand Up @@ -183,7 +183,7 @@ public static void InsertSingleAndMerge(WriteAheadLogMode mode, int count, int k
zoneTree.Maintenance.MoveMutableSegmentForward();
zoneTree.Maintenance.StartMergeOperation()?.Join();

basicMaintainer.CompleteRunningTasks();
basicMaintainer.WaitForBackgroundThreads();
new StatsCollector().LogWithColor(
"Merged in:",
stopWatch.ElapsedMilliseconds,
Expand Down Expand Up @@ -222,7 +222,7 @@ public static void Iterate(WriteAheadLogMode mode, int count)
"Completed in:",
stopWatch.ElapsedMilliseconds,
ConsoleColor.Green);
basicMaintainer.CompleteRunningTasks();
basicMaintainer.WaitForBackgroundThreads();
}

public static void MultipleIterate(WriteAheadLogMode mode, int count, int iteratorCount)
Expand Down Expand Up @@ -262,7 +262,7 @@ public static void MultipleIterate(WriteAheadLogMode mode, int count, int iterat
"Completed in:",
stopWatch.ElapsedMilliseconds,
ConsoleColor.Green);
basicMaintainer.CompleteRunningTasks();
basicMaintainer.WaitForBackgroundThreads();
}

private static IZoneTree<int, int> OpenOrCreateZoneTree(WriteAheadLogMode mode, string dataPath)
Expand Down
4 changes: 2 additions & 2 deletions src/Playground/Benchmark/StevesChallenge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void Insert(IStatsCollector stats)
zoneTree.Maintenance.MoveMutableSegmentForward();
zoneTree.Maintenance.StartMergeOperation()?.Join();
}
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();

stats.AddStage("Merged In", ConsoleColor.DarkCyan);
}
Expand Down Expand Up @@ -100,7 +100,7 @@ public void Iterate(IStatsCollector stats)
stats.AddStage(
"Iterated in",
ConsoleColor.Green);
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
}

}
4 changes: 2 additions & 2 deletions src/Playground/Benchmark/ZoneTreeTest1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void Insert(IStatsCollector stats)
zoneTree.Maintenance.MoveMutableSegmentForward();
zoneTree.Maintenance.StartMergeOperation()?.Join();
}
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();

stats.AddStage("Merged In", ConsoleColor.DarkCyan);
}
Expand Down Expand Up @@ -82,6 +82,6 @@ public void Iterate(IStatsCollector stats)
stats.AddStage(
"Iterated in",
ConsoleColor.Green);
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
}
}
4 changes: 2 additions & 2 deletions src/Playground/Benchmark/ZoneTreeTest2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void Insert(IStatsCollector stats)
zoneTree.Maintenance.MoveMutableSegmentForward();
zoneTree.Maintenance.StartMergeOperation()?.Join();
}
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();

stats.AddStage("Merged In", ConsoleColor.DarkCyan);
}
Expand Down Expand Up @@ -85,6 +85,6 @@ public void Iterate(IStatsCollector stats)
stats.AddStage(
"Iterated in",
ConsoleColor.Green);
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
}
}
4 changes: 2 additions & 2 deletions src/Playground/Benchmark/ZoneTreeTest3.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void Insert(IStatsCollector stats)
zoneTree.Maintenance.ZoneTree.Maintenance.MoveMutableSegmentForward();
zoneTree.Maintenance.ZoneTree.Maintenance.StartMergeOperation()?.Join();
}
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();

stats.AddStage("Merged In", ConsoleColor.DarkCyan);
}
Expand Down Expand Up @@ -82,6 +82,6 @@ public void Iterate(IStatsCollector stats)
stats.AddStage(
"Iterated in",
ConsoleColor.Green);
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
}
}
6 changes: 3 additions & 3 deletions src/Playground/Test1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void SeveralParallelTransactions()
});
}
Console.WriteLine("Elapsed: " + stopWatch.ElapsedMilliseconds);
basicMaintainer.CompleteRunningTasks();
basicMaintainer.WaitForBackgroundThreads();
zoneTree.Maintenance.SaveMetaData();
}

Expand Down Expand Up @@ -110,7 +110,7 @@ public static void TestReverseIterator(
return true;
});
}
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
}

upload.Stop();
Expand Down Expand Up @@ -247,7 +247,7 @@ public static void TestIteratorBehavior(
});
t1.Wait();
zoneTree1.Maintenance.TryCancelMergeOperation();
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
}


Expand Down
61 changes: 50 additions & 11 deletions src/ZoneTree/Core/ZoneTreeMaintainer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Threading;
using Tenray.ZoneTree.Logger;
using Tenray.ZoneTree.Segments;
using Tenray.ZoneTree.Segments.Disk;
Expand Down Expand Up @@ -36,12 +37,6 @@ public sealed class ZoneTreeMaintainer<TKey, TValue> : IMaintainer, IDisposable
/// </summary>
public IZoneTreeMaintenance<TKey, TValue> Maintenance { get; }

/// <inheritdoc/>
public int MinimumSparseArrayLength { get; set; }

/// <inheritdoc/>
public int SparseArrayStepLength { get; set; } = 1_000;

/// <inheritdoc/>
public int ThresholdForMergeOperationStart { get; set; } = 0;

Check warning on line 41 in src/ZoneTree/Core/ZoneTreeMaintainer.cs

View workflow job for this annotation

GitHub Actions / build

Member 'ThresholdForMergeOperationStart' is explicitly initialized to its default value (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1805)

Expand Down Expand Up @@ -105,17 +100,27 @@ void AttachEvents()
Maintenance.OnDiskSegmentCreated += OnDiskSegmentCreated;
Maintenance.OnMergeOperationEnded += OnMergeOperationEnded;
Maintenance.OnZoneTreeIsDisposing += OnZoneTreeIsDisposing;
Maintenance.OnBottomSegmentsMergeOperationEnded += OnBottomSegmentsMergeOperationEnded;
}


void OnZoneTreeIsDisposing(IZoneTreeMaintenance<TKey, TValue> zoneTree)
{
Trace("ZoneTree is disposing. ZoneTreeMaintainer disposal started.");
PeriodicTimerCancellationTokenSource.Cancel();
CompleteRunningTasks();
WaitForBackgroundThreads();
Dispose();
Trace("ZoneTreeMaintainer is disposed.");
}

void OnBottomSegmentsMergeOperationEnded(
IZoneTreeMaintenance<TKey, TValue> zoneTree,
MergeResult mergeResult)
{
Trace(mergeResult.ToString());
MergerThreads.Remove(Environment.CurrentManagedThreadId, out _);
}

void OnMergeOperationEnded(
IZoneTreeMaintenance<TKey, TValue> zoneTree,
MergeResult mergeResult)
Expand Down Expand Up @@ -168,7 +173,8 @@ void OnMutableSegmentMovedForward(IZoneTreeMaintenance<TKey, TValue> zoneTree)
StartMerge();
}

void StartMerge()
/// <inheritdoc/>
public void StartMerge()
{
lock (this)
{
Expand All @@ -182,14 +188,39 @@ void StartMerge()
}
}


/// <inheritdoc/>
public void StartBottomSegmentsMerge(
int fromIndex = 0, int toIndex = int.MaxValue)
{
lock (this)
{
var mergerThread = Maintenance
.StartBottomSegmentsMergeOperation(fromIndex, toIndex);
if (mergerThread == null)
return;
MergerThreads.AddOrUpdate(
mergerThread.ManagedThreadId,
mergerThread,
(key, value) => mergerThread);
}
}

/// <inheritdoc/>
public void TryCancelRunningTasks()
public void TryCancelBackgroundThreads()
{
Maintenance.TryCancelMergeOperation();
Maintenance.TryCancelBottomSegmentsMergeOperation();
}

/// <inheritdoc/>
public void CompleteRunningTasks()
public void WaitForBackgroundThreads()
{
WaitForBackgroundThreadsAsync().Wait();
}

/// <inheritdoc/>
public async Task WaitForBackgroundThreadsAsync()
{
while (true)
{
Expand All @@ -203,7 +234,7 @@ public void CompleteRunningTasks()
if (t.ThreadState == ThreadState.Stopped)
MergerThreads.TryRemove(a.Key, out var _);
else
t.Join();
await Task.Run(() => t.Join());
}
Trace("Wait ended");
}
Expand Down Expand Up @@ -245,6 +276,13 @@ void Trace(string msg)
Logger.LogTrace(msg);
}

/// <inheritdoc/>
public void EvictToDisk()
{
Maintenance.MoveMutableSegmentForward();
StartMerge();
}

/// <summary>
/// Disposes this maintainer.
/// </summary>
Expand All @@ -256,5 +294,6 @@ public void Dispose()
Maintenance.OnDiskSegmentCreated -= OnDiskSegmentCreated;
Maintenance.OnMergeOperationEnded -= OnMergeOperationEnded;
Maintenance.OnZoneTreeIsDisposing -= OnZoneTreeIsDisposing;
Maintenance.OnBottomSegmentsMergeOperationEnded -= OnBottomSegmentsMergeOperationEnded;
}
}
57 changes: 36 additions & 21 deletions src/ZoneTree/IMaintainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,11 @@
/// merge operations and memory compaction.
/// </summary>
/// <remarks>
/// You must complete or cancel all pending tasks of this maintainer
/// You must complete or cancel all pending threads of this maintainer
/// before disposing.
/// </remarks>
public interface IMaintainer : IDisposable
{
/// <summary>
/// Minimum sparse array length when a new disk segment is created.
/// Default value is 0.
/// </summary>
int MinimumSparseArrayLength { get; set; }

/// <summary>
/// Configures sparse array step length when the disk segment length is bigger than
/// MinimumSparseArrayLength * SparseArrayStepLength.
/// The default value is 1000.
/// <remarks>The sparse array length reduce binary lookup range on disk segment
/// to reduce IO.
/// </remarks>
/// </summary>
int SparseArrayStepLength { get; set; }

/// <summary>
/// Starts merge operation when records count
/// in read-only segments exceeds this value.
Expand Down Expand Up @@ -61,12 +45,43 @@ public interface IMaintainer : IDisposable
TimeSpan InactiveBlockCacheCleanupInterval { get; set; }

/// <summary>
/// Tries cancel running tasks.
/// Tries cancel background threads.
/// </summary>
void TryCancelBackgroundThreads();

/// <summary>
/// Blocks the calling thread until all background threads have completed their execution.
/// </summary>
void WaitForBackgroundThreads();

/// <summary>
/// Asynchronously waits for all background threads to complete.
/// </summary>
/// <returns>A task that represents the asynchronous wait operation.</returns>
Task WaitForBackgroundThreadsAsync();

/// <summary>
/// Evicts all in-memory data to disk by moving the mutable segment forward and initiating a merge process.
/// </summary>
/// <remarks>
/// This method is responsible for freeing up memory in the LSM tree by moving data from the mutable in-memory segment to disk storage.
/// It first advances the current mutable segment to a new state, ensuring that any data currently in memory is prepared for disk storage.
/// Afterward, it starts the merging process, which combines the in-memory data with existing on-disk data to maintain the integrity
/// and efficiency of the LSM tree structure.
/// </remarks>
void EvictToDisk();

/// <summary>
/// Initiates the merge process in a new thread.
/// </summary>
void TryCancelRunningTasks();
void StartMerge();

/// <summary>
/// Waits until all running tasks are completed.
/// Initiates a merge of selected bottom segments into a single bottom disk segment.
/// </summary>
void CompleteRunningTasks();
/// <param name="fromIndex">The lower bound</param>
/// <param name="toIndex">The upper bound</param>
/// <returns></returns>
void StartBottomSegmentsMerge(
int fromIndex = 0, int toIndex = int.MaxValue);
}
4 changes: 2 additions & 2 deletions src/ZoneTree/docs/ZoneTree/README-NUGET.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ Note: For small data you don't need a maintainer.
// 2. Read/Write data
zoneTree.Upsert(39, "Hello ZoneTree!");

// 3. Complete maintainer running tasks.
maintainer.CompleteRunningTasks();
// 3. Wait for background threads.
maintainer.WaitForBackgroundThreads();
```

## How to delete keys?
Expand Down
2 changes: 1 addition & 1 deletion src/ZoneTree/docs/ZoneTree/guide/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ for (var i = 0; i < 10_000_000; ++i){
}

// Ensure maintainer merge operations are completed before tree disposal.
maintainer.CompleteRunningTasks();
maintainer.WaitForBackgroundThreads();
```

## How to merge data to the disk segment manually?
Expand Down

0 comments on commit b085602

Please sign in to comment.