Skip to content

Commit

Permalink
Object Store fixes and docs
Browse files Browse the repository at this point in the history
* Fixed buffer resize on writes larger than default 64K
* Added flow control handling
* Fixed re-putting a deleted object issue
* Added Object Store documentation
  • Loading branch information
mtmk committed Oct 18, 2023
1 parent 5430c18 commit a328b4a
Show file tree
Hide file tree
Showing 17 changed files with 291 additions and 44 deletions.
2 changes: 2 additions & 0 deletions docs/documentation/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ these docs. You can also create a Pull Request using the Edit on GitHub link on
[JetStream](jetstream/intro.md) is the built-in distributed persistence system built-in to the same NATS server binary.

[Key/Value Store](key-value-store/intro.md) is the built-in distributed persistent associative arrays built on top of JetStream.

[Object Store](object-store/intro.md) is the built-in distributed persistent objects of arbitrary size built on top of JetStream.
103 changes: 103 additions & 0 deletions docs/documentation/object-store/intro.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Object Store

[The Object Store](https://docs.nats.io/nats-concepts/jetstream/obj_store) is very similar to the Key Value Store in that you put and get data using a key.
The difference being that Object store allows for the storage of objects that can be of any size.

Under the covers Object Store is a client side construct that allows you to store and retrieve chunks of data
by a key using JetStream as the stream persistence engine. It's a simple, yet powerful way to store
and retrieve large data like files.

To be able to use Object Store you need to enable JetStream by running the server with `-js` flag e.g. `nats-server -js`.

## Object Store Quick Start

[Download the latest](https://nats.io/download/) `nats-server` for your platform and run it with JetStream enabled:

```shell
$ nats-server -js
```

Install `NATS.Client.ObjectStore` preview from Nuget.

Before we can do anything, we need an Object Store context:

```csharp
await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);
```

Let's create our store first. In Object Store, a bucket is simply a storage for key/object pairs:

```csharp
var store = await obj.CreateObjectStore("test-bucket");
```

Now that we have a KV bucket in our stream, let's see its status using the [NATS command
line client](https://github.com/nats-io/natscli):

```shell
$ nats object ls
╭──────────────────────────────────────────────────────────────────────╮
│ Object Store Buckets │
├─────────────┬─────────────┬─────────────────────┬──────┬─────────────┤
│ Bucket │ Description │ Created │ Size │ Last Update │
├─────────────┼─────────────┼─────────────────────┼──────┼─────────────┤
│ test-bucket │ │ 2023-10-18 14:10:27 │ 0 B │ never │
╰─────────────┴─────────────┴─────────────────────┴──────┴─────────────╯
```

We can save objects in a bucket by putting them using a key, which is `my/random/data.bin` in our case. We can also retrieve the
saved value by its key:

```csharp
await store.PutAsync("my/random/data.bin", File.OpenRead("data.bin"));

await store.GetAsync("my/random/data.bin", File.OpenWrite("data_copy.bin"));
```

We can also confirm that our value is persisted by using the NATS command line:

```shell
$ nats object info test-bucket my/random/data.bin
Object information for test-bucket > my/random/data.bin

Size: 10 MiB
Modification Time: 18 Oct 23 14:54 +0000
Chunks: 80
Digest: SHA-256 d34334673e4e2b2300c09550faa5e2b6d0f04245a1d0b664454bb922da56
```

## Other Operations

### Get Info

We can get information about a key in a bucket:

```csharp
var metadata = await store.GetInfoAsync("my/random/data.bin");

Console.WriteLine("Metadata:");
Console.WriteLine($" Bucket: {metadata.Bucket}");
Console.WriteLine($" Name: {metadata.Name}");
Console.WriteLine($" Size: {metadata.Size}");
Console.WriteLine($" Time: {metadata.MTime}");
Console.WriteLine($" Chunks: {metadata.Chunks}");

// Outputs:
// Metadata:
// Bucket: test-bucket
// Name: my/random/data.bin
// Size: 10485760
// Time: 18/10/2023 15:13:22 +00:00
// Chunks: 80
```

### Delete

We can delete a key from a bucket:

```csharp
await store.DeleteAsync("my/random/data.bin");
```
3 changes: 3 additions & 0 deletions docs/documentation/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,8 @@
- name: Key/Value Store
href: key-value-store/intro.md

- name: Object Store
href: object-store/intro.md

- name: Updating Documentation
href: update-docs.md
3 changes: 2 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ The NATS.NET V2 client is in preview and not recommended for production use yet.
- [x] Core NATS
- [x] JetStream initial support
- [x] KV initial support
- [ ] Object Store initial support
- [x] Object Store initial support
- [ ] Service API initial support
- [ ] .NET 8.0 support (e.g. Native AOT)
- [ ] Beta phase

Expand Down
1 change: 1 addition & 0 deletions sandbox/Example.ObjectStore/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
data*
59 changes: 33 additions & 26 deletions sandbox/Example.ObjectStore/Program.cs
Original file line number Diff line number Diff line change
@@ -1,41 +1,48 @@
using System.Text;
using System.Security.Cryptography;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.ObjectStore;
using NATS.Client.ObjectStore.Models;

var nats = new NatsConnection();
var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };

var nats = new NatsConnection(opts);
var js = new NatsJSContext(nats);
var ob = new NatsObjContext(js);
var obj = new NatsObjContext(js);

var store = await ob.CreateObjectStore(new NatsObjConfig("o1"));
Log("Create object store...");
var store = await obj.CreateObjectStore("test-bucket");

var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, };
var data = new byte[1024 * 1024 * 10];
Random.Shared.NextBytes(data);

