From 98974181ee29fe4cac4c209b7989eaef39eb2544 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 12 Oct 2023 17:53:08 +0100 Subject: [PATCH 1/9] Initial Object Store implementation --- NATS.Client.sln | 14 ++ NATS.Client.sln.DotSettings | 1 + src/NATS.Client.Core/INatsSerializer.cs | 19 +- src/NATS.Client.Core/NATS.Client.Core.csproj | 2 + .../NATS.Client.JetStream.csproj | 2 + .../Internal/Encoder.cs | 219 ++++++++++++++++++ .../Internal/NatsOBSub.cs | 85 +++++++ .../Models/ObjectMetadata.cs | 75 ++++++ .../NATS.Client.ObjectStore.csproj | 20 ++ src/NATS.Client.ObjectStore/NatsOBConfig.cs | 25 ++ src/NATS.Client.ObjectStore/NatsOBContext.cs | 70 ++++++ .../NatsOBException.cs | 16 ++ src/NATS.Client.ObjectStore/NatsOBStore.cs | 205 ++++++++++++++++ .../Base64UrlEncoderTest.cs | 69 ++++++ .../GlobalUsings.cs | 1 + .../NATS.Client.ObjectStore.Tests.csproj | 37 +++ .../ObjectStoreTest.cs | 73 ++++++ 17 files changed, 931 insertions(+), 2 deletions(-) create mode 100644 src/NATS.Client.ObjectStore/Internal/Encoder.cs create mode 100644 src/NATS.Client.ObjectStore/Internal/NatsOBSub.cs create mode 100644 src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs create mode 100644 src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj create mode 100644 src/NATS.Client.ObjectStore/NatsOBConfig.cs create mode 100644 src/NATS.Client.ObjectStore/NatsOBContext.cs create mode 100644 src/NATS.Client.ObjectStore/NatsOBException.cs create mode 100644 src/NATS.Client.ObjectStore/NatsOBStore.cs create mode 100644 tests/NATS.Client.ObjectStore.Tests/Base64UrlEncoderTest.cs create mode 100644 tests/NATS.Client.ObjectStore.Tests/GlobalUsings.cs create mode 100644 tests/NATS.Client.ObjectStore.Tests/NATS.Client.ObjectStore.Tests.csproj create mode 100644 tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs diff --git a/NATS.Client.sln b/NATS.Client.sln index 2f0aff886..53ac9d290 100644 --- a/NATS.Client.sln +++ b/NATS.Client.sln @@ -73,6 +73,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.KeyValueStore.T EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.KeyValueStore.Watcher", "sandbox\Example.KeyValueStore.Watcher\Example.KeyValueStore.Watcher.csproj", "{912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.ObjectStore", "src\NATS.Client.ObjectStore\NATS.Client.ObjectStore.csproj", "{3F8840BA-4F91-4359-AA53-6B26823E7F55}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.ObjectStore.Tests", "tests\NATS.Client.ObjectStore.Tests\NATS.Client.ObjectStore.Tests.csproj", "{BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -183,6 +187,14 @@ Global {912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}.Debug|Any CPU.Build.0 = Debug|Any CPU {912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}.Release|Any CPU.ActiveCfg = Release|Any CPU {912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53}.Release|Any CPU.Build.0 = Release|Any CPU + {3F8840BA-4F91-4359-AA53-6B26823E7F55}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3F8840BA-4F91-4359-AA53-6B26823E7F55}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3F8840BA-4F91-4359-AA53-6B26823E7F55}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3F8840BA-4F91-4359-AA53-6B26823E7F55}.Release|Any CPU.Build.0 = Release|Any CPU + {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -215,6 +227,8 @@ Global {A102AB7B-A90C-4717-B17C-045240838060} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} {908F2CED-CAC0-4A4E-AD19-362A413B5DA4} = {C526E8AB-739A-48D7-8FC4-048978C9B650} {912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53} = {95A69671-16CA-4133-981C-CC381B7AAA30} + {3F8840BA-4F91-4359-AA53-6B26823E7F55} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} + {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E} = {C526E8AB-739A-48D7-8FC4-048978C9B650} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA} diff --git a/NATS.Client.sln.DotSettings b/NATS.Client.sln.DotSettings index e0bb2621b..5f92e7257 100644 --- a/NATS.Client.sln.DotSettings +++ b/NATS.Client.sln.DotSettings @@ -5,6 +5,7 @@ JS KV LF + OB True True True diff --git a/src/NATS.Client.Core/INatsSerializer.cs b/src/NATS.Client.Core/INatsSerializer.cs index 8c6cb819f..7d476d50e 100644 --- a/src/NATS.Client.Core/INatsSerializer.cs +++ b/src/NATS.Client.Core/INatsSerializer.cs @@ -18,6 +18,21 @@ public interface ICountableBufferWriter : IBufferWriter int WrittenCount { get; } } +public readonly struct FixedSizeMemoryOwner : IMemoryOwner +{ + private readonly IMemoryOwner _owner; + + public FixedSizeMemoryOwner(IMemoryOwner owner, int size) + { + _owner = owner; + Memory = _owner.Memory.Slice(0, size); + } + + public Memory Memory { get; } + + public void Dispose() => _owner.Dispose(); +} + public static class NatsDefaultSerializer { public static readonly INatsSerializer Default = new NatsRawSerializer(NatsJsonSerializer.Default); @@ -105,9 +120,9 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) if (typeof(T) == typeof(IMemoryOwner)) { - var memoryOwner = MemoryPool.Shared.Rent((int)buffer.Length); + var memoryOwner = new FixedSizeMemoryOwner(MemoryPool.Shared.Rent((int)buffer.Length), (int)buffer.Length); buffer.CopyTo(memoryOwner.Memory.Span); - return (T)memoryOwner; + return (T)(object)memoryOwner; } if (Next != null) diff --git a/src/NATS.Client.Core/NATS.Client.Core.csproj b/src/NATS.Client.Core/NATS.Client.Core.csproj index 96d9b019d..db709f6e2 100644 --- a/src/NATS.Client.Core/NATS.Client.Core.csproj +++ b/src/NATS.Client.Core/NATS.Client.Core.csproj @@ -29,5 +29,7 @@ + + diff --git a/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj b/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj index 16158b740..62a3ceef3 100644 --- a/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj +++ b/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj @@ -16,6 +16,8 @@ + + diff --git a/src/NATS.Client.ObjectStore/Internal/Encoder.cs b/src/NATS.Client.ObjectStore/Internal/Encoder.cs new file mode 100644 index 000000000..6325aa141 --- /dev/null +++ b/src/NATS.Client.ObjectStore/Internal/Encoder.cs @@ -0,0 +1,219 @@ +using System.Security.Cryptography; + +namespace NATS.Client.ObjectStore.Internal; + +// Borrowed from https://github.com/AzureAD/azure-activedirectory-identitymodel-extensions-for-dotnet/blob/6.32.3/src/Microsoft.IdentityModel.Tokens/Base64UrlEncoder.cs +using System; +using System.Text; + +/// +/// Encodes and Decodes strings as Base64Url encoding. +/// +public static class Base64UrlEncoder +{ + private const char Base64PadCharacter = '='; + private const char Base64Character62 = '+'; + private const char Base64Character63 = '/'; + private const char Base64UrlCharacter62 = '-'; + private const char Base64UrlCharacter63 = '_'; + + /// + /// Encoding table + /// + private static readonly char[] SBase64Table = new[] + { + 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', + '5', '6', '7', '8', '9', Base64UrlCharacter62, Base64UrlCharacter63, + }; + + public static string Sha256(ReadOnlySpan value) + { + Span destination = stackalloc byte[256 / 8]; + using (var sha256 = SHA256.Create()) + { + sha256.TryComputeHash(value, destination, out _); + } + + return Encode(destination); + } + + /// + /// The following functions perform base64url encoding which differs from regular base64 encoding as follows + /// * padding is skipped so the pad character '=' doesn't have to be percent encoded + /// * the 62nd and 63rd regular base64 encoding characters ('+' and '/') are replace with ('-' and '_') + /// The changes make the encoding alphabet file and URL safe. + /// + /// string to encode. + /// Base64Url encoding of the UTF8 bytes. + public static string Encode(string arg) + { + _ = arg ?? throw new ArgumentNullException(nameof(arg)); + + return Encode(Encoding.UTF8.GetBytes(arg)); + } + + /// + /// Converts a subset of an array of 8-bit unsigned integers to its equivalent string representation which is encoded with base-64-url digits. Parameters specify + /// the subset as an offset in the input array, and the number of elements in the array to convert. + /// + /// An array of 8-bit unsigned integers. + /// Remove padding + /// The string representation in base 64 url encoding of length elements of inArray, starting at position offset. + /// 'inArray' is null. + /// offset or length is negative OR offset plus length is greater than the length of inArray. + public static string Encode(Span inArray, bool raw = false) + { + var offset = 0; + var length = inArray.Length; + + if (length == 0) + return string.Empty; + + var lengthMod3 = length % 3; + var limit = length - lengthMod3; + var output = new char[(length + 2) / 3 * 4]; + var table = SBase64Table; + int i, j = 0; + + // takes 3 bytes from inArray and insert 4 bytes into output + for (i = offset; i < limit; i += 3) + { + var d0 = inArray[i]; + var d1 = inArray[i + 1]; + var d2 = inArray[i + 2]; + + output[j + 0] = table[d0 >> 2]; + output[j + 1] = table[((d0 & 0x03) << 4) | (d1 >> 4)]; + output[j + 2] = table[((d1 & 0x0f) << 2) | (d2 >> 6)]; + output[j + 3] = table[d2 & 0x3f]; + j += 4; + } + + // Where we left off before + i = limit; + + switch (lengthMod3) + { + case 2: + { + var d0 = inArray[i]; + var d1 = inArray[i + 1]; + + output[j + 0] = table[d0 >> 2]; + output[j + 1] = table[((d0 & 0x03) << 4) | (d1 >> 4)]; + output[j + 2] = table[(d1 & 0x0f) << 2]; + j += 3; + } + + break; + + case 1: + { + var d0 = inArray[i]; + + output[j + 0] = table[d0 >> 2]; + output[j + 1] = table[(d0 & 0x03) << 4]; + j += 2; + } + + break; + + // default or case 0: no further operations are needed. + } + + if (raw) + return new string(output, 0, j); + + for (var k = j; k < output.Length; k++) + { + output[k] = Base64PadCharacter; + } + + return new string(output); + } + + /// + /// Decodes the string from Base64UrlEncoded to UTF8. + /// + /// string to decode. + /// UTF8 string. + public static string Decode(string arg) + { + return Encoding.UTF8.GetString(DecodeBytes(arg)); + } + + /// + /// Converts the specified string, base-64-url encoded to utf8 bytes. + /// base64Url encoded string. + /// UTF8 bytes. + public static byte[] DecodeBytes(string str) + { + _ = str ?? throw new ArgumentNullException(nameof(str)); + return UnsafeDecode(str); + } + + private static unsafe byte[] UnsafeDecode(string str) + { + var mod = str.Length % 4; + if (mod == 1) + throw new FormatException(nameof(str)); + + var needReplace = false; + var decodedLength = str.Length + ((4 - mod) % 4); + + for (var i = 0; i < str.Length; i++) + { + if (str[i] == Base64UrlCharacter62 || str[i] == Base64UrlCharacter63) + { + needReplace = true; + break; + } + } + + if (needReplace) + { + string decodedString = new(char.MinValue, decodedLength); + fixed (char* dest = decodedString) + { + var i = 0; + for (; i < str.Length; i++) + { + if (str[i] == Base64UrlCharacter62) + dest[i] = Base64Character62; + else if (str[i] == Base64UrlCharacter63) + dest[i] = Base64Character63; + else + dest[i] = str[i]; + } + + for (; i < decodedLength; i++) + dest[i] = Base64PadCharacter; + } + + return Convert.FromBase64String(decodedString); + } + else + { + if (decodedLength == str.Length) + { + return Convert.FromBase64String(str); + } + else + { + string decodedString = new(char.MinValue, decodedLength); + fixed (char* src = str) + { + fixed (char* dest = decodedString) + { + Buffer.MemoryCopy(src, dest, str.Length * 2, str.Length * 2); + dest[str.Length] = Base64PadCharacter; + if (str.Length + 2 == decodedLength) + dest[str.Length + 1] = Base64PadCharacter; + } + } + + return Convert.FromBase64String(decodedString); + } + } + } +} diff --git a/src/NATS.Client.ObjectStore/Internal/NatsOBSub.cs b/src/NATS.Client.ObjectStore/Internal/NatsOBSub.cs new file mode 100644 index 000000000..a9357afff --- /dev/null +++ b/src/NATS.Client.ObjectStore/Internal/NatsOBSub.cs @@ -0,0 +1,85 @@ +using System.Buffers; +using System.Threading.Channels; +using NATS.Client.Core; +using NATS.Client.JetStream; + +namespace NATS.Client.ObjectStore.Internal; + +internal enum NatsOBSubCommand +{ + Msg, + Ready, +} + +internal readonly struct NatsOBSubMsg +{ + public NatsOBSubMsg() + { + } + + public NatsOBSubCommand Command { get; init; } = default; + + public NatsJSMsg Msg { get; init; } = default; +} + +internal class NatsOBSub : NatsSubBase +{ + private readonly NatsJSContext _context; + private readonly CancellationToken _cancellationToken; + private readonly NatsConnection _nats; + private readonly NatsHeaderParser _headerParser; + private readonly INatsSerializer _serializer; + private readonly ChannelWriter> _commands; + + public NatsOBSub( + NatsJSContext context, + Channel> commandChannel, + NatsSubOpts? opts, + CancellationToken cancellationToken) + : base( + connection: context.Connection, + manager: context.Connection.SubscriptionManager, + subject: context.NewInbox(), + queueGroup: default, + opts) + { + _context = context; + _cancellationToken = cancellationToken; + _serializer = opts?.Serializer ?? context.Connection.Opts.Serializer; + _nats = context.Connection; + _headerParser = _nats.HeaderParser; + _commands = commandChannel.Writer; + _nats.ConnectionOpened += OnConnectionOpened; + } + + public override async ValueTask ReadyAsync() + { + await base.ReadyAsync(); + await _commands.WriteAsync(new NatsOBSubMsg { Command = NatsOBSubCommand.Ready }, _cancellationToken).ConfigureAwait(false); + } + + public override ValueTask DisposeAsync() + { + _nats.ConnectionOpened -= OnConnectionOpened; + return base.DisposeAsync(); + } + + protected override async ValueTask ReceiveInternalAsync( + string subject, + string? replyTo, + ReadOnlySequence? headersBuffer, + ReadOnlySequence payloadBuffer) + { + var msg = new NatsJSMsg(NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _headerParser, _serializer), _context); + await _commands.WriteAsync(new NatsOBSubMsg { Command = NatsOBSubCommand.Msg, Msg = msg }, _cancellationToken).ConfigureAwait(false); + } + + protected override void TryComplete() + { + } + + private void OnConnectionOpened(object? sender, string e) + { + _commands.TryWrite(new NatsOBSubMsg { Command = NatsOBSubCommand.Ready }); + } +} diff --git a/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs new file mode 100644 index 000000000..5d8ebb761 --- /dev/null +++ b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs @@ -0,0 +1,75 @@ +using System.Text.Json.Serialization; + +namespace NATS.Client.ObjectStore.Models; + +public record ObjectMetadata +{ + /// + /// Object name + /// + [JsonPropertyName("name")] + public string Name { get; set; } = default!; + + /// + /// Bucket name + /// + [JsonPropertyName("bucket")] + public string Bucket { get; set; } = default!; + + /// + /// Object NUID + /// + [JsonPropertyName("nuid")] + public string Nuid { get; set; } = default!; + + /// + /// Max chunk size + /// + [JsonPropertyName("size")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int Size { get; set; } = default!; + + /// + /// Modified timestamp + /// + [JsonPropertyName("mtime")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public DateTimeOffset MTime { get; set; } = default!; + + /// + /// Number of chunks + /// + [JsonPropertyName("chunks")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int Chunks { get; set; } = default!; + + /// + /// Object digest + /// + [JsonPropertyName("digest")] + public string Digest { get; set; } = default!; + + /// + /// Object metadata + /// + [JsonPropertyName("meta")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public Dictionary Meta { get; set; } = default!; + + /// + /// Object options + /// + [JsonPropertyName("options")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public Options Options { get; set; } = default!; +} + +public record Options +{ + /// + /// Max chunk size + /// + [JsonPropertyName("max_chunk_size")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public int MaxChunkSize { get; set; } = default!; +} diff --git a/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj b/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj new file mode 100644 index 000000000..d58a0e017 --- /dev/null +++ b/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj @@ -0,0 +1,20 @@ + + + + net6.0 + enable + enable + true + + + pubsub;messaging + JetStream Object Store support for NATS.Client. + false + + + + + + + + diff --git a/src/NATS.Client.ObjectStore/NatsOBConfig.cs b/src/NATS.Client.ObjectStore/NatsOBConfig.cs new file mode 100644 index 000000000..afbffb64e --- /dev/null +++ b/src/NATS.Client.ObjectStore/NatsOBConfig.cs @@ -0,0 +1,25 @@ +namespace NATS.Client.ObjectStore; + +/// +/// Object Store storage type +/// +public enum NatsOBStorageType +{ + File = 0, + Memory = 1, +} + +public record NatsOBConfig(string Bucket) +{ + public string? Description { get; init; } + + public TimeSpan? MaxAge { get; init; } + + public long? MaxBytes { get; init; } + + public NatsOBStorageType? Storage { get; init; } + + public int NumberOfReplicas { get; init; } = 1; + + public Dictionary? Metadata { get; init; } +} diff --git a/src/NATS.Client.ObjectStore/NatsOBContext.cs b/src/NATS.Client.ObjectStore/NatsOBContext.cs new file mode 100644 index 000000000..f5f602005 --- /dev/null +++ b/src/NATS.Client.ObjectStore/NatsOBContext.cs @@ -0,0 +1,70 @@ +using System.Text.RegularExpressions; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Internal; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.ObjectStore; + +public class NatsOBContext +{ + private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled); + + private readonly NatsJSContext _context; + + public NatsOBContext(NatsJSContext context) + { + _context = context; + } + + public async ValueTask CreateObjectStore(NatsOBConfig config, CancellationToken cancellationToken = default) + { + ValidateBucketName(config.Bucket); + + var storage = config.Storage == NatsOBStorageType.File + ? StreamConfigurationStorage.file + : StreamConfigurationStorage.memory; + + var streamConfiguration = new StreamConfiguration + { + Name = $"OBJ_{config.Bucket}", + Description = config.Description!, + Subjects = new[] { $"$O.{config.Bucket}.C.>", $"$O.{config.Bucket}.M.>" }, + MaxAge = config.MaxAge?.ToNanos() ?? 0, + MaxBytes = config.MaxBytes ?? -1, + Storage = storage, + NumReplicas = config.NumberOfReplicas, + /* TODO: Placement = */ + Discard = StreamConfigurationDiscard.@new, + AllowRollupHdrs = true, + AllowDirect = true, + Metadata = config.Metadata!, + Retention = StreamConfigurationRetention.limits, + }; + + var stream = await _context.CreateStreamAsync(streamConfiguration, cancellationToken); + return new NatsOBStore(config, _context, stream); + } + + private void ValidateBucketName(string bucket) + { + if (string.IsNullOrWhiteSpace(bucket)) + { + throw new NatsOBException("Bucket name can't be empty"); + } + + if (bucket.StartsWith(".")) + { + throw new NatsOBException("Bucket name can't start with a period"); + } + + if (bucket.EndsWith(".")) + { + throw new NatsOBException("Bucket name can't end with a period"); + } + + if (!ValidBucketRegex.IsMatch(bucket)) + { + throw new NatsOBException("Bucket name can only contain alphanumeric characters, dashes, and underscores"); + } + } +} diff --git a/src/NATS.Client.ObjectStore/NatsOBException.cs b/src/NATS.Client.ObjectStore/NatsOBException.cs new file mode 100644 index 000000000..43f16dc5d --- /dev/null +++ b/src/NATS.Client.ObjectStore/NatsOBException.cs @@ -0,0 +1,16 @@ +using NATS.Client.JetStream; + +namespace NATS.Client.ObjectStore; + +public class NatsOBException : NatsJSException +{ + public NatsOBException(string message) + : base(message) + { + } + + public NatsOBException(string message, Exception exception) + : base(message, exception) + { + } +} diff --git a/src/NATS.Client.ObjectStore/NatsOBStore.cs b/src/NATS.Client.ObjectStore/NatsOBStore.cs new file mode 100644 index 000000000..b60cfb572 --- /dev/null +++ b/src/NATS.Client.ObjectStore/NatsOBStore.cs @@ -0,0 +1,205 @@ +using System.Buffers; +using System.Drawing; +using System.Security.Cryptography; +using System.Text.RegularExpressions; +using NATS.Client.Core; +using NATS.Client.Core.Internal; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Internal; +using NATS.Client.JetStream.Models; +using NATS.Client.ObjectStore.Internal; +using NATS.Client.ObjectStore.Models; + +namespace NATS.Client.ObjectStore; + +public class NatsOBStore +{ + private const int DefaultChunkSize = 128 * 1024; + private const string NatsRollup = "Nats-Rollup"; + private const string RollupSubject = "sub"; + + private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } }; + private static readonly Regex ValidObjectRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled); + + private readonly string _bucket; + private readonly NatsOBConfig _config; + private readonly NatsJSContext _context; + private readonly NatsJSStream _stream; + private readonly NatsConnection _nats; + + internal NatsOBStore(NatsOBConfig config, NatsJSContext context, NatsJSStream stream) + { + _bucket = config.Bucket; + _config = config; + _context = context; + _nats = context.Connection; + _stream = stream; + } + + // public async ValueTask GetAsync(string name, Stream stream, CancellationToken cancellationToken = default) + // { + // ValidateObjectName(name); + // var info = await GetInfoAsync(name, cancellationToken); + // + // } + + public async ValueTask PutAsync(ObjectMetadata meta, Stream stream, CancellationToken cancellationToken = default) + { + ValidateObjectName(meta.Name); + + ObjectMetadata? info = null; + try + { + info = await GetInfoAsync(meta.Name, cancellationToken); + } + catch (NatsJSApiException e) + { + if (e.Error.Code != 404) + throw; + } + + var nuid = NewNuid(); + var encodedName = Base64UrlEncoder.Encode(meta.Name); + + meta.Bucket = _bucket; + meta.Nuid = nuid; + meta.MTime = DateTimeOffset.UtcNow; + + if (meta.Options == null!) + { + meta.Options = new Options { MaxChunkSize = DefaultChunkSize }; + } + + if (meta.Options.MaxChunkSize == 0) + { + meta.Options.MaxChunkSize = DefaultChunkSize; + } + + var size = 0; + var chunks = 0; + var chunkSize = meta.Options.MaxChunkSize; + + string digest; + using (var sha256 = SHA256.Create()) + { + await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read)) + { + while (true) + { + var memoryOwner = MemoryPool.Shared.Rent(chunkSize); + + var memory = memoryOwner.Memory; + var currentChunkSize = 0; + var eof = false; + + // Fill a chunk + while (true) + { + var read = await hashedStream.ReadAsync(memory, cancellationToken); + + // End of stream + if (read == 0) + { + eof = true; + break; + } + + memory = memory.Slice(read); + currentChunkSize += read; + + // Chunk filled + if (memory.IsEmpty) + { + break; + } + } + + if (currentChunkSize > 0) + { + size += currentChunkSize; + chunks++; + } + + var buffer = new FixedSizeMemoryOwner(memoryOwner, currentChunkSize); + + // Chunks + var ack1 = await _context.PublishAsync($"$O.{_bucket}.C.{nuid}", buffer, cancellationToken: cancellationToken); + ack1.EnsureSuccess(); + + if (eof) + break; + } + } + + if (sha256.Hash == null) + throw new NatsOBException("Can't compute SHA256 hash"); + + digest = Base64UrlEncoder.Encode(sha256.Hash); + } + + meta.Chunks = chunks; + meta.Size = size; + meta.Digest = $"SHA-256={digest}"; + + // Metadata + var ack2 = await _context.PublishAsync($"$O.{_bucket}.M.{encodedName}", meta, headers: NatsRollupHeaders, cancellationToken: cancellationToken); + ack2.EnsureSuccess(); + + // Delete the old object + if (info != null && info.Nuid != nuid) + { + try + { + await _context.JSRequestResponseAsync( + subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{_bucket}", + request: new StreamPurgeRequest { Filter = $"$O.{_bucket}.C.{info.Nuid}" }, + cancellationToken); + } + catch (NatsJSApiException e) + { + if (e.Error.Code != 404) + throw; + } + } + + return meta; + } + + public async ValueTask GetInfoAsync(string key, CancellationToken cancellationToken = default) + { + var request = new StreamMsgGetRequest { LastBySubj = $"$O.{_bucket}.M.{Base64UrlEncoder.Encode(key)}", }; + + var response = await _stream.GetAsync(request, cancellationToken); + + var data = NatsJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))); + + if (data == null) + throw new NatsOBException("Can't deserialize object metadata"); + + return data; + } + + private string NewNuid() + { + Span buffer = stackalloc char[22]; + if (NuidWriter.TryWriteNuid(buffer)) + { + return new string(buffer); + } + + throw new InvalidOperationException("Internal error: can't generate nuid"); + } + + private void ValidateObjectName(string name) + { + if (string.IsNullOrWhiteSpace(name)) + { + throw new NatsOBException("Object name can't be empty"); + } + + if (!ValidObjectRegex.IsMatch(name)) + { + throw new NatsOBException("Object name can only contain alphanumeric characters, dashes, underscores, forward slash, equals sign, and periods"); + } + } +} diff --git a/tests/NATS.Client.ObjectStore.Tests/Base64UrlEncoderTest.cs b/tests/NATS.Client.ObjectStore.Tests/Base64UrlEncoderTest.cs new file mode 100644 index 000000000..1c94bc298 --- /dev/null +++ b/tests/NATS.Client.ObjectStore.Tests/Base64UrlEncoderTest.cs @@ -0,0 +1,69 @@ +using System.Text; +using NATS.Client.ObjectStore.Internal; + +namespace NATS.Client.ObjectStore.Tests; + +public class Base64UrlEncoderTest +{ + private readonly ITestOutputHelper _output; + + public Base64UrlEncoderTest(ITestOutputHelper output) => _output = output; + + [Theory] + [InlineData("Hello World!")] + [InlineData("!~£$%^&*()_+{}:@~<>?")] + [InlineData("C")] + [InlineData("AB")] + [InlineData("ABC")] + public void Encoding_test(string input) + { + var encoded = Base64UrlEncoder.Encode(Encoding.UTF8.GetBytes(input)); + var expected = Encode(input); + Assert.Equal(expected, encoded); + _output.WriteLine($">>{encoded}<<"); + } + + [Theory] + [InlineData("SGVsbG8gV29ybGQh")] + [InlineData("IX7CoyQlXiYqKClfK3t9OkB-PD4_")] + [InlineData("Qw==")] + [InlineData("QUI=")] + [InlineData("QUJD")] + public void Decoding_test(string input) + { + var decoded = Base64UrlEncoder.Decode(input); + var expected = Decode(input); + Assert.Equal(expected, decoded); + _output.WriteLine($">>{decoded}<<"); + } + + private string Encode(string input, bool raw = false) + { + var base64String = Convert.ToBase64String(Encoding.UTF8.GetBytes(input)); + + if (raw) + { + base64String = base64String.TrimEnd('='); + } + + return base64String + .Replace('+', '-') + .Replace('/', '_'); + } + + private string Decode(string input) + { + var incoming = input + .Replace('_', '/') + .Replace('-', '+'); + + switch (input.Length % 4) { + case 2: incoming += "=="; break; + case 3: incoming += "="; break; + } + + var bytes = Convert.FromBase64String(incoming); + + return Encoding.UTF8.GetString(bytes); + } +} diff --git a/tests/NATS.Client.ObjectStore.Tests/GlobalUsings.cs b/tests/NATS.Client.ObjectStore.Tests/GlobalUsings.cs new file mode 100644 index 000000000..c802f4480 --- /dev/null +++ b/tests/NATS.Client.ObjectStore.Tests/GlobalUsings.cs @@ -0,0 +1 @@ +global using Xunit; diff --git a/tests/NATS.Client.ObjectStore.Tests/NATS.Client.ObjectStore.Tests.csproj b/tests/NATS.Client.ObjectStore.Tests/NATS.Client.ObjectStore.Tests.csproj new file mode 100644 index 000000000..29e28d4af --- /dev/null +++ b/tests/NATS.Client.ObjectStore.Tests/NATS.Client.ObjectStore.Tests.csproj @@ -0,0 +1,37 @@ + + + + net6.0 + enable + enable + + false + true + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + + + + + + + + diff --git a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs new file mode 100644 index 000000000..c805cac56 --- /dev/null +++ b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs @@ -0,0 +1,73 @@ +using System.Buffers; +using System.Security.Cryptography; +using System.Text; +using NATS.Client.Core.Tests; +using NATS.Client.ObjectStore.Internal; +using NATS.Client.ObjectStore.Models; + +namespace NATS.Client.ObjectStore.Tests; + +public class ObjectStoreTest +{ + private readonly ITestOutputHelper _output; + + public ObjectStoreTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Create_store() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10000)); + var cancellationToken = cts.Token; + + // await using var server = NatsServer.StartJS(); + // await using var nats = server.CreateClientConnection(); + var nats = new NatsConnection(); + + var js = new NatsJSContext(nats); + var ob = new NatsOBContext(js); + + var store = await ob.CreateObjectStore(new NatsOBConfig("b1"), cancellationToken); + + var stringBuilder = new StringBuilder(); + for (var i = 0; i < 9; i++) + { + stringBuilder.Append($"{i:D2}-4567890"); + } + + var buffer90 = stringBuilder.ToString(); + + // square buffer: all chunks are the same size + { + var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, }; + + var buffer = Encoding.ASCII.GetBytes(buffer90); + var stream = new MemoryStream(buffer); + + await store.PutAsync(meta, stream, cancellationToken); + + var data = await store.GetInfoAsync("k1", cancellationToken); + + _output.WriteLine($"MSG.DATA:{data}"); + _output.WriteLine($"CHUNKS={Math.Ceiling(buffer.Length / 10.0)}"); + _output.WriteLine($"SIZE={buffer.Length}"); + _output.WriteLine($"sha:{Base64UrlEncoder.Encode(SHA256.HashData(buffer))}"); + } + + // buffer with smaller last chunk + { + var meta = new ObjectMetadata { Name = "k2", Options = new Options { MaxChunkSize = 10 }, }; + + var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45"); + var stream = new MemoryStream(buffer); + + await store.PutAsync(meta, stream, cancellationToken); + + var data = await store.GetInfoAsync("k2", cancellationToken); + + _output.WriteLine($"MSG.DATA:{data}"); + _output.WriteLine($"CHUNKS={Math.Ceiling(buffer.Length / 10.0)}"); + _output.WriteLine($"SIZE={buffer.Length}"); + _output.WriteLine($"SHA-256={Base64UrlEncoder.Encode(SHA256.HashData(buffer))}"); + } + } +} From d6b89ca1c65ee84abb9e0aba5576cbea7063e78f Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 12 Oct 2023 18:02:32 +0100 Subject: [PATCH 2/9] Run Object store tests in GH CI --- .github/workflows/test.yml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d946cc8f4..2a89b9429 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -45,14 +45,16 @@ jobs: run: dotnet build -c Debug - name: Test Core - run: dotnet test -c Debug --no-build tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj + run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj - name: Test JetStream - # This test is hanging sometimes. Find out where! run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj - name: Test Key/Value Store - run: dotnet test -c Debug --no-build tests/NATS.Client.KeyValueStore.Tests/NATS.Client.KeyValueStore.Tests.csproj + run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.KeyValueStore.Tests/NATS.Client.KeyValueStore.Tests.csproj + + - name: Test Object Store + run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.ObjectStore.Tests/NATS.Client.ObjectStore.Tests.csproj memory_test: name: memory test From 63b7dd2387171fac42174c4f87e4c027c5ba25b7 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Fri, 13 Oct 2023 19:54:05 +0100 Subject: [PATCH 3/9] Pull consumer abstraction --- NATS.Client.sln | 7 + .../Example.ObjectStore.csproj | 15 + sandbox/Example.ObjectStore/Program.cs | 41 ++ .../Internal/NatsJSOrderedPushConsumer.cs | 438 ++++++++++++++++++ src/NATS.Client.JetStream/NatsJSLogEvents.cs | 4 + .../Internal/Encoder.cs | 36 +- .../Internal/NatsOBSub.cs | 11 +- .../Models/ObjectMetadata.cs | 2 +- src/NATS.Client.ObjectStore/NatsOBConfig.cs | 2 +- src/NATS.Client.ObjectStore/NatsOBContext.cs | 2 +- .../NatsOBException.cs | 2 +- src/NATS.Client.ObjectStore/NatsOBStore.cs | 56 ++- .../Base64UrlEncoderTest.cs | 13 +- .../ObjectStoreTest.cs | 74 ++- 14 files changed, 659 insertions(+), 44 deletions(-) create mode 100644 sandbox/Example.ObjectStore/Example.ObjectStore.csproj create mode 100644 sandbox/Example.ObjectStore/Program.cs create mode 100644 src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs diff --git a/NATS.Client.sln b/NATS.Client.sln index 53ac9d290..d70c7e9d9 100644 --- a/NATS.Client.sln +++ b/NATS.Client.sln @@ -77,6 +77,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.ObjectStore", " EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.ObjectStore.Tests", "tests\NATS.Client.ObjectStore.Tests\NATS.Client.ObjectStore.Tests.csproj", "{BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.ObjectStore", "sandbox\Example.ObjectStore\Example.ObjectStore.csproj", "{51882883-A66E-4F95-A1AB-CFCBF71B4376}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -195,6 +197,10 @@ Global {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}.Debug|Any CPU.Build.0 = Debug|Any CPU {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}.Release|Any CPU.ActiveCfg = Release|Any CPU {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E}.Release|Any CPU.Build.0 = Release|Any CPU + {51882883-A66E-4F95-A1AB-CFCBF71B4376}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {51882883-A66E-4F95-A1AB-CFCBF71B4376}.Debug|Any CPU.Build.0 = Debug|Any CPU + {51882883-A66E-4F95-A1AB-CFCBF71B4376}.Release|Any CPU.ActiveCfg = Release|Any CPU + {51882883-A66E-4F95-A1AB-CFCBF71B4376}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -229,6 +235,7 @@ Global {912A4F2F-1BD1-4AE2-BAB8-5A49C221DB53} = {95A69671-16CA-4133-981C-CC381B7AAA30} {3F8840BA-4F91-4359-AA53-6B26823E7F55} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E} = {C526E8AB-739A-48D7-8FC4-048978C9B650} + {51882883-A66E-4F95-A1AB-CFCBF71B4376} = {95A69671-16CA-4133-981C-CC381B7AAA30} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA} diff --git a/sandbox/Example.ObjectStore/Example.ObjectStore.csproj b/sandbox/Example.ObjectStore/Example.ObjectStore.csproj new file mode 100644 index 000000000..e74912569 --- /dev/null +++ b/sandbox/Example.ObjectStore/Example.ObjectStore.csproj @@ -0,0 +1,15 @@ + + + + Exe + net6.0 + enable + enable + false + + + + + + + diff --git a/sandbox/Example.ObjectStore/Program.cs b/sandbox/Example.ObjectStore/Program.cs new file mode 100644 index 000000000..af378ed90 --- /dev/null +++ b/sandbox/Example.ObjectStore/Program.cs @@ -0,0 +1,41 @@ +using System.Text; +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.ObjectStore; +using NATS.Client.ObjectStore.Models; + +var nats = new NatsConnection(); +var js = new NatsJSContext(nats); +var ob = new NatsOBContext(js); + +var store = await ob.CreateObjectStore(new NatsOBConfig("o1")); + +var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, }; + +var stringBuilder = new StringBuilder(); +for (var i = 0; i < 9; i++) +{ + stringBuilder.Append($"{i:D2}-4567890"); +} + +var buffer90 = stringBuilder.ToString(); +{ + var buffer = Encoding.ASCII.GetBytes(buffer90); + var stream = new MemoryStream(buffer); + + await store.PutAsync(meta, stream); + + var data = await store.GetInfoAsync("k1"); + + Console.WriteLine($"DATA: {data}"); +} + +{ + var memoryStream = new MemoryStream(); + await store.GetAsync("k1", memoryStream); + await memoryStream.FlushAsync(); + var buffer = memoryStream.ToArray(); + Console.WriteLine($"buffer:{Encoding.ASCII.GetString(buffer)}"); +} + +Console.WriteLine("Bye"); diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs new file mode 100644 index 000000000..46aef3961 --- /dev/null +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -0,0 +1,438 @@ +using System.Buffers; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using NATS.Client.Core.Internal; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream.Internal; + +internal enum NatsJSOrderedPushConsumerCommand +{ + Msg, + Ready, +} + +internal readonly struct NatsJSOrderedPushConsumerMsg +{ + public NatsJSOrderedPushConsumerMsg() + { + } + + public NatsJSOrderedPushConsumerCommand Command { get; init; } = default; + + public NatsJSMsg Msg { get; init; } = default; +} + +internal record NatsJSOrderedPushConsumerOpts +{ + /// + /// Default watch options + /// + public static readonly NatsJSOrderedPushConsumerOpts Default = new(); + + /// + /// Idle heartbeat interval + /// + public TimeSpan IdleHeartbeat { get; init; } = TimeSpan.FromSeconds(5); + + public ConsumerConfigurationDeliverPolicy DeliverPolicy { get; init; } = ConsumerConfigurationDeliverPolicy.all; + + public bool HeadersOnly { get; init; } = false; +} + +internal class NatsJSOrderedPushConsumer +{ + private readonly ILogger _logger; + private readonly bool _debug; + private readonly NatsJSContext _context; + private readonly string _stream; + private readonly string _filter; + private readonly NatsJSOrderedPushConsumerOpts _opts; + private readonly NatsSubOpts? _subOpts; + private readonly CancellationToken _cancellationToken; + private readonly NatsConnection _nats; + private readonly Channel> _commandChannel; + private readonly Channel> _msgChannel; + private readonly Channel _consumerCreateChannel; + private readonly Timer _timer; + private readonly int _hbTimeout; + private readonly long _idleHbNanos; + private readonly Task _consumerCreateTask; + private readonly Task _commandTask; + private readonly long _ackWaitNanos; + + private long _sequenceStream; + private long _sequenceConsumer; + private string _consumer; + private volatile NatsJSOrderedPushConsumerSub? _sub; + private int _done; + + public NatsJSOrderedPushConsumer( + NatsJSContext context, + string stream, + string filter, + NatsJSOrderedPushConsumerOpts opts, + NatsSubOpts? subOpts, + CancellationToken cancellationToken) + { + _logger = context.Connection.Opts.LoggerFactory.CreateLogger>(); + _debug = _logger.IsEnabled(LogLevel.Debug); + _context = context; + _stream = stream; + _filter = filter; + _opts = opts; + _subOpts = subOpts; + _cancellationToken = cancellationToken; + _nats = context.Connection; + _ackWaitNanos = TimeSpan.FromHours(22).ToNanos(); + _hbTimeout = (int)(opts.IdleHeartbeat * 2).TotalMilliseconds; + _idleHbNanos = opts.IdleHeartbeat.ToNanos(); + _consumer = NewNuid(); + + _nats.ConnectionDisconnected += OnDisconnected; + + _timer = new Timer( + static state => + { + var self = (NatsJSOrderedPushConsumer)state!; + self.CreateSub("idle-heartbeat-timeout"); + if (self._debug) + { + self._logger.LogDebug( + NatsJSLogEvents.IdleTimeout, + "Idle heartbeat timeout after {Timeout}ns", + self._idleHbNanos); + } + }, + this, + Timeout.Infinite, + Timeout.Infinite); + + // Channel size 1 is enough because we want backpressure to go all the way to the subscription + // so that we get most accurate view of the stream. We can keep them as 1 until we find a case + // where it's not enough due to performance for example. + _commandChannel = Channel.CreateBounded>(1); + _msgChannel = Channel.CreateBounded>(1); + + // A single request to create the consumer is enough because we don't want to create a new consumer + // back to back in case the consumer is being recreated due to a timeout and a mismatch in consumer + // sequence for example; creating the consumer once would solve both the issues. + _consumerCreateChannel = Channel.CreateBounded(new BoundedChannelOptions(1) + { + AllowSynchronousContinuations = false, + FullMode = BoundedChannelFullMode.DropOldest, + }); + + _consumerCreateTask = Task.Run(ConsumerCreateLoop); + _commandTask = Task.Run(CommandLoop); + } + + public ChannelReader> Msgs => _msgChannel.Reader; + + public bool IsDone => Volatile.Read(ref _done) > 0; + + private string Consumer + { + get => Volatile.Read(ref _consumer); + set => Volatile.Write(ref _consumer, value); + } + + public async ValueTask DisposeAsync() + { + _nats.ConnectionDisconnected -= OnDisconnected; + + _consumerCreateChannel.Writer.TryComplete(); + _commandChannel.Writer.TryComplete(); + _msgChannel.Writer.TryComplete(); + + await _consumerCreateTask; + await _commandTask; + + await _context.DeleteConsumerAsync(_stream, Consumer, _cancellationToken); + } + + internal void Init() + { + Consumer = NewNuid(); + CreateSub("init"); + } + + internal void Done() + { + Interlocked.Increment(ref _done); + _msgChannel.Writer.TryComplete(); + } + + private void OnDisconnected(object? sender, string e) => StopHeartbeatTimer(); + + private async Task CommandLoop() + { + try + { + while (await _commandChannel.Reader.WaitToReadAsync(_cancellationToken)) + { + while (_commandChannel.Reader.TryRead(out var command)) + { + try + { + var subCommand = command.Command; + + if (subCommand == NatsJSOrderedPushConsumerCommand.Msg) + { + ResetHeartbeatTimer(); + var msg = command.Msg; + + var subSubject = _sub?.Subject; + + if (subSubject == null) + continue; + + if (string.Equals(msg.Subject, subSubject)) + { + // Control message: e.g. heartbeat + } + else + { + if (msg.Metadata is { } metadata) + { + if (!metadata.Consumer.Equals(Consumer)) + { + // Ignore messages from other consumers + // This might happen if the consumer is recreated + // and the old consumer somehow still receives messages + continue; + } + + var sequence = Interlocked.Increment(ref _sequenceConsumer); + + if (sequence != (long)metadata.Sequence.Consumer) + { + CreateSub("sequence-mismatch"); + _logger.LogWarning("Missed messages, recreating consumer"); + continue; + } + + // Increment the sequence before writing to the channel in case the channel is full + // and the writer is waiting for the reader to read the message. This way the sequence + // will be correctly incremented in case the timeout kicks in and recreated the consumer. + Interlocked.Exchange(ref _sequenceStream, (long)metadata.Sequence.Stream); + + if (!IsDone) + { + try + { + await _msgChannel.Writer.WriteAsync(msg, _cancellationToken); + } + catch + { + if (!IsDone) + throw; + } + } + } + else + { + _logger.LogWarning("Protocol error: Message metadata is missing"); + } + } + } + else if (subCommand == NatsJSOrderedPushConsumerCommand.Ready) + { + ResetHeartbeatTimer(); + } + else + { + _logger.LogError("Internal error: unexpected command {Command}", subCommand); + } + } + catch (Exception e) + { + _logger.LogWarning(e, "Command error"); + } + } + } + } + catch (OperationCanceledException) + { + } + catch (Exception e) + { + _logger.LogError(e, "Unexpected command loop error"); + } + } + + private async Task ConsumerCreateLoop() + { + try + { + while (await _consumerCreateChannel.Reader.WaitToReadAsync(_cancellationToken)) + { + while (_consumerCreateChannel.Reader.TryRead(out var origin)) + { + try + { + await CreatePushConsumer(origin); + } + catch (Exception e) + { + _logger.LogWarning(e, "Consumer create error"); + } + } + } + } + catch (OperationCanceledException) + { + } + catch (Exception e) + { + _logger.LogError(e, "Unexpected consumer create loop error"); + } + } + + private async ValueTask CreatePushConsumer(string origin) + { + if (_debug) + { + _logger.LogDebug(NatsJSLogEvents.NewConsumer, "Creating new consumer {Consumer} from {Origin}", Consumer, origin); + } + + if (_sub != null) + { + if (_debug) + { + _logger.LogDebug(NatsJSLogEvents.DeleteOldDeliverySubject, "Deleting old delivery subject {Subject}", _sub.Subject); + } + + await _sub.UnsubscribeAsync(); + await _sub.DisposeAsync(); + } + + _sub = new NatsJSOrderedPushConsumerSub(_context, _commandChannel, _subOpts, _cancellationToken); + await _context.Connection.SubAsync(_sub, _cancellationToken).ConfigureAwait(false); + + if (_debug) + { + _logger.LogDebug(NatsJSLogEvents.NewDeliverySubject, "New delivery subject {Subject}", _sub.Subject); + } + + Interlocked.Exchange(ref _sequenceConsumer, 0); + + var sequence = Volatile.Read(ref _sequenceStream); + + var config = new ConsumerConfiguration + { + Name = Consumer, + DeliverPolicy = ConsumerConfigurationDeliverPolicy.all, + AckPolicy = ConsumerConfigurationAckPolicy.none, + DeliverSubject = _sub.Subject, + FilterSubject = _filter, + FlowControl = true, + IdleHeartbeat = _idleHbNanos, + AckWait = _ackWaitNanos, + MaxDeliver = 1, + MemStorage = true, + NumReplicas = 1, + ReplayPolicy = ConsumerConfigurationReplayPolicy.instant, + }; + + config.DeliverPolicy = _opts.DeliverPolicy; + config.HeadersOnly = _opts.HeadersOnly; + + if (sequence > 0) + { + config.DeliverPolicy = ConsumerConfigurationDeliverPolicy.by_start_sequence; + config.OptStartSeq = sequence + 1; + } + + await _context.CreateConsumerAsync( + new ConsumerCreateRequest { StreamName = _stream, Config = config, }, + cancellationToken: _cancellationToken); + + if (_debug) + { + _logger.LogDebug(NatsJSLogEvents.NewConsumerCreated, "Created new consumer {Consumer} from {Origin}", Consumer, origin); + } + } + + private string NewNuid() + { + Span buffer = stackalloc char[22]; + if (NuidWriter.TryWriteNuid(buffer)) + { + return new string(buffer); + } + + throw new InvalidOperationException("Internal error: can't generate nuid"); + } + + private void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite); + + private void StopHeartbeatTimer() => _timer.Change(Timeout.Infinite, Timeout.Infinite); + + private void CreateSub(string origin) + { + Consumer = NewNuid(); + _consumerCreateChannel.Writer.TryWrite(origin); + } +} + +internal class NatsJSOrderedPushConsumerSub : NatsSubBase +{ + private readonly NatsJSContext _context; + private readonly CancellationToken _cancellationToken; + private readonly NatsConnection _nats; + private readonly NatsHeaderParser _headerParser; + private readonly INatsSerializer _serializer; + private readonly ChannelWriter> _commands; + + public NatsJSOrderedPushConsumerSub( + NatsJSContext context, + Channel> commandChannel, + NatsSubOpts? opts, + CancellationToken cancellationToken) + : base( + connection: context.Connection, + manager: context.Connection.SubscriptionManager, + subject: context.NewInbox(), + queueGroup: default, + opts) + { + _context = context; + _cancellationToken = cancellationToken; + _serializer = opts?.Serializer ?? context.Connection.Opts.Serializer; + _nats = context.Connection; + _headerParser = _nats.HeaderParser; + _commands = commandChannel.Writer; + _nats.ConnectionOpened += OnConnectionOpened; + } + + public override async ValueTask ReadyAsync() + { + await base.ReadyAsync(); + await _commands.WriteAsync(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Ready }, _cancellationToken).ConfigureAwait(false); + } + + public override ValueTask DisposeAsync() + { + _nats.ConnectionOpened -= OnConnectionOpened; + return base.DisposeAsync(); + } + + protected override async ValueTask ReceiveInternalAsync( + string subject, + string? replyTo, + ReadOnlySequence? headersBuffer, + ReadOnlySequence payloadBuffer) + { + var msg = new NatsJSMsg(NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _headerParser, _serializer), _context); + await _commands.WriteAsync(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Msg, Msg = msg }, _cancellationToken).ConfigureAwait(false); + } + + protected override void TryComplete() + { + } + + private void OnConnectionOpened(object? sender, string e) => + _commands.TryWrite(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Ready }); +} diff --git a/src/NATS.Client.JetStream/NatsJSLogEvents.cs b/src/NATS.Client.JetStream/NatsJSLogEvents.cs index 055519aa5..af409a800 100644 --- a/src/NATS.Client.JetStream/NatsJSLogEvents.cs +++ b/src/NATS.Client.JetStream/NatsJSLogEvents.cs @@ -13,4 +13,8 @@ public static class NatsJSLogEvents public static readonly EventId PendingCount = new(2007, nameof(PendingCount)); public static readonly EventId MessageProperty = new(2008, nameof(MessageProperty)); public static readonly EventId PullRequest = new(2009, nameof(PullRequest)); + public static readonly EventId NewConsumer = new(2010, nameof(NewConsumer)); + public static readonly EventId DeleteOldDeliverySubject = new(2011, nameof(DeleteOldDeliverySubject)); + public static readonly EventId NewDeliverySubject = new(2012, nameof(NewDeliverySubject)); + public static readonly EventId NewConsumerCreated = new(2013, nameof(NewConsumerCreated)); } diff --git a/src/NATS.Client.ObjectStore/Internal/Encoder.cs b/src/NATS.Client.ObjectStore/Internal/Encoder.cs index 6325aa141..1ef9b5e41 100644 --- a/src/NATS.Client.ObjectStore/Internal/Encoder.cs +++ b/src/NATS.Client.ObjectStore/Internal/Encoder.cs @@ -1,4 +1,4 @@ -using System.Security.Cryptography; +using System.Security.Cryptography; namespace NATS.Client.ObjectStore.Internal; @@ -95,30 +95,30 @@ public static string Encode(Span inArray, bool raw = false) switch (lengthMod3) { case 2: - { - var d0 = inArray[i]; - var d1 = inArray[i + 1]; + { + var d0 = inArray[i]; + var d1 = inArray[i + 1]; - output[j + 0] = table[d0 >> 2]; - output[j + 1] = table[((d0 & 0x03) << 4) | (d1 >> 4)]; - output[j + 2] = table[(d1 & 0x0f) << 2]; - j += 3; - } + output[j + 0] = table[d0 >> 2]; + output[j + 1] = table[((d0 & 0x03) << 4) | (d1 >> 4)]; + output[j + 2] = table[(d1 & 0x0f) << 2]; + j += 3; + } - break; + break; case 1: - { - var d0 = inArray[i]; + { + var d0 = inArray[i]; - output[j + 0] = table[d0 >> 2]; - output[j + 1] = table[(d0 & 0x03) << 4]; - j += 2; - } + output[j + 0] = table[d0 >> 2]; + output[j + 1] = table[(d0 & 0x03) << 4]; + j += 2; + } - break; + break; - // default or case 0: no further operations are needed. + // default or case 0: no further operations are needed. } if (raw) diff --git a/src/NATS.Client.ObjectStore/Internal/NatsOBSub.cs b/src/NATS.Client.ObjectStore/Internal/NatsOBSub.cs index a9357afff..352047e64 100644 --- a/src/NATS.Client.ObjectStore/Internal/NatsOBSub.cs +++ b/src/NATS.Client.ObjectStore/Internal/NatsOBSub.cs @@ -31,7 +31,10 @@ internal class NatsOBSub : NatsSubBase private readonly INatsSerializer _serializer; private readonly ChannelWriter> _commands; + private int _done; + public NatsOBSub( + string subject, NatsJSContext context, Channel> commandChannel, NatsSubOpts? opts, @@ -39,7 +42,7 @@ public NatsOBSub( : base( connection: context.Connection, manager: context.Connection.SubscriptionManager, - subject: context.NewInbox(), + subject: subject, queueGroup: default, opts) { @@ -52,6 +55,8 @@ public NatsOBSub( _nats.ConnectionOpened += OnConnectionOpened; } + public void Done() => Interlocked.Increment(ref _done); + public override async ValueTask ReadyAsync() { await base.ReadyAsync(); @@ -70,7 +75,11 @@ protected override async ValueTask ReceiveInternalAsync( ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) { + if (Volatile.Read(ref _done) > 0) + return; + var msg = new NatsJSMsg(NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _headerParser, _serializer), _context); + await _commands.WriteAsync(new NatsOBSubMsg { Command = NatsOBSubCommand.Msg, Msg = msg }, _cancellationToken).ConfigureAwait(false); } diff --git a/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs index 5d8ebb761..573b5188b 100644 --- a/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs +++ b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs @@ -1,4 +1,4 @@ -using System.Text.Json.Serialization; +using System.Text.Json.Serialization; namespace NATS.Client.ObjectStore.Models; diff --git a/src/NATS.Client.ObjectStore/NatsOBConfig.cs b/src/NATS.Client.ObjectStore/NatsOBConfig.cs index afbffb64e..2b7d207ae 100644 --- a/src/NATS.Client.ObjectStore/NatsOBConfig.cs +++ b/src/NATS.Client.ObjectStore/NatsOBConfig.cs @@ -1,4 +1,4 @@ -namespace NATS.Client.ObjectStore; +namespace NATS.Client.ObjectStore; /// /// Object Store storage type diff --git a/src/NATS.Client.ObjectStore/NatsOBContext.cs b/src/NATS.Client.ObjectStore/NatsOBContext.cs index f5f602005..9a990e4d9 100644 --- a/src/NATS.Client.ObjectStore/NatsOBContext.cs +++ b/src/NATS.Client.ObjectStore/NatsOBContext.cs @@ -1,4 +1,4 @@ -using System.Text.RegularExpressions; +using System.Text.RegularExpressions; using NATS.Client.JetStream; using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; diff --git a/src/NATS.Client.ObjectStore/NatsOBException.cs b/src/NATS.Client.ObjectStore/NatsOBException.cs index 43f16dc5d..bf3d96624 100644 --- a/src/NATS.Client.ObjectStore/NatsOBException.cs +++ b/src/NATS.Client.ObjectStore/NatsOBException.cs @@ -1,4 +1,4 @@ -using NATS.Client.JetStream; +using NATS.Client.JetStream; namespace NATS.Client.ObjectStore; diff --git a/src/NATS.Client.ObjectStore/NatsOBStore.cs b/src/NATS.Client.ObjectStore/NatsOBStore.cs index b60cfb572..59fda5ea7 100644 --- a/src/NATS.Client.ObjectStore/NatsOBStore.cs +++ b/src/NATS.Client.ObjectStore/NatsOBStore.cs @@ -1,5 +1,4 @@ -using System.Buffers; -using System.Drawing; +using System.Buffers; using System.Security.Cryptography; using System.Text.RegularExpressions; using NATS.Client.Core; @@ -36,12 +35,46 @@ internal NatsOBStore(NatsOBConfig config, NatsJSContext context, NatsJSStream st _stream = stream; } - // public async ValueTask GetAsync(string name, Stream stream, CancellationToken cancellationToken = default) - // { - // ValidateObjectName(name); - // var info = await GetInfoAsync(name, cancellationToken); - // - // } + public async ValueTask GetAsync(string name, Stream stream, CancellationToken cancellationToken = default) + { + ValidateObjectName(name); + + var info = await GetInfoAsync(name, cancellationToken); + + await using var pushConsumer = new NatsJSOrderedPushConsumer>( + _context, + $"OBJ_{_bucket}", + $"$O.{_bucket}.C.{info.Nuid}", + new NatsJSOrderedPushConsumerOpts { DeliverPolicy = ConsumerConfigurationDeliverPolicy.all }, + new NatsSubOpts(), + cancellationToken); + + pushConsumer.Init(); + + await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken)) + { + if (pushConsumer.IsDone) + continue; + + if (msg.Data != null) + { + using (msg.Data) + { + await stream.WriteAsync(msg.Data.Memory, cancellationToken); + } + } + + var p = msg.Metadata?.NumPending; + if (p is 0) + { + pushConsumer.Done(); + } + } + + await stream.FlushAsync(cancellationToken); + + return info; + } public async ValueTask PutAsync(ObjectMetadata meta, Stream stream, CancellationToken cancellationToken = default) { @@ -86,7 +119,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre { while (true) { - var memoryOwner = MemoryPool.Shared.Rent(chunkSize); + var memoryOwner = new FixedSizeMemoryOwner(MemoryPool.Shared.Rent(chunkSize), chunkSize); var memory = memoryOwner.Memory; var currentChunkSize = 0; @@ -171,10 +204,7 @@ public async ValueTask GetInfoAsync(string key, CancellationToke var response = await _stream.GetAsync(request, cancellationToken); - var data = NatsJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))); - - if (data == null) - throw new NatsOBException("Can't deserialize object metadata"); + var data = NatsJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsOBException("Can't deserialize object metadata"); return data; } diff --git a/tests/NATS.Client.ObjectStore.Tests/Base64UrlEncoderTest.cs b/tests/NATS.Client.ObjectStore.Tests/Base64UrlEncoderTest.cs index 1c94bc298..f349e8c7a 100644 --- a/tests/NATS.Client.ObjectStore.Tests/Base64UrlEncoderTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/Base64UrlEncoderTest.cs @@ -1,4 +1,4 @@ -using System.Text; +using System.Text; using NATS.Client.ObjectStore.Internal; namespace NATS.Client.ObjectStore.Tests; @@ -57,9 +57,14 @@ private string Decode(string input) .Replace('_', '/') .Replace('-', '+'); - switch (input.Length % 4) { - case 2: incoming += "=="; break; - case 3: incoming += "="; break; + switch (input.Length % 4) + { + case 2: + incoming += "=="; + break; + case 3: + incoming += "="; + break; } var bytes = Convert.FromBase64String(incoming); diff --git a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs index c805cac56..9cf9b9589 100644 --- a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs @@ -16,13 +16,13 @@ public class ObjectStoreTest [Fact] public async Task Create_store() { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10000)); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; - // await using var server = NatsServer.StartJS(); - // await using var nats = server.CreateClientConnection(); - var nats = new NatsConnection(); + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + // var nats = new NatsConnection(); var js = new NatsJSContext(nats); var ob = new NatsOBContext(js); @@ -70,4 +70,70 @@ public async Task Create_store() _output.WriteLine($"SHA-256={Base64UrlEncoder.Encode(SHA256.HashData(buffer))}"); } } + + [Fact] + public async Task Create_store2() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + + // var nats = new NatsConnection(); + var js = new NatsJSContext(nats); + var ob = new NatsOBContext(js); + + var store = await ob.CreateObjectStore(new NatsOBConfig("b1"), cancellationToken); + + var stringBuilder = new StringBuilder(); + for (var i = 0; i < 9; i++) + { + stringBuilder.Append($"{i:D2}-4567890"); + } + + var buffer90 = stringBuilder.ToString(); + + // square buffer: all chunks are the same size + { + var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, }; + + var buffer = Encoding.ASCII.GetBytes(buffer90); + var stream = new MemoryStream(buffer); + + await store.PutAsync(meta, stream, cancellationToken); + + var data = await store.GetInfoAsync("k1", cancellationToken); + + _output.WriteLine($"MSG.DATA:{data}"); + _output.WriteLine($"CHUNKS={Math.Ceiling(buffer.Length / 10.0)}"); + _output.WriteLine($"SIZE={buffer.Length}"); + _output.WriteLine($"sha:{Base64UrlEncoder.Encode(SHA256.HashData(buffer))}"); + } + + { + var memoryStream = new MemoryStream(); + await store.GetAsync("k1", memoryStream, cancellationToken); + await memoryStream.FlushAsync(cancellationToken); + var buffer = memoryStream.ToArray(); + _output.WriteLine($"buffer:{Encoding.ASCII.GetString(buffer)}"); + } + + // buffer with smaller last chunk + // { + // var meta = new ObjectMetadata { Name = "k2", Options = new Options { MaxChunkSize = 10 }, }; + // + // var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45"); + // var stream = new MemoryStream(buffer); + // + // await store.PutAsync(meta, stream, cancellationToken); + // + // var data = await store.GetInfoAsync("k2", cancellationToken); + // + // _output.WriteLine($"MSG.DATA:{data}"); + // _output.WriteLine($"CHUNKS={Math.Ceiling(buffer.Length / 10.0)}"); + // _output.WriteLine($"SIZE={buffer.Length}"); + // _output.WriteLine($"SHA-256={Base64UrlEncoder.Encode(SHA256.HashData(buffer))}"); + // } + } } From ead74ed6f13d0c03137f53edacc6e7901d9ead86 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 16 Oct 2023 11:04:31 +0100 Subject: [PATCH 4/9] Push consumer channel explanation --- src/NATS.Client.ObjectStore/NatsOBStore.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/NATS.Client.ObjectStore/NatsOBStore.cs b/src/NATS.Client.ObjectStore/NatsOBStore.cs index 59fda5ea7..b379e3928 100644 --- a/src/NATS.Client.ObjectStore/NatsOBStore.cs +++ b/src/NATS.Client.ObjectStore/NatsOBStore.cs @@ -53,6 +53,12 @@ public async ValueTask GetAsync(string name, Stream stream, Canc await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken)) { + // We have to make sure to carry on consuming the channel to avoid any blocking: + // e.g. if the channel is full, we would be blocking the reads off the socket (this was intentionally + // done ot avoid bloating the memory with a large backlog of messages or dropping messages at this level + // and signal the server that we are a slow consumer); then when we make an request-reply API call to + // delete the consumer, the socket would be blocked trying to send the response back to us; so we need to + // keep consuming the channel to avoid this. if (pushConsumer.IsDone) continue; From 79df9a8690caabd01fc1851cbf393221aa5ba8de Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 16 Oct 2023 14:40:04 +0100 Subject: [PATCH 5/9] Object get and delete --- .../NatsJSContext.Streams.cs | 21 ++ src/NATS.Client.JetStream/NatsJSStream.cs | 15 ++ .../Internal/NatsOBSub.cs | 94 -------- .../Models/ObjectMetadata.cs | 7 + src/NATS.Client.ObjectStore/NatsOBConfig.cs | 22 ++ src/NATS.Client.ObjectStore/NatsOBContext.cs | 30 ++- .../NatsOBException.cs | 12 + src/NATS.Client.ObjectStore/NatsOBStore.cs | 205 +++++++++++++++--- .../ObjectStoreTest.cs | 133 ++++++++---- 9 files changed, 364 insertions(+), 175 deletions(-) delete mode 100644 src/NATS.Client.ObjectStore/Internal/NatsOBSub.cs diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index 3bd5d8a8a..58a5cbae3 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -55,6 +55,27 @@ public async ValueTask DeleteStreamAsync( return response.Success; } + /// + /// Purges all of the (or filtered) data in a stream, leaves the stream. + /// + /// Stream name to be purged. + /// Purge request. + /// A used to cancel the API call. + /// Purge response + /// There was an issue retrieving the response. + /// Server responded with an error. + public async ValueTask PurgeStreamAsync( + string stream, + StreamPurgeRequest request, + CancellationToken cancellationToken = default) + { + var response = await JSRequestResponseAsync( + subject: $"{Opts.Prefix}.STREAM.PURGE.{stream}", + request: request, + cancellationToken); + return response; + } + /// /// Get stream information from the server and creates a NATS JetStream stream object . /// diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs index ec72dc0d9..f791a3bb3 100644 --- a/src/NATS.Client.JetStream/NatsJSStream.cs +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -38,6 +38,21 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d return _deleted = await _context.DeleteStreamAsync(_name, cancellationToken); } + /// + /// Purge data from this stream. Leaves the stream. + /// + /// Purge request. + /// A used to cancel the API call. + /// Whether delete was successful or not. + /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. + /// Server responded with an error. + /// After deletion this object can't be used anymore. + public async ValueTask PurgeAsync(StreamPurgeRequest request, CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + return await _context.PurgeStreamAsync(_name, request, cancellationToken); + } + /// /// Update stream properties on the server. /// diff --git a/src/NATS.Client.ObjectStore/Internal/NatsOBSub.cs b/src/NATS.Client.ObjectStore/Internal/NatsOBSub.cs deleted file mode 100644 index 352047e64..000000000 --- a/src/NATS.Client.ObjectStore/Internal/NatsOBSub.cs +++ /dev/null @@ -1,94 +0,0 @@ -using System.Buffers; -using System.Threading.Channels; -using NATS.Client.Core; -using NATS.Client.JetStream; - -namespace NATS.Client.ObjectStore.Internal; - -internal enum NatsOBSubCommand -{ - Msg, - Ready, -} - -internal readonly struct NatsOBSubMsg -{ - public NatsOBSubMsg() - { - } - - public NatsOBSubCommand Command { get; init; } = default; - - public NatsJSMsg Msg { get; init; } = default; -} - -internal class NatsOBSub : NatsSubBase -{ - private readonly NatsJSContext _context; - private readonly CancellationToken _cancellationToken; - private readonly NatsConnection _nats; - private readonly NatsHeaderParser _headerParser; - private readonly INatsSerializer _serializer; - private readonly ChannelWriter> _commands; - - private int _done; - - public NatsOBSub( - string subject, - NatsJSContext context, - Channel> commandChannel, - NatsSubOpts? opts, - CancellationToken cancellationToken) - : base( - connection: context.Connection, - manager: context.Connection.SubscriptionManager, - subject: subject, - queueGroup: default, - opts) - { - _context = context; - _cancellationToken = cancellationToken; - _serializer = opts?.Serializer ?? context.Connection.Opts.Serializer; - _nats = context.Connection; - _headerParser = _nats.HeaderParser; - _commands = commandChannel.Writer; - _nats.ConnectionOpened += OnConnectionOpened; - } - - public void Done() => Interlocked.Increment(ref _done); - - public override async ValueTask ReadyAsync() - { - await base.ReadyAsync(); - await _commands.WriteAsync(new NatsOBSubMsg { Command = NatsOBSubCommand.Ready }, _cancellationToken).ConfigureAwait(false); - } - - public override ValueTask DisposeAsync() - { - _nats.ConnectionOpened -= OnConnectionOpened; - return base.DisposeAsync(); - } - - protected override async ValueTask ReceiveInternalAsync( - string subject, - string? replyTo, - ReadOnlySequence? headersBuffer, - ReadOnlySequence payloadBuffer) - { - if (Volatile.Read(ref _done) > 0) - return; - - var msg = new NatsJSMsg(NatsMsg.Build(subject, replyTo, headersBuffer, payloadBuffer, _nats, _headerParser, _serializer), _context); - - await _commands.WriteAsync(new NatsOBSubMsg { Command = NatsOBSubCommand.Msg, Msg = msg }, _cancellationToken).ConfigureAwait(false); - } - - protected override void TryComplete() - { - } - - private void OnConnectionOpened(object? sender, string e) - { - _commands.TryWrite(new NatsOBSubMsg { Command = NatsOBSubCommand.Ready }); - } -} diff --git a/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs index 573b5188b..ba5509bef 100644 --- a/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs +++ b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs @@ -56,6 +56,13 @@ public record ObjectMetadata [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public Dictionary Meta { get; set; } = default!; + /// + /// Object deleted + /// + [JsonPropertyName("deleted")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public bool Deleted { get; set; } = default!; + /// /// Object options /// diff --git a/src/NATS.Client.ObjectStore/NatsOBConfig.cs b/src/NATS.Client.ObjectStore/NatsOBConfig.cs index 2b7d207ae..4af4c425e 100644 --- a/src/NATS.Client.ObjectStore/NatsOBConfig.cs +++ b/src/NATS.Client.ObjectStore/NatsOBConfig.cs @@ -9,17 +9,39 @@ public enum NatsOBStorageType Memory = 1, } +/// +/// Object store configuration. +/// +/// Name of the bucket. public record NatsOBConfig(string Bucket) { + /// + /// Bucket description. + /// public string? Description { get; init; } + /// + /// Maximum age of the object. + /// public TimeSpan? MaxAge { get; init; } + /// + /// How big the store may be, when the combined stream size exceeds this old keys are removed. + /// public long? MaxBytes { get; init; } + /// + /// Type of backing storage to use. + /// public NatsOBStorageType? Storage { get; init; } + /// + /// How many replicas to keep for each key. + /// public int NumberOfReplicas { get; init; } = 1; + /// + /// Additional metadata for the bucket. + /// public Dictionary? Metadata { get; init; } } diff --git a/src/NATS.Client.ObjectStore/NatsOBContext.cs b/src/NATS.Client.ObjectStore/NatsOBContext.cs index 9a990e4d9..f3a6d9d00 100644 --- a/src/NATS.Client.ObjectStore/NatsOBContext.cs +++ b/src/NATS.Client.ObjectStore/NatsOBContext.cs @@ -5,17 +5,27 @@ namespace NATS.Client.ObjectStore; +/// +/// Object Store context. +/// public class NatsOBContext { private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled); private readonly NatsJSContext _context; - public NatsOBContext(NatsJSContext context) - { - _context = context; - } + /// + /// Create a new object store context. + /// + /// JetStream context. + public NatsOBContext(NatsJSContext context) => _context = context; + /// + /// Create a new object store. + /// + /// Object store configuration. + /// A used to cancel the API call. + /// Object store object. public async ValueTask CreateObjectStore(NatsOBConfig config, CancellationToken cancellationToken = default) { ValidateBucketName(config.Bucket); @@ -45,6 +55,18 @@ public async ValueTask CreateObjectStore(NatsOBConfig config, Cance return new NatsOBStore(config, _context, stream); } + /// + /// Delete an object store. + /// + /// Name of the bucket. + /// A used to cancel the API call. + /// Whether delete was successful or not. + public ValueTask DeleteObjectStore(string bucket, CancellationToken cancellationToken) + { + ValidateBucketName(bucket); + return _context.DeleteStreamAsync($"OBJ_{bucket}", cancellationToken); + } + private void ValidateBucketName(string bucket) { if (string.IsNullOrWhiteSpace(bucket)) diff --git a/src/NATS.Client.ObjectStore/NatsOBException.cs b/src/NATS.Client.ObjectStore/NatsOBException.cs index bf3d96624..f54f43268 100644 --- a/src/NATS.Client.ObjectStore/NatsOBException.cs +++ b/src/NATS.Client.ObjectStore/NatsOBException.cs @@ -2,13 +2,25 @@ namespace NATS.Client.ObjectStore; +/// +/// NATS Object Store exception. +/// public class NatsOBException : NatsJSException { + /// + /// Create a new NATS Object Store exception. + /// + /// Exception message. public NatsOBException(string message) : base(message) { } + /// + /// Create a new NATS Object Store exception. + /// + /// Exception message. + /// Inner exception public NatsOBException(string message, Exception exception) : base(message, exception) { diff --git a/src/NATS.Client.ObjectStore/NatsOBStore.cs b/src/NATS.Client.ObjectStore/NatsOBStore.cs index b379e3928..4347cd281 100644 --- a/src/NATS.Client.ObjectStore/NatsOBStore.cs +++ b/src/NATS.Client.ObjectStore/NatsOBStore.cs @@ -11,6 +11,9 @@ namespace NATS.Client.ObjectStore; +/// +/// NATS Object Store. +/// public class NatsOBStore { private const int DefaultChunkSize = 128 * 1024; @@ -21,60 +24,105 @@ public class NatsOBStore private static readonly Regex ValidObjectRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled); private readonly string _bucket; - private readonly NatsOBConfig _config; private readonly NatsJSContext _context; private readonly NatsJSStream _stream; - private readonly NatsConnection _nats; internal NatsOBStore(NatsOBConfig config, NatsJSContext context, NatsJSStream stream) { _bucket = config.Bucket; - _config = config; _context = context; - _nats = context.Connection; _stream = stream; } - public async ValueTask GetAsync(string name, Stream stream, CancellationToken cancellationToken = default) + /// + /// Get object by key. + /// + /// Object key. + /// A used to cancel the API call. + /// Object value as a byte array. + public async ValueTask GetBytesAsync(string key, CancellationToken cancellationToken = default) { - ValidateObjectName(name); + var memoryStream = new MemoryStream(); + await GetAsync(key, memoryStream, cancellationToken).ConfigureAwait(false); + return memoryStream.ToArray(); + } + + /// + /// Get object by key. + /// + /// Object key. + /// Stream to write the object value to. + /// A used to cancel the API call. + /// Object metadata. + /// Metadata didn't match the value retrieved e.g. the SHA digest. + public async ValueTask GetAsync(string key, Stream stream, CancellationToken cancellationToken = default) + { + ValidateObjectName(key); - var info = await GetInfoAsync(name, cancellationToken); + var info = await GetInfoAsync(key, cancellationToken: cancellationToken); await using var pushConsumer = new NatsJSOrderedPushConsumer>( _context, $"OBJ_{_bucket}", - $"$O.{_bucket}.C.{info.Nuid}", + GetChunkSubject(info.Nuid), new NatsJSOrderedPushConsumerOpts { DeliverPolicy = ConsumerConfigurationDeliverPolicy.all }, new NatsSubOpts(), cancellationToken); pushConsumer.Init(); - await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken)) + string digest; + var chunks = 0; + var size = 0; + using (var sha256 = SHA256.Create()) { - // We have to make sure to carry on consuming the channel to avoid any blocking: - // e.g. if the channel is full, we would be blocking the reads off the socket (this was intentionally - // done ot avoid bloating the memory with a large backlog of messages or dropping messages at this level - // and signal the server that we are a slow consumer); then when we make an request-reply API call to - // delete the consumer, the socket would be blocked trying to send the response back to us; so we need to - // keep consuming the channel to avoid this. - if (pushConsumer.IsDone) - continue; - - if (msg.Data != null) + await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write)) { - using (msg.Data) + await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken)) { - await stream.WriteAsync(msg.Data.Memory, cancellationToken); + // We have to make sure to carry on consuming the channel to avoid any blocking: + // e.g. if the channel is full, we would be blocking the reads off the socket (this was intentionally + // done ot avoid bloating the memory with a large backlog of messages or dropping messages at this level + // and signal the server that we are a slow consumer); then when we make an request-reply API call to + // delete the consumer, the socket would be blocked trying to send the response back to us; so we need to + // keep consuming the channel to avoid this. + if (pushConsumer.IsDone) + continue; + + if (msg.Data != null) + { + using (msg.Data) + { + chunks++; + size += msg.Data.Memory.Length; + await hashedStream.WriteAsync(msg.Data.Memory, cancellationToken); + } + } + + var p = msg.Metadata?.NumPending; + if (p is 0) + { + pushConsumer.Done(); + } } } - var p = msg.Metadata?.NumPending; - if (p is 0) - { - pushConsumer.Done(); - } + digest = Base64UrlEncoder.Encode(sha256.Hash); + } + + if ($"SHA-256={digest}" != info.Digest) + { + throw new NatsOBException("SHA-256 digest mismatch"); + } + + if (chunks != info.Chunks) + { + throw new NatsOBException("Chunks mismatch"); + } + + if (size != info.Size) + { + throw new NatsOBException("Size mismatch"); } await stream.FlushAsync(cancellationToken); @@ -82,6 +130,37 @@ public async ValueTask GetAsync(string name, Stream stream, Canc return info; } + /// + /// Put an object by key. + /// + /// Object key. + /// Object value as a byte array. + /// A used to cancel the API call. + /// Object metadata. + public ValueTask PutAsync(string key, byte[] value, CancellationToken cancellationToken = default) => + PutAsync(new ObjectMetadata { Name = key }, new MemoryStream(value), cancellationToken); + + /// + /// Put an object by key. + /// + /// Object key. + /// Stream to read the value from. + /// A used to cancel the API call. + /// Object metadata. + /// There was an error calculating SHA digest. + /// Server responded with an error. + public ValueTask PutAsync(string key, Stream stream, CancellationToken cancellationToken = default) => + PutAsync(new ObjectMetadata { Name = key }, stream, cancellationToken); + + /// + /// Put an object by key. + /// + /// Object metadata. + /// Stream to read the value from. + /// A used to cancel the API call. + /// Object metadata. + /// There was an error calculating SHA digest. + /// Server responded with an error. public async ValueTask PutAsync(ObjectMetadata meta, Stream stream, CancellationToken cancellationToken = default) { ValidateObjectName(meta.Name); @@ -89,7 +168,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre ObjectMetadata? info = null; try { - info = await GetInfoAsync(meta.Name, cancellationToken); + info = await GetInfoAsync(meta.Name, cancellationToken: cancellationToken).ConfigureAwait(false); } catch (NatsJSApiException e) { @@ -98,8 +177,6 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre } var nuid = NewNuid(); - var encodedName = Base64UrlEncoder.Encode(meta.Name); - meta.Bucket = _bucket; meta.Nuid = nuid; meta.MTime = DateTimeOffset.UtcNow; @@ -162,8 +239,8 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre var buffer = new FixedSizeMemoryOwner(memoryOwner, currentChunkSize); // Chunks - var ack1 = await _context.PublishAsync($"$O.{_bucket}.C.{nuid}", buffer, cancellationToken: cancellationToken); - ack1.EnsureSuccess(); + var ack = await _context.PublishAsync(GetChunkSubject(nuid), buffer, cancellationToken: cancellationToken); + ack.EnsureSuccess(); if (eof) break; @@ -181,8 +258,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre meta.Digest = $"SHA-256={digest}"; // Metadata - var ack2 = await _context.PublishAsync($"$O.{_bucket}.M.{encodedName}", meta, headers: NatsRollupHeaders, cancellationToken: cancellationToken); - ack2.EnsureSuccess(); + await PublishMeta(meta, cancellationToken); // Delete the old object if (info != null && info.Nuid != nuid) @@ -191,7 +267,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre { await _context.JSRequestResponseAsync( subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{_bucket}", - request: new StreamPurgeRequest { Filter = $"$O.{_bucket}.C.{info.Nuid}" }, + request: new StreamPurgeRequest { Filter = GetChunkSubject(info.Nuid) }, cancellationToken); } catch (NatsJSApiException e) @@ -204,17 +280,74 @@ await _context.JSRequestResponseAsync( return meta; } - public async ValueTask GetInfoAsync(string key, CancellationToken cancellationToken = default) + /// + /// Get object metadata by key. + /// + /// Object key. + /// Also retrieve deleted objects. + /// A used to cancel the API call. + /// Object metadata. + /// Object was not found. + public async ValueTask GetInfoAsync(string key, bool showDeleted = false, CancellationToken cancellationToken = default) { - var request = new StreamMsgGetRequest { LastBySubj = $"$O.{_bucket}.M.{Base64UrlEncoder.Encode(key)}", }; + ValidateObjectName(key); + + var request = new StreamMsgGetRequest { LastBySubj = GetMetaSubject(key) }; var response = await _stream.GetAsync(request, cancellationToken); var data = NatsJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsOBException("Can't deserialize object metadata"); + if (!showDeleted && data.Deleted) + { + throw new NatsOBException("Object not found"); + } + return data; } + /// + /// Delete an object by key. + /// + /// Object key. + /// A used to cancel the API call. + /// Object metadata was invalid or chunks can't be purged. + public async ValueTask DeleteAsync(string key, CancellationToken cancellationToken = default) + { + ValidateObjectName(key); + + var meta = await GetInfoAsync(key, showDeleted: true, cancellationToken).ConfigureAwait(false); + + if (string.IsNullOrEmpty(meta.Nuid)) + { + throw new NatsOBException("Object-store meta information invalid"); + } + + meta.Size = 0; + meta.Chunks = 0; + meta.Digest = string.Empty; + meta.Deleted = true; + meta.MTime = DateTimeOffset.UtcNow; + + await PublishMeta(meta, cancellationToken); + + var response = await _stream.PurgeAsync(new StreamPurgeRequest { Filter = GetChunkSubject(meta.Nuid) }, cancellationToken); + if (!response.Success) + { + throw new NatsOBException("Can't purge object chunks"); + } + } + + private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken) + { + var ack = await _context.PublishAsync(GetMetaSubject(meta.Name), meta, headers: NatsRollupHeaders, cancellationToken: cancellationToken); + ack.EnsureSuccess(); + } + + private string GetMetaSubject(string key) => $"$O.{_bucket}.M.{Base64UrlEncoder.Encode(key)}"; + + private string GetChunkSubject(string nuid) => $"$O.{_bucket}.C.{nuid}"; + private string NewNuid() { Span buffer = stackalloc char[22]; diff --git a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs index 9cf9b9589..86467dfe2 100644 --- a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs @@ -1,4 +1,3 @@ -using System.Buffers; using System.Security.Cryptography; using System.Text; using NATS.Client.Core.Tests; @@ -13,16 +12,42 @@ public class ObjectStoreTest public ObjectStoreTest(ITestOutputHelper output) => _output = output; + [Fact] - public async Task Create_store() + public async Task Create_delete_object_store() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = NatsServer.StartJS(); await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + var ob = new NatsOBContext(js); + + await ob.CreateObjectStore(new NatsOBConfig("b1"), cancellationToken); + + await foreach (var stream in js.ListStreamsAsync(cancellationToken: cancellationToken)) + { + Assert.Equal($"OBJ_b1", stream.Info.Config.Name); + } + + var deleted = await ob.DeleteObjectStore("b1", cancellationToken); + Assert.True(deleted); + + await foreach (var stream in js.ListStreamsAsync(cancellationToken: cancellationToken)) + { + Assert.Fail("Should not have any streams"); + } + } + + [Fact] + public async Task Put_chunks() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; - // var nats = new NatsConnection(); + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); var js = new NatsJSContext(nats); var ob = new NatsOBContext(js); @@ -45,12 +70,15 @@ public async Task Create_store() await store.PutAsync(meta, stream, cancellationToken); - var data = await store.GetInfoAsync("k1", cancellationToken); + var data = await store.GetInfoAsync("k1", cancellationToken: cancellationToken); + + var sha = Base64UrlEncoder.Encode(SHA256.HashData(buffer)); + var size = buffer.Length; + var chunks = Math.Ceiling(size / 10.0); - _output.WriteLine($"MSG.DATA:{data}"); - _output.WriteLine($"CHUNKS={Math.Ceiling(buffer.Length / 10.0)}"); - _output.WriteLine($"SIZE={buffer.Length}"); - _output.WriteLine($"sha:{Base64UrlEncoder.Encode(SHA256.HashData(buffer))}"); + Assert.Equal($"SHA-256={sha}", data.Digest); + Assert.Equal(chunks, data.Chunks); + Assert.Equal(size, data.Size); } // buffer with smaller last chunk @@ -62,25 +90,26 @@ public async Task Create_store() await store.PutAsync(meta, stream, cancellationToken); - var data = await store.GetInfoAsync("k2", cancellationToken); + var data = await store.GetInfoAsync("k2", cancellationToken: cancellationToken); + + var sha = Base64UrlEncoder.Encode(SHA256.HashData(buffer)); + var size = buffer.Length; + var chunks = Math.Ceiling(size / 10.0); - _output.WriteLine($"MSG.DATA:{data}"); - _output.WriteLine($"CHUNKS={Math.Ceiling(buffer.Length / 10.0)}"); - _output.WriteLine($"SIZE={buffer.Length}"); - _output.WriteLine($"SHA-256={Base64UrlEncoder.Encode(SHA256.HashData(buffer))}"); + Assert.Equal($"SHA-256={sha}", data.Digest); + Assert.Equal(chunks, data.Chunks); + Assert.Equal(size, data.Size); } } [Fact] - public async Task Create_store2() + public async Task Get_chunks() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var cancellationToken = cts.Token; await using var server = NatsServer.StartJS(); await using var nats = server.CreateClientConnection(); - - // var nats = new NatsConnection(); var js = new NatsJSContext(nats); var ob = new NatsOBContext(js); @@ -97,18 +126,9 @@ public async Task Create_store2() // square buffer: all chunks are the same size { var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, }; - var buffer = Encoding.ASCII.GetBytes(buffer90); var stream = new MemoryStream(buffer); - await store.PutAsync(meta, stream, cancellationToken); - - var data = await store.GetInfoAsync("k1", cancellationToken); - - _output.WriteLine($"MSG.DATA:{data}"); - _output.WriteLine($"CHUNKS={Math.Ceiling(buffer.Length / 10.0)}"); - _output.WriteLine($"SIZE={buffer.Length}"); - _output.WriteLine($"sha:{Base64UrlEncoder.Encode(SHA256.HashData(buffer))}"); } { @@ -116,24 +136,55 @@ public async Task Create_store2() await store.GetAsync("k1", memoryStream, cancellationToken); await memoryStream.FlushAsync(cancellationToken); var buffer = memoryStream.ToArray(); - _output.WriteLine($"buffer:{Encoding.ASCII.GetString(buffer)}"); + Assert.Equal(buffer90, Encoding.ASCII.GetString(buffer)); } // buffer with smaller last chunk - // { - // var meta = new ObjectMetadata { Name = "k2", Options = new Options { MaxChunkSize = 10 }, }; - // - // var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45"); - // var stream = new MemoryStream(buffer); - // - // await store.PutAsync(meta, stream, cancellationToken); - // - // var data = await store.GetInfoAsync("k2", cancellationToken); - // - // _output.WriteLine($"MSG.DATA:{data}"); - // _output.WriteLine($"CHUNKS={Math.Ceiling(buffer.Length / 10.0)}"); - // _output.WriteLine($"SIZE={buffer.Length}"); - // _output.WriteLine($"SHA-256={Base64UrlEncoder.Encode(SHA256.HashData(buffer))}"); - // } + { + var meta = new ObjectMetadata { Name = "k2", Options = new Options { MaxChunkSize = 10 }, }; + var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45"); + var stream = new MemoryStream(buffer); + await store.PutAsync(meta, stream, cancellationToken); + } + + { + var memoryStream = new MemoryStream(); + await store.GetAsync("k2", memoryStream, cancellationToken); + await memoryStream.FlushAsync(cancellationToken); + var buffer = memoryStream.ToArray(); + Assert.Equal(buffer90 + "09-45", Encoding.ASCII.GetString(buffer)); + } + } + + [Fact] + public async Task Delete_object() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + var ob = new NatsOBContext(js); + + var store = await ob.CreateObjectStore(new NatsOBConfig("b1"), cancellationToken); + await store.PutAsync("k1", new byte[] { 65, 66, 67 }, cancellationToken); + + var info = await store.GetInfoAsync("k1", cancellationToken: cancellationToken); + Assert.Equal(3, info.Size); + + var bytes = await store.GetBytesAsync("k1", cancellationToken); + Assert.Equal(bytes, new byte[] { 65, 66, 67 }); + + await store.DeleteAsync("k1", cancellationToken); + + var exception = await Assert.ThrowsAsync(async () => await store.GetInfoAsync("k1", cancellationToken: cancellationToken)); + Assert.Equal("Object not found", exception.Message); + + var info2 = await store.GetInfoAsync("k1", showDeleted: true, cancellationToken: cancellationToken); + Assert.True(info2.Deleted); + Assert.Equal(0, info2.Size); + Assert.Equal(0, info2.Chunks); + Assert.Equal(string.Empty, info2.Digest); } } From 2ed63d9c8212a9f9dce9090bb387cd01fe2133b6 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 17 Oct 2023 16:24:13 +0100 Subject: [PATCH 6/9] Stream leave open option --- .../Internal/NatsJSOrderedPushConsumer.cs | 2 +- src/NATS.Client.ObjectStore/NatsOBStore.cs | 16 +++++++++------- .../ObjectStoreTest.cs | 13 ++++++------- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs index 46aef3961..b3c736ce2 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -1,4 +1,4 @@ -using System.Buffers; +using System.Buffers; using System.Threading.Channels; using Microsoft.Extensions.Logging; using NATS.Client.Core; diff --git a/src/NATS.Client.ObjectStore/NatsOBStore.cs b/src/NATS.Client.ObjectStore/NatsOBStore.cs index 4347cd281..47fc75d65 100644 --- a/src/NATS.Client.ObjectStore/NatsOBStore.cs +++ b/src/NATS.Client.ObjectStore/NatsOBStore.cs @@ -43,7 +43,7 @@ internal NatsOBStore(NatsOBConfig config, NatsJSContext context, NatsJSStream st public async ValueTask GetBytesAsync(string key, CancellationToken cancellationToken = default) { var memoryStream = new MemoryStream(); - await GetAsync(key, memoryStream, cancellationToken).ConfigureAwait(false); + await GetAsync(key, memoryStream, cancellationToken: cancellationToken).ConfigureAwait(false); return memoryStream.ToArray(); } @@ -52,10 +52,11 @@ public async ValueTask GetBytesAsync(string key, CancellationToken cance /// /// Object key. /// Stream to write the object value to. + /// true to not close the underlying stream when async method returns; otherwise, false /// A used to cancel the API call. /// Object metadata. /// Metadata didn't match the value retrieved e.g. the SHA digest. - public async ValueTask GetAsync(string key, Stream stream, CancellationToken cancellationToken = default) + public async ValueTask GetAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) { ValidateObjectName(key); @@ -76,7 +77,7 @@ public async ValueTask GetAsync(string key, Stream stream, Cance var size = 0; using (var sha256 = SHA256.Create()) { - await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write)) + await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write, leaveOpen)) { await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken)) { @@ -138,7 +139,7 @@ public async ValueTask GetAsync(string key, Stream stream, Cance /// A used to cancel the API call. /// Object metadata. public ValueTask PutAsync(string key, byte[] value, CancellationToken cancellationToken = default) => - PutAsync(new ObjectMetadata { Name = key }, new MemoryStream(value), cancellationToken); + PutAsync(new ObjectMetadata { Name = key }, new MemoryStream(value), cancellationToken: cancellationToken); /// /// Put an object by key. @@ -150,18 +151,19 @@ public ValueTask PutAsync(string key, byte[] value, Cancellation /// There was an error calculating SHA digest. /// Server responded with an error. public ValueTask PutAsync(string key, Stream stream, CancellationToken cancellationToken = default) => - PutAsync(new ObjectMetadata { Name = key }, stream, cancellationToken); + PutAsync(new ObjectMetadata { Name = key }, stream, cancellationToken: cancellationToken); /// /// Put an object by key. /// /// Object metadata. /// Stream to read the value from. + /// true to not close the underlying stream when async method returns; otherwise, false /// A used to cancel the API call. /// Object metadata. /// There was an error calculating SHA digest. /// Server responded with an error. - public async ValueTask PutAsync(ObjectMetadata meta, Stream stream, CancellationToken cancellationToken = default) + public async ValueTask PutAsync(ObjectMetadata meta, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) { ValidateObjectName(meta.Name); @@ -198,7 +200,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre string digest; using (var sha256 = SHA256.Create()) { - await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read)) + await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read, leaveOpen)) { while (true) { diff --git a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs index 86467dfe2..49e6cb8c9 100644 --- a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs @@ -12,7 +12,6 @@ public class ObjectStoreTest public ObjectStoreTest(ITestOutputHelper output) => _output = output; - [Fact] public async Task Create_delete_object_store() { @@ -68,7 +67,7 @@ public async Task Put_chunks() var buffer = Encoding.ASCII.GetBytes(buffer90); var stream = new MemoryStream(buffer); - await store.PutAsync(meta, stream, cancellationToken); + await store.PutAsync(meta, stream, cancellationToken: cancellationToken); var data = await store.GetInfoAsync("k1", cancellationToken: cancellationToken); @@ -88,7 +87,7 @@ public async Task Put_chunks() var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45"); var stream = new MemoryStream(buffer); - await store.PutAsync(meta, stream, cancellationToken); + await store.PutAsync(meta, stream, cancellationToken: cancellationToken); var data = await store.GetInfoAsync("k2", cancellationToken: cancellationToken); @@ -128,12 +127,12 @@ public async Task Get_chunks() var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, }; var buffer = Encoding.ASCII.GetBytes(buffer90); var stream = new MemoryStream(buffer); - await store.PutAsync(meta, stream, cancellationToken); + await store.PutAsync(meta, stream, cancellationToken: cancellationToken); } { var memoryStream = new MemoryStream(); - await store.GetAsync("k1", memoryStream, cancellationToken); + await store.GetAsync("k1", memoryStream, cancellationToken: cancellationToken); await memoryStream.FlushAsync(cancellationToken); var buffer = memoryStream.ToArray(); Assert.Equal(buffer90, Encoding.ASCII.GetString(buffer)); @@ -144,12 +143,12 @@ public async Task Get_chunks() var meta = new ObjectMetadata { Name = "k2", Options = new Options { MaxChunkSize = 10 }, }; var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45"); var stream = new MemoryStream(buffer); - await store.PutAsync(meta, stream, cancellationToken); + await store.PutAsync(meta, stream, cancellationToken: cancellationToken); } { var memoryStream = new MemoryStream(); - await store.GetAsync("k2", memoryStream, cancellationToken); + await store.GetAsync("k2", memoryStream, cancellationToken: cancellationToken); await memoryStream.FlushAsync(cancellationToken); var buffer = memoryStream.ToArray(); Assert.Equal(buffer90 + "09-45", Encoding.ASCII.GetString(buffer)); From 318ba5072d08e5b43ff80086601d07e61bf581cf Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 17 Oct 2023 16:28:41 +0100 Subject: [PATCH 7/9] Stream leave open option Missed one! --- src/NATS.Client.ObjectStore/NatsOBStore.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/NATS.Client.ObjectStore/NatsOBStore.cs b/src/NATS.Client.ObjectStore/NatsOBStore.cs index 47fc75d65..6c76ff4b1 100644 --- a/src/NATS.Client.ObjectStore/NatsOBStore.cs +++ b/src/NATS.Client.ObjectStore/NatsOBStore.cs @@ -146,12 +146,13 @@ public ValueTask PutAsync(string key, byte[] value, Cancellation /// /// Object key. /// Stream to read the value from. + /// true to not close the underlying stream when async method returns; otherwise, false /// A used to cancel the API call. /// Object metadata. /// There was an error calculating SHA digest. /// Server responded with an error. - public ValueTask PutAsync(string key, Stream stream, CancellationToken cancellationToken = default) => - PutAsync(new ObjectMetadata { Name = key }, stream, cancellationToken: cancellationToken); + public ValueTask PutAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) => + PutAsync(new ObjectMetadata { Name = key }, stream, leaveOpen, cancellationToken); /// /// Put an object by key. From 9001738d207ca388d1ebf7921f586b0de2ed0a11 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 17 Oct 2023 16:55:00 +0100 Subject: [PATCH 8/9] Rename OB to Obj --- sandbox/Example.ObjectStore/Program.cs | 4 +-- .../{NatsOBConfig.cs => NatsObjConfig.cs} | 6 ++-- .../{NatsOBContext.cs => NatsObjContext.cs} | 18 +++++----- ...NatsOBException.cs => NatsObjException.cs} | 6 ++-- .../{NatsOBStore.cs => NatsObjStore.cs} | 34 +++++++++---------- .../ObjectStoreTest.cs | 18 +++++----- 6 files changed, 43 insertions(+), 43 deletions(-) rename src/NATS.Client.ObjectStore/{NatsOBConfig.cs => NatsObjConfig.cs} (88%) rename src/NATS.Client.ObjectStore/{NatsOBContext.cs => NatsObjContext.cs} (78%) rename src/NATS.Client.ObjectStore/{NatsOBException.cs => NatsObjException.cs} (78%) rename src/NATS.Client.ObjectStore/{NatsOBStore.cs => NatsObjStore.cs} (90%) diff --git a/sandbox/Example.ObjectStore/Program.cs b/sandbox/Example.ObjectStore/Program.cs index af378ed90..ee537d297 100644 --- a/sandbox/Example.ObjectStore/Program.cs +++ b/sandbox/Example.ObjectStore/Program.cs @@ -6,9 +6,9 @@ var nats = new NatsConnection(); var js = new NatsJSContext(nats); -var ob = new NatsOBContext(js); +var ob = new NatsObjContext(js); -var store = await ob.CreateObjectStore(new NatsOBConfig("o1")); +var store = await ob.CreateObjectStore(new NatsObjConfig("o1")); var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, }; diff --git a/src/NATS.Client.ObjectStore/NatsOBConfig.cs b/src/NATS.Client.ObjectStore/NatsObjConfig.cs similarity index 88% rename from src/NATS.Client.ObjectStore/NatsOBConfig.cs rename to src/NATS.Client.ObjectStore/NatsObjConfig.cs index 4af4c425e..80a1f3029 100644 --- a/src/NATS.Client.ObjectStore/NatsOBConfig.cs +++ b/src/NATS.Client.ObjectStore/NatsObjConfig.cs @@ -3,7 +3,7 @@ namespace NATS.Client.ObjectStore; /// /// Object Store storage type /// -public enum NatsOBStorageType +public enum NatsObjStorageType { File = 0, Memory = 1, @@ -13,7 +13,7 @@ public enum NatsOBStorageType /// Object store configuration. /// /// Name of the bucket. -public record NatsOBConfig(string Bucket) +public record NatsObjConfig(string Bucket) { /// /// Bucket description. @@ -33,7 +33,7 @@ public record NatsOBConfig(string Bucket) /// /// Type of backing storage to use. /// - public NatsOBStorageType? Storage { get; init; } + public NatsObjStorageType? Storage { get; init; } /// /// How many replicas to keep for each key. diff --git a/src/NATS.Client.ObjectStore/NatsOBContext.cs b/src/NATS.Client.ObjectStore/NatsObjContext.cs similarity index 78% rename from src/NATS.Client.ObjectStore/NatsOBContext.cs rename to src/NATS.Client.ObjectStore/NatsObjContext.cs index f3a6d9d00..ebfb89a1a 100644 --- a/src/NATS.Client.ObjectStore/NatsOBContext.cs +++ b/src/NATS.Client.ObjectStore/NatsObjContext.cs @@ -8,7 +8,7 @@ namespace NATS.Client.ObjectStore; /// /// Object Store context. /// -public class NatsOBContext +public class NatsObjContext { private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled); @@ -18,7 +18,7 @@ public class NatsOBContext /// Create a new object store context. /// /// JetStream context. - public NatsOBContext(NatsJSContext context) => _context = context; + public NatsObjContext(NatsJSContext context) => _context = context; /// /// Create a new object store. @@ -26,11 +26,11 @@ public class NatsOBContext /// Object store configuration. /// A used to cancel the API call. /// Object store object. - public async ValueTask CreateObjectStore(NatsOBConfig config, CancellationToken cancellationToken = default) + public async ValueTask CreateObjectStore(NatsObjConfig config, CancellationToken cancellationToken = default) { ValidateBucketName(config.Bucket); - var storage = config.Storage == NatsOBStorageType.File + var storage = config.Storage == NatsObjStorageType.File ? StreamConfigurationStorage.file : StreamConfigurationStorage.memory; @@ -52,7 +52,7 @@ public async ValueTask CreateObjectStore(NatsOBConfig config, Cance }; var stream = await _context.CreateStreamAsync(streamConfiguration, cancellationToken); - return new NatsOBStore(config, _context, stream); + return new NatsObjStore(config, _context, stream); } /// @@ -71,22 +71,22 @@ private void ValidateBucketName(string bucket) { if (string.IsNullOrWhiteSpace(bucket)) { - throw new NatsOBException("Bucket name can't be empty"); + throw new NatsObjException("Bucket name can't be empty"); } if (bucket.StartsWith(".")) { - throw new NatsOBException("Bucket name can't start with a period"); + throw new NatsObjException("Bucket name can't start with a period"); } if (bucket.EndsWith(".")) { - throw new NatsOBException("Bucket name can't end with a period"); + throw new NatsObjException("Bucket name can't end with a period"); } if (!ValidBucketRegex.IsMatch(bucket)) { - throw new NatsOBException("Bucket name can only contain alphanumeric characters, dashes, and underscores"); + throw new NatsObjException("Bucket name can only contain alphanumeric characters, dashes, and underscores"); } } } diff --git a/src/NATS.Client.ObjectStore/NatsOBException.cs b/src/NATS.Client.ObjectStore/NatsObjException.cs similarity index 78% rename from src/NATS.Client.ObjectStore/NatsOBException.cs rename to src/NATS.Client.ObjectStore/NatsObjException.cs index f54f43268..6e411f78c 100644 --- a/src/NATS.Client.ObjectStore/NatsOBException.cs +++ b/src/NATS.Client.ObjectStore/NatsObjException.cs @@ -5,13 +5,13 @@ namespace NATS.Client.ObjectStore; /// /// NATS Object Store exception. /// -public class NatsOBException : NatsJSException +public class NatsObjException : NatsJSException { /// /// Create a new NATS Object Store exception. /// /// Exception message. - public NatsOBException(string message) + public NatsObjException(string message) : base(message) { } @@ -21,7 +21,7 @@ public NatsOBException(string message) /// /// Exception message. /// Inner exception - public NatsOBException(string message, Exception exception) + public NatsObjException(string message, Exception exception) : base(message, exception) { } diff --git a/src/NATS.Client.ObjectStore/NatsOBStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs similarity index 90% rename from src/NATS.Client.ObjectStore/NatsOBStore.cs rename to src/NATS.Client.ObjectStore/NatsObjStore.cs index 6c76ff4b1..b703b2508 100644 --- a/src/NATS.Client.ObjectStore/NatsOBStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -14,7 +14,7 @@ namespace NATS.Client.ObjectStore; /// /// NATS Object Store. /// -public class NatsOBStore +public class NatsObjStore { private const int DefaultChunkSize = 128 * 1024; private const string NatsRollup = "Nats-Rollup"; @@ -27,7 +27,7 @@ public class NatsOBStore private readonly NatsJSContext _context; private readonly NatsJSStream _stream; - internal NatsOBStore(NatsOBConfig config, NatsJSContext context, NatsJSStream stream) + internal NatsObjStore(NatsObjConfig config, NatsJSContext context, NatsJSStream stream) { _bucket = config.Bucket; _context = context; @@ -55,7 +55,7 @@ public async ValueTask GetBytesAsync(string key, CancellationToken cance /// true to not close the underlying stream when async method returns; otherwise, false /// A used to cancel the API call. /// Object metadata. - /// Metadata didn't match the value retrieved e.g. the SHA digest. + /// Metadata didn't match the value retrieved e.g. the SHA digest. public async ValueTask GetAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) { ValidateObjectName(key); @@ -113,17 +113,17 @@ public async ValueTask GetAsync(string key, Stream stream, bool if ($"SHA-256={digest}" != info.Digest) { - throw new NatsOBException("SHA-256 digest mismatch"); + throw new NatsObjException("SHA-256 digest mismatch"); } if (chunks != info.Chunks) { - throw new NatsOBException("Chunks mismatch"); + throw new NatsObjException("Chunks mismatch"); } if (size != info.Size) { - throw new NatsOBException("Size mismatch"); + throw new NatsObjException("Size mismatch"); } await stream.FlushAsync(cancellationToken); @@ -149,7 +149,7 @@ public ValueTask PutAsync(string key, byte[] value, Cancellation /// true to not close the underlying stream when async method returns; otherwise, false /// A used to cancel the API call. /// Object metadata. - /// There was an error calculating SHA digest. + /// There was an error calculating SHA digest. /// Server responded with an error. public ValueTask PutAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) => PutAsync(new ObjectMetadata { Name = key }, stream, leaveOpen, cancellationToken); @@ -162,7 +162,7 @@ public ValueTask PutAsync(string key, Stream stream, bool leaveO /// true to not close the underlying stream when async method returns; otherwise, false /// A used to cancel the API call. /// Object metadata. - /// There was an error calculating SHA digest. + /// There was an error calculating SHA digest. /// Server responded with an error. public async ValueTask PutAsync(ObjectMetadata meta, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) { @@ -251,7 +251,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre } if (sha256.Hash == null) - throw new NatsOBException("Can't compute SHA256 hash"); + throw new NatsObjException("Can't compute SHA256 hash"); digest = Base64UrlEncoder.Encode(sha256.Hash); } @@ -290,7 +290,7 @@ await _context.JSRequestResponseAsync( /// Also retrieve deleted objects. /// A used to cancel the API call. /// Object metadata. - /// Object was not found. + /// Object was not found. public async ValueTask GetInfoAsync(string key, bool showDeleted = false, CancellationToken cancellationToken = default) { ValidateObjectName(key); @@ -299,11 +299,11 @@ public async ValueTask GetInfoAsync(string key, bool showDeleted var response = await _stream.GetAsync(request, cancellationToken); - var data = NatsJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsOBException("Can't deserialize object metadata"); + var data = NatsJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata"); if (!showDeleted && data.Deleted) { - throw new NatsOBException("Object not found"); + throw new NatsObjException("Object not found"); } return data; @@ -314,7 +314,7 @@ public async ValueTask GetInfoAsync(string key, bool showDeleted /// /// Object key. /// A used to cancel the API call. - /// Object metadata was invalid or chunks can't be purged. + /// Object metadata was invalid or chunks can't be purged. public async ValueTask DeleteAsync(string key, CancellationToken cancellationToken = default) { ValidateObjectName(key); @@ -323,7 +323,7 @@ public async ValueTask DeleteAsync(string key, CancellationToken cancellationTok if (string.IsNullOrEmpty(meta.Nuid)) { - throw new NatsOBException("Object-store meta information invalid"); + throw new NatsObjException("Object-store meta information invalid"); } meta.Size = 0; @@ -337,7 +337,7 @@ public async ValueTask DeleteAsync(string key, CancellationToken cancellationTok var response = await _stream.PurgeAsync(new StreamPurgeRequest { Filter = GetChunkSubject(meta.Nuid) }, cancellationToken); if (!response.Success) { - throw new NatsOBException("Can't purge object chunks"); + throw new NatsObjException("Can't purge object chunks"); } } @@ -366,12 +366,12 @@ private void ValidateObjectName(string name) { if (string.IsNullOrWhiteSpace(name)) { - throw new NatsOBException("Object name can't be empty"); + throw new NatsObjException("Object name can't be empty"); } if (!ValidObjectRegex.IsMatch(name)) { - throw new NatsOBException("Object name can only contain alphanumeric characters, dashes, underscores, forward slash, equals sign, and periods"); + throw new NatsObjException("Object name can only contain alphanumeric characters, dashes, underscores, forward slash, equals sign, and periods"); } } } diff --git a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs index 49e6cb8c9..08e007b4a 100644 --- a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs @@ -21,9 +21,9 @@ public async Task Create_delete_object_store() await using var server = NatsServer.StartJS(); await using var nats = server.CreateClientConnection(); var js = new NatsJSContext(nats); - var ob = new NatsOBContext(js); + var ob = new NatsObjContext(js); - await ob.CreateObjectStore(new NatsOBConfig("b1"), cancellationToken); + await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken); await foreach (var stream in js.ListStreamsAsync(cancellationToken: cancellationToken)) { @@ -48,9 +48,9 @@ public async Task Put_chunks() await using var server = NatsServer.StartJS(); await using var nats = server.CreateClientConnection(); var js = new NatsJSContext(nats); - var ob = new NatsOBContext(js); + var ob = new NatsObjContext(js); - var store = await ob.CreateObjectStore(new NatsOBConfig("b1"), cancellationToken); + var store = await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken); var stringBuilder = new StringBuilder(); for (var i = 0; i < 9; i++) @@ -110,9 +110,9 @@ public async Task Get_chunks() await using var server = NatsServer.StartJS(); await using var nats = server.CreateClientConnection(); var js = new NatsJSContext(nats); - var ob = new NatsOBContext(js); + var ob = new NatsObjContext(js); - var store = await ob.CreateObjectStore(new NatsOBConfig("b1"), cancellationToken); + var store = await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken); var stringBuilder = new StringBuilder(); for (var i = 0; i < 9; i++) @@ -164,9 +164,9 @@ public async Task Delete_object() await using var server = NatsServer.StartJS(); await using var nats = server.CreateClientConnection(); var js = new NatsJSContext(nats); - var ob = new NatsOBContext(js); + var ob = new NatsObjContext(js); - var store = await ob.CreateObjectStore(new NatsOBConfig("b1"), cancellationToken); + var store = await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken); await store.PutAsync("k1", new byte[] { 65, 66, 67 }, cancellationToken); var info = await store.GetInfoAsync("k1", cancellationToken: cancellationToken); @@ -177,7 +177,7 @@ public async Task Delete_object() await store.DeleteAsync("k1", cancellationToken); - var exception = await Assert.ThrowsAsync(async () => await store.GetInfoAsync("k1", cancellationToken: cancellationToken)); + var exception = await Assert.ThrowsAsync(async () => await store.GetInfoAsync("k1", cancellationToken: cancellationToken)); Assert.Equal("Object not found", exception.Message); var info2 = await store.GetInfoAsync("k1", showDeleted: true, cancellationToken: cancellationToken); From 2393fb3fd5d86c1c41127688e231a08e2413fe9b Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Tue, 17 Oct 2023 18:20:07 +0100 Subject: [PATCH 9/9] Update NATS.Client.sln.DotSettings Co-authored-by: Caleb Lloyd <2414837+caleblloyd@users.noreply.github.com> --- NATS.Client.sln.DotSettings | 1 - 1 file changed, 1 deletion(-) diff --git a/NATS.Client.sln.DotSettings b/NATS.Client.sln.DotSettings index 5f92e7257..e0bb2621b 100644 --- a/NATS.Client.sln.DotSettings +++ b/NATS.Client.sln.DotSettings @@ -5,7 +5,6 @@ JS KV LF - OB True True True