Skip to content

Commit

Permalink
Initial implementation of per-key-TTL for KV
Browse files Browse the repository at this point in the history
  • Loading branch information
stebet committed Jan 8, 2025
1 parent af04c71 commit be1a5c1
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 27 deletions.
35 changes: 17 additions & 18 deletions NATS.Net.sln
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NATS.Client.Core.Tests", "t
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MicroBenchmark", "sandbox\MicroBenchmark\MicroBenchmark.csproj", "{A10F0D89-13F3-49B3-ACF7-66E45DC95225}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".", ".", "{899BE3EA-C5CA-4394-9B62-C45CBFF3AF4E}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
Directory.Build.props = Directory.Build.props
README.md = README.md
version.txt = version.txt
.gitattributes = .gitattributes
.gitignore = .gitignore
CODE-OF-CONDUCT.md = CODE-OF-CONDUCT.md
CONTRIBUTING.md = CONTRIBUTING.md
dependencies.md = dependencies.md
global.json = global.json
Icon.png = Icon.png
LICENSE = LICENSE
REPO_RENAME.md = REPO_RENAME.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NATS.Client.Hosting", "src\NATS.Client.Hosting\NATS.Client.Hosting.csproj", "{D3F09B30-1ED5-48C2-81CD-A2AD88E751AC}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MinimumWebApp", "sandbox\MinimumWebApp\MinimumWebApp.csproj", "{44881DEE-8B49-44EA-B0BA-8BDA4F706E1A}"
Expand Down Expand Up @@ -139,6 +122,23 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{
.github\workflows\test.yml = .github\workflows\test.yml
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{1D1B89B7-5963-48AE-B5F8-BB9AA4834CD6}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
.gitattributes = .gitattributes
.gitignore = .gitignore
CODE-OF-CONDUCT.md = CODE-OF-CONDUCT.md
CONTRIBUTING.md = CONTRIBUTING.md
dependencies.md = dependencies.md
Directory.Build.props = Directory.Build.props
global.json = global.json
Icon.png = Icon.png
LICENSE = LICENSE
README.md = README.md
REPO_RENAME.md = REPO_RENAME.md
version.txt = version.txt
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -396,7 +396,6 @@ Global
{227C88B1-0510-4010-B142-C44725578FCD} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{8A676AAA-FEE3-4C18-870A-66E59AD9069F} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{9521D9E0-642A-4C7E-BD10-372DF235CF62} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{B9EF0111-6720-46DF-B11A-8F8C88C3F5C1} = {899BE3EA-C5CA-4394-9B62-C45CBFF3AF4E}
{0B7F1286-4426-45DE-82C2-FE7CF13CA0DA} = {B9EF0111-6720-46DF-B11A-8F8C88C3F5C1}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
Expand Down
7 changes: 7 additions & 0 deletions src/NATS.Client.JetStream/Models/StreamConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ internal StreamConfig()
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public bool DiscardNewPerSubject { get; set; }

/// <summary>
/// AllowMsgTTL allows header initiated per-message TTLs. If disabled, then the `NATS-TTL` header will be ignored.
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("allow_msg_ttl")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public bool AllowMsgTTL { get; set; }

/// <summary>
/// Additional metadata for the Stream
/// </summary>
Expand Down
9 changes: 6 additions & 3 deletions src/NATS.Client.KeyValueStore/INatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,37 @@ public interface INatsKVStore
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="value">Value of the entry</param>
/// <param name="ttl">Time to live for the entry (requires the <see cref="NatsKVConfig.AllowMsgTTL"/> to be set to true)</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>Revision number</returns>
ValueTask<ulong> PutAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);
ValueTask<ulong> PutAsync<T>(string key, T value, TimeSpan ttl = default, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Create a new entry in the bucket only if it doesn't exist
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="value">Value of the entry</param>
/// <param name="ttl">Time to live for the entry (requires the <see cref="NatsKVConfig.AllowMsgTTL"/> to be set to true)</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>The revision number of the entry</returns>
ValueTask<ulong> CreateAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);
ValueTask<ulong> CreateAsync<T>(string key, T value, TimeSpan ttl = default, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Update an entry in the bucket only if last update revision matches
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="value">Value of the entry</param>
/// <param name="revision">Last revision number to match</param>
/// <param name="ttl">Time to live for the entry (requires the <see cref="NatsKVConfig.AllowMsgTTL"/> to be set to true)</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>The revision number of the updated entry</returns>
ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);
ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, TimeSpan ttl = default, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Delete an entry from the bucket
Expand Down
5 changes: 5 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public record NatsKVConfig
/// Sources defines the configuration for sources of a KeyValue store.
/// </summary>
public ICollection<StreamSource>? Sources { get; set; }

/// <summary>
/// If true, the bucket will allow TTL on individual keys.
/// </summary>
public bool AllowMsgTTL { get; set; }
}

/// <summary>
Expand Down
1 change: 1 addition & 0 deletions src/NATS.Client.KeyValueStore/NatsKVContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config)
MirrorDirect = mirrorDirect,
Sources = sources,
Retention = StreamConfigRetention.Limits, // from ADR-8
AllowMsgTTL = config.AllowMsgTTL,
};