var stringBuilder = new StringBuilder();
for (var i = 0; i < 9; i++)
{
stringBuilder.Append($"{i:D2}-4567890");
}
File.WriteAllBytes("data.bin", data);

var buffer90 = stringBuilder.ToString();
{
var buffer = Encoding.ASCII.GetBytes(buffer90);
var stream = new MemoryStream(buffer);
Log("Put file...");
await store.PutAsync("my/random/data.bin", File.OpenRead("data.bin"));

await store.PutAsync(meta, stream);
Log("Get file...");
await store.GetAsync("my/random/data.bin", File.OpenWrite("data1.bin"));

var data = await store.GetInfoAsync("k1");
var hash = Convert.ToBase64String(SHA256.HashData(File.ReadAllBytes("data.bin")));
var hash1 = Convert.ToBase64String(SHA256.HashData(File.ReadAllBytes("data1.bin")));

Console.WriteLine($"DATA: {data}");
}
Log($"Check SHA-256: {hash == hash1}");

{
var memoryStream = new MemoryStream();
await store.GetAsync("k1", memoryStream);
await memoryStream.FlushAsync();
var buffer = memoryStream.ToArray();
Console.WriteLine($"buffer:{Encoding.ASCII.GetString(buffer)}");
}
var metadata = await store.GetInfoAsync("my/random/data.bin");

Console.WriteLine("Metadata:");
Console.WriteLine($" Bucket: {metadata.Bucket}");
Console.WriteLine($" Name: {metadata.Name}");
Console.WriteLine($" Size: {metadata.Size}");
Console.WriteLine($" Time: {metadata.MTime}");
Console.WriteLine($" Chunks: {metadata.Chunks}");

await store.DeleteAsync("my/random/data.bin");

Console.WriteLine("Bye");

void Log(string message)
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} {message}");
}
10 changes: 8 additions & 2 deletions src/NATS.Client.Core/INatsSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,14 @@ public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
{
using (memoryOwner)
{
bufferWriter.Write(memoryOwner.Memory.Span);
return memoryOwner.Memory.Length;
var length = memoryOwner.Memory.Length;

var buffer = bufferWriter.GetMemory(length);
memoryOwner.Memory.CopyTo(buffer);

bufferWriter.Advance(length);

return length;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.Core/Internal/FixedArrayBufferWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Memory<byte> GetMemory(int sizeHint = 0)
{
if (_buffer.Length - _written < sizeHint)
{
Resize(sizeHint);
Resize(sizeHint + _written);
}

return _buffer.AsMemory(_written);
Expand All @@ -63,7 +63,7 @@ public Span<byte> GetSpan(int sizeHint = 0)
{
if (_buffer.Length - _written < sizeHint)
{
Resize(sizeHint);
Resize(sizeHint + _written);
}

return _buffer.AsSpan(_written);
Expand Down
14 changes: 14 additions & 0 deletions src/NATS.Client.Core/NatsMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ internal static NatsMsg<T> Build(
return new NatsMsg<T>(subject, replyTo, (int)size, headers, data, connection);
}

/// <summary>
/// Reply with an empty message.
/// </summary>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
public ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
CheckReplyPreconditions();
return Connection.PublishAsync<object?>(ReplyTo!, default, headers, replyTo, opts, cancellationToken);
}

/// <summary>
/// Reply to this message.
/// </summary>
Expand Down
14 changes: 13 additions & 1 deletion src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,19 @@ private async Task CommandLoop()

if (string.Equals(msg.Subject, subSubject))
{
// Control message: e.g. heartbeat
// Control messages: e.g. heartbeat
if (msg.Headers is { } headers)
{
if (headers.TryGetValue("Nats-Consumer-Stalled", out var flowControlReplyTo))
{
await _nats.PublishAsync(flowControlReplyTo, cancellationToken: _cancellationToken);
}

if (headers is { Code: 100, MessageText: "FlowControl Request" })
{
await msg.ReplyAsync(cancellationToken: _cancellationToken);
}
}
}
else
{
Expand Down
11 changes: 11 additions & 0 deletions src/NATS.Client.JetStream/NatsJSMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ internal NatsJSMsg(NatsMsg<T> msg, NatsJSContext context)
/// </summary>
public NatsJSMsgMetadata? Metadata => _replyToDateTimeAndSeq.Value;

/// <summary>
/// Reply with an empty message.
/// </summary>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
public ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) =>
_msg.ReplyAsync(headers, replyTo, opts, cancellationToken);

/// <summary>
/// Acknowledges the message was completely handled.
/// </summary>
Expand Down
9 changes: 9 additions & 0 deletions src/NATS.Client.ObjectStore/NatsObjContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ public class NatsObjContext
/// <param name="context">JetStream context.</param>
public NatsObjContext(NatsJSContext context) => _context = context;

/// <summary>
/// Create a new object store.
/// </summary>
/// <param name="bucket">Bucket name.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object store object.</returns>
public ValueTask<NatsObjStore> CreateObjectStore(string bucket, CancellationToken cancellationToken = default) =>
CreateObjectStore(new NatsObjConfig(bucket), cancellationToken);

/// <summary>
/// Create a new object store.
/// </summary>
Expand Down
15 changes: 15 additions & 0 deletions src/NATS.Client.ObjectStore/NatsObjException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,18 @@ public NatsObjException(string message, Exception exception)
{
}
}

/// <summary>
/// NATS Object Store object not found exception.
/// </summary>
public class NatsObjNotFoundException : NatsObjException
{
/// <summary>
/// Create a new NATS Object Store object not found exception.
/// </summary>
/// <param name="message">Exception message.</param>
public NatsObjNotFoundException(string message)
: base(message)
{
}
}
29 changes: 18 additions & 11 deletions src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ public async ValueTask<ObjectMetadata> GetAsync(string key, Stream stream, bool
throw new NatsObjException("Size mismatch");
}

await stream.FlushAsync(cancellationToken);

return info;
}

Expand Down Expand Up @@ -173,10 +171,8 @@ public async ValueTask<ObjectMetadata> PutAsync(ObjectMetadata meta, Stream stre
{
info = await GetInfoAsync(meta.Name, cancellationToken: cancellationToken).ConfigureAwait(false);
}
catch (NatsJSApiException e)
catch (NatsObjNotFoundException)
{
if (e.Error.Code != 404)
throw;
}

var nuid = NewNuid();
Expand Down Expand Up @@ -296,17 +292,28 @@ public async ValueTask<ObjectMetadata> GetInfoAsync(string key, bool showDeleted
ValidateObjectName(key);

var request = new StreamMsgGetRequest { LastBySubj = GetMetaSubject(key) };
try
{
var response = await _stream.GetAsync(request, cancellationToken);

var response = await _stream.GetAsync(request, cancellationToken);
var data = NatsJsonSerializer.Default.Deserialize<ObjectMetadata>(new ReadOnlySequence<byte>(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata");

var data = NatsJsonSerializer.Default.Deserialize<ObjectMetadata>(new ReadOnlySequence<byte>(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata");
if (!showDeleted && data.Deleted)
{
throw new NatsObjNotFoundException($"Object not found: {key}");
}

if (!showDeleted && data.Deleted)
{
throw new NatsObjException("Object not found");
return data;
}
catch (NatsJSApiException e)
{
if (e.Error.Code == 404)
{
throw new NatsObjNotFoundException($"Object not found: {key}");
}

return data;
throw;
}
}

/// <summary>
Expand Down
Loading

0 comments on commit a328b4a

Please sign in to comment.