diff --git a/docs/documentation/intro.md b/docs/documentation/intro.md index 2a49ff5d9..f8ff7b6c1 100644 --- a/docs/documentation/intro.md +++ b/docs/documentation/intro.md @@ -15,3 +15,5 @@ these docs. You can also create a Pull Request using the Edit on GitHub link on [JetStream](jetstream/intro.md) is the built-in distributed persistence system built-in to the same NATS server binary. [Key/Value Store](key-value-store/intro.md) is the built-in distributed persistent associative arrays built on top of JetStream. + +[Object Store](object-store/intro.md) is the built-in distributed persistent objects of arbitrary size built on top of JetStream. diff --git a/docs/documentation/object-store/intro.md b/docs/documentation/object-store/intro.md new file mode 100644 index 000000000..ec32ee011 --- /dev/null +++ b/docs/documentation/object-store/intro.md @@ -0,0 +1,103 @@ +# Object Store + +[The Object Store](https://docs.nats.io/nats-concepts/jetstream/obj_store) is very similar to the Key Value Store in that you put and get data using a key. +The difference being that Object store allows for the storage of objects that can be of any size. + +Under the covers Object Store is a client side construct that allows you to store and retrieve chunks of data +by a key using JetStream as the stream persistence engine. It's a simple, yet powerful way to store +and retrieve large data like files. + +To be able to use Object Store you need to enable JetStream by running the server with `-js` flag e.g. `nats-server -js`. + +## Object Store Quick Start + +[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it with JetStream enabled: + +```shell +$ nats-server -js +``` + +Install `NATS.Client.ObjectStore` preview from Nuget. + +Before we can do anything, we need an Object Store context: + +```csharp +await using var nats = new NatsConnection(); +var js = new NatsJSContext(nats); +var obj = new NatsObjContext(js); +``` + +Let's create our store first. In Object Store, a bucket is simply a storage for key/object pairs: + +```csharp +var store = await obj.CreateObjectStore("test-bucket"); +``` + +Now that we have a KV bucket in our stream, let's see its status using the [NATS command +line client](https://github.com/nats-io/natscli): + +```shell +$ nats object ls +╭──────────────────────────────────────────────────────────────────────╮ +│ Object Store Buckets │ +├─────────────┬─────────────┬─────────────────────┬──────┬─────────────┤ +│ Bucket │ Description │ Created │ Size │ Last Update │ +├─────────────┼─────────────┼─────────────────────┼──────┼─────────────┤ +│ test-bucket │ │ 2023-10-18 14:10:27 │ 0 B │ never │ +╰─────────────┴─────────────┴─────────────────────┴──────┴─────────────╯ +``` + +We can save objects in a bucket by putting them using a key, which is `my/random/data.bin` in our case. We can also retrieve the +saved value by its key: + +```csharp +await store.PutAsync("my/random/data.bin", File.OpenRead("data.bin")); + +await store.GetAsync("my/random/data.bin", File.OpenWrite("data_copy.bin")); +``` + +We can also confirm that our value is persisted by using the NATS command line: + +```shell +$ nats object info test-bucket my/random/data.bin +Object information for test-bucket > my/random/data.bin + + Size: 10 MiB + Modification Time: 18 Oct 23 14:54 +0000 + Chunks: 80 + Digest: SHA-256 d34334673e4e2b2300c09550faa5e2b6d0f04245a1d0b664454bb922da56 +``` + +## Other Operations + +### Get Info + +We can get information about a key in a bucket: + +```csharp +var metadata = await store.GetInfoAsync("my/random/data.bin"); + +Console.WriteLine("Metadata:"); +Console.WriteLine($" Bucket: {metadata.Bucket}"); +Console.WriteLine($" Name: {metadata.Name}"); +Console.WriteLine($" Size: {metadata.Size}"); +Console.WriteLine($" Time: {metadata.MTime}"); +Console.WriteLine($" Chunks: {metadata.Chunks}"); + +// Outputs: +// Metadata: +// Bucket: test-bucket +// Name: my/random/data.bin +// Size: 10485760 +// Time: 18/10/2023 15:13:22 +00:00 +// Chunks: 80 + +``` + +### Delete + +We can delete a key from a bucket: + +```csharp +await store.DeleteAsync("my/random/data.bin"); +``` diff --git a/docs/documentation/toc.yml b/docs/documentation/toc.yml index 8f166681b..51d58d936 100644 --- a/docs/documentation/toc.yml +++ b/docs/documentation/toc.yml @@ -24,5 +24,8 @@ - name: Key/Value Store href: key-value-store/intro.md +- name: Object Store + href: object-store/intro.md + - name: Updating Documentation href: update-docs.md diff --git a/docs/index.md b/docs/index.md index 5ce93393a..577b2a0ab 100644 --- a/docs/index.md +++ b/docs/index.md @@ -11,7 +11,8 @@ The NATS.NET V2 client is in preview and not recommended for production use yet. - [x] Core NATS - [x] JetStream initial support - [x] KV initial support -- [ ] Object Store initial support +- [x] Object Store initial support +- [ ] Service API initial support - [ ] .NET 8.0 support (e.g. Native AOT) - [ ] Beta phase diff --git a/sandbox/Example.ObjectStore/.gitignore b/sandbox/Example.ObjectStore/.gitignore new file mode 100644 index 000000000..3f1bc6e12 --- /dev/null +++ b/sandbox/Example.ObjectStore/.gitignore @@ -0,0 +1 @@ +data* diff --git a/sandbox/Example.ObjectStore/Program.cs b/sandbox/Example.ObjectStore/Program.cs index ee537d297..465f9e54f 100644 --- a/sandbox/Example.ObjectStore/Program.cs +++ b/sandbox/Example.ObjectStore/Program.cs @@ -1,41 +1,48 @@ -using System.Text; +using System.Security.Cryptography; +using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.ObjectStore; -using NATS.Client.ObjectStore.Models; -var nats = new NatsConnection(); +var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) }; + +var nats = new NatsConnection(opts); var js = new NatsJSContext(nats); -var ob = new NatsObjContext(js); +var obj = new NatsObjContext(js); -var store = await ob.CreateObjectStore(new NatsObjConfig("o1")); +Log("Create object store..."); +var store = await obj.CreateObjectStore("test-bucket"); -var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, }; +var data = new byte[1024 * 1024 * 10]; +Random.Shared.NextBytes(data); -var stringBuilder = new StringBuilder(); -for (var i = 0; i < 9; i++) -{ - stringBuilder.Append($"{i:D2}-4567890"); -} +File.WriteAllBytes("data.bin", data); -var buffer90 = stringBuilder.ToString(); -{ - var buffer = Encoding.ASCII.GetBytes(buffer90); - var stream = new MemoryStream(buffer); +Log("Put file..."); +await store.PutAsync("my/random/data.bin", File.OpenRead("data.bin")); - await store.PutAsync(meta, stream); +Log("Get file..."); +await store.GetAsync("my/random/data.bin", File.OpenWrite("data1.bin")); - var data = await store.GetInfoAsync("k1"); +var hash = Convert.ToBase64String(SHA256.HashData(File.ReadAllBytes("data.bin"))); +var hash1 = Convert.ToBase64String(SHA256.HashData(File.ReadAllBytes("data1.bin"))); - Console.WriteLine($"DATA: {data}"); -} +Log($"Check SHA-256: {hash == hash1}"); -{ - var memoryStream = new MemoryStream(); - await store.GetAsync("k1", memoryStream); - await memoryStream.FlushAsync(); - var buffer = memoryStream.ToArray(); - Console.WriteLine($"buffer:{Encoding.ASCII.GetString(buffer)}"); -} +var metadata = await store.GetInfoAsync("my/random/data.bin"); + +Console.WriteLine("Metadata:"); +Console.WriteLine($" Bucket: {metadata.Bucket}"); +Console.WriteLine($" Name: {metadata.Name}"); +Console.WriteLine($" Size: {metadata.Size}"); +Console.WriteLine($" Time: {metadata.MTime}"); +Console.WriteLine($" Chunks: {metadata.Chunks}"); + +await store.DeleteAsync("my/random/data.bin"); Console.WriteLine("Bye"); + +void Log(string message) +{ + Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} {message}"); +} diff --git a/src/NATS.Client.Core/INatsSerializer.cs b/src/NATS.Client.Core/INatsSerializer.cs index 7d476d50e..0a82e66fe 100644 --- a/src/NATS.Client.Core/INatsSerializer.cs +++ b/src/NATS.Client.Core/INatsSerializer.cs @@ -85,8 +85,14 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) { using (memoryOwner) { - bufferWriter.Write(memoryOwner.Memory.Span); - return memoryOwner.Memory.Length; + var length = memoryOwner.Memory.Length; + + var buffer = bufferWriter.GetMemory(length); + memoryOwner.Memory.CopyTo(buffer); + + bufferWriter.Advance(length); + + return length; } } diff --git a/src/NATS.Client.Core/Internal/FixedArrayBufferWriter.cs b/src/NATS.Client.Core/Internal/FixedArrayBufferWriter.cs index bb74aa9eb..adc8138ac 100644 --- a/src/NATS.Client.Core/Internal/FixedArrayBufferWriter.cs +++ b/src/NATS.Client.Core/Internal/FixedArrayBufferWriter.cs @@ -52,7 +52,7 @@ public Memory GetMemory(int sizeHint = 0) { if (_buffer.Length - _written < sizeHint) { - Resize(sizeHint); + Resize(sizeHint + _written); } return _buffer.AsMemory(_written); @@ -63,7 +63,7 @@ public Span GetSpan(int sizeHint = 0) { if (_buffer.Length - _written < sizeHint) { - Resize(sizeHint); + Resize(sizeHint + _written); } return _buffer.AsSpan(_written); diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index 14bd1c78a..f9a7099d0 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -67,6 +67,20 @@ internal static NatsMsg Build( return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); } + /// + /// Reply with an empty message. + /// + /// Optional message headers. + /// Optional reply-to subject. + /// A for publishing options. + /// A used to cancel the command. + /// A that represents the asynchronous send operation. + public ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + { + CheckReplyPreconditions(); + return Connection.PublishAsync(ReplyTo!, default, headers, replyTo, opts, cancellationToken); + } + /// /// Reply to this message. /// diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs index b3c736ce2..35ebb36ca 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -190,7 +190,19 @@ private async Task CommandLoop() if (string.Equals(msg.Subject, subSubject)) { - // Control message: e.g. heartbeat + // Control messages: e.g. heartbeat + if (msg.Headers is { } headers) + { + if (headers.TryGetValue("Nats-Consumer-Stalled", out var flowControlReplyTo)) + { + await _nats.PublishAsync(flowControlReplyTo, cancellationToken: _cancellationToken); + } + + if (headers is { Code: 100, MessageText: "FlowControl Request" }) + { + await msg.ReplyAsync(cancellationToken: _cancellationToken); + } + } } else { diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index ee657083e..29c190b87 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -57,6 +57,17 @@ internal NatsJSMsg(NatsMsg msg, NatsJSContext context) /// public NatsJSMsgMetadata? Metadata => _replyToDateTimeAndSeq.Value; + /// + /// Reply with an empty message. + /// + /// Optional message headers. + /// Optional reply-to subject. + /// A for publishing options. + /// A used to cancel the command. + /// A that represents the asynchronous send operation. + public ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) => + _msg.ReplyAsync(headers, replyTo, opts, cancellationToken); + /// /// Acknowledges the message was completely handled. /// diff --git a/src/NATS.Client.ObjectStore/NatsObjContext.cs b/src/NATS.Client.ObjectStore/NatsObjContext.cs index ebfb89a1a..a2e46a80a 100644 --- a/src/NATS.Client.ObjectStore/NatsObjContext.cs +++ b/src/NATS.Client.ObjectStore/NatsObjContext.cs @@ -20,6 +20,15 @@ public class NatsObjContext /// JetStream context. public NatsObjContext(NatsJSContext context) => _context = context; + /// + /// Create a new object store. + /// + /// Bucket name. + /// A used to cancel the API call. + /// Object store object. + public ValueTask CreateObjectStore(string bucket, CancellationToken cancellationToken = default) => + CreateObjectStore(new NatsObjConfig(bucket), cancellationToken); + /// /// Create a new object store. /// diff --git a/src/NATS.Client.ObjectStore/NatsObjException.cs b/src/NATS.Client.ObjectStore/NatsObjException.cs index 6e411f78c..1a77deb87 100644 --- a/src/NATS.Client.ObjectStore/NatsObjException.cs +++ b/src/NATS.Client.ObjectStore/NatsObjException.cs @@ -26,3 +26,18 @@ public NatsObjException(string message, Exception exception) { } } + +/// +/// NATS Object Store object not found exception. +/// +public class NatsObjNotFoundException : NatsObjException +{ + /// + /// Create a new NATS Object Store object not found exception. + /// + /// Exception message. + public NatsObjNotFoundException(string message) + : base(message) + { + } +} diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index b703b2508..c59b3276e 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -126,8 +126,6 @@ public async ValueTask GetAsync(string key, Stream stream, bool throw new NatsObjException("Size mismatch"); } - await stream.FlushAsync(cancellationToken); - return info; } @@ -173,10 +171,8 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre { info = await GetInfoAsync(meta.Name, cancellationToken: cancellationToken).ConfigureAwait(false); } - catch (NatsJSApiException e) + catch (NatsObjNotFoundException) { - if (e.Error.Code != 404) - throw; } var nuid = NewNuid(); @@ -296,17 +292,28 @@ public async ValueTask GetInfoAsync(string key, bool showDeleted ValidateObjectName(key); var request = new StreamMsgGetRequest { LastBySubj = GetMetaSubject(key) }; + try + { + var response = await _stream.GetAsync(request, cancellationToken); - var response = await _stream.GetAsync(request, cancellationToken); + var data = NatsJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata"); - var data = NatsJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata"); + if (!showDeleted && data.Deleted) + { + throw new NatsObjNotFoundException($"Object not found: {key}"); + } - if (!showDeleted && data.Deleted) - { - throw new NatsObjException("Object not found"); + return data; } + catch (NatsJSApiException e) + { + if (e.Error.Code == 404) + { + throw new NatsObjNotFoundException($"Object not found: {key}"); + } - return data; + throw; + } } /// diff --git a/tests/NATS.Client.Core.Tests/FixedArrayBufferWriterTest.cs b/tests/NATS.Client.Core.Tests/FixedArrayBufferWriterTest.cs index 27f6969f3..20f96e1fc 100644 --- a/tests/NATS.Client.Core.Tests/FixedArrayBufferWriterTest.cs +++ b/tests/NATS.Client.Core.Tests/FixedArrayBufferWriterTest.cs @@ -37,4 +37,30 @@ public void Ensure() newSpan.Length.Should().Be((ushort.MaxValue * 2) - 20000); } + + [Theory] + [InlineData(129, 0, "double capacity")] + [InlineData(257, 0, "adjust capacity to size")] + [InlineData(129, 1, "double capacity when already advanced")] + [InlineData(257, 1, "adjust capacity to size when already advanced")] + public void Resize(int size, int advance, string reason) + { + // GetSpan() + { + var writer = new FixedArrayBufferWriter(128); + if (advance > 0) + writer.Advance(advance); + var span = writer.GetSpan(size); + span.Length.Should().BeGreaterOrEqualTo(size, reason); + } + + // GetMemory() + { + var writer = new FixedArrayBufferWriter(128); + if (advance > 0) + writer.Advance(advance); + var memory = writer.GetMemory(size); + memory.Length.Should().BeGreaterOrEqualTo(size, reason); + } + } } diff --git a/tests/NATS.Client.Core.Tests/TlsFirstTest.cs b/tests/NATS.Client.Core.Tests/TlsFirstTest.cs index 35f48921b..bbc624950 100644 --- a/tests/NATS.Client.Core.Tests/TlsFirstTest.cs +++ b/tests/NATS.Client.Core.Tests/TlsFirstTest.cs @@ -62,7 +62,6 @@ public async Task Implicit_TLS_fails_when_disabled() Assert.Matches(@"can not start to connect nats server", exception.Message); _output.WriteLine($"Implicit TLS connection rejected"); - } // Normal TLS connection should work diff --git a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs index 08e007b4a..09e31b311 100644 --- a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs @@ -186,4 +186,35 @@ public async Task Delete_object() Assert.Equal(0, info2.Chunks); Assert.Equal(string.Empty, info2.Digest); } + + [Fact] + public async Task Put_and_get_large_file() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + var obj = new NatsObjContext(js); + + var store = await obj.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken); + + var data = new byte[1024 * 1024 * 10]; + Random.Shared.NextBytes(data); + + const string filename = $"_tmp_test_file_{nameof(Put_and_get_large_file)}.bin"; + var filename1 = $"{filename}.1"; + + await File.WriteAllBytesAsync(filename, data, cancellationToken); + + await store.PutAsync("my/random/data.bin", File.OpenRead(filename), cancellationToken: cancellationToken); + + await store.GetAsync("my/random/data.bin", File.OpenWrite(filename1), cancellationToken: cancellationToken); + + var hash = Convert.ToBase64String(SHA256.HashData(await File.ReadAllBytesAsync(filename, cancellationToken))); + var hash1 = Convert.ToBase64String(SHA256.HashData(await File.ReadAllBytesAsync(filename1, cancellationToken))); + + Assert.Equal(hash, hash1); + } }