return streamConfig;
Expand Down
27 changes: 21 additions & 6 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class NatsKVStore : INatsKVStore
private const string NatsSubject = "Nats-Subject";
private const string NatsSequence = "Nats-Sequence";
private const string NatsTimeStamp = "Nats-Time-Stamp";
private const string NatsTtl = "Nats-TTL";
private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled);
private static readonly NatsKVException MissingSequenceHeaderException = new("Missing sequence header");
private static readonly NatsKVException MissingTimestampHeaderException = new("Missing timestamp header");
Expand Down Expand Up @@ -70,23 +71,33 @@ internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream
public string Bucket { get; }

/// <inheritdoc />
public async ValueTask<ulong> PutAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
public async ValueTask<ulong> PutAsync<T>(string key, T value, TimeSpan ttl = default, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
var ack = await JetStreamContext.PublishAsync($"$KV.{Bucket}.{key}", value, serializer: serializer, cancellationToken: cancellationToken);

NatsHeaders? headers = default;
if (ttl != default)
{
headers = new NatsHeaders
{
{ NatsTtl, ttl == TimeSpan.MaxValue ? "never" : ttl.TotalSeconds.ToString("N0") },
};
}

var ack = await JetStreamContext.PublishAsync($"$KV.{Bucket}.{key}", value, serializer: serializer, headers: headers, cancellationToken: cancellationToken);
ack.EnsureSuccess();
return ack.Seq;
}

/// <inheritdoc />
public async ValueTask<ulong> CreateAsync<T>(string key, T value, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
public async ValueTask<ulong> CreateAsync<T>(string key, T value, TimeSpan ttl = default, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);

// First try to create a new entry
try
{
return await UpdateAsync(key, value, revision: 0, serializer, cancellationToken);
return await UpdateAsync(key, value, revision: 0, ttl, serializer, cancellationToken);
}
catch (NatsKVWrongLastRevisionException)
{
Expand All @@ -99,17 +110,21 @@ public async ValueTask<ulong> CreateAsync<T>(string key, T value, INatsSerialize
}
catch (NatsKVKeyDeletedException e)
{
return await UpdateAsync(key, value, e.Revision, serializer, cancellationToken);
return await UpdateAsync(key, value, e.Revision, ttl, serializer, cancellationToken);
}

throw new NatsKVCreateException();
}

/// <inheritdoc />
public async ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
public async ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, TimeSpan ttl = default, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
var headers = new NatsHeaders { { NatsExpectedLastSubjectSequence, revision.ToString() } };
if (ttl != default)
{
headers.Add(NatsTtl, ttl == TimeSpan.MaxValue ? "never" : ttl.TotalSeconds.ToString("N0"));
}

try
{
Expand Down
67 changes: 67 additions & 0 deletions tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,73 @@ await Assert.ThrowsAsync<NatsKVCreateException>(async () =>
}
}

[Fact]
public async Task TestMessageTTL()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);

var store = await kv.CreateStoreAsync(new NatsKVConfig("kv1") { AllowMsgTTL = true }, cancellationToken: cancellationToken);

for (var i = 0; i < 10; i++)
{
await store.PutAsync($"k{i}", $"v{i}", TimeSpan.FromSeconds(1), cancellationToken: cancellationToken);
}

var state = await store.GetStatusAsync();
Assert.Equal(10, state.Info.State.Messages);
Assert.Equal(1ul, state.Info.State.FirstSeq);
Assert.Equal(10ul, state.Info.State.LastSeq);

// Sleep for two seconds, now all the messages should be gone
await Task.Delay(2000);
state = await store.GetStatusAsync();
Assert.Equal(0, state.Info.State.Messages);
Assert.Equal(11ul, state.Info.State.FirstSeq);
Assert.Equal(10ul, state.Info.State.LastSeq);
}

[Fact]
public async Task TestMessageNeverExpire()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);

var store = await kv.CreateStoreAsync(new NatsKVConfig("kv1") { AllowMsgTTL = true, MaxAge = TimeSpan.FromSeconds(1) }, cancellationToken: cancellationToken);

// The first message we publish is set to "never expire", therefore it won't age out with the MaxAge policy.
await store.PutAsync($"k0", $"v0", TimeSpan.MaxValue, cancellationToken: cancellationToken);

for (var i = 1; i < 11; i++)
{
await store.PutAsync($"k{i}", $"v{i}", TimeSpan.FromSeconds(1), cancellationToken: cancellationToken);
}

var state = await store.GetStatusAsync();
Assert.Equal(11, state.Info.State.Messages);
Assert.Equal(1ul, state.Info.State.FirstSeq);
Assert.Equal(11ul, state.Info.State.LastSeq);

// Sleep for two seconds, only the first message should be there
await Task.Delay(2000);
state = await store.GetStatusAsync();
Assert.Equal(1, state.Info.State.Messages);
Assert.Equal(1ul, state.Info.State.FirstSeq);
Assert.Equal(11ul, state.Info.State.LastSeq);
}

[Fact]
public async Task History()
{
Expand Down

0 comments on commit be1a5c1

Please sign in to comment.