From 20797d204ad3937a5fbf02d4d2ee4de780083770 Mon Sep 17 00:00:00 2001 From: Caleb Lloyd <2414837+caleblloyd@users.noreply.github.com> Date: Fri, 20 Oct 2023 01:00:45 -0400 Subject: [PATCH] add NatsMemoryOwner (#162) Signed-off-by: Caleb Lloyd --- src/NATS.Client.Core/INatsSerializer.cs | 30 +- src/NATS.Client.Core/NatsMemoryOwner.cs | 376 ++++++++++++++++++ src/NATS.Client.ObjectStore/NatsObjStore.cs | 4 +- .../NatsKVWatcherTest.cs | 58 +-- tests/NATS.Client.Perf/Program.cs | 2 +- 5 files changed, 405 insertions(+), 65 deletions(-) create mode 100644 src/NATS.Client.Core/NatsMemoryOwner.cs diff --git a/src/NATS.Client.Core/INatsSerializer.cs b/src/NATS.Client.Core/INatsSerializer.cs index 7d476d50e..6f37c4b77 100644 --- a/src/NATS.Client.Core/INatsSerializer.cs +++ b/src/NATS.Client.Core/INatsSerializer.cs @@ -18,21 +18,6 @@ public interface ICountableBufferWriter : IBufferWriter int WrittenCount { get; } } -public readonly struct FixedSizeMemoryOwner : IMemoryOwner -{ - private readonly IMemoryOwner _owner; - - public FixedSizeMemoryOwner(IMemoryOwner owner, int size) - { - _owner = owner; - Memory = _owner.Memory.Slice(0, size); - } - - public Memory Memory { get; } - - public void Dispose() => _owner.Dispose(); -} - public static class NatsDefaultSerializer { public static readonly INatsSerializer Default = new NatsRawSerializer(NatsJsonSerializer.Default); @@ -118,9 +103,9 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) return (T)(object)new ReadOnlySequence(buffer.ToArray()); } - if (typeof(T) == typeof(IMemoryOwner)) + if (typeof(T) == typeof(IMemoryOwner) || typeof(T) == typeof(NatsMemoryOwner)) { - var memoryOwner = new FixedSizeMemoryOwner(MemoryPool.Shared.Rent((int)buffer.Length), (int)buffer.Length); + var memoryOwner = NatsMemoryOwner.Allocate((int)buffer.Length); buffer.CopyTo(memoryOwner.Memory.Span); return (T)(object)memoryOwner; } @@ -134,11 +119,7 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) public sealed class NatsJsonSerializer : INatsSerializer { - private static readonly JsonWriterOptions JsonWriterOpts = new JsonWriterOptions - { - Indented = false, - SkipValidation = true, - }; + private static readonly JsonWriterOptions JsonWriterOpts = new JsonWriterOptions { Indented = false, SkipValidation = true, }; [ThreadStatic] private static Utf8JsonWriter? _jsonWriter; @@ -148,10 +129,7 @@ public sealed class NatsJsonSerializer : INatsSerializer public NatsJsonSerializer(JsonSerializerOptions opts) => _opts = opts; public static NatsJsonSerializer Default { get; } = - new(new JsonSerializerOptions - { - DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, - }); + new(new JsonSerializerOptions { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, }); public INatsSerializer? Next => default; diff --git a/src/NATS.Client.Core/NatsMemoryOwner.cs b/src/NATS.Client.Core/NatsMemoryOwner.cs new file mode 100644 index 000000000..625155afc --- /dev/null +++ b/src/NATS.Client.Core/NatsMemoryOwner.cs @@ -0,0 +1,376 @@ +// adapted from https://github.com/CommunityToolkit/dotnet/blob/v8.2.1/src/CommunityToolkit.HighPerformance/Buffers/MemoryOwner%7BT%7D.cs +// changed from class to struct for non-nullable deserialization + +namespace NATS.Client.Core; + +using System; +using System.Buffers; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +/// +/// An that indicates a mode to use when allocating buffers. +/// +public enum NatsMemoryOwnerAllocationMode +{ + /// + /// The default allocation mode for pooled memory (rented buffers are not cleared). + /// + Default, + + /// + /// Clear pooled buffers when renting them. + /// + Clear, +} + +/// +/// An implementation with an embedded length and a fast accessor. +/// +/// The type of items to store in the current instance. +public struct NatsMemoryOwner : IMemoryOwner +{ + /// + /// The starting offset within . + /// + private readonly int _start; + +#pragma warning disable IDE0032 + /// + /// The usable length within (starting from ). + /// + private readonly int _length; +#pragma warning restore IDE0032 + + /// + /// The instance used to rent . + /// + private readonly ArrayPool _pool; + + /// + /// The underlying array. + /// + private T[]? _array; + + /// + /// Initializes a new instance of the class with the specified parameters. + /// + /// The length of the new memory buffer to use. + /// The instance to use. + /// Indicates the allocation mode to use for the new buffer to rent. + private NatsMemoryOwner(int length, ArrayPool pool, NatsMemoryOwnerAllocationMode mode) + { + _start = 0; + this._length = length; + this._pool = pool; + _array = pool.Rent(length); + + if (mode == NatsMemoryOwnerAllocationMode.Clear) + { + _array.AsSpan(0, length).Clear(); + } + } + + /// + /// Initializes a new instance of the class with the specified parameters. + /// + /// The starting offset within . + /// The length of the array to use. + /// The instance currently in use. + /// The input array to use. + private NatsMemoryOwner(int start, int length, ArrayPool pool, T[] array) + { + this._start = start; + this._length = length; + this._pool = pool; + this._array = array; + } + + /// + /// Gets an empty instance. + /// + public static NatsMemoryOwner Empty + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => new(0, ArrayPool.Shared, NatsMemoryOwnerAllocationMode.Default); + } + + /// + /// Gets the number of items in the current instance + /// + public int Length + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => _length; + } + + /// + /// Creates a new instance with the specified parameters. + /// + /// The length of the new memory buffer to use. + /// A instance of the requested length. + /// Thrown when is not valid. + /// This method is just a proxy for the constructor, for clarity. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static NatsMemoryOwner Allocate(int size) => new(size, ArrayPool.Shared, NatsMemoryOwnerAllocationMode.Default); + + /// + /// Creates a new instance with the specified parameters. + /// + /// The length of the new memory buffer to use. + /// The instance currently in use. + /// A instance of the requested length. + /// Thrown when is not valid. + /// This method is just a proxy for the constructor, for clarity. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static NatsMemoryOwner Allocate(int size, ArrayPool pool) => new(size, pool, NatsMemoryOwnerAllocationMode.Default); + + /// + /// Creates a new instance with the specified parameters. + /// + /// The length of the new memory buffer to use. + /// Indicates the allocation mode to use for the new buffer to rent. + /// A instance of the requested length. + /// Thrown when is not valid. + /// This method is just a proxy for the constructor, for clarity. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static NatsMemoryOwner Allocate(int size, NatsMemoryOwnerAllocationMode mode) => new(size, ArrayPool.Shared, mode); + + /// + /// Creates a new instance with the specified parameters. + /// + /// The length of the new memory buffer to use. + /// The instance currently in use. + /// Indicates the allocation mode to use for the new buffer to rent. + /// A instance of the requested length. + /// Thrown when is not valid. + /// This method is just a proxy for the constructor, for clarity. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static NatsMemoryOwner Allocate(int size, ArrayPool pool, NatsMemoryOwnerAllocationMode mode) => new(size, pool, mode); + + /// + public Memory Memory + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get + { + var array = this._array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return new(array!, _start, _length); + } + } + + /// + /// Gets a wrapping the memory belonging to the current instance. + /// + public Span Span + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get + { + var array = this._array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + ref var r0 = ref array!.DangerousGetReferenceAt(_start); + + // On .NET 6+ runtimes, we can manually create a span from the starting reference to + // skip the argument validations, which include an explicit null check, covariance check + // for the array and the actual validation for the starting offset and target length. We + // only do this on .NET 6+ as we can leverage the runtime-specific array layout to get + // a fast access to the initial element, which makes this trick worth it. Otherwise, on + // runtimes where we would need to at least access a static field to retrieve the base + // byte offset within an SZ array object, we can get better performance by just using the + // default Span constructor and paying the cost of the extra conditional branches, + // especially if T is a value type, in which case the covariance check is JIT removed. + return MemoryMarshal.CreateSpan(ref r0, _length); + } + } + + /// + /// Returns a reference to the first element within the current instance, with no bounds check. + /// + /// A reference to the first element within the current instance. + /// Thrown when the buffer in use has already been disposed. + /// + /// This method does not perform bounds checks on the underlying buffer, but does check whether + /// the buffer itself has been disposed or not. This check should not be removed, and it's also + /// the reason why the method to get a reference at a specified offset is not present. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public ref T DangerousGetReference() + { + var array = this._array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return ref array!.DangerousGetReferenceAt(_start); + } + + /// + /// Gets an instance wrapping the underlying array in use. + /// + /// An instance wrapping the underlying array in use. + /// Thrown when the buffer in use has already been disposed. + /// + /// This method is meant to be used when working with APIs that only accept an array as input, and should be used with caution. + /// In particular, the returned array is rented from an array pool, and it is responsibility of the caller to ensure that it's + /// not used after the current instance is disposed. Doing so is considered undefined behavior, + /// as the same array might be in use within another instance. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public ArraySegment DangerousGetArray() + { + var array = this._array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return new(array!, _start, _length); + } + + /// + /// Slices the buffer currently in use and returns a new instance. + /// + /// The starting offset within the current buffer. + /// The length of the buffer to use. + /// A new instance using the target range of items. + /// Thrown when the buffer in use has already been disposed. + /// Thrown when or are not valid. + /// + /// Using this method will dispose the current instance, and should only be used when an oversized + /// buffer is rented and then adjusted in size, to avoid having to rent a new buffer of the new + /// size and copy the previous items into the new one, or needing an additional variable/field + /// to manually handle to track the used range within a given instance. + /// + public NatsMemoryOwner Slice(int start, int length) + { + var array = this._array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + this._array = null; + + if ((uint)start > this._length) + { + ThrowInvalidOffsetException(); + } + + if ((uint)length > (this._length - start)) + { + ThrowInvalidLengthException(); + } + + // We're transferring the ownership of the underlying array, so the current + // instance no longer needs to be disposed. Because of this, we can manually + // suppress the finalizer to reduce the overhead on the garbage collector. + GC.SuppressFinalize(this); + + return new(start, length, _pool, array!); + } + + /// + public void Dispose() + { + var array = this._array; + + if (array is null) + { + return; + } + + this._array = null; + + _pool.Return(array); + } + + /// + public override string ToString() + { + // Normally we would throw if the array has been disposed, + // but in this case we'll just return the non formatted + // representation as a fallback, since the ToString method + // is generally expected not to throw exceptions. + if (typeof(T) == typeof(char) && + _array is char[] chars) + { + return new(chars, _start, _length); + } + + // Same representation used in Span + return $"CommunityToolkit.HighPerformance.Buffers.MemoryOwner<{typeof(T)}>[{_length}]"; + } + + /// + /// Throws an when is . + /// + private static void ThrowObjectDisposedException() + { + throw new ObjectDisposedException(nameof(NatsMemoryOwner), "The current buffer has already been disposed"); + } + + /// + /// Throws an when the is invalid. + /// + private static void ThrowInvalidOffsetException() + { + throw new ArgumentOutOfRangeException(nameof(_start), "The input start parameter was not valid"); + } + + /// + /// Throws an when the is invalid. + /// + private static void ThrowInvalidLengthException() + { + throw new ArgumentOutOfRangeException(nameof(_length), "The input length parameter was not valid"); + } +} + +internal static class NatsMemoryOwnerArrayExtensions +{ + /// + /// Returns a reference to the first element within a given array, with no bounds checks. + /// + /// The type of elements in the input array instance. + /// The input array instance. + /// A reference to the first element within , or the location it would have used, if is empty. + /// This method doesn't do any bounds checks, therefore it is responsibility of the caller to perform checks in case the returned value is dereferenced. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static ref T DangerousGetReference(this T[] array) + { + return ref MemoryMarshal.GetArrayDataReference(array); + } + + /// + /// Returns a reference to an element at a specified index within a given array, with no bounds checks. + /// + /// The type of elements in the input array instance. + /// The input array instance. + /// The index of the element to retrieve within . + /// A reference to the element within at the index specified by . + /// This method doesn't do any bounds checks, therefore it is responsibility of the caller to ensure the parameter is valid. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static ref T DangerousGetReferenceAt(this T[] array, int i) + { + ref var r0 = ref MemoryMarshal.GetArrayDataReference(array); + ref var ri = ref Unsafe.Add(ref r0, (nint)(uint)i); + + return ref ri; + } +} diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index b703b2508..d347a73b0 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -205,7 +205,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre { while (true) { - var memoryOwner = new FixedSizeMemoryOwner(MemoryPool.Shared.Rent(chunkSize), chunkSize); + var memoryOwner = NatsMemoryOwner.Allocate(chunkSize); var memory = memoryOwner.Memory; var currentChunkSize = 0; @@ -239,7 +239,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre chunks++; } - var buffer = new FixedSizeMemoryOwner(memoryOwner, currentChunkSize); + var buffer = memoryOwner.Slice(0, currentChunkSize); // Chunks var ack = await _context.PublishAsync(GetChunkSubject(nuid), buffer, cancellationToken: cancellationToken); diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs index 52fda59a5..ecc6603c7 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs @@ -31,7 +31,7 @@ public async Task Watcher_reconnect_with_history() var js2 = new NatsJSContext(nats2); var kv2 = new NatsKVContext(js2); var store2 = await kv2.CreateStoreAsync(config, cancellationToken: cancellationToken); - var watcher = await store2.WatchAsync>("k1.*", cancellationToken: cancellationToken); + var watcher = await store2.WatchAsync>("k1.*", cancellationToken: cancellationToken); await store1.PutAsync("k1.p1", 1, cancellationToken); await store1.PutAsync("k1.p1", 2, cancellationToken); @@ -41,25 +41,18 @@ public async Task Watcher_reconnect_with_history() await foreach (var entry in watcher.Entries.ReadAllAsync(cancellationToken)) { - if (entry.Value is { } memoryOwner) + using (entry.Value) { - using (memoryOwner) + if (Utf8Parser.TryParse(entry.Value.Memory.Span, out int value, out _)) { - if (Utf8Parser.TryParse(memoryOwner.Memory.Span, out int value, out _)) - { - Assert.Equal(++count, value); - if (value == 3) - break; - } - else - { - Assert.Fail("Not a number (1)"); - } + Assert.Equal(++count, value); + if (value == 3) + break; + } + else + { + Assert.Fail("Not a number (1)"); } - } - else - { - throw new Exception("Null value (1)"); } } @@ -163,7 +156,7 @@ public async Task Watcher_timeout_reconnect() var js2 = new NatsJSContext(nats2); var kv2 = new NatsKVContext(js2); var store2 = await kv2.CreateStoreAsync(bucket, cancellationToken: cancellationToken); - var watcher = await store2.WatchAsync>("k1.*", cancellationToken: cancellationToken); + var watcher = await store2.WatchAsync>("k1.*", cancellationToken: cancellationToken); // Swallow heartbeats proxy.ServerInterceptors.Add(m => m?.Contains("Idle Heartbeat") ?? false ? null : m); @@ -177,29 +170,22 @@ public async Task Watcher_timeout_reconnect() await store1.PutAsync("k1.p1", 2, cancellationToken); await store1.PutAsync("k1.p1", 3, cancellationToken); - var consumer1 = ((NatsKVWatcher>)watcher).Consumer; + var consumer1 = ((NatsKVWatcher>)watcher).Consumer; await foreach (var entry in watcher.Entries.ReadAllAsync(cancellationToken)) { - if (entry.Value is { } memoryOwner) + using (entry.Value) { - using (memoryOwner) + if (Utf8Parser.TryParse(entry.Value.Memory.Span, out int value, out _)) { - if (Utf8Parser.TryParse(memoryOwner.Memory.Span, out int value, out _)) - { - Assert.Equal(++count, value); - if (value == 3) - break; - } - else - { - Assert.Fail("Not a number (1)"); - } + Assert.Equal(++count, value); + if (value == 3) + break; + } + else + { + Assert.Fail("Not a number (1)"); } - } - else - { - throw new Exception("Null value (1)"); } } @@ -223,7 +209,7 @@ public async Task Watcher_timeout_reconnect() await Retry.Until( reason: "consumer changed", - condition: () => consumer1 != ((NatsKVWatcher>)watcher).Consumer, + condition: () => consumer1 != ((NatsKVWatcher>)watcher).Consumer, retryDelay: TimeSpan.FromSeconds(1), timeout: timeout); diff --git a/tests/NATS.Client.Perf/Program.cs b/tests/NATS.Client.Perf/Program.cs index 02417b99e..dd3ea909f 100644 --- a/tests/NATS.Client.Perf/Program.cs +++ b/tests/NATS.Client.Perf/Program.cs @@ -29,7 +29,7 @@ await nats1.PingAsync(); await nats2.PingAsync(); -await using var sub = await nats1.SubscribeAsync>(t.Subject); +await using var sub = await nats1.SubscribeAsync>(t.Subject); var stopwatch = Stopwatch.StartNew();