diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index d946cc8f4..2a89b9429 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -45,14 +45,16 @@ jobs:
run: dotnet build -c Debug
- name: Test Core
- run: dotnet test -c Debug --no-build tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj
+ run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj
- name: Test JetStream
- # This test is hanging sometimes. Find out where!
run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj
- name: Test Key/Value Store
- run: dotnet test -c Debug --no-build tests/NATS.Client.KeyValueStore.Tests/NATS.Client.KeyValueStore.Tests.csproj
+ run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.KeyValueStore.Tests/NATS.Client.KeyValueStore.Tests.csproj
+
+ - name: Test Object Store
+ run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.ObjectStore.Tests/NATS.Client.ObjectStore.Tests.csproj
memory_test:
name: memory test
diff --git a/NATS.Client.sln b/NATS.Client.sln
index cf2ced273..930a9f0c9 100644
--- a/NATS.Client.sln
+++ b/NATS.Client.sln
@@ -73,6 +73,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.KeyValueStore.T
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.KeyValueStore.Watcher", "sandbox\Example.KeyValueStore.Watcher\Example.KeyValueStore.Watcher.csproj", "{912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.ObjectStore", "src\NATS.Client.ObjectStore\NATS.Client.ObjectStore.csproj", "{3F8840BA-4F91-4359-AA53-6B26823E7F55}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.ObjectStore.Tests", "tests\NATS.Client.ObjectStore.Tests\NATS.Client.ObjectStore.Tests.csproj", "{BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.ObjectStore", "sandbox\Example.ObjectStore\Example.ObjectStore.csproj", "{51882883-A66E-4F95-A1AB-CFCBF71B4376}"
+EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.TlsFirst", "sandbox\Example.TlsFirst\Example.TlsFirst.csproj", "{88625045-978F-417F-9F51-A4E3A9718945}"
EndProject
Global
@@ -185,6 +191,18 @@ Global
{912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}.Debug|Any CPU.Build.0 = Debug|Any CPU
{912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}.Release|Any CPU.ActiveCfg = Release|Any CPU
{912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}.Release|Any CPU.Build.0 = Release|Any CPU
+ {3F8840BA-4F91-4359-AA53-6B26823E7F55}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {3F8840BA-4F91-4359-AA53-6B26823E7F55}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {3F8840BA-4F91-4359-AA53-6B26823E7F55}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {3F8840BA-4F91-4359-AA53-6B26823E7F55}.Release|Any CPU.Build.0 = Release|Any CPU
+ {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {51882883-A66E-4F95-A1AB-CFCBF71B4376}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {51882883-A66E-4F95-A1AB-CFCBF71B4376}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {51882883-A66E-4F95-A1AB-CFCBF71B4376}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {51882883-A66E-4F95-A1AB-CFCBF71B4376}.Release|Any CPU.Build.0 = Release|Any CPU
{88625045-978F-417F-9F51-A4E3A9718945}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{88625045-978F-417F-9F51-A4E3A9718945}.Debug|Any CPU.Build.0 = Debug|Any CPU
{88625045-978F-417F-9F51-A4E3A9718945}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -221,6 +239,9 @@ Global
{A102AB7B-A90C-4717-B17C-045240838060} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{908F2CED-CAC0-4A4E-AD19-362A413B5DA4} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53} = {95A69671-16CA-4133-981C-CC381B7AAA30}
+ {3F8840BA-4F91-4359-AA53-6B26823E7F55} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
+ {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
+ {51882883-A66E-4F95-A1AB-CFCBF71B4376} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{88625045-978F-417F-9F51-A4E3A9718945} = {95A69671-16CA-4133-981C-CC381B7AAA30}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
diff --git a/sandbox/Example.ObjectStore/Example.ObjectStore.csproj b/sandbox/Example.ObjectStore/Example.ObjectStore.csproj
new file mode 100644
index 000000000..e74912569
--- /dev/null
+++ b/sandbox/Example.ObjectStore/Example.ObjectStore.csproj
@@ -0,0 +1,15 @@
+
+
+
+ Exe
+ net6.0
+ enable
+ enable
+ false
+
+
+
+
+
+
+
diff --git a/sandbox/Example.ObjectStore/Program.cs b/sandbox/Example.ObjectStore/Program.cs
new file mode 100644
index 000000000..ee537d297
--- /dev/null
+++ b/sandbox/Example.ObjectStore/Program.cs
@@ -0,0 +1,41 @@
+using System.Text;
+using NATS.Client.Core;
+using NATS.Client.JetStream;
+using NATS.Client.ObjectStore;
+using NATS.Client.ObjectStore.Models;
+
+var nats = new NatsConnection();
+var js = new NatsJSContext(nats);
+var ob = new NatsObjContext(js);
+
+var store = await ob.CreateObjectStore(new NatsObjConfig("o1"));
+
+var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, };
+
+var stringBuilder = new StringBuilder();
+for (var i = 0; i < 9; i++)
+{
+ stringBuilder.Append($"{i:D2}-4567890");
+}
+
+var buffer90 = stringBuilder.ToString();
+{
+ var buffer = Encoding.ASCII.GetBytes(buffer90);
+ var stream = new MemoryStream(buffer);
+
+ await store.PutAsync(meta, stream);
+
+ var data = await store.GetInfoAsync("k1");
+
+ Console.WriteLine($"DATA: {data}");
+}
+
+{
+ var memoryStream = new MemoryStream();
+ await store.GetAsync("k1", memoryStream);
+ await memoryStream.FlushAsync();
+ var buffer = memoryStream.ToArray();
+ Console.WriteLine($"buffer:{Encoding.ASCII.GetString(buffer)}");
+}
+
+Console.WriteLine("Bye");
diff --git a/src/NATS.Client.Core/INatsSerializer.cs b/src/NATS.Client.Core/INatsSerializer.cs
index 8c6cb819f..7d476d50e 100644
--- a/src/NATS.Client.Core/INatsSerializer.cs
+++ b/src/NATS.Client.Core/INatsSerializer.cs
@@ -18,6 +18,21 @@ public interface ICountableBufferWriter : IBufferWriter
int WrittenCount { get; }
}
+public readonly struct FixedSizeMemoryOwner : IMemoryOwner
+{
+ private readonly IMemoryOwner _owner;
+
+ public FixedSizeMemoryOwner(IMemoryOwner owner, int size)
+ {
+ _owner = owner;
+ Memory = _owner.Memory.Slice(0, size);
+ }
+
+ public Memory Memory { get; }
+
+ public void Dispose() => _owner.Dispose();
+}
+
public static class NatsDefaultSerializer
{
public static readonly INatsSerializer Default = new NatsRawSerializer(NatsJsonSerializer.Default);
@@ -105,9 +120,9 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value)
if (typeof(T) == typeof(IMemoryOwner))
{
- var memoryOwner = MemoryPool.Shared.Rent((int)buffer.Length);
+ var memoryOwner = new FixedSizeMemoryOwner(MemoryPool.Shared.Rent((int)buffer.Length), (int)buffer.Length);
buffer.CopyTo(memoryOwner.Memory.Span);
- return (T)memoryOwner;
+ return (T)(object)memoryOwner;
}
if (Next != null)
diff --git a/src/NATS.Client.Core/NATS.Client.Core.csproj b/src/NATS.Client.Core/NATS.Client.Core.csproj
index 96d9b019d..db709f6e2 100644
--- a/src/NATS.Client.Core/NATS.Client.Core.csproj
+++ b/src/NATS.Client.Core/NATS.Client.Core.csproj
@@ -29,5 +29,7 @@
+
+
diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs
new file mode 100644
index 000000000..b3c736ce2
--- /dev/null
+++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs
@@ -0,0 +1,438 @@
+using System.Buffers;
+using System.Threading.Channels;
+using Microsoft.Extensions.Logging;
+using NATS.Client.Core;
+using NATS.Client.Core.Internal;
+using NATS.Client.JetStream.Models;
+
+namespace NATS.Client.JetStream.Internal;
+
+internal enum NatsJSOrderedPushConsumerCommand
+{
+ Msg,
+ Ready,
+}
+
+internal readonly struct NatsJSOrderedPushConsumerMsg
+{
+ public NatsJSOrderedPushConsumerMsg()
+ {
+ }
+
+ public NatsJSOrderedPushConsumerCommand Command { get; init; } = default;
+
+ public NatsJSMsg Msg { get; init; } = default;
+}
+
+internal record NatsJSOrderedPushConsumerOpts
+{
+ ///
+ /// Default watch options
+ ///
+ public static readonly NatsJSOrderedPushConsumerOpts Default = new();
+
+ ///
+ /// Idle heartbeat interval
+ ///
+ public TimeSpan IdleHeartbeat { get; init; } = TimeSpan.FromSeconds(5);
+
+ public ConsumerConfigurationDeliverPolicy DeliverPolicy { get; init; } = ConsumerConfigurationDeliverPolicy.all;
+
+ public bool HeadersOnly { get; init; } = false;
+}
+
+internal class NatsJSOrderedPushConsumer
+{
+ private readonly ILogger _logger;
+ private readonly bool _debug;
+ private readonly NatsJSContext _context;
+ private readonly string _stream;
+ private readonly string _filter;
+ private readonly NatsJSOrderedPushConsumerOpts _opts;
+ private readonly NatsSubOpts? _subOpts;
+ private readonly CancellationToken _cancellationToken;
+ private readonly NatsConnection _nats;
+ private readonly Channel> _commandChannel;
+ private readonly Channel> _msgChannel;
+ private readonly Channel _consumerCreateChannel;
+ private readonly Timer _timer;
+ private readonly int _hbTimeout;
+ private readonly long _idleHbNanos;
+ private readonly Task _consumerCreateTask;
+ private readonly Task _commandTask;
+ private readonly long _ackWaitNanos;
+
+ private long _sequenceStream;
+ private long _sequenceConsumer;
+ private string _consumer;
+ private volatile NatsJSOrderedPushConsumerSub? _sub;
+ private int _done;
+
+ public NatsJSOrderedPushConsumer(
+ NatsJSContext context,
+ string stream,
+ string filter,
+ NatsJSOrderedPushConsumerOpts opts,
+ NatsSubOpts? subOpts,
+ CancellationToken cancellationToken)
+ {
+ _logger = context.Connection.Opts.LoggerFactory.CreateLogger>();
+ _debug = _logger.IsEnabled(LogLevel.Debug);
+ _context = context;
+ _stream = stream;
+ _filter = filter;
+ _opts = opts;
+ _subOpts = subOpts;
+ _cancellationToken = cancellationToken;
+ _nats = context.Connection;
+ _ackWaitNanos = TimeSpan.FromHours(22).ToNanos();
+ _hbTimeout = (int)(opts.IdleHeartbeat * 2).TotalMilliseconds;
+ _idleHbNanos = opts.IdleHeartbeat.ToNanos();
+ _consumer = NewNuid();
+
+ _nats.ConnectionDisconnected += OnDisconnected;
+
+ _timer = new Timer(
+ static state =>
+ {
+ var self = (NatsJSOrderedPushConsumer)state!;
+ self.CreateSub("idle-heartbeat-timeout");
+ if (self._debug)
+ {
+ self._logger.LogDebug(
+ NatsJSLogEvents.IdleTimeout,
+ "Idle heartbeat timeout after {Timeout}ns",
+ self._idleHbNanos);
+ }
+ },
+ this,
+ Timeout.Infinite,
+ Timeout.Infinite);
+
+ // Channel size 1 is enough because we want backpressure to go all the way to the subscription
+ // so that we get most accurate view of the stream. We can keep them as 1 until we find a case
+ // where it's not enough due to performance for example.
+ _commandChannel = Channel.CreateBounded>(1);
+ _msgChannel = Channel.CreateBounded>(1);
+
+ // A single request to create the consumer is enough because we don't want to create a new consumer
+ // back to back in case the consumer is being recreated due to a timeout and a mismatch in consumer
+ // sequence for example; creating the consumer once would solve both the issues.
+ _consumerCreateChannel = Channel.CreateBounded(new BoundedChannelOptions(1)
+ {
+ AllowSynchronousContinuations = false,
+ FullMode = BoundedChannelFullMode.DropOldest,
+ });
+
+ _consumerCreateTask = Task.Run(ConsumerCreateLoop);
+ _commandTask = Task.Run(CommandLoop);
+ }
+
+ public ChannelReader> Msgs => _msgChannel.Reader;
+
+ public bool IsDone => Volatile.Read(ref _done) > 0;
+
+ private string Consumer
+ {
+ get => Volatile.Read(ref _consumer);
+ set => Volatile.Write(ref _consumer, value);
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ _nats.ConnectionDisconnected -= OnDisconnected;
+
+ _consumerCreateChannel.Writer.TryComplete();
+ _commandChannel.Writer.TryComplete();
+ _msgChannel.Writer.TryComplete();
+
+ await _consumerCreateTask;
+ await _commandTask;
+
+ await _context.DeleteConsumerAsync(_stream, Consumer, _cancellationToken);
+ }
+
+ internal void Init()
+ {
+ Consumer = NewNuid();
+ CreateSub("init");
+ }
+
+ internal void Done()
+ {
+ Interlocked.Increment(ref _done);
+ _msgChannel.Writer.TryComplete();
+ }
+
+ private void OnDisconnected(object? sender, string e) => StopHeartbeatTimer();
+
+ private async Task CommandLoop()
+ {
+ try
+ {
+ while (await _commandChannel.Reader.WaitToReadAsync(_cancellationToken))
+ {
+ while (_commandChannel.Reader.TryRead(out var command))
+ {
+ try
+ {
+ var subCommand = command.Command;
+
+ if (subCommand == NatsJSOrderedPushConsumerCommand.Msg)
+ {
+ ResetHeartbeatTimer();
+ var msg = command.Msg;
+
+ var subSubject = _sub?.Subject;
+
+ if (subSubject == null)
+ continue;
+
+ if (string.Equals(msg.Subject, subSubject))
+ {
+ // Control message: e.g. heartbeat
+ }
+ else
+ {
+ if (msg.Metadata is { } metadata)
+ {
+ if (!metadata.Consumer.Equals(Consumer))
+ {
+ // Ignore messages from other consumers
+ // This might happen if the consumer is recreated
+ // and the old consumer somehow still receives messages
+ continue;
+ }
+
+ var sequence = Interlocked.Increment(ref _sequenceConsumer);
+
+ if (sequence != (long)metadata.Sequence.Consumer)
+ {
+ CreateSub("sequence-mismatch");
+ _logger.LogWarning("Missed messages, recreating consumer");
+ continue;
+ }
+
+ // Increment the sequence before writing to the channel in case the channel is full
+ // and the writer is waiting for the reader to read the message. This way the sequence
+ // will be correctly incremented in case the timeout kicks in and recreated the consumer.
+ Interlocked.Exchange(ref _sequenceStream, (long)metadata.Sequence.Stream);
+
+ if (!IsDone)
+ {
+ try
+ {
+ await _msgChannel.Writer.WriteAsync(msg, _cancellationToken);
+ }
+ catch
+ {
+ if (!IsDone)
+ throw;
+ }
+ }
+ }
+ else
+ {
+ _logger.LogWarning("Protocol error: Message metadata is missing");
+ }
+ }
+ }
+ else if (subCommand == NatsJSOrderedPushConsumerCommand.Ready)
+ {
+ ResetHeartbeatTimer();
+ }
+ else
+ {
+ _logger.LogError("Internal error: unexpected command {Command}", subCommand);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.LogWarning(e, "Command error");
+ }
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, "Unexpected command loop error");
+ }
+ }
+
+ private async Task ConsumerCreateLoop()
+ {
+ try
+ {
+ while (await _consumerCreateChannel.Reader.WaitToReadAsync(_cancellationToken))
+ {
+ while (_consumerCreateChannel.Reader.TryRead(out var origin))
+ {
+ try
+ {
+ await CreatePushConsumer(origin);
+ }
+ catch (Exception e)
+ {
+ _logger.LogWarning(e, "Consumer create error");
+ }
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, "Unexpected consumer create loop error");
+ }
+ }
+
+ private async ValueTask CreatePushConsumer(string origin)
+ {
+ if (_debug)
+ {
+ _logger.LogDebug(NatsJSLogEvents.NewConsumer, "Creating new consumer {Consumer} from {Origin}", Consumer, origin);
+ }
+
+ if (_sub != null)
+ {
+ if (_debug)
+ {
+ _logger.LogDebug(NatsJSLogEvents.DeleteOldDeliverySubject, "Deleting old delivery subject {Subject}", _sub.Subject);
+ }
+
+ await _sub.UnsubscribeAsync();
+ await _sub.DisposeAsync();
+ }
+
+ _sub = new NatsJSOrderedPushConsumerSub(_context, _commandChannel, _subOpts, _cancellationToken);
+ await _context.Connection.SubAsync(_sub, _cancellationToken).ConfigureAwait(false);
+
+ if (_debug)
+ {
+ _logger.LogDebug(NatsJSLogEvents.NewDeliverySubject, "New delivery subject {Subject}", _sub.Subject);
+ }
+
+ Interlocked.Exchange(ref _sequenceConsumer, 0);
+
+ var sequence = Volatile.Read(ref _sequenceStream);
+
+ var config = new ConsumerConfiguration
+ {
+ Name = Consumer,
+ DeliverPolicy = ConsumerConfigurationDeliverPolicy.all,
+ AckPolicy = ConsumerConfigurationAckPolicy.none,
+ DeliverSubject = _sub.Subject,
+ FilterSubject = _filter,
+ FlowControl = true,
+ IdleHeartbeat = _idleHbNanos,
+ AckWait = _ackWaitNanos,
+ MaxDeliver = 1,
+ MemStorage = true,
+ NumReplicas = 1,
+ ReplayPolicy = ConsumerConfigurationReplayPolicy.instant,
+ };
+
+ config.DeliverPolicy = _opts.DeliverPolicy;
+ config.HeadersOnly = _opts.HeadersOnly;
+
+ if (sequence > 0)
+ {
+ config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.by_start_sequence;
+ config.OptStartSeq = sequence + 1;
+ }
+
+ await _context.CreateConsumerAsync(
+ new ConsumerCreateRequest { StreamName = _stream, Config = config, },
+ cancellationToken: _cancellationToken);
+
+ if (_debug)
+ {
+ _logger.LogDebug(NatsJSLogEvents.NewConsumerCreated, "Created new consumer {Consumer} from {Origin}", Consumer, origin);
+ }
+ }
+
+ private string NewNuid()
+ {
+ Span buffer = stackalloc char[22];
+ if (NuidWriter.TryWriteNuid(buffer))
+ {
+ return new string(buffer);
+ }
+
+ throw new InvalidOperationException("Internal error: can't generate nuid");
+ }
+
+ private void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite);
+
+ private void StopHeartbeatTimer() => _timer.Change(Timeout.Infinite, Timeout.Infinite);
+
+ private void CreateSub(string origin)
+ {
+ Consumer = NewNuid();
+ _consumerCreateChannel.Writer.TryWrite(origin);
+ }
+}
+
+internal class NatsJSOrderedPushConsumerSub : NatsSubBase
+{
+ private readonly NatsJSContext _context;
+ private readonly CancellationToken _cancellationToken;
+ private readonly NatsConnection _nats;
+ private readonly NatsHeaderParser _headerParser;
+ private readonly INatsSerializer _serializer;
+ private readonly ChannelWriter> _commands;
+
+ public NatsJSOrderedPushConsumerSub(
+ NatsJSContext context,
+ Channel> commandChannel,
+ NatsSubOpts? opts,
+ CancellationToken cancellationToken)
+ : base(
+ connection: context.Connection,
+ manager: context.Connection.SubscriptionManager,
+ subject: context.NewInbox(),
+ queueGroup: default,
+ opts)
+ {
+ _context = context;
+ _cancellationToken = cancellationToken;
+ _serializer = opts?.Serializer ?? context.Connection.Opts.Serializer;
+ _nats = context.Connection;
+ _headerParser = _nats.HeaderParser;
+ _commands = commandChannel.Writer;
+ _nats.ConnectionOpened += OnConnectionOpened;
+ }
+
+ public override async ValueTask ReadyAsync()
+ {
+ await base.ReadyAsync();
+ await _commands.WriteAsync(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Ready }, _cancellationToken).ConfigureAwait(false);
+ }
+
+ public override ValueTask DisposeAsync()
+ {
+ _nats.ConnectionOpened -= OnConnectionOpened;
+ return base.DisposeAsync();
+ }
+
+ protected override async ValueTask ReceiveInternalAsync(
+ string subject,
+ string? replyTo,
+ ReadOnlySequence? headersBuffer,
+ ReadOnlySequence payloadBuffer)
+ {
+ var msg = new NatsJSMsg(NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _headerParser, _serializer), _context);
+ await _commands.WriteAsync(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Msg, Msg = msg }, _cancellationToken).ConfigureAwait(false);
+ }
+
+ protected override void TryComplete()
+ {
+ }
+
+ private void OnConnectionOpened(object? sender, string e) =>
+ _commands.TryWrite(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Ready });
+}
diff --git a/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj b/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj
index 16158b740..62a3ceef3 100644
--- a/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj
+++ b/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj
@@ -16,6 +16,8 @@
+
+
diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs
index 3bd5d8a8a..58a5cbae3 100644
--- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs
+++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs
@@ -55,6 +55,27 @@ public async ValueTask DeleteStreamAsync(
return response.Success;
}
+ ///
+ /// Purges all of the (or filtered) data in a stream, leaves the stream.
+ ///
+ /// Stream name to be purged.
+ /// Purge request.
+ /// A used to cancel the API call.
+ /// Purge response
+ /// There was an issue retrieving the response.
+ /// Server responded with an error.
+ public async ValueTask PurgeStreamAsync(
+ string stream,
+ StreamPurgeRequest request,
+ CancellationToken cancellationToken = default)
+ {
+ var response = await JSRequestResponseAsync(
+ subject: $"{Opts.Prefix}.STREAM.PURGE.{stream}",
+ request: request,
+ cancellationToken);
+ return response;
+ }
+
///
/// Get stream information from the server and creates a NATS JetStream stream object .
///
diff --git a/src/NATS.Client.JetStream/NatsJSLogEvents.cs b/src/NATS.Client.JetStream/NatsJSLogEvents.cs
index 055519aa5..af409a800 100644
--- a/src/NATS.Client.JetStream/NatsJSLogEvents.cs
+++ b/src/NATS.Client.JetStream/NatsJSLogEvents.cs
@@ -13,4 +13,8 @@ public static class NatsJSLogEvents
public static readonly EventId PendingCount = new(2007, nameof(PendingCount));
public static readonly EventId MessageProperty = new(2008, nameof(MessageProperty));
public static readonly EventId PullRequest = new(2009, nameof(PullRequest));
+ public static readonly EventId NewConsumer = new(2010, nameof(NewConsumer));
+ public static readonly EventId DeleteOldDeliverySubject = new(2011, nameof(DeleteOldDeliverySubject));
+ public static readonly EventId NewDeliverySubject = new(2012, nameof(NewDeliverySubject));
+ public static readonly EventId NewConsumerCreated = new(2013, nameof(NewConsumerCreated));
}
diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs
index ec72dc0d9..f791a3bb3 100644
--- a/src/NATS.Client.JetStream/NatsJSStream.cs
+++ b/src/NATS.Client.JetStream/NatsJSStream.cs
@@ -38,6 +38,21 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d
return _deleted = await _context.DeleteStreamAsync(_name, cancellationToken);
}
+ ///
+ /// Purge data from this stream. Leaves the stream.
+ ///
+ /// Purge request.
+ /// A used to cancel the API call.
+ /// Whether delete was successful or not.
+ /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier.
+ /// Server responded with an error.
+ /// After deletion this object can't be used anymore.
+ public async ValueTask PurgeAsync(StreamPurgeRequest request, CancellationToken cancellationToken = default)
+ {
+ ThrowIfDeleted();
+ return await _context.PurgeStreamAsync(_name, request, cancellationToken);
+ }
+
///
/// Update stream properties on the server.
///
diff --git a/src/NATS.Client.ObjectStore/Internal/Encoder.cs b/src/NATS.Client.ObjectStore/Internal/Encoder.cs
new file mode 100644
index 000000000..1ef9b5e41
--- /dev/null
+++ b/src/NATS.Client.ObjectStore/Internal/Encoder.cs
@@ -0,0 +1,219 @@
+using System.Security.Cryptography;
+
+namespace NATS.Client.ObjectStore.Internal;
+
+// Borrowed from https://github.com/AzureAD/azure-activedirectory-identitymodel-extensions-for-dotnet/blob/6.32.3/src/Microsoft.IdentityModel.Tokens/Base64UrlEncoder.cs
+using System;
+using System.Text;
+
+///
+/// Encodes and Decodes strings as Base64Url encoding.
+///
+public static class Base64UrlEncoder
+{
+ private const char Base64PadCharacter = '=';
+ private const char Base64Character62 = '+';
+ private const char Base64Character63 = '/';
+ private const char Base64UrlCharacter62 = '-';
+ private const char Base64UrlCharacter63 = '_';
+
+ ///
+ /// Encoding table
+ ///
+ private static readonly char[] SBase64Table = new[]
+ {
+ 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4',
+ '5', '6', '7', '8', '9', Base64UrlCharacter62, Base64UrlCharacter63,
+ };
+
+ public static string Sha256(ReadOnlySpan value)
+ {
+ Span destination = stackalloc byte[256 / 8];
+ using (var sha256 = SHA256.Create())
+ {
+ sha256.TryComputeHash(value, destination, out _);
+ }
+
+ return Encode(destination);
+ }
+
+ ///
+ /// The following functions perform base64url encoding which differs from regular base64 encoding as follows
+ /// * padding is skipped so the pad character '=' doesn't have to be percent encoded
+ /// * the 62nd and 63rd regular base64 encoding characters ('+' and '/') are replace with ('-' and '_')
+ /// The changes make the encoding alphabet file and URL safe.
+ ///
+ /// string to encode.
+ /// Base64Url encoding of the UTF8 bytes.
+ public static string Encode(string arg)
+ {
+ _ = arg ?? throw new ArgumentNullException(nameof(arg));
+
+ return Encode(Encoding.UTF8.GetBytes(arg));
+ }
+
+ ///
+ /// Converts a subset of an array of 8-bit unsigned integers to its equivalent string representation which is encoded with base-64-url digits. Parameters specify
+ /// the subset as an offset in the input array, and the number of elements in the array to convert.
+ ///
+ /// An array of 8-bit unsigned integers.
+ /// Remove padding
+ /// The string representation in base 64 url encoding of length elements of inArray, starting at position offset.
+ /// 'inArray' is null.
+ /// offset or length is negative OR offset plus length is greater than the length of inArray.
+ public static string Encode(Span inArray, bool raw = false)
+ {
+ var offset = 0;
+ var length = inArray.Length;
+
+ if (length == 0)
+ return string.Empty;
+
+ var lengthMod3 = length % 3;
+ var limit = length - lengthMod3;
+ var output = new char[(length + 2) / 3 * 4];
+ var table = SBase64Table;
+ int i, j = 0;
+
+ // takes 3 bytes from inArray and insert 4 bytes into output
+ for (i = offset; i < limit; i += 3)
+ {
+ var d0 = inArray[i];
+ var d1 = inArray[i + 1];
+ var d2 = inArray[i + 2];
+
+ output[j + 0] = table[d0 >> 2];
+ output[j + 1] = table[((d0 & 0x03) << 4) | (d1 >> 4)];
+ output[j + 2] = table[((d1 & 0x0f) << 2) | (d2 >> 6)];
+ output[j + 3] = table[d2 & 0x3f];
+ j += 4;
+ }
+
+ // Where we left off before
+ i = limit;
+
+ switch (lengthMod3)
+ {
+ case 2:
+ {
+ var d0 = inArray[i];
+ var d1 = inArray[i + 1];
+
+ output[j + 0] = table[d0 >> 2];
+ output[j + 1] = table[((d0 & 0x03) << 4) | (d1 >> 4)];
+ output[j + 2] = table[(d1 & 0x0f) << 2];
+ j += 3;
+ }
+
+ break;
+
+ case 1:
+ {
+ var d0 = inArray[i];
+
+ output[j + 0] = table[d0 >> 2];
+ output[j + 1] = table[(d0 & 0x03) << 4];
+ j += 2;
+ }
+
+ break;
+
+ // default or case 0: no further operations are needed.
+ }
+
+ if (raw)
+ return new string(output, 0, j);
+
+ for (var k = j; k < output.Length; k++)
+ {
+ output[k] = Base64PadCharacter;
+ }
+
+ return new string(output);
+ }
+
+ ///
+ /// Decodes the string from Base64UrlEncoded to UTF8.
+ ///
+ /// string to decode.
+ /// UTF8 string.
+ public static string Decode(string arg)
+ {
+ return Encoding.UTF8.GetString(DecodeBytes(arg));
+ }
+
+ ///
+ /// Converts the specified string, base-64-url encoded to utf8 bytes.
+ /// base64Url encoded string.
+ /// UTF8 bytes.
+ public static byte[] DecodeBytes(string str)
+ {
+ _ = str ?? throw new ArgumentNullException(nameof(str));
+ return UnsafeDecode(str);
+ }
+
+ private static unsafe byte[] UnsafeDecode(string str)
+ {
+ var mod = str.Length % 4;
+ if (mod == 1)
+ throw new FormatException(nameof(str));
+
+ var needReplace = false;
+ var decodedLength = str.Length + ((4 - mod) % 4);
+
+ for (var i = 0; i < str.Length; i++)
+ {
+ if (str[i] == Base64UrlCharacter62 || str[i] == Base64UrlCharacter63)
+ {
+ needReplace = true;
+ break;
+ }
+ }
+
+ if (needReplace)
+ {
+ string decodedString = new(char.MinValue, decodedLength);
+ fixed (char* dest = decodedString)
+ {
+ var i = 0;
+ for (; i < str.Length; i++)
+ {
+ if (str[i] == Base64UrlCharacter62)
+ dest[i] = Base64Character62;
+ else if (str[i] == Base64UrlCharacter63)
+ dest[i] = Base64Character63;
+ else
+ dest[i] = str[i];
+ }
+
+ for (; i < decodedLength; i++)
+ dest[i] = Base64PadCharacter;
+ }
+
+ return Convert.FromBase64String(decodedString);
+ }
+ else
+ {
+ if (decodedLength == str.Length)
+ {
+ return Convert.FromBase64String(str);
+ }
+ else
+ {
+ string decodedString = new(char.MinValue, decodedLength);
+ fixed (char* src = str)
+ {
+ fixed (char* dest = decodedString)
+ {
+ Buffer.MemoryCopy(src, dest, str.Length * 2, str.Length * 2);
+ dest[str.Length] = Base64PadCharacter;
+ if (str.Length + 2 == decodedLength)
+ dest[str.Length + 1] = Base64PadCharacter;
+ }
+ }
+
+ return Convert.FromBase64String(decodedString);
+ }
+ }
+ }
+}
diff --git a/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs
new file mode 100644
index 000000000..ba5509bef
--- /dev/null
+++ b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs
@@ -0,0 +1,82 @@
+using System.Text.Json.Serialization;
+
+namespace NATS.Client.ObjectStore.Models;
+
+public record ObjectMetadata
+{
+ ///
+ /// Object name
+ ///
+ [JsonPropertyName("name")]
+ public string Name { get; set; } = default!;
+
+ ///
+ /// Bucket name
+ ///
+ [JsonPropertyName("bucket")]
+ public string Bucket { get; set; } = default!;
+
+ ///
+ /// Object NUID
+ ///
+ [JsonPropertyName("nuid")]
+ public string Nuid { get; set; } = default!;
+
+ ///
+ /// Max chunk size
+ ///
+ [JsonPropertyName("size")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public int Size { get; set; } = default!;
+
+ ///
+ /// Modified timestamp
+ ///
+ [JsonPropertyName("mtime")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public DateTimeOffset MTime { get; set; } = default!;
+
+ ///
+ /// Number of chunks
+ ///
+ [JsonPropertyName("chunks")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public int Chunks { get; set; } = default!;
+
+ ///
+ /// Object digest
+ ///
+ [JsonPropertyName("digest")]
+ public string Digest { get; set; } = default!;
+
+ ///
+ /// Object metadata
+ ///
+ [JsonPropertyName("meta")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public Dictionary Meta { get; set; } = default!;
+
+ ///
+ /// Object deleted
+ ///
+ [JsonPropertyName("deleted")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public bool Deleted { get; set; } = default!;
+
+ ///
+ /// Object options
+ ///
+ [JsonPropertyName("options")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public Options Options { get; set; } = default!;
+}
+
+public record Options
+{
+ ///
+ /// Max chunk size
+ ///
+ [JsonPropertyName("max_chunk_size")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
+ public int MaxChunkSize { get; set; } = default!;
+}
diff --git a/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj b/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj
new file mode 100644
index 000000000..d58a0e017
--- /dev/null
+++ b/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj
@@ -0,0 +1,20 @@
+
+
+
+ net6.0
+ enable
+ enable
+ true
+
+
+ pubsub;messaging
+ JetStream Object Store support for NATS.Client.
+ false
+
+
+
+
+
+
+
+
diff --git a/src/NATS.Client.ObjectStore/NatsObjConfig.cs b/src/NATS.Client.ObjectStore/NatsObjConfig.cs
new file mode 100644
index 000000000..80a1f3029
--- /dev/null
+++ b/src/NATS.Client.ObjectStore/NatsObjConfig.cs
@@ -0,0 +1,47 @@
+namespace NATS.Client.ObjectStore;
+
+///
+/// Object Store storage type
+///
+public enum NatsObjStorageType
+{
+ File = 0,
+ Memory = 1,
+}
+
+///
+/// Object store configuration.
+///
+/// Name of the bucket.
+public record NatsObjConfig(string Bucket)
+{
+ ///
+ /// Bucket description.
+ ///
+ public string? Description { get; init; }
+
+ ///
+ /// Maximum age of the object.
+ ///
+ public TimeSpan? MaxAge { get; init; }
+
+ ///
+ /// How big the store may be, when the combined stream size exceeds this old keys are removed.
+ ///
+ public long? MaxBytes { get; init; }
+
+ ///
+ /// Type of backing storage to use.
+ ///
+ public NatsObjStorageType? Storage { get; init; }
+
+ ///
+ /// How many replicas to keep for each key.
+ ///
+ public int NumberOfReplicas { get; init; } = 1;
+
+ ///
+ /// Additional metadata for the bucket.
+ ///
+ public Dictionary? Metadata { get; init; }
+}
diff --git a/src/NATS.Client.ObjectStore/NatsObjContext.cs b/src/NATS.Client.ObjectStore/NatsObjContext.cs
new file mode 100644
index 000000000..ebfb89a1a
--- /dev/null
+++ b/src/NATS.Client.ObjectStore/NatsObjContext.cs
@@ -0,0 +1,92 @@
+using System.Text.RegularExpressions;
+using NATS.Client.JetStream;
+using NATS.Client.JetStream.Internal;
+using NATS.Client.JetStream.Models;
+
+namespace NATS.Client.ObjectStore;
+
+///
+/// Object Store context.
+///
+public class NatsObjContext
+{
+ private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled);
+
+ private readonly NatsJSContext _context;
+
+ ///
+ /// Create a new object store context.
+ ///
+ /// JetStream context.
+ public NatsObjContext(NatsJSContext context) => _context = context;
+
+ ///
+ /// Create a new object store.
+ ///
+ /// Object store configuration.
+ /// A used to cancel the API call.
+ /// Object store object.
+ public async ValueTask CreateObjectStore(NatsObjConfig config, CancellationToken cancellationToken = default)
+ {
+ ValidateBucketName(config.Bucket);
+
+ var storage = config.Storage == NatsObjStorageType.File
+ ? StreamConfigurationStorage.file
+ : StreamConfigurationStorage.memory;
+
+ var streamConfiguration = new StreamConfiguration
+ {
+ Name = $"OBJ_{config.Bucket}",
+ Description = config.Description!,
+ Subjects = new[] { $"$O.{config.Bucket}.C.>", $"$O.{config.Bucket}.M.>" },
+ MaxAge = config.MaxAge?.ToNanos() ?? 0,
+ MaxBytes = config.MaxBytes ?? -1,
+ Storage = storage,
+ NumReplicas = config.NumberOfReplicas,
+ /* TODO: Placement = */
+ Discard = StreamConfigurationDiscard.@new,
+ AllowRollupHdrs = true,
+ AllowDirect = true,
+ Metadata = config.Metadata!,
+ Retention = StreamConfigurationRetention.limits,
+ };
+
+ var stream = await _context.CreateStreamAsync(streamConfiguration, cancellationToken);
+ return new NatsObjStore(config, _context, stream);
+ }
+
+ ///
+ /// Delete an object store.
+ ///
+ /// Name of the bucket.
+ /// A used to cancel the API call.
+ /// Whether delete was successful or not.
+ public ValueTask DeleteObjectStore(string bucket, CancellationToken cancellationToken)
+ {
+ ValidateBucketName(bucket);
+ return _context.DeleteStreamAsync($"OBJ_{bucket}", cancellationToken);
+ }
+
+ private void ValidateBucketName(string bucket)
+ {
+ if (string.IsNullOrWhiteSpace(bucket))
+ {
+ throw new NatsObjException("Bucket name can't be empty");
+ }
+
+ if (bucket.StartsWith("."))
+ {
+ throw new NatsObjException("Bucket name can't start with a period");
+ }
+
+ if (bucket.EndsWith("."))
+ {
+ throw new NatsObjException("Bucket name can't end with a period");
+ }
+
+ if (!ValidBucketRegex.IsMatch(bucket))
+ {
+ throw new NatsObjException("Bucket name can only contain alphanumeric characters, dashes, and underscores");
+ }
+ }
+}
diff --git a/src/NATS.Client.ObjectStore/NatsObjException.cs b/src/NATS.Client.ObjectStore/NatsObjException.cs
new file mode 100644
index 000000000..6e411f78c
--- /dev/null
+++ b/src/NATS.Client.ObjectStore/NatsObjException.cs
@@ -0,0 +1,28 @@
+using NATS.Client.JetStream;
+
+namespace NATS.Client.ObjectStore;
+
+///
+/// NATS Object Store exception.
+///
+public class NatsObjException : NatsJSException
+{
+ ///
+ /// Create a new NATS Object Store exception.
+ ///
+ /// Exception message.
+ public NatsObjException(string message)
+ : base(message)
+ {
+ }
+
+ ///
+ /// Create a new NATS Object Store exception.
+ ///
+ /// Exception message.
+ /// Inner exception
+ public NatsObjException(string message, Exception exception)
+ : base(message, exception)
+ {
+ }
+}
diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs
new file mode 100644
index 000000000..b703b2508
--- /dev/null
+++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs
@@ -0,0 +1,377 @@
+using System.Buffers;
+using System.Security.Cryptography;
+using System.Text.RegularExpressions;
+using NATS.Client.Core;
+using NATS.Client.Core.Internal;
+using NATS.Client.JetStream;
+using NATS.Client.JetStream.Internal;
+using NATS.Client.JetStream.Models;
+using NATS.Client.ObjectStore.Internal;
+using NATS.Client.ObjectStore.Models;
+
+namespace NATS.Client.ObjectStore;
+
+///
+/// NATS Object Store.
+///
+public class NatsObjStore
+{
+ private const int DefaultChunkSize = 128 * 1024;
+ private const string NatsRollup = "Nats-Rollup";
+ private const string RollupSubject = "sub";
+
+ private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } };
+ private static readonly Regex ValidObjectRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled);
+
+ private readonly string _bucket;
+ private readonly NatsJSContext _context;
+ private readonly NatsJSStream _stream;
+
+ internal NatsObjStore(NatsObjConfig config, NatsJSContext context, NatsJSStream stream)
+ {
+ _bucket = config.Bucket;
+ _context = context;
+ _stream = stream;
+ }
+
+ ///
+ /// Get object by key.
+ ///
+ /// Object key.
+ /// A used to cancel the API call.
+ /// Object value as a byte array.
+ public async ValueTask GetBytesAsync(string key, CancellationToken cancellationToken = default)
+ {
+ var memoryStream = new MemoryStream();
+ await GetAsync(key, memoryStream, cancellationToken: cancellationToken).ConfigureAwait(false);
+ return memoryStream.ToArray();
+ }
+
+ ///
+ /// Get object by key.
+ ///
+ /// Object key.
+ /// Stream to write the object value to.
+ /// true to not close the underlying stream when async method returns; otherwise, false
+ /// A used to cancel the API call.
+ /// Object metadata.
+ /// Metadata didn't match the value retrieved e.g. the SHA digest.
+ public async ValueTask GetAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default)
+ {
+ ValidateObjectName(key);
+
+ var info = await GetInfoAsync(key, cancellationToken: cancellationToken);
+
+ await using var pushConsumer = new NatsJSOrderedPushConsumer>(
+ _context,
+ $"OBJ_{_bucket}",
+ GetChunkSubject(info.Nuid),
+ new NatsJSOrderedPushConsumerOpts { DeliverPolicy = ConsumerConfigurationDeliverPolicy.all },
+ new NatsSubOpts(),
+ cancellationToken);
+
+ pushConsumer.Init();
+
+ string digest;
+ var chunks = 0;
+ var size = 0;
+ using (var sha256 = SHA256.Create())
+ {
+ await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write, leaveOpen))
+ {
+ await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken))
+ {
+ // We have to make sure to carry on consuming the channel to avoid any blocking:
+ // e.g. if the channel is full, we would be blocking the reads off the socket (this was intentionally
+ // done ot avoid bloating the memory with a large backlog of messages or dropping messages at this level
+ // and signal the server that we are a slow consumer); then when we make an request-reply API call to
+ // delete the consumer, the socket would be blocked trying to send the response back to us; so we need to
+ // keep consuming the channel to avoid this.
+ if (pushConsumer.IsDone)
+ continue;
+
+ if (msg.Data != null)
+ {
+ using (msg.Data)
+ {
+ chunks++;
+ size += msg.Data.Memory.Length;
+ await hashedStream.WriteAsync(msg.Data.Memory, cancellationToken);
+ }
+ }
+
+ var p = msg.Metadata?.NumPending;
+ if (p is 0)
+ {
+ pushConsumer.Done();
+ }
+ }
+ }
+
+ digest = Base64UrlEncoder.Encode(sha256.Hash);
+ }
+
+ if ($"SHA-256={digest}" != info.Digest)
+ {
+ throw new NatsObjException("SHA-256 digest mismatch");
+ }
+
+ if (chunks != info.Chunks)
+ {
+ throw new NatsObjException("Chunks mismatch");
+ }
+
+ if (size != info.Size)
+ {
+ throw new NatsObjException("Size mismatch");
+ }
+
+ await stream.FlushAsync(cancellationToken);
+
+ return info;
+ }
+
+ ///
+ /// Put an object by key.
+ ///
+ /// Object key.
+ /// Object value as a byte array.
+ /// A used to cancel the API call.
+ /// Object metadata.
+ public ValueTask PutAsync(string key, byte[] value, CancellationToken cancellationToken = default) =>
+ PutAsync(new ObjectMetadata { Name = key }, new MemoryStream(value), cancellationToken: cancellationToken);
+
+ ///
+ /// Put an object by key.
+ ///
+ /// Object key.
+ /// Stream to read the value from.
+ /// true to not close the underlying stream when async method returns; otherwise, false
+ /// A used to cancel the API call.
+ /// Object metadata.
+ /// There was an error calculating SHA digest.
+ /// Server responded with an error.
+ public ValueTask PutAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) =>
+ PutAsync(new ObjectMetadata { Name = key }, stream, leaveOpen, cancellationToken);
+
+ ///
+ /// Put an object by key.
+ ///
+ /// Object metadata.
+ /// Stream to read the value from.
+ /// true to not close the underlying stream when async method returns; otherwise, false
+ /// A used to cancel the API call.
+ /// Object metadata.
+ /// There was an error calculating SHA digest.
+ /// Server responded with an error.
+ public async ValueTask PutAsync(ObjectMetadata meta, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default)
+ {
+ ValidateObjectName(meta.Name);
+
+ ObjectMetadata? info = null;
+ try
+ {
+ info = await GetInfoAsync(meta.Name, cancellationToken: cancellationToken).ConfigureAwait(false);
+ }
+ catch (NatsJSApiException e)
+ {
+ if (e.Error.Code != 404)
+ throw;
+ }
+
+ var nuid = NewNuid();
+ meta.Bucket = _bucket;
+ meta.Nuid = nuid;
+ meta.MTime = DateTimeOffset.UtcNow;
+
+ if (meta.Options == null!)
+ {
+ meta.Options = new Options { MaxChunkSize = DefaultChunkSize };
+ }
+
+ if (meta.Options.MaxChunkSize == 0)
+ {
+ meta.Options.MaxChunkSize = DefaultChunkSize;
+ }
+
+ var size = 0;
+ var chunks = 0;
+ var chunkSize = meta.Options.MaxChunkSize;
+
+ string digest;
+ using (var sha256 = SHA256.Create())
+ {
+ await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read, leaveOpen))
+ {
+ while (true)
+ {
+ var memoryOwner = new FixedSizeMemoryOwner(MemoryPool.Shared.Rent(chunkSize), chunkSize);
+
+ var memory = memoryOwner.Memory;
+ var currentChunkSize = 0;
+ var eof = false;
+
+ // Fill a chunk
+ while (true)
+ {
+ var read = await hashedStream.ReadAsync(memory, cancellationToken);
+
+ // End of stream
+ if (read == 0)
+ {
+ eof = true;
+ break;
+ }
+
+ memory = memory.Slice(read);
+ currentChunkSize += read;
+
+ // Chunk filled
+ if (memory.IsEmpty)
+ {
+ break;
+ }
+ }
+
+ if (currentChunkSize > 0)
+ {
+ size += currentChunkSize;
+ chunks++;
+ }
+
+ var buffer = new FixedSizeMemoryOwner(memoryOwner, currentChunkSize);
+
+ // Chunks
+ var ack = await _context.PublishAsync(GetChunkSubject(nuid), buffer, cancellationToken: cancellationToken);
+ ack.EnsureSuccess();
+
+ if (eof)
+ break;
+ }
+ }
+
+ if (sha256.Hash == null)
+ throw new NatsObjException("Can't compute SHA256 hash");
+
+ digest = Base64UrlEncoder.Encode(sha256.Hash);
+ }
+
+ meta.Chunks = chunks;
+ meta.Size = size;
+ meta.Digest = $"SHA-256={digest}";
+
+ // Metadata
+ await PublishMeta(meta, cancellationToken);
+
+ // Delete the old object
+ if (info != null && info.Nuid != nuid)
+ {
+ try
+ {
+ await _context.JSRequestResponseAsync(
+ subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{_bucket}",
+ request: new StreamPurgeRequest { Filter = GetChunkSubject(info.Nuid) },
+ cancellationToken);
+ }
+ catch (NatsJSApiException e)
+ {
+ if (e.Error.Code != 404)
+ throw;
+ }
+ }
+
+ return meta;
+ }
+
+ ///
+ /// Get object metadata by key.
+ ///
+ /// Object key.
+ /// Also retrieve deleted objects.
+ /// A used to cancel the API call.
+ /// Object metadata.
+ /// Object was not found.
+ public async ValueTask GetInfoAsync(string key, bool showDeleted = false, CancellationToken cancellationToken = default)
+ {
+ ValidateObjectName(key);
+
+ var request = new StreamMsgGetRequest { LastBySubj = GetMetaSubject(key) };
+
+ var response = await _stream.GetAsync(request, cancellationToken);
+
+ var data = NatsJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata");
+
+ if (!showDeleted && data.Deleted)
+ {
+ throw new NatsObjException("Object not found");
+ }
+
+ return data;
+ }
+
+ ///
+ /// Delete an object by key.
+ ///
+ /// Object key.
+ /// A used to cancel the API call.
+ /// Object metadata was invalid or chunks can't be purged.
+ public async ValueTask DeleteAsync(string key, CancellationToken cancellationToken = default)
+ {
+ ValidateObjectName(key);
+
+ var meta = await GetInfoAsync(key, showDeleted: true, cancellationToken).ConfigureAwait(false);
+
+ if (string.IsNullOrEmpty(meta.Nuid))
+ {
+ throw new NatsObjException("Object-store meta information invalid");
+ }
+
+ meta.Size = 0;
+ meta.Chunks = 0;
+ meta.Digest = string.Empty;
+ meta.Deleted = true;
+ meta.MTime = DateTimeOffset.UtcNow;
+
+ await PublishMeta(meta, cancellationToken);
+
+ var response = await _stream.PurgeAsync(new StreamPurgeRequest { Filter = GetChunkSubject(meta.Nuid) }, cancellationToken);
+ if (!response.Success)
+ {
+ throw new NatsObjException("Can't purge object chunks");
+ }
+ }
+
+ private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken)
+ {
+ var ack = await _context.PublishAsync(GetMetaSubject(meta.Name), meta, headers: NatsRollupHeaders, cancellationToken: cancellationToken);
+ ack.EnsureSuccess();
+ }
+
+ private string GetMetaSubject(string key) => $"$O.{_bucket}.M.{Base64UrlEncoder.Encode(key)}";
+
+ private string GetChunkSubject(string nuid) => $"$O.{_bucket}.C.{nuid}";
+
+ private string NewNuid()
+ {
+ Span buffer = stackalloc char[22];
+ if (NuidWriter.TryWriteNuid(buffer))
+ {
+ return new string(buffer);
+ }
+
+ throw new InvalidOperationException("Internal error: can't generate nuid");
+ }
+
+ private void ValidateObjectName(string name)
+ {
+ if (string.IsNullOrWhiteSpace(name))
+ {
+ throw new NatsObjException("Object name can't be empty");
+ }
+
+ if (!ValidObjectRegex.IsMatch(name))
+ {
+ throw new NatsObjException("Object name can only contain alphanumeric characters, dashes, underscores, forward slash, equals sign, and periods");
+ }
+ }
+}
diff --git a/tests/NATS.Client.ObjectStore.Tests/Base64UrlEncoderTest.cs b/tests/NATS.Client.ObjectStore.Tests/Base64UrlEncoderTest.cs
new file mode 100644
index 000000000..f349e8c7a
--- /dev/null
+++ b/tests/NATS.Client.ObjectStore.Tests/Base64UrlEncoderTest.cs
@@ -0,0 +1,74 @@
+using System.Text;
+using NATS.Client.ObjectStore.Internal;
+
+namespace NATS.Client.ObjectStore.Tests;
+
+public class Base64UrlEncoderTest
+{
+ private readonly ITestOutputHelper _output;
+
+ public Base64UrlEncoderTest(ITestOutputHelper output) => _output = output;
+
+ [Theory]
+ [InlineData("Hello World!")]
+ [InlineData("!~£$%^&*()_+{}:@~<>?")]
+ [InlineData("C")]
+ [InlineData("AB")]
+ [InlineData("ABC")]
+ public void Encoding_test(string input)
+ {
+ var encoded = Base64UrlEncoder.Encode(Encoding.UTF8.GetBytes(input));
+ var expected = Encode(input);
+ Assert.Equal(expected, encoded);
+ _output.WriteLine($">>{encoded}<<");
+ }
+
+ [Theory]
+ [InlineData("SGVsbG8gV29ybGQh")]
+ [InlineData("IX7CoyQlXiYqKClfK3t9OkB-PD4_")]
+ [InlineData("Qw==")]
+ [InlineData("QUI=")]
+ [InlineData("QUJD")]
+ public void Decoding_test(string input)
+ {
+ var decoded = Base64UrlEncoder.Decode(input);
+ var expected = Decode(input);
+ Assert.Equal(expected, decoded);
+ _output.WriteLine($">>{decoded}<<");
+ }
+
+ private string Encode(string input, bool raw = false)
+ {
+ var base64String = Convert.ToBase64String(Encoding.UTF8.GetBytes(input));
+
+ if (raw)
+ {
+ base64String = base64String.TrimEnd('=');
+ }
+
+ return base64String
+ .Replace('+', '-')
+ .Replace('/', '_');
+ }
+
+ private string Decode(string input)
+ {
+ var incoming = input
+ .Replace('_', '/')
+ .Replace('-', '+');
+
+ switch (input.Length % 4)
+ {
+ case 2:
+ incoming += "==";
+ break;
+ case 3:
+ incoming += "=";
+ break;
+ }
+
+ var bytes = Convert.FromBase64String(incoming);
+
+ return Encoding.UTF8.GetString(bytes);
+ }
+}
diff --git a/tests/NATS.Client.ObjectStore.Tests/GlobalUsings.cs b/tests/NATS.Client.ObjectStore.Tests/GlobalUsings.cs
new file mode 100644
index 000000000..c802f4480
--- /dev/null
+++ b/tests/NATS.Client.ObjectStore.Tests/GlobalUsings.cs
@@ -0,0 +1 @@
+global using Xunit;
diff --git a/tests/NATS.Client.ObjectStore.Tests/NATS.Client.ObjectStore.Tests.csproj b/tests/NATS.Client.ObjectStore.Tests/NATS.Client.ObjectStore.Tests.csproj
new file mode 100644
index 000000000..29e28d4af
--- /dev/null
+++ b/tests/NATS.Client.ObjectStore.Tests/NATS.Client.ObjectStore.Tests.csproj
@@ -0,0 +1,37 @@
+
+
+
+ net6.0
+ enable
+ enable
+
+ false
+ true
+
+
+
+
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs
new file mode 100644
index 000000000..08e007b4a
--- /dev/null
+++ b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs
@@ -0,0 +1,189 @@
+using System.Security.Cryptography;
+using System.Text;
+using NATS.Client.Core.Tests;
+using NATS.Client.ObjectStore.Internal;
+using NATS.Client.ObjectStore.Models;
+
+namespace NATS.Client.ObjectStore.Tests;
+
+public class ObjectStoreTest
+{
+ private readonly ITestOutputHelper _output;
+
+ public ObjectStoreTest(ITestOutputHelper output) => _output = output;
+
+ [Fact]
+ public async Task Create_delete_object_store()
+ {
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ var cancellationToken = cts.Token;
+
+ await using var server = NatsServer.StartJS();
+ await using var nats = server.CreateClientConnection();
+ var js = new NatsJSContext(nats);
+ var ob = new NatsObjContext(js);
+
+ await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
+
+ await foreach (var stream in js.ListStreamsAsync(cancellationToken: cancellationToken))
+ {
+ Assert.Equal($"OBJ_b1", stream.Info.Config.Name);
+ }
+
+ var deleted = await ob.DeleteObjectStore("b1", cancellationToken);
+ Assert.True(deleted);
+
+ await foreach (var stream in js.ListStreamsAsync(cancellationToken: cancellationToken))
+ {
+ Assert.Fail("Should not have any streams");
+ }
+ }
+
+ [Fact]
+ public async Task Put_chunks()
+ {
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ var cancellationToken = cts.Token;
+
+ await using var server = NatsServer.StartJS();
+ await using var nats = server.CreateClientConnection();
+ var js = new NatsJSContext(nats);
+ var ob = new NatsObjContext(js);
+
+ var store = await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
+
+ var stringBuilder = new StringBuilder();
+ for (var i = 0; i < 9; i++)
+ {
+ stringBuilder.Append($"{i:D2}-4567890");
+ }
+
+ var buffer90 = stringBuilder.ToString();
+
+ // square buffer: all chunks are the same size
+ {
+ var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, };
+
+ var buffer = Encoding.ASCII.GetBytes(buffer90);
+ var stream = new MemoryStream(buffer);
+
+ await store.PutAsync(meta, stream, cancellationToken: cancellationToken);
+
+ var data = await store.GetInfoAsync("k1", cancellationToken: cancellationToken);
+
+ var sha = Base64UrlEncoder.Encode(SHA256.HashData(buffer));
+ var size = buffer.Length;
+ var chunks = Math.Ceiling(size / 10.0);
+
+ Assert.Equal($"SHA-256={sha}", data.Digest);
+ Assert.Equal(chunks, data.Chunks);
+ Assert.Equal(size, data.Size);
+ }
+
+ // buffer with smaller last chunk
+ {
+ var meta = new ObjectMetadata { Name = "k2", Options = new Options { MaxChunkSize = 10 }, };
+
+ var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45");
+ var stream = new MemoryStream(buffer);
+
+ await store.PutAsync(meta, stream, cancellationToken: cancellationToken);
+
+ var data = await store.GetInfoAsync("k2", cancellationToken: cancellationToken);
+
+ var sha = Base64UrlEncoder.Encode(SHA256.HashData(buffer));
+ var size = buffer.Length;
+ var chunks = Math.Ceiling(size / 10.0);
+
+ Assert.Equal($"SHA-256={sha}", data.Digest);
+ Assert.Equal(chunks, data.Chunks);
+ Assert.Equal(size, data.Size);
+ }
+ }
+
+ [Fact]
+ public async Task Get_chunks()
+ {
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ var cancellationToken = cts.Token;
+
+ await using var server = NatsServer.StartJS();
+ await using var nats = server.CreateClientConnection();
+ var js = new NatsJSContext(nats);
+ var ob = new NatsObjContext(js);
+
+ var store = await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
+
+ var stringBuilder = new StringBuilder();
+ for (var i = 0; i < 9; i++)
+ {
+ stringBuilder.Append($"{i:D2}-4567890");
+ }
+
+ var buffer90 = stringBuilder.ToString();
+
+ // square buffer: all chunks are the same size
+ {
+ var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, };
+ var buffer = Encoding.ASCII.GetBytes(buffer90);
+ var stream = new MemoryStream(buffer);
+ await store.PutAsync(meta, stream, cancellationToken: cancellationToken);
+ }
+
+ {
+ var memoryStream = new MemoryStream();
+ await store.GetAsync("k1", memoryStream, cancellationToken: cancellationToken);
+ await memoryStream.FlushAsync(cancellationToken);
+ var buffer = memoryStream.ToArray();
+ Assert.Equal(buffer90, Encoding.ASCII.GetString(buffer));
+ }
+
+ // buffer with smaller last chunk
+ {
+ var meta = new ObjectMetadata { Name = "k2", Options = new Options { MaxChunkSize = 10 }, };
+ var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45");
+ var stream = new MemoryStream(buffer);
+ await store.PutAsync(meta, stream, cancellationToken: cancellationToken);
+ }
+
+ {
+ var memoryStream = new MemoryStream();
+ await store.GetAsync("k2", memoryStream, cancellationToken: cancellationToken);
+ await memoryStream.FlushAsync(cancellationToken);
+ var buffer = memoryStream.ToArray();
+ Assert.Equal(buffer90 + "09-45", Encoding.ASCII.GetString(buffer));
+ }
+ }
+
+ [Fact]
+ public async Task Delete_object()
+ {
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ var cancellationToken = cts.Token;
+
+ await using var server = NatsServer.StartJS();
+ await using var nats = server.CreateClientConnection();
+ var js = new NatsJSContext(nats);
+ var ob = new NatsObjContext(js);
+
+ var store = await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken);
+ await store.PutAsync("k1", new byte[] { 65, 66, 67 }, cancellationToken);
+
+ var info = await store.GetInfoAsync("k1", cancellationToken: cancellationToken);
+ Assert.Equal(3, info.Size);
+
+ var bytes = await store.GetBytesAsync("k1", cancellationToken);
+ Assert.Equal(bytes, new byte[] { 65, 66, 67 });
+
+ await store.DeleteAsync("k1", cancellationToken);
+
+ var exception = await Assert.ThrowsAsync(async () => await store.GetInfoAsync("k1", cancellationToken: cancellationToken));
+ Assert.Equal("Object not found", exception.Message);
+
+ var info2 = await store.GetInfoAsync("k1", showDeleted: true, cancellationToken: cancellationToken);
+ Assert.True(info2.Deleted);
+ Assert.Equal(0, info2.Size);
+ Assert.Equal(0, info2.Chunks);
+ Assert.Equal(string.Empty, info2.Digest);
+ }
+}