diff --git a/.github/workflows/perf.yml b/.github/workflows/perf.yml index 79ac4aaf8..7aa320386 100644 --- a/.github/workflows/perf.yml +++ b/.github/workflows/perf.yml @@ -47,7 +47,8 @@ jobs: - name: Setup dotnet uses: actions/setup-dotnet@v3 with: - dotnet-version: '6.x' + dotnet-version: '8.x' + dotnet-quality: 'preview' - name: Release Build run: dotnet build -c Release tests/NATS.Client.Perf/NATS.Client.Perf.csproj diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 5cedd1759..fccc3a489 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -39,9 +39,8 @@ jobs: name: Setup dotnet uses: actions/setup-dotnet@v3 with: - dotnet-version: | - 6.x - 7.x + dotnet-version: '8.x' + dotnet-quality: 'preview' - if: ${{ fromJSON(steps.tag.outputs.create) }} name: Pack diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d2c913653..94aaba1af 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -39,7 +39,8 @@ jobs: - name: Setup dotnet uses: actions/setup-dotnet@v3 with: - dotnet-version: '6.x' + dotnet-version: '8.x' + dotnet-quality: 'preview' - name: Build run: dotnet build -c Debug @@ -59,6 +60,15 @@ jobs: - name: Test Services run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.Services.Tests/NATS.Client.Services.Tests.csproj + - name: Check Native AOT + run: | + cd tests/NATS.Client.CheckNativeAot + dotnet publish -r linux-x64 -c Release -o dist + cd dist + ls -lh + file NATS.Client.CheckNativeAot + ./NATS.Client.CheckNativeAot + memory_test: name: memory test strategy: @@ -82,7 +92,8 @@ jobs: - name: Setup dotnet uses: actions/setup-dotnet@v3 with: - dotnet-version: '6.x' + dotnet-version: '8.x' + dotnet-quality: 'preview' - name: Get nats-server shell: bash diff --git a/NATS.Client.sln b/NATS.Client.sln index 8f57c7bd6..d33a223cf 100644 --- a/NATS.Client.sln +++ b/NATS.Client.sln @@ -87,6 +87,14 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Services", "sandbox EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.TlsFirst", "sandbox\Example.TlsFirst\Example.TlsFirst.csproj", "{88625045-978F-417F-9F51-A4E3A9718945}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.NativeAot", "sandbox\Example.NativeAot\Example.NativeAot.csproj", "{51362D87-49C8-414C-AAB7-E51B946231E7}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.CheckNativeAot", "tests\NATS.Client.CheckNativeAot\NATS.Client.CheckNativeAot.csproj", "{CF44A42E-C075-4C5C-BE8B-3DF266FC617B}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.ProtoBufMessages", "sandbox\Example.ProtoBufMessages\Example.ProtoBufMessages.csproj", "{9FCD9377-FE5F-4D94-BDCF-54427DB6487D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Serializers.Json", "src\NATS.Client.Serializers.Json\NATS.Client.Serializers.Json.csproj", "{5B4B30A5-941B-44F9-98C6-06F0BB2242AB}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -225,6 +233,22 @@ Global {88625045-978F-417F-9F51-A4E3A9718945}.Debug|Any CPU.Build.0 = Debug|Any CPU {88625045-978F-417F-9F51-A4E3A9718945}.Release|Any CPU.ActiveCfg = Release|Any CPU {88625045-978F-417F-9F51-A4E3A9718945}.Release|Any CPU.Build.0 = Release|Any CPU + {51362D87-49C8-414C-AAB7-E51B946231E7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {51362D87-49C8-414C-AAB7-E51B946231E7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {51362D87-49C8-414C-AAB7-E51B946231E7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {51362D87-49C8-414C-AAB7-E51B946231E7}.Release|Any CPU.Build.0 = Release|Any CPU + {CF44A42E-C075-4C5C-BE8B-3DF266FC617B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CF44A42E-C075-4C5C-BE8B-3DF266FC617B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CF44A42E-C075-4C5C-BE8B-3DF266FC617B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CF44A42E-C075-4C5C-BE8B-3DF266FC617B}.Release|Any CPU.Build.0 = Release|Any CPU + {9FCD9377-FE5F-4D94-BDCF-54427DB6487D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9FCD9377-FE5F-4D94-BDCF-54427DB6487D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9FCD9377-FE5F-4D94-BDCF-54427DB6487D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9FCD9377-FE5F-4D94-BDCF-54427DB6487D}.Release|Any CPU.Build.0 = Release|Any CPU + {5B4B30A5-941B-44F9-98C6-06F0BB2242AB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5B4B30A5-941B-44F9-98C6-06F0BB2242AB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5B4B30A5-941B-44F9-98C6-06F0BB2242AB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5B4B30A5-941B-44F9-98C6-06F0BB2242AB}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -264,6 +288,10 @@ Global {749CAE39-4C1E-4627-9E31-A36B987BC453} = {C526E8AB-739A-48D7-8FC4-048978C9B650} {DD0AB72A-D6CD-4054-A9C9-0DCA3EDBA00F} = {95A69671-16CA-4133-981C-CC381B7AAA30} {88625045-978F-417F-9F51-A4E3A9718945} = {95A69671-16CA-4133-981C-CC381B7AAA30} + {51362D87-49C8-414C-AAB7-E51B946231E7} = {95A69671-16CA-4133-981C-CC381B7AAA30} + {CF44A42E-C075-4C5C-BE8B-3DF266FC617B} = {C526E8AB-739A-48D7-8FC4-048978C9B650} + {9FCD9377-FE5F-4D94-BDCF-54427DB6487D} = {95A69671-16CA-4133-981C-CC381B7AAA30} + {5B4B30A5-941B-44F9-98C6-06F0BB2242AB} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA} diff --git a/docs/documentation/serialization.md b/docs/documentation/serialization.md new file mode 100644 index 000000000..e24f8c6d1 --- /dev/null +++ b/docs/documentation/serialization.md @@ -0,0 +1,277 @@ +# Serialization + +NATS .NET Client supports serialization of messages using a simple interface [`INatsSerializer`](xref:NATS.Client.Core.INatsSerializer). + +```csharp +public interface INatsSerializer +{ + // Serialize the value to the buffer. + void Serialize(IBufferWriter bufferWriter, T value); + + // Deserialize the value from the buffer. + T? Deserialize(in ReadOnlySequence buffer); +} +``` + +By default, the client uses the [`NatsDefaultSerializer`](xref:NATS.Client.Core.NatsDefaultSerializer) which can handle binary data, UTF8 strings and numbers. You can provide your own +serializer by implementing the [`INatsSerializer`](xref:NATS.Client.Core.INatsSerializer) interface or using the [`NatsJsonContextSerializer`](xref:NATS.Client.Core.NatsJsonContextSerializer) for generated +JSON serialization. Serializers can also be chained together to provide multiple serialization formats typically +depending on the types being used. + +## Default Serializer + +Default serializer is used when no serializer is provided to the connection options. It can handle binary data, UTF8 +strings and numbers. It uses the following rules to determine the type of the data: + +- If the data is a byte array, [`Memory`](https://learn.microsoft.com/dotnet/api/system.memory-1), [`IMemoryOwner`](https://learn.microsoft.com/dotnet/api/system.buffers.imemoryowner-1) or similar it is treated as binary data. +- If the data is a string or similar it is treated as UTF8 string. +- If the data is a primitive (for example `DateTime`, `int` or `double`. See also [`NatsUtf8PrimitivesSerializer`](xref:NATS.Client.Core.NatsUtf8PrimitivesSerializer)) it is treated as the primitive encoded as a UTF8 string. +- For any other type, the serializer will throw an exception. + +```csharp +// Same as not specifying a serializer. +var natsOpts = NatsOpts.Default with { Serializer = NatsDefaultSerializer.Default }; + +await using var nats = new NatsConnection(natsOpts); + +await using INatsSub sub = await nats.SubscribeAsync(subject: "foo"); + +// Flush the the network buffers to make sure the subscription request has been processed. +await nats.PingAsync(); + +await nats.PublishAsync(subject: "foo", data: "Hello World"); + +NatsMsg msg = await sub.Msgs.ReadAsync(); + +// Outputs 'Hello World' +Console.WriteLine(msg.Data); +``` + +The default serializer is designed to be used by developers who want to only work with binary data, and provide an out +of the box experience for basic use cases like sending and receiving UTF8 strings. + +### Using JSON Serialization with Reflection + +If you're not using [Native AOT deployments](https://learn.microsoft.com/dotnet/core/deploying/native-aot) you can use +the [`NatsJsonSerializer`](xref:NATS.Client.Core.Serializers.Json.NatsJsonSerializer) to serialize and deserialize +messages. [`NatsJsonSerializer`](xref:NATS.Client.Core.Serializers.Json.NatsJsonSerializer) uses [`System.Text.Json`](https://learn.microsoft.com/dotnet/api/system.text.json) +APIs that can work with types that are not registered to generate serialization code. + +Using this serializer is most useful for use cases where you want to send and receive JSON messages and you don't want to +worry about registering types. It's also useful for prototyping and testing. To use the serializer you need to install +the `NATS.Client.Serializers.Json` Nuget package. + +```shell +$ dotnet add package NATS.Client.Serializers.Json --prerelease +``` + +Then set the serializer as the default for the connection: + +```csharp +using NATS.Client.Serializers.Json; + +var natsOpts = NatsOpts.Default with { Serializer = NatsJsonSerializer.Default }; + +await using var nats = new NatsConnection(natsOpts); +``` + +## Using JSON Serializer Context + +The [`NatsJsonContextSerializer`](xref:NATS.Client.Core.NatsJsonContextSerializer) uses the [`System.Text.Json`](https://learn.microsoft.com/dotnet/api/system.text.json) serializer to serialize and deserialize messages. It relies +on the [`System.Text.Json` source generator](https://devblogs.microsoft.com/dotnet/try-the-new-system-text-json-source-generator/) +to generate the serialization code at compile time. This is the recommended JSON serializer for most use cases and it's +required for [Native AOT deployments](https://learn.microsoft.com/dotnet/core/deploying/native-aot). + +First you need to define your JSON classes and a context to generate the serialization code: +```csharp +public record MyData +{ + [JsonPropertyName("id")] + public int Id { get; set; } + + [JsonPropertyName("name")] + public string? Name { get; set; } +} + +[JsonSerializable(typeof(MyData))] +internal partial class MyJsonContext : JsonSerializerContext; +``` + +Then you can use the [`NatsJsonContextSerializer`](xref:NATS.Client.Core.NatsJsonContextSerializer) to serialize and deserialize messages: +```csharp +// Set the custom serializer as the default for the connection. +var natsOpts = NatsOpts.Default with { Serializer = new NatsJsonContextSerializer(MyJsonContext.Default) }; + +await using var nats = new NatsConnection(natsOpts); + +await using INatsSub sub = await nats.SubscribeAsync(subject: "foo"); + +// Flush the the network buffers to make sure the subscription request has been processed. +await nats.PingAsync(); + +await nats.PublishAsync(subject: "foo", data: new MyData { Id = 1, Name = "bar" }); + +NatsMsg msg = await sub.Msgs.ReadAsync(); + +// Outputs 'MyData { Id = 1, Name = bar }' +Console.WriteLine(msg.Data); +``` + +You can also set the serializer for a specific subscription or publish call: +```csharp +await using var nats = new NatsConnection(); + +var natsSubOpts = new NatsSubOpts { Serializer = new NatsJsonContextSerializer(MyJsonContext.Default) }; +await using INatsSub sub = await nats.SubscribeAsync(subject: "foo", opts: natsSubOpts); + +// Flush the the network buffers to make sure the subscription request has been processed. +await nats.PingAsync(); + +var natsPubOpts = new NatsPubOpts { Serializer = new NatsJsonContextSerializer(MyJsonContext.Default) }; +await nats.PublishAsync(subject: "foo", data: new MyData { Id = 1, Name = "bar" }, opts: natsPubOpts); + +NatsMsg msg = await sub.Msgs.ReadAsync(); + +// Outputs 'MyData { Id = 1, Name = bar }' +Console.WriteLine(msg.Data); +``` + +## Using Custom Serializer + +You can also provide your own serializer by implementing the [`INatsSerializer`](xref:NATS.Client.Core.INatsSerializer) interface. This is useful if you need to +support a custom serialization format or if you need to support multiple serialization formats. + +Here is an example of a custom serializer that uses the Google ProtoBuf serializer to serialize and deserialize: + +```csharp +public class MyProtoBufSerializer : INatsSerializer +{ + public static readonly INatsSerializer Default = new MyProtoBufSerializer(); + + public INatsSerializer? Next => default; + + public void Serialize(IBufferWriter bufferWriter, T value) + { + if (value is IMessage message) + { + message.WriteTo(bufferWriter); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}"); + } + } + + public T? Deserialize(in ReadOnlySequence buffer) + { + if (typeof(T) == typeof(Greeting)) + { + return (T)(object)Greeting.Parser.ParseFrom(buffer); + } + + throw new NatsException($"Can't deserialize {typeof(T)}"); + } +} +``` + +You can then use the custom serializer as the default for the connection: + +```csharp +var natsOpts = NatsOpts.Default with { Serializer = MyProtoBufSerializer.Default }; + +await using var nats = new NatsConnection(natsOpts); + +await using var sub = await nats.SubscribeAsync(subject: "foo"); + +// Flush the the network buffers to make sure the subscription request has been processed. +await nats.PingAsync(); + +await nats.PublishAsync(subject: "foo", data: new Greeting { Id = 42, Name = "Marvin" }); + +var msg = await sub.Msgs.ReadAsync(); + +// Outputs '{ "id": 42, "name": "Marvin" }' +Console.WriteLine(msg.Data); +``` + +## Using Multiple Serializers (chaining) + +You can also chain multiple serializers together to support multiple serialization formats. The first serializer in the +chain that can handle the data will be used. This is useful if you need to support multiple serialization formats and +reuse them. + +Note that chaining serializers is implemented by convention and not enforced by the [`INatsSerializer`](xref:NATS.Client.Core.INatsSerializer) +interface since the next serializer would not be exposed to external users of the interface. + +Here is an example of a serializer that uses the Google ProtoBuf serializer and the [`NatsJsonContextSerializer`](xref:NATS.Client.Core.NatsJsonContextSerializer) to +serialize and deserialize messages based on the type: + +```csharp +var serializers = new NatsJsonContextSerializer(MyJsonContext.Default, next: MyProtoBufSerializer.Default); +var natsOpts = NatsOpts.Default with { Serializer = serializers }; + +await using var nats = new NatsConnection(natsOpts); + +await using var sub1 = await nats.SubscribeAsync(subject: "greet"); +await using var sub2 = await nats.SubscribeAsync(subject: "data"); + +// Flush the the network buffers to make sure the subscription request has been processed. +await nats.PingAsync(); + +await nats.PublishAsync(subject: "greet", data: new Greeting { Id = 42, Name = "Marvin" }); +await nats.PublishAsync(subject: "data", data: new MyData { Id = 1, Name = "Bob" }); + +var msg1 = await sub1.Msgs.ReadAsync(); +var msg2 = await sub2.Msgs.ReadAsync(); + +// Outputs '{ "id": 42, "name": "Marvin" }' +Console.WriteLine(msg1.Data); + +// Outputs 'MyData { Id = 1, Name = bar }' +Console.WriteLine(msg2.Data); +``` + +## Dealing with Binary Data and Buffers + +The default serializer can handle binary data and buffers. This is typically archived by using [`IMemoryOwner`](https://learn.microsoft.com/dotnet/api/system.buffers.imemoryowner-1) +implementations. NATS .NET Client provides a [`NatsMemoryOwner`](xref:NATS.Client.Core.NatsMemoryOwner`1) implementation that can be used to allocate buffers. +The [`NatsMemoryOwner`](xref:NATS.Client.Core.NatsMemoryOwner`1) and [`NatsBufferWriter`](xref:NATS.Client.Core.NatsBufferWriter`1) (adapted from [.NET Community Toolkit](https://learn.microsoft.com/en-us/dotnet/communitytoolkit/high-performance/memoryowner)) +are [`IMemoryOwner`](https://learn.microsoft.com/dotnet/api/system.buffers.imemoryowner-1) and [`IBufferWriter`](https://learn.microsoft.com/dotnet/api/system.buffers.ibufferwriter-1) implementations that use the [`ArrayPool`](https://learn.microsoft.com/dotnet/api/system.buffers.arraypool-1) +to allocate buffers. They can be used with the default serializer. + +```csharp +// Same as not specifying a serializer. +var natsOpts = NatsOpts.Default with { Serializer = NatsDefaultSerializer.Default }; + +await using var nats = new NatsConnection(natsOpts); + +await using var sub = await nats.SubscribeAsync>(subject: "foo"); + +// Flush the the network buffers to make sure the subscription request has been processed. +await nats.PingAsync(); + +// Don't reuse NatsBufferWriter, it's disposed and returned to the pool +// by the publisher after being written to network. +var bw = new NatsBufferWriter(); +var memory = bw.GetMemory(2); +memory.Span[0] = (byte)'H'; +memory.Span[1] = (byte)'i'; +bw.Advance(2); + +await nats.PublishAsync(subject: "foo", data: bw); + +var msg = await sub.Msgs.ReadAsync(); + +// Dispose the memory owner after using it so it can be retunrned to the pool. +using (var memoryOwner = msg.Data) +{ + // Outputs 'Hi' + Console.WriteLine(Encoding.ASCII.GetString(memoryOwner.Memory.Span)); +} +``` + +Advantage of using [`NatsMemoryOwner`](xref:NATS.Client.Core.NatsMemoryOwner`1) and [`NatsBufferWriter`](xref:NATS.Client.Core.NatsBufferWriter`1) is that they can be used with the default serializer and +they can be used to allocate buffers from the [`ArrayPool`](https://learn.microsoft.com/dotnet/api/system.buffers.arraypool-1) which can be reused. This is useful if you need to allocate +buffers for binary data and you want to avoid allocating buffers on for every operation (e.g. `new byte[]`) reducing +garbage collection pressure. They may also be useful for example, if your subscription may receive messages with +different formats and the only way to determine the format is by reading the message. diff --git a/docs/documentation/toc.yml b/docs/documentation/toc.yml index 51d58d936..562361587 100644 --- a/docs/documentation/toc.yml +++ b/docs/documentation/toc.yml @@ -27,5 +27,11 @@ - name: Object Store href: object-store/intro.md +- name: Services + href: services/intro.md + +- name: Serialization + href: serialization.md + - name: Updating Documentation href: update-docs.md diff --git a/docs/index.md b/docs/index.md index bfb43d241..69568ccf8 100644 --- a/docs/index.md +++ b/docs/index.md @@ -13,8 +13,11 @@ The NATS.NET V2 client is in preview and not recommended for production use yet. - [x] KV initial support - [x] Object Store initial support - [x] Service API initial support -- [ ] .NET 8.0 support (e.g. Native AOT) +- [x] .NET 8.0 support (Native AOT) +- [ ] Implementation of missing major features (e.g. JetStream ordered consumers) - [ ] Beta phase +- [ ] Testing and bug fixing +- [ ] General Availability ## What's Next diff --git a/sandbox/Example.NativeAot/.gitignore b/sandbox/Example.NativeAot/.gitignore new file mode 100644 index 000000000..849ddff3b --- /dev/null +++ b/sandbox/Example.NativeAot/.gitignore @@ -0,0 +1 @@ +dist/ diff --git a/sandbox/Example.NativeAot/Example.NativeAot.csproj b/sandbox/Example.NativeAot/Example.NativeAot.csproj new file mode 100644 index 000000000..8383b0121 --- /dev/null +++ b/sandbox/Example.NativeAot/Example.NativeAot.csproj @@ -0,0 +1,16 @@ + + + + Exe + net8.0 + enable + enable + true + + + + + + + + diff --git a/sandbox/Example.NativeAot/Program.cs b/sandbox/Example.NativeAot/Program.cs new file mode 100644 index 000000000..abeca1a28 --- /dev/null +++ b/sandbox/Example.NativeAot/Program.cs @@ -0,0 +1,178 @@ +using System.Buffers; +using System.Text; +using System.Text.Json.Serialization; +using Google.Protobuf; +using NATS.Client.Core; + +// string +{ + // Same as not specifying a serializer. + var natsOpts = NatsOpts.Default with { Serializer = NatsDefaultSerializer.Default }; + + await using var nats = new NatsConnection(natsOpts); + + await using var sub = await nats.SubscribeAsync(subject: "foo"); + + // Flush the the network buffers to make sure the subscription request has been processed. + await nats.PingAsync(); + + await nats.PublishAsync(subject: "foo", data: "Hello World"); + + var msg = await sub.Msgs.ReadAsync(); + + // Outputs 'Hello World' + Console.WriteLine(msg.Data); +} + +// custom JSON +{ + var natsOpts = NatsOpts.Default with { Serializer = new NatsJsonContextSerializer(MyJsonContext.Default) }; + + await using var nats = new NatsConnection(natsOpts); + + await using var sub = await nats.SubscribeAsync(subject: "foo"); + + // Flush the the network buffers to make sure the subscription request has been processed. + await nats.PingAsync(); + + await nats.PublishAsync(subject: "foo", data: new MyData { Id = 1, Name = "bar" }); + + var msg = await sub.Msgs.ReadAsync(); + + // Outputs 'MyData { Id = 1, Name = bar }' + Console.WriteLine(msg.Data); +} + +// custom JSON +{ + await using var nats = new NatsConnection(); + + var natsSubOpts = new NatsSubOpts { Serializer = new NatsJsonContextSerializer(MyJsonContext.Default) }; + await using var sub = await nats.SubscribeAsync(subject: "foo", opts: natsSubOpts); + + // Flush the the network buffers to make sure the subscription request has been processed. + await nats.PingAsync(); + + var natsPubOpts = new NatsPubOpts { Serializer = new NatsJsonContextSerializer(MyJsonContext.Default) }; + await nats.PublishAsync(subject: "foo", data: new MyData { Id = 1, Name = "bar" }, opts: natsPubOpts); + + var msg = await sub.Msgs.ReadAsync(); + + // Outputs 'MyData { Id = 1, Name = bar }' + Console.WriteLine(msg.Data); +} + +// Protobuf +{ + var natsOpts = NatsOpts.Default with { Serializer = MyProtoBufSerializer.Default }; + + await using var nats = new NatsConnection(natsOpts); + + await using var sub = await nats.SubscribeAsync(subject: "foo"); + + // Flush the the network buffers to make sure the subscription request has been processed. + await nats.PingAsync(); + + await nats.PublishAsync(subject: "foo", data: new Greeting { Id = 42, Name = "Marvin" }); + + var msg = await sub.Msgs.ReadAsync(); + + // Outputs '{ "id": 42, "name": "Marvin" }' + Console.WriteLine(msg.Data); +} + +// Protobuf/JSON +{ + var serializers = new NatsJsonContextSerializer(MyJsonContext.Default, next: MyProtoBufSerializer.Default); + var natsOpts = NatsOpts.Default with { Serializer = serializers }; + + await using var nats = new NatsConnection(natsOpts); + + await using var sub1 = await nats.SubscribeAsync(subject: "greet"); + await using var sub2 = await nats.SubscribeAsync(subject: "data"); + + // Flush the the network buffers to make sure the subscription request has been processed. + await nats.PingAsync(); + + await nats.PublishAsync(subject: "greet", data: new Greeting { Id = 42, Name = "Marvin" }); + await nats.PublishAsync(subject: "data", data: new MyData { Id = 1, Name = "Bob" }); + + var msg1 = await sub1.Msgs.ReadAsync(); + var msg2 = await sub2.Msgs.ReadAsync(); + + // Outputs '{ "id": 42, "name": "Marvin" }' + Console.WriteLine(msg1.Data); + + // Outputs 'MyData { Id = 1, Name = bar }' + Console.WriteLine(msg2.Data); +} + +// Binary +{ + // Same as not specifying a serializer. + var natsOpts = NatsOpts.Default with { Serializer = NatsDefaultSerializer.Default }; + + await using var nats = new NatsConnection(natsOpts); + + await using var sub = await nats.SubscribeAsync>(subject: "foo"); + + // Flush the the network buffers to make sure the subscription request has been processed. + await nats.PingAsync(); + + var bw = new NatsBufferWriter(); + var memory = bw.GetMemory(2); + memory.Span[0] = (byte)'H'; + memory.Span[1] = (byte)'i'; + bw.Advance(2); + + await nats.PublishAsync(subject: "foo", data: bw); + + var msg = await sub.Msgs.ReadAsync(); + + using (var memoryOwner = msg.Data) + { + // Outputs 'Hi' + Console.WriteLine(Encoding.ASCII.GetString(memoryOwner.Memory.Span)); + } +} + +public class MyProtoBufSerializer : INatsSerializer +{ + public static readonly INatsSerializer Default = new MyProtoBufSerializer(); + + public INatsSerializer? Next => default; + + public void Serialize(IBufferWriter bufferWriter, T value) + { + if (value is IMessage message) + { + message.WriteTo(bufferWriter); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}"); + } + } + + public T? Deserialize(in ReadOnlySequence buffer) + { + if (typeof(T) == typeof(Greeting)) + { + return (T)(object)Greeting.Parser.ParseFrom(buffer); + } + + throw new NatsException($"Can't deserialize {typeof(T)}"); + } +} + +public record MyData +{ + [JsonPropertyName("id")] + public int Id { get; set; } + + [JsonPropertyName("name")] + public string? Name { get; set; } +} + +[JsonSerializable(typeof(MyData))] +internal partial class MyJsonContext : JsonSerializerContext; diff --git a/sandbox/Example.NativeAot/run.bat b/sandbox/Example.NativeAot/run.bat new file mode 100644 index 000000000..831272fa1 --- /dev/null +++ b/sandbox/Example.NativeAot/run.bat @@ -0,0 +1,3 @@ +del /q /s .\dist +dotnet publish -r win-x64 -c Release -o dist +.\dist\Example.NativeAot.exe diff --git a/sandbox/Example.NativeAot/run.sh b/sandbox/Example.NativeAot/run.sh new file mode 100644 index 000000000..31a7b8377 --- /dev/null +++ b/sandbox/Example.NativeAot/run.sh @@ -0,0 +1,3 @@ +rm -rf ./dist +dotnet publish -r linux-x64 -c Release -o dist +./dist/Example.NativeAot diff --git a/sandbox/Example.ProtoBufMessages/Example.ProtoBufMessages.csproj b/sandbox/Example.ProtoBufMessages/Example.ProtoBufMessages.csproj new file mode 100644 index 000000000..6d3ff0f78 --- /dev/null +++ b/sandbox/Example.ProtoBufMessages/Example.ProtoBufMessages.csproj @@ -0,0 +1,22 @@ + + + + net6.0 + enable + enable + Example.ProtoMessages + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + diff --git a/sandbox/Example.ProtoBufMessages/example/greeting.proto b/sandbox/Example.ProtoBufMessages/example/greeting.proto new file mode 100644 index 000000000..55f483233 --- /dev/null +++ b/sandbox/Example.ProtoBufMessages/example/greeting.proto @@ -0,0 +1,6 @@ +syntax = "proto3"; + +message Greeting { + int32 id = 1; + string name = 2; +} diff --git a/src/NATS.Client.Core/INatsSerializer.cs b/src/NATS.Client.Core/INatsSerializer.cs index 4d93073f6..a1e2c9357 100644 --- a/src/NATS.Client.Core/INatsSerializer.cs +++ b/src/NATS.Client.Core/INatsSerializer.cs @@ -1,52 +1,539 @@ using System.Buffers; +using System.Buffers.Text; +using System.Text; using System.Text.Json; using System.Text.Json.Serialization; +using System.Text.Json.Serialization.Metadata; namespace NATS.Client.Core; +/// +/// Serializer interface for NATS messages. +/// public interface INatsSerializer { - public INatsSerializer? Next { get; } - - int Serialize(ICountableBufferWriter bufferWriter, T? value); - + /// + /// Serialize value to buffer. + /// + /// Buffer to write the serialized data. + /// Object to be serialized. + /// Serialized object type + void Serialize(IBufferWriter bufferWriter, T value); + + /// + /// Deserialize value from buffer. + /// + /// Buffer with the serialized data. + /// Serialized object type. + /// Deserialized object T? Deserialize(in ReadOnlySequence buffer); } -public interface ICountableBufferWriter : IBufferWriter +/// +/// Default serializer for NATS messages. +/// +public static class NatsDefaultSerializer { - int WrittenCount { get; } + /// + /// Combined serializer of and set + /// as the default serializer for NATS messages. + /// + public static readonly INatsSerializer Default = new NatsRawSerializer(new NatsUtf8PrimitivesSerializer(default)); } -public static class NatsDefaultSerializer +/// +/// UTF8 serializer for strings and all the primitives. +/// +/// +/// Supported types are string, DateTime, DateTimeOffset, Guid, +/// TimeSpan, bool, byte, decimal, double, float, +/// int, long, sbyte, short, uint and ulong. +/// +public class NatsUtf8PrimitivesSerializer : INatsSerializer { - public static readonly INatsSerializer Default = new NatsRawSerializer(NatsJsonSerializer.Default); + private readonly INatsSerializer? _next; + + /// + /// Creates a new instance of . + /// + /// The next serializer in chain. + public NatsUtf8PrimitivesSerializer(INatsSerializer? next) => _next = next; + + /// + public void Serialize(IBufferWriter bufferWriter, T value) + { + if (value is string str) + { + Encoding.UTF8.GetBytes(str, bufferWriter); + return; + } + + var span = bufferWriter.GetSpan(128); + + // DateTime + { + if (value is DateTime input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // DateTimeOffset + { + if (value is DateTimeOffset input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // Guid + { + if (value is Guid input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // TimeSpan + { + if (value is TimeSpan input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // bool + { + if (value is bool input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // byte + { + if (value is byte input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // decimal + { + if (value is decimal input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // double + { + if (value is double input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // float + { + if (value is float input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // int + { + if (value is int input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // long + { + if (value is long input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // sbyte + { + if (value is sbyte input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // short + { + if (value is short input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // uint + { + if (value is uint input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + // ulong + { + if (value is ulong input) + { + if (Utf8Formatter.TryFormat(input, span, out var written)) + { + bufferWriter.Advance(written); + } + else + { + throw new NatsException($"Can't serialize {typeof(T)}, format error"); + } + + return; + } + } + + if (_next == null) + { + throw new NatsException($"Can't serialize {typeof(T)}"); + } + + _next.Serialize(bufferWriter, value); + } + + /// + public T? Deserialize(in ReadOnlySequence buffer) + { + if (typeof(T) == typeof(string)) + { + return (T)(object)Encoding.UTF8.GetString(buffer); + } + + var span = buffer.IsSingleSegment ? buffer.FirstSpan : buffer.ToArray(); + + if (typeof(T) == typeof(DateTime) || typeof(T) == typeof(DateTime?)) + { + if (Utf8Parser.TryParse(span, out DateTime value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(DateTimeOffset) || typeof(T) == typeof(DateTimeOffset?)) + { + if (Utf8Parser.TryParse(span, out DateTimeOffset value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(Guid) || typeof(T) == typeof(Guid?)) + { + if (Utf8Parser.TryParse(span, out Guid value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(TimeSpan) || typeof(T) == typeof(TimeSpan?)) + { + if (Utf8Parser.TryParse(span, out TimeSpan value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(bool) || typeof(T) == typeof(bool?)) + { + if (Utf8Parser.TryParse(span, out bool value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(byte) || typeof(T) == typeof(byte?)) + { + if (Utf8Parser.TryParse(span, out byte value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(decimal) || typeof(T) == typeof(decimal?)) + { + if (Utf8Parser.TryParse(span, out decimal value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(double) || typeof(T) == typeof(double?)) + { + if (Utf8Parser.TryParse(span, out double value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(float) || typeof(T) == typeof(float?)) + { + if (Utf8Parser.TryParse(span, out float value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(int) || typeof(T) == typeof(int?)) + { + if (Utf8Parser.TryParse(span, out int value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(long) || typeof(T) == typeof(long?)) + { + if (Utf8Parser.TryParse(span, out long value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(sbyte) || typeof(T) == typeof(sbyte?)) + { + if (Utf8Parser.TryParse(span, out sbyte value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(short) || typeof(T) == typeof(short?)) + { + if (Utf8Parser.TryParse(span, out short value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(uint) || typeof(T) == typeof(uint?)) + { + if (Utf8Parser.TryParse(span, out uint value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (typeof(T) == typeof(ulong) || typeof(T) == typeof(ulong?)) + { + if (Utf8Parser.TryParse(span, out ulong value, out _)) + { + return (T)(object)value; + } + + throw new NatsException($"Can't deserialize {typeof(T)}. Parsing error"); + } + + if (_next == null) + { + throw new NatsException($"Can't deserialize {typeof(T)}"); + } + + return _next.Deserialize(buffer); + } } +/// +/// Serializer for binary data. +/// public class NatsRawSerializer : INatsSerializer { - public NatsRawSerializer(INatsSerializer? next) => Next = next; + private readonly INatsSerializer? _next; - public INatsSerializer? Next { get; } + /// + /// Creates a new instance of . + /// + /// Next serializer in chain. + public NatsRawSerializer(INatsSerializer? next) => _next = next; - public int Serialize(ICountableBufferWriter bufferWriter, T? value) + /// + public void Serialize(IBufferWriter bufferWriter, T? value) { if (value is byte[] bytes) { bufferWriter.Write(bytes); - return bytes.Length; + return; } if (value is Memory memory) { bufferWriter.Write(memory.Span); - return memory.Length; + return; } if (value is ReadOnlyMemory readOnlyMemory) { bufferWriter.Write(readOnlyMemory.Span); - return readOnlyMemory.Length; + return; } if (value is ReadOnlySequence readOnlySequence) @@ -63,7 +550,7 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) } } - return (int)readOnlySequence.Length; + return; } if (value is IMemoryOwner memoryOwner) @@ -77,16 +564,19 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) bufferWriter.Advance(length); - return length; + return; } } - if (Next != null) - return Next.Serialize(bufferWriter, value); + if (_next == null) + { + throw new NatsException($"Can't serialize {typeof(T)}"); + } - throw new NatsException($"Can't serialize {typeof(T)}"); + _next.Serialize(bufferWriter, value); } + /// public T? Deserialize(in ReadOnlySequence buffer) { if (typeof(T) == typeof(byte[])) @@ -116,65 +606,94 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) return (T)(object)memoryOwner; } - if (Next != null) - return Next.Deserialize(buffer); + if (_next == null) + { + throw new NatsException($"Can't deserialize {typeof(T)}"); + } - throw new NatsException($"Can't deserialize {typeof(T)}"); + return _next.Deserialize(buffer); } } -public sealed class NatsJsonSerializer : INatsSerializer +/// +/// Serializer with support for . +/// +public sealed class NatsJsonContextSerializer : INatsSerializer { - private static readonly JsonWriterOptions JsonWriterOpts = new JsonWriterOptions { Indented = false, SkipValidation = true, }; + private static readonly JsonWriterOptions JsonWriterOpts = new() { Indented = false, SkipValidation = true }; [ThreadStatic] private static Utf8JsonWriter? _jsonWriter; - private readonly JsonSerializerOptions _opts; - - public NatsJsonSerializer(JsonSerializerOptions opts) => _opts = opts; - - public static NatsJsonSerializer Default { get; } = - new(new JsonSerializerOptions { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, }); + private readonly JsonSerializerContext _context; + private readonly INatsSerializer? _next; - public INatsSerializer? Next => default; + /// + /// Creates a new instance of . + /// + /// Context to use for serialization. + /// Next serializer in chain. + public NatsJsonContextSerializer(JsonSerializerContext context, INatsSerializer? next = default) + { + _context = context; + _next = next; + } - public int Serialize(ICountableBufferWriter bufferWriter, T? value) + /// + public void Serialize(IBufferWriter bufferWriter, T value) { - Utf8JsonWriter writer; - if (_jsonWriter == null) + if (_context.GetTypeInfo(typeof(T)) is JsonTypeInfo jsonTypeInfo) { - writer = _jsonWriter = new Utf8JsonWriter(bufferWriter, JsonWriterOpts); + Utf8JsonWriter writer; + if (_jsonWriter == null) + { + writer = _jsonWriter = new Utf8JsonWriter(bufferWriter, JsonWriterOpts); + } + else + { + writer = _jsonWriter; + writer.Reset(bufferWriter); + } + + JsonSerializer.Serialize(writer, value, jsonTypeInfo); + + writer.Reset(NullBufferWriter.Instance); + return; } - else + + if (_next == null) { - writer = _jsonWriter; - writer.Reset(bufferWriter); + throw new NatsException($"Can't serialize {typeof(T)}"); } - JsonSerializer.Serialize(writer, value, _opts); - - var bytesCommitted = (int)writer.BytesCommitted; - writer.Reset(NullBufferWriter.Instance); - return bytesCommitted; + _next.Serialize(bufferWriter, value); } + /// public T? Deserialize(in ReadOnlySequence buffer) { - var reader = new Utf8JsonReader(buffer); // Utf8JsonReader is ref struct, no allocate. - return JsonSerializer.Deserialize(ref reader, _opts); - } - - private sealed class NullBufferWriter : IBufferWriter - { - internal static readonly IBufferWriter Instance = new NullBufferWriter(); - - public void Advance(int count) + if (_context.GetTypeInfo(typeof(T)) is JsonTypeInfo jsonTypeInfo) { + var reader = new Utf8JsonReader(buffer); // Utf8JsonReader is ref struct, no allocate. + return JsonSerializer.Deserialize(ref reader, jsonTypeInfo); } - public Memory GetMemory(int sizeHint = 0) => Array.Empty(); + if (_next != null) + return _next.Deserialize(buffer); - public Span GetSpan(int sizeHint = 0) => Array.Empty(); + throw new NatsException($"Can't deserialize {typeof(T)}"); } } + +internal sealed class NullBufferWriter : IBufferWriter +{ + internal static readonly IBufferWriter Instance = new NullBufferWriter(); + + public void Advance(int count) + { + } + + public Memory GetMemory(int sizeHint = 0) => Array.Empty(); + + public Span GetSpan(int sizeHint = 0) => Array.Empty(); +} diff --git a/src/NATS.Client.Core/Internal/FixedArrayBufferWriter.cs b/src/NATS.Client.Core/Internal/FixedArrayBufferWriter.cs index adc8138ac..4a868f5f9 100644 --- a/src/NATS.Client.Core/Internal/FixedArrayBufferWriter.cs +++ b/src/NATS.Client.Core/Internal/FixedArrayBufferWriter.cs @@ -4,7 +4,7 @@ namespace NATS.Client.Core.Internal; // similar as ArrayBufferWriter but adds more functional for ProtocolWriter -internal sealed class FixedArrayBufferWriter : ICountableBufferWriter +internal sealed class FixedArrayBufferWriter : IBufferWriter { private byte[] _buffer; private int _written; diff --git a/src/NATS.Client.Core/NATS.Client.Core.csproj b/src/NATS.Client.Core/NATS.Client.Core.csproj index 7ced64843..9dfdc402e 100644 --- a/src/NATS.Client.Core/NATS.Client.Core.csproj +++ b/src/NATS.Client.Core/NATS.Client.Core.csproj @@ -5,6 +5,7 @@ enable enable true + true pubsub;messaging diff --git a/src/NATS.Client.Core/NatsBufferWriter.cs b/src/NATS.Client.Core/NatsBufferWriter.cs new file mode 100644 index 000000000..3e1326891 --- /dev/null +++ b/src/NATS.Client.Core/NatsBufferWriter.cs @@ -0,0 +1,420 @@ +// adapted from https://github.com/CommunityToolkit/dotnet/blob/main/src/CommunityToolkit.HighPerformance/Buffers/NatsBufferWriter%7BT%7D.cs + +using System.Buffers; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using BitOperations = System.Numerics.BitOperations; + +namespace NATS.Client.Core; + +/// +/// Represents a heap-based, array-backed output sink into which data can be written. +/// +/// The type of items to write to the current instance. +public sealed class NatsBufferWriter : IBufferWriter, IMemoryOwner +{ + /// + /// The default buffer size to use to expand empty arrays. + /// + private const int DefaultInitialBufferSize = 256; + + /// + /// The instance used to rent . + /// + private readonly ArrayPool _pool; + + /// + /// The underlying array. + /// + private T[]? _array; + +#pragma warning disable IDE0032 // Use field over auto-property (clearer and faster) + /// + /// The starting offset within . + /// + private int _index; +#pragma warning restore IDE0032 + + /// + /// Initializes a new instance of the class. + /// + public NatsBufferWriter() + : this(ArrayPool.Shared, DefaultInitialBufferSize) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The instance to use. + public NatsBufferWriter(ArrayPool pool) + : this(pool, DefaultInitialBufferSize) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The minimum capacity with which to initialize the underlying buffer. + /// Thrown when is not valid. + public NatsBufferWriter(int initialCapacity) + : this(ArrayPool.Shared, initialCapacity) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The instance to use. + /// The minimum capacity with which to initialize the underlying buffer. + /// Thrown when is not valid. + public NatsBufferWriter(ArrayPool pool, int initialCapacity) + { + // Since we're using pooled arrays, we can rent the buffer with the + // default size immediately, we don't need to use lazy initialization + // to save unnecessary memory allocations in this case. + // Additionally, we don't need to manually throw the exception if + // the requested size is not valid, as that'll be thrown automatically + // by the array pool in use when we try to rent an array with that size. + _pool = pool; + _array = pool.Rent(initialCapacity); + _index = 0; + } + + /// + Memory IMemoryOwner.Memory + { + // This property is explicitly implemented so that it's hidden + // under normal usage, as the name could be confusing when + // displayed besides WrittenMemory and GetMemory(). + // The IMemoryOwner interface is implemented primarily + // so that the AsStream() extension can be used on this type, + // allowing users to first create a NatsBufferWriter + // instance to write data to, then get a stream through the + // extension and let it take care of returning the underlying + // buffer to the shared pool when it's no longer necessary. + // Inlining is not needed here since this will always be a callvirt. + get => MemoryMarshal.AsMemory(WrittenMemory); + } + + /// + /// Gets the data written to the underlying buffer so far, as a . + /// + public ReadOnlyMemory WrittenMemory + { + get + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array!.AsMemory(0, _index); + } + } + + /// + /// Gets the data written to the underlying buffer so far, as a . + /// + public ReadOnlySpan WrittenSpan + { + get + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array!.AsSpan(0, _index); + } + } + + /// + /// Gets the amount of data written to the underlying buffer so far. + /// + public int WrittenCount + { + get => _index; + } + + /// + /// Gets the total amount of space within the underlying buffer. + /// + public int Capacity + { + get + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array!.Length; + } + } + + /// + /// Gets the amount of space available that can still be written into without forcing the underlying buffer to grow. + /// + public int FreeCapacity + { + get + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return array!.Length - _index; + } + } + + /// + /// Clears the data written to the underlying buffer. + /// + /// + /// You must clear the buffer instance before trying to re-use it. + /// + public void Clear() + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + array.AsSpan(0, _index).Clear(); + + _index = 0; + } + + /// + public void Advance(int count) + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + if (count < 0) + { + ThrowArgumentOutOfRangeExceptionForNegativeCount(); + } + + if (_index > array!.Length - count) + { + ThrowArgumentExceptionForAdvancedTooFar(); + } + + _index += count; + } + + /// + public Memory GetMemory(int sizeHint = 0) + { + CheckBufferAndEnsureCapacity(sizeHint); + + return _array.AsMemory(_index); + } + + /// + public Span GetSpan(int sizeHint = 0) + { + CheckBufferAndEnsureCapacity(sizeHint); + + return _array.AsSpan(_index); + } + + /// + /// Gets an instance wrapping the underlying array in use. + /// + /// An instance wrapping the underlying array in use. + /// Thrown when the buffer in use has already been disposed. + /// + /// This method is meant to be used when working with APIs that only accept an array as input, and should be used with caution. + /// In particular, the returned array is rented from an array pool, and it is responsibility of the caller to ensure that it's + /// not used after the current instance is disposed. Doing so is considered undefined + /// behavior, as the same array might be in use within another instance. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public ArraySegment DangerousGetArray() + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + return new(array!, 0, _index); + } + + /// + public void Dispose() + { + var array = _array; + + if (array is null) + { + return; + } + + _array = null; + + _pool.Return(array); + } + + /// + public override string ToString() + { + // See comments in MemoryOwner about this + if (typeof(T) == typeof(char) && + _array is char[] chars) + { + return new(chars, 0, _index); + } + + // Same representation used in Span + return $"CommunityToolkit.HighPerformance.Buffers.NatsBufferWriter<{typeof(T)}>[{_index}]"; + } + + /// + /// Throws an when the requested count is negative. + /// + private static void ThrowArgumentOutOfRangeExceptionForNegativeCount() + { + throw new ArgumentOutOfRangeException("count", "The count can't be a negative value."); + } + + /// + /// Throws an when the size hint is negative. + /// + private static void ThrowArgumentOutOfRangeExceptionForNegativeSizeHint() + { + throw new ArgumentOutOfRangeException("sizeHint", "The size hint can't be a negative value."); + } + + /// + /// Throws an when the requested count is negative. + /// + private static void ThrowArgumentExceptionForAdvancedTooFar() + { + throw new ArgumentException("The buffer writer has advanced too far."); + } + + /// + /// Throws an when is . + /// + private static void ThrowObjectDisposedException() + { + throw new ObjectDisposedException("The current buffer has already been disposed."); + } + + /// + /// Ensures that has enough free space to contain a given number of new items. + /// + /// The minimum number of items to ensure space for in . + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void CheckBufferAndEnsureCapacity(int sizeHint) + { + var array = _array; + + if (array is null) + { + ThrowObjectDisposedException(); + } + + if (sizeHint < 0) + { + ThrowArgumentOutOfRangeExceptionForNegativeSizeHint(); + } + + if (sizeHint == 0) + { + sizeHint = 1; + } + + if (sizeHint > array!.Length - _index) + { + ResizeBuffer(sizeHint); + } + } + + /// + /// Resizes to ensure it can fit the specified number of new items. + /// + /// The minimum number of items to ensure space for in . + [MethodImpl(MethodImplOptions.NoInlining)] + private void ResizeBuffer(int sizeHint) + { + var minimumSize = (uint)_index + (uint)sizeHint; + + // The ArrayPool class has a maximum threshold of 1024 * 1024 for the maximum length of + // pooled arrays, and once this is exceeded it will just allocate a new array every time + // of exactly the requested size. In that case, we manually round up the requested size to + // the nearest power of two, to ensure that repeated consecutive writes when the array in + // use is bigger than that threshold don't end up causing a resize every single time. + if (minimumSize > 1024 * 1024) + { + minimumSize = BitOperations.RoundUpToPowerOf2(minimumSize); + } + + _pool.Resize(ref _array, (int)minimumSize); + } +} + +internal static class NatsBufferWriterExtensions +{ + /// + /// Changes the number of elements of a rented one-dimensional array to the specified new size. + /// + /// The type of items into the target array to resize. + /// The target instance to use to resize the array. + /// The rented array to resize, or to create a new array. + /// The size of the new array. + /// Indicates whether the contents of the array should be cleared before reuse. + /// Thrown when is less than 0. + /// When this method returns, the caller must not use any references to the old array anymore. + public static void Resize(this ArrayPool pool, [NotNull] ref T[]? array, int newSize, bool clearArray = false) + { + // If the old array is null, just create a new one with the requested size + if (array is null) + { + array = pool.Rent(newSize); + + return; + } + + // If the new size is the same as the current size, do nothing + if (array.Length == newSize) + { + return; + } + + // Rent a new array with the specified size, and copy as many items from the current array + // as possible to the new array. This mirrors the behavior of the Array.Resize API from + // the BCL: if the new size is greater than the length of the current array, copy all the + // items from the original array into the new one. Otherwise, copy as many items as possible, + // until the new array is completely filled, and ignore the remaining items in the first array. + var newArray = pool.Rent(newSize); + var itemsToCopy = Math.Min(array.Length, newSize); + + Array.Copy(array, 0, newArray, 0, itemsToCopy); + + pool.Return(array, clearArray); + + array = newArray; + } +} diff --git a/src/NATS.Client.Hosting/NATS.Client.Hosting.csproj b/src/NATS.Client.Hosting/NATS.Client.Hosting.csproj index a412f3a4d..ac3ead9d1 100644 --- a/src/NATS.Client.Hosting/NATS.Client.Hosting.csproj +++ b/src/NATS.Client.Hosting/NATS.Client.Hosting.csproj @@ -5,6 +5,7 @@ enable enable true + true pubsub;messaging diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index eee5a9dd1..50481fcc3 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.Text; +using System.Text.Json.Serialization; using System.Threading.Channels; using Microsoft.Extensions.Logging; using NATS.Client.Core; @@ -153,7 +154,7 @@ public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request, return Connection.PubModelAsync( subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", data: request, - serializer: NatsJsonSerializer.Default, + serializer: NatsJSJsonSerializer.Default, replyTo: Subject, headers: default, cancellationToken); @@ -193,7 +194,7 @@ internal override IEnumerable GetReconnectCommands(int sid) replyTo: Subject, headers: default, value: request, - serializer: NatsJsonSerializer.Default, + serializer: NatsJSJsonSerializer.Default, errorHandler: default, cancellationToken: default); } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs index f72ef1cd9..a25b3a397 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSErrorAwareJsonSerializer.cs @@ -11,7 +11,7 @@ internal sealed class NatsJSErrorAwareJsonSerializer : INatsSerializer public INatsSerializer? Next => default; - public int Serialize(ICountableBufferWriter bufferWriter, T? value) => + public void Serialize(IBufferWriter bufferWriter, T? value) => throw new NotSupportedException(); public T? Deserialize(in ReadOnlySequence buffer) @@ -23,11 +23,11 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) => var jsonDocument = JsonDocument.Parse(buffer); if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement)) { - var error = errorElement.Deserialize() ?? throw new NatsJSException("Can't parse JetStream error JSON payload"); + var error = errorElement.Deserialize(NatsJSJsonSerializerContext.Default.ApiError) ?? throw new NatsJSException("Can't parse JetStream error JSON payload"); throw new NatsJSApiErrorException(error); } - return jsonDocument.Deserialize(); + return NatsJSJsonSerializer.Default.Deserialize(buffer); } } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index f01bd171d..bf30c89b4 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -120,7 +120,7 @@ public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationTo Connection.PubModelAsync( subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", data: request, - serializer: NatsJsonSerializer.Default, + serializer: NatsJSJsonSerializer.Default, replyTo: Subject, headers: default, cancellationToken); @@ -153,7 +153,7 @@ internal override IEnumerable GetReconnectCommands(int sid) replyTo: Subject, headers: default, value: request, - serializer: NatsJsonSerializer.Default, + serializer: NatsJSJsonSerializer.Default, errorHandler: default, cancellationToken: default); } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSJsonSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSJsonSerializer.cs new file mode 100644 index 000000000..3b679c6da --- /dev/null +++ b/src/NATS.Client.JetStream/Internal/NatsJSJsonSerializer.cs @@ -0,0 +1,89 @@ +using System.Text.Json.Serialization; +using NATS.Client.Core; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream.Internal; + +internal static class NatsJSJsonSerializer +{ + public static readonly INatsSerializer Default = new NatsJsonContextSerializer(NatsJSJsonSerializerContext.Default); +} + +[JsonSerializable(typeof(AccountInfoResponse))] +[JsonSerializable(typeof(AccountLimits))] +[JsonSerializable(typeof(AccountPurgeResponse))] +[JsonSerializable(typeof(AccountStats))] +[JsonSerializable(typeof(ApiError))] +[JsonSerializable(typeof(ApiStats))] +[JsonSerializable(typeof(ClusterInfo))] +[JsonSerializable(typeof(ConsumerConfiguration))] +[JsonSerializable(typeof(ConsumerCreateRequest))] +[JsonSerializable(typeof(ConsumerCreateResponse))] +[JsonSerializable(typeof(ConsumerDeleteResponse))] +[JsonSerializable(typeof(ConsumerGetnextRequest))] +[JsonSerializable(typeof(ConsumerInfo))] +[JsonSerializable(typeof(ConsumerInfoResponse))] +[JsonSerializable(typeof(ConsumerLeaderStepdownResponse))] +[JsonSerializable(typeof(ConsumerListRequest))] +[JsonSerializable(typeof(ConsumerListResponse))] +[JsonSerializable(typeof(ConsumerNamesRequest))] +[JsonSerializable(typeof(ConsumerNamesResponse))] +[JsonSerializable(typeof(ErrorResponse))] +[JsonSerializable(typeof(ExternalStreamSource))] +[JsonSerializable(typeof(IterableRequest))] +[JsonSerializable(typeof(IterableResponse))] +[JsonSerializable(typeof(LostStreamData))] +[JsonSerializable(typeof(MetaLeaderStepdownRequest))] +[JsonSerializable(typeof(MetaLeaderStepdownResponse))] +[JsonSerializable(typeof(MetaServerRemoveRequest))] +[JsonSerializable(typeof(MetaServerRemoveResponse))] +[JsonSerializable(typeof(PeerInfo))] +[JsonSerializable(typeof(Placement))] +[JsonSerializable(typeof(PubAckResponse))] +[JsonSerializable(typeof(Republish))] +[JsonSerializable(typeof(SequenceInfo))] +[JsonSerializable(typeof(SequencePair))] +[JsonSerializable(typeof(StoredMessage))] +[JsonSerializable(typeof(StreamAlternate))] +[JsonSerializable(typeof(StreamConfiguration))] +[JsonSerializable(typeof(StreamCreateRequest))] +[JsonSerializable(typeof(StreamCreateResponse))] +[JsonSerializable(typeof(StreamDeleteResponse))] +[JsonSerializable(typeof(StreamInfo))] +[JsonSerializable(typeof(StreamInfoRequest))] +[JsonSerializable(typeof(StreamInfoResponse))] +[JsonSerializable(typeof(StreamLeaderStepdownResponse))] +[JsonSerializable(typeof(StreamListRequest))] +[JsonSerializable(typeof(StreamListResponse))] +[JsonSerializable(typeof(StreamMsgDeleteRequest))] +[JsonSerializable(typeof(StreamMsgDeleteResponse))] +[JsonSerializable(typeof(StreamMsgGetRequest))] +[JsonSerializable(typeof(StreamMsgGetResponse))] +[JsonSerializable(typeof(StreamNamesRequest))] +[JsonSerializable(typeof(StreamNamesResponse))] +[JsonSerializable(typeof(StreamPurgeRequest))] +[JsonSerializable(typeof(StreamPurgeResponse))] +[JsonSerializable(typeof(StreamRemovePeerRequest))] +[JsonSerializable(typeof(StreamRemovePeerResponse))] +[JsonSerializable(typeof(StreamRestoreRequest))] +[JsonSerializable(typeof(StreamRestoreResponse))] +[JsonSerializable(typeof(StreamSnapshotRequest))] +[JsonSerializable(typeof(StreamSnapshotResponse))] +[JsonSerializable(typeof(StreamSource))] +[JsonSerializable(typeof(StreamSourceInfo))] +[JsonSerializable(typeof(StreamState))] +[JsonSerializable(typeof(StreamTemplateConfiguration))] +[JsonSerializable(typeof(StreamTemplateCreateRequest))] +[JsonSerializable(typeof(StreamTemplateCreateResponse))] +[JsonSerializable(typeof(StreamTemplateDeleteResponse))] +[JsonSerializable(typeof(StreamTemplateInfo))] +[JsonSerializable(typeof(StreamTemplateInfoResponse))] +[JsonSerializable(typeof(StreamTemplateNamesRequest))] +[JsonSerializable(typeof(StreamTemplateNamesResponse))] +[JsonSerializable(typeof(StreamUpdateRequest))] +[JsonSerializable(typeof(StreamUpdateResponse))] +[JsonSerializable(typeof(SubjectTransform))] +[JsonSerializable(typeof(Tier))] +internal partial class NatsJSJsonSerializerContext : JsonSerializerContext +{ +} diff --git a/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj b/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj index 474feaa55..697593be2 100644 --- a/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj +++ b/src/NATS.Client.JetStream/NATS.Client.JetStream.csproj @@ -5,6 +5,7 @@ enable enable true + true pubsub;messaging;persistance diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index 78b08cd8f..38234ee70 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -109,7 +109,7 @@ public async ValueTask UpdateStreamAsync( StreamUpdateRequest request, CancellationToken cancellationToken = default) { - var response = await JSRequestResponseAsync( + var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.STREAM.UPDATE.{request.Name}", request: request, cancellationToken); diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index e614a0b50..1682b8afe 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -86,7 +86,7 @@ public async ValueTask PublishAsync( data: data, headers: headers, requestOpts: opts, - replyOpts: default, + replyOpts: new NatsSubOpts { Serializer = NatsJSJsonSerializer.Default }, cancellationToken) .ConfigureAwait(false); @@ -129,7 +129,8 @@ internal async ValueTask> JSRequestAsync> JSRequestAsyncenable enable true + true pubsub;messaging;persistance;key-value;storage diff --git a/src/NATS.Client.ObjectStore/Internal/NatsObjJsonSerializer.cs b/src/NATS.Client.ObjectStore/Internal/NatsObjJsonSerializer.cs new file mode 100644 index 000000000..f81821370 --- /dev/null +++ b/src/NATS.Client.ObjectStore/Internal/NatsObjJsonSerializer.cs @@ -0,0 +1,16 @@ +using System.Text.Json.Serialization; +using NATS.Client.Core; +using NATS.Client.ObjectStore.Models; + +namespace NATS.Client.ObjectStore.Internal; + +internal static class NatsObjJsonSerializer +{ + public static readonly INatsSerializer Default = new NatsJsonContextSerializer(NatsObjJsonSerializerContext.Default); +} + +[JsonSerializable(typeof(ObjectMetadata))] +[JsonSerializable(typeof(MetaDataOptions))] +internal partial class NatsObjJsonSerializerContext : JsonSerializerContext +{ +} diff --git a/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs index ba5509bef..14b75e564 100644 --- a/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs +++ b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs @@ -68,10 +68,10 @@ public record ObjectMetadata /// [JsonPropertyName("options")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] - public Options Options { get; set; } = default!; + public MetaDataOptions Options { get; set; } = default!; } -public record Options +public record MetaDataOptions { /// /// Max chunk size diff --git a/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj b/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj index d0ce22d99..f97d97a80 100644 --- a/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj +++ b/src/NATS.Client.ObjectStore/NATS.Client.ObjectStore.csproj @@ -5,6 +5,7 @@ enable enable true + true pubsub;messaging;persistance;storage diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 8736d2e4f..578a0c058 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -182,7 +182,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre if (meta.Options == null!) { - meta.Options = new Options { MaxChunkSize = DefaultChunkSize }; + meta.Options = new MetaDataOptions { MaxChunkSize = DefaultChunkSize }; } if (meta.Options.MaxChunkSize == 0) @@ -296,7 +296,7 @@ public async ValueTask GetInfoAsync(string key, bool showDeleted { var response = await _stream.GetAsync(request, cancellationToken); - var data = NatsJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata"); + var data = NatsObjJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata"); if (!showDeleted && data.Deleted) { @@ -350,7 +350,7 @@ public async ValueTask DeleteAsync(string key, CancellationToken cancellationTok private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken) { - var ack = await _context.PublishAsync(GetMetaSubject(meta.Name), meta, headers: NatsRollupHeaders, cancellationToken: cancellationToken); + var ack = await _context.PublishAsync(GetMetaSubject(meta.Name), meta, opts: new NatsPubOpts { Serializer = NatsObjJsonSerializer.Default }, headers: NatsRollupHeaders, cancellationToken: cancellationToken); ack.EnsureSuccess(); } diff --git a/src/NATS.Client.Serializers.Json/NATS.Client.Serializers.Json.csproj b/src/NATS.Client.Serializers.Json/NATS.Client.Serializers.Json.csproj new file mode 100644 index 000000000..fc32cdb7b --- /dev/null +++ b/src/NATS.Client.Serializers.Json/NATS.Client.Serializers.Json.csproj @@ -0,0 +1,19 @@ + + + + net6.0 + enable + enable + true + + + pubsub;messaging + NATS client generic JSON serializer. Not suitable for native AOT deployments. + true + + + + + + + diff --git a/src/NATS.Client.Serializers.Json/NatsJsonSerializer.cs b/src/NATS.Client.Serializers.Json/NatsJsonSerializer.cs new file mode 100644 index 000000000..96122f154 --- /dev/null +++ b/src/NATS.Client.Serializers.Json/NatsJsonSerializer.cs @@ -0,0 +1,72 @@ +using System.Buffers; +using System.Text.Json; +using System.Text.Json.Serialization; +using NATS.Client.Core; + +namespace NATS.Client.Serializers.Json; + +/// +/// Reflection based JSON serializer for NATS. +/// +/// +/// This serializer is not suitable for native AOT deployments since it might rely on reflection +/// +public sealed class NatsJsonSerializer : INatsSerializer +{ + private static readonly JsonWriterOptions JsonWriterOpts = new() { Indented = false, SkipValidation = true, }; + + [ThreadStatic] + private static Utf8JsonWriter? _jsonWriter; + + private readonly JsonSerializerOptions _opts; + + /// + /// Creates a new instance of with the specified options. + /// + /// Serialization options + public NatsJsonSerializer(JsonSerializerOptions opts) => _opts = opts; + + /// + /// Default instance of with option set to ignore null values when writing. + /// + public static NatsJsonSerializer Default { get; } = new(new JsonSerializerOptions { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull }); + + /// + public void Serialize(IBufferWriter bufferWriter, T? value) + { + Utf8JsonWriter writer; + if (_jsonWriter == null) + { + writer = _jsonWriter = new Utf8JsonWriter(bufferWriter, JsonWriterOpts); + } + else + { + writer = _jsonWriter; + writer.Reset(bufferWriter); + } + + JsonSerializer.Serialize(writer, value, _opts); + + writer.Reset(NullBufferWriter.Instance); + } + + /// + public T? Deserialize(in ReadOnlySequence buffer) + { + var reader = new Utf8JsonReader(buffer); // Utf8JsonReader is ref struct, no allocate. + return JsonSerializer.Deserialize(ref reader, _opts); + } + + private sealed class NullBufferWriter : IBufferWriter + { + internal static readonly IBufferWriter Instance = new NullBufferWriter(); + + public void Advance(int count) + { + } + + public Memory GetMemory(int sizeHint = 0) => Array.Empty(); + + public Span GetSpan(int sizeHint = 0) => Array.Empty(); + } +} diff --git a/src/NATS.Client.Services/Internal/NatsSrvJsonSerializer.cs b/src/NATS.Client.Services/Internal/NatsSrvJsonSerializer.cs new file mode 100644 index 000000000..293638892 --- /dev/null +++ b/src/NATS.Client.Services/Internal/NatsSrvJsonSerializer.cs @@ -0,0 +1,19 @@ +using System.Text.Json.Serialization; +using NATS.Client.Core; +using NATS.Client.Services.Models; + +namespace NATS.Client.Services.Internal; + +internal class NatsSrvJsonSerializer +{ + public static readonly INatsSerializer Default = new NatsJsonContextSerializer(NatsSrvJsonSerializerContext.Default); +} + +[JsonSerializable(typeof(InfoResponse))] +[JsonSerializable(typeof(EndpointInfo))] +[JsonSerializable(typeof(PingResponse))] +[JsonSerializable(typeof(StatsResponse))] +[JsonSerializable(typeof(EndpointStats))] +internal partial class NatsSrvJsonSerializerContext : JsonSerializerContext +{ +} diff --git a/src/NATS.Client.Services/NATS.Client.Services.csproj b/src/NATS.Client.Services/NATS.Client.Services.csproj index 5a9b4940f..6ae07c76e 100644 --- a/src/NATS.Client.Services/NATS.Client.Services.csproj +++ b/src/NATS.Client.Services/NATS.Client.Services.csproj @@ -5,6 +5,7 @@ enable enable true + true pubsub;messaging;microservices;services @@ -14,6 +15,8 @@ + + diff --git a/src/NATS.Client.Services/NatsSvcServer.cs b/src/NATS.Client.Services/NatsSvcServer.cs index b77d656aa..e4eaa324f 100644 --- a/src/NATS.Client.Services/NatsSvcServer.cs +++ b/src/NATS.Client.Services/NatsSvcServer.cs @@ -162,6 +162,7 @@ private async Task MsgLoop() await svcMsg.Msg.ReplyAsync( new PingResponse { Name = _config.Name, Id = _id, Version = _config.Version, }, + opts: new NatsPubOpts { Serializer = NatsSrvJsonSerializer.Default }, cancellationToken: _cancellationToken); } else if (type == SvcMsgType.Info) @@ -189,6 +190,7 @@ await svcMsg.Msg.ReplyAsync( Metadata = _config.Metadata!, Endpoints = endPoints, }, + opts: new NatsPubOpts { Serializer = NatsSrvJsonSerializer.Default }, cancellationToken: _cancellationToken); } else if (type == SvcMsgType.Stats) @@ -237,6 +239,7 @@ await svcMsg.Msg.ReplyAsync( await svcMsg.Msg.ReplyAsync( response, + opts: new NatsPubOpts { Serializer = NatsSrvJsonSerializer.Default }, cancellationToken: _cancellationToken); } } diff --git a/tests/NATS.Client.CheckNativeAot/NATS.Client.CheckNativeAot.csproj b/tests/NATS.Client.CheckNativeAot/NATS.Client.CheckNativeAot.csproj new file mode 100644 index 000000000..9b9ae815c --- /dev/null +++ b/tests/NATS.Client.CheckNativeAot/NATS.Client.CheckNativeAot.csproj @@ -0,0 +1,26 @@ + + + + Exe + net8.0 + enable + enable + true + + + + + + + + + + + + + + + + + + diff --git a/tests/NATS.Client.CheckNativeAot/Program.cs b/tests/NATS.Client.CheckNativeAot/Program.cs new file mode 100644 index 000000000..239beb9a9 --- /dev/null +++ b/tests/NATS.Client.CheckNativeAot/Program.cs @@ -0,0 +1,481 @@ +using System.Text; +using System.Text.Json.Nodes; +using NATS.Client.Core; +using NATS.Client.Core.Tests; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using NATS.Client.KeyValueStore; +using NATS.Client.ObjectStore; +using NATS.Client.ObjectStore.Models; +using NATS.Client.Services; +using NATS.Client.Services.Internal; +using NATS.Client.Services.Models; + +Log("Starting..."); + +await RequestReplyTests(); +await JetStreamTests(); +await KVTests(); +await ObjectStoreTests(); +await ServicesTests(); +await ServicesTests2(); + +Log("Bye"); + +void Log(string message) +{ + Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} {message}"); +} + +async Task RequestReplyTests() +{ + Log("Request reply tests..."); + + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(); + + var sub = await nats.SubscribeAsync("foo"); + var reg = sub.Register(async msg => + { + await msg.ReplyAsync(msg.Data * 2); + await msg.ReplyAsync(msg.Data * 3); + await msg.ReplyAsync(msg.Data * 4); + await msg.ReplyAsync(null); // sentinel + }); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var results = new[] { 2, 3, 4 }; + var count = 0; + await foreach (var msg in nats.RequestManyAsync("foo", 1, cancellationToken: cts.Token)) + { + Assert.Equal(results[count++], msg.Data); + } + + Assert.Equal(3, count); + + await sub.DisposeAsync(); + await reg; + + Log("OK"); +} + +async Task JetStreamTests() +{ + Log("JetStream tests..."); + + await using var server = NatsServer.StartJS(); + var nats = server.CreateClientConnection(); + + // Happy user + { + var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var js = new NatsJSContext(nats); + + // Create stream + var stream = await js.CreateStreamAsync( + request: new StreamConfiguration { Name = "events", Subjects = new[] { "events.*" }, }, + cancellationToken: cts1.Token); + Assert.Equal("events", stream.Info.Config.Name); + + // Create consumer + var consumer = await js.CreateConsumerAsync( + new ConsumerCreateRequest + { + StreamName = "events", + Config = new ConsumerConfiguration + { + Name = "consumer1", + DurableName = "consumer1", + + // Turn on ACK so we can test them below + AckPolicy = ConsumerConfigurationAckPolicy.@explicit, + }, + }, + cts1.Token); + Assert.Equal("events", consumer.Info.StreamName); + Assert.Equal("consumer1", consumer.Info.Config.Name); + + // Publish + var ack = await js.PublishAsync("events.foo", new TestData { Test = 1 }, opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts1.Token); + Assert.Null(ack.Error); + Assert.Equal("events", ack.Stream); + Assert.Equal(1, ack.Seq); + Assert.False(ack.Duplicate); + + // Message ID + ack = await js.PublishAsync( + "events.foo", + new TestData { Test = 2 }, + opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, + headers: new NatsHeaders { { "Nats-Msg-Id", "test2" } }, + cancellationToken: cts1.Token); + Assert.Null(ack.Error); + Assert.Equal("events", ack.Stream); + Assert.Equal(2, ack.Seq); + Assert.False(ack.Duplicate); + + // Duplicate + ack = await js.PublishAsync( + "events.foo", + new TestData { Test = 2 }, + opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, + headers: new NatsHeaders { { "Nats-Msg-Id", "test2" } }, + cancellationToken: cts1.Token); + Assert.Null(ack.Error); + Assert.Equal("events", ack.Stream); + Assert.Equal(2, ack.Seq); + Assert.True(ack.Duplicate); + + // Consume + var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var messages = new List>(); + var cc = await consumer.ConsumeAsync( + new NatsJSConsumeOpts { MaxMsgs = 100, Serializer = TestDataJsonSerializer.Default }, + cancellationToken: cts2.Token); + await foreach (var msg in cc.Msgs.ReadAllAsync(cts2.Token)) + { + messages.Add(msg); + + // Only ACK one message so we can consume again + if (messages.Count == 1) + { + await msg.AckAsync(new AckOpts(WaitUntilSent: true), cancellationToken: cts2.Token); + } + + if (messages.Count == 2) + { + break; + } + } + + Assert.Equal(2, messages.Count); + Assert.Equal("events.foo", messages[0].Subject); + Assert.Equal("events.foo", messages[1].Subject); + } + + // Handle errors + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var js = new NatsJSContext(nats); + var exception = await Assert.ThrowsAsync(async () => + { + await js.CreateStreamAsync( + request: new StreamConfiguration + { + Name = "events2", + Subjects = new[] { "events.*" }, + }, + cancellationToken: cts.Token); + }); + Assert.Equal(400, exception.Error.Code); + + // subjects overlap with an existing stream + Assert.Equal(10065, exception.Error.ErrCode); + } + + // Delete stream + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var js = new NatsJSContext(nats); + + // Success + await js.DeleteStreamAsync("events", cts.Token); + + // Error + var exception = await Assert.ThrowsAsync(async () => + { + await js.DeleteStreamAsync("events2", cts.Token); + }); + + Assert.Equal(404, exception.Error.Code); + + // stream not found + Assert.Equal(10059, exception.Error.ErrCode); + } + + Log("OK"); +} + +async Task KVTests() +{ + Log("KV tests..."); + + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + + var js = new NatsJSContext(nats); + var kv = new NatsKVContext(js); + + var store = await kv.CreateStoreAsync("b1"); + + await store.PutAsync("k1", "v1"); + + var entry = await store.GetEntryAsync("k1"); + Assert.Equal("v1", entry.Value); + + Log("OK"); +} + +async Task ObjectStoreTests() +{ + Log("Object store tests..."); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + var ob = new NatsObjContext(js); + + var store = await ob.CreateObjectStore(new NatsObjConfig("b1"), cancellationToken); + + var stringBuilder = new StringBuilder(); + for (var i = 0; i < 9; i++) + { + stringBuilder.Append($"{i:D2}-4567890"); + } + + var buffer90 = stringBuilder.ToString(); + + // square buffer: all chunks are the same size + { + var meta = new ObjectMetadata { Name = "k1", Options = new MetaDataOptions { MaxChunkSize = 10 }, }; + var buffer = Encoding.ASCII.GetBytes(buffer90); + var stream = new MemoryStream(buffer); + await store.PutAsync(meta, stream, cancellationToken: cancellationToken); + } + + { + var memoryStream = new MemoryStream(); + await store.GetAsync("k1", memoryStream, cancellationToken: cancellationToken); + await memoryStream.FlushAsync(cancellationToken); + var buffer = memoryStream.ToArray(); + Assert.Equal(buffer90, Encoding.ASCII.GetString(buffer)); + } + + // buffer with smaller last chunk + { + var meta = new ObjectMetadata { Name = "k2", Options = new MetaDataOptions { MaxChunkSize = 10 }, }; + var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45"); + var stream = new MemoryStream(buffer); + await store.PutAsync(meta, stream, cancellationToken: cancellationToken); + } + + { + var memoryStream = new MemoryStream(); + await store.GetAsync("k2", memoryStream, cancellationToken: cancellationToken); + await memoryStream.FlushAsync(cancellationToken); + var buffer = memoryStream.ToArray(); + Assert.Equal(buffer90 + "09-45", Encoding.ASCII.GetString(buffer)); + } + + Log("OK"); +} + +async Task ServicesTests() +{ + Log("Services tests..."); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(); + var svc = new NatsSvcContext(nats); + + await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken); + + await s1.AddEndpointAsync( + name: "baz", + subject: "foo.baz", + handler: m => ValueTask.CompletedTask, + cancellationToken: cancellationToken); + + await s1.AddEndpointAsync( + subject: "foo.bar1", + handler: m => ValueTask.CompletedTask, + cancellationToken: cancellationToken); + + var grp1 = await s1.AddGroupAsync("grp1", cancellationToken: cancellationToken); + + await grp1.AddEndpointAsync( + name: "e1", + handler: m => ValueTask.CompletedTask, + cancellationToken: cancellationToken); + + await grp1.AddEndpointAsync( + name: "e2", + subject: "foo.bar2", + handler: m => ValueTask.CompletedTask, + cancellationToken: cancellationToken); + + var grp2 = await s1.AddGroupAsync(string.Empty, queueGroup: "q_empty", cancellationToken: cancellationToken); + + await grp2.AddEndpointAsync( + name: "empty1", + subject: "foo.empty1", + handler: m => ValueTask.CompletedTask, + cancellationToken: cancellationToken); + + // Check that the endpoints are registered correctly + { + var info = (await FindServices(nats, "$SRV.INFO.s1", 1, cancellationToken)).First(); + Assert.Equal(5, info.Endpoints.Count); + var endpoints = info.Endpoints.ToList(); + + Assert.Equal("foo.baz", info.Endpoints.First(e => e.Name == "baz").Subject); + Assert.Equal("q", info.Endpoints.First(e => e.Name == "baz").QueueGroup); + + Assert.Equal("foo.bar1", info.Endpoints.First(e => e.Name == "foo.bar1").Subject); + Assert.Equal("q", info.Endpoints.First(e => e.Name == "foo.bar1").QueueGroup); + + Assert.Equal("grp1.e1", info.Endpoints.First(e => e.Name == "grp1.e1").Subject); + Assert.Equal("q", info.Endpoints.First(e => e.Name == "grp1.e1").QueueGroup); + + Assert.Equal("grp1.foo.bar2", info.Endpoints.First(e => e.Name == "grp1.e2").Subject); + Assert.Equal("q", info.Endpoints.First(e => e.Name == "grp1.e2").QueueGroup); + + Assert.Equal("foo.empty1", info.Endpoints.First(e => e.Name == "empty1").Subject); + Assert.Equal("q_empty", info.Endpoints.First(e => e.Name == "empty1").QueueGroup); + } + + await using var s2 = await svc.AddServiceAsync( + new NatsSvcConfig("s2", "2.0.0") + { + Description = "es-two", + QueueGroup = "q2", + Metadata = new Dictionary { { "k1", "v1" }, { "k2", "v2" }, }, + StatsHandler = () => JsonNode.Parse("{\"stat-k1\":\"stat-v1\",\"stat-k2\":\"stat-v2\"}")!, + }, + cancellationToken: cancellationToken); + + await s2.AddEndpointAsync( + name: "s2baz", + subject: "s2foo.baz", + handler: m => ValueTask.CompletedTask, + metadata: new Dictionary { { "ep-k1", "ep-v1" } }, + cancellationToken: cancellationToken); + + // Check default queue group and stats handler + { + var info = (await FindServices(nats, "$SRV.INFO.s2", 1, cancellationToken)).First(); + Assert.Single(info.Endpoints); + var epi = info.Endpoints.First(); + + Assert.Equal("s2baz", epi.Name); + Assert.Equal("s2foo.baz", epi.Subject); + Assert.Equal("q2", epi.QueueGroup); + Assert.Equal("ep-v1", epi.Metadata["ep-k1"]); + + var stat = (await FindServices(nats, "$SRV.STATS.s2", 1, cancellationToken)).First(); + Assert.Equal("v1", stat.Metadata["k1"]); + Assert.Equal("v2", stat.Metadata["k2"]); + Assert.Single(stat.Endpoints); + var eps = stat.Endpoints.First(); + Assert.Equal("stat-v1", eps.Data["stat-k1"]?.GetValue()); + Assert.Equal("stat-v2", eps.Data["stat-k2"]?.GetValue()); + } + + Log("OK"); +} + +async Task ServicesTests2() +{ + Log("Services tests 2..."); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10_0000)); + var cancellationToken = cts.Token; + + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(); + var svc = new NatsSvcContext(nats); + + await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken); + + await s1.AddEndpointAsync( + name: "e1", + handler: async m => + { + if (m.Data == 7) + { + await m.ReplyErrorAsync(m.Data, $"Error{m.Data}", cancellationToken: cancellationToken); + return; + } + + if (m.Data == 8) + { + throw new NatsSvcEndpointException(m.Data, $"Error{m.Data}"); + } + + if (m.Data == 9) + { + throw new Exception("this won't be exposed"); + } + + await m.ReplyAsync(m.Data * m.Data, cancellationToken: cancellationToken); + }, + cancellationToken: cancellationToken); + + var info = (await FindServices(nats, "$SRV.INFO", 1, cancellationToken)).First(); + Assert.Single(info.Endpoints); + var endpointInfo = info.Endpoints.First(); + Assert.Equal("e1", endpointInfo.Name); + + for (var i = 0; i < 10; i++) + { + var response = await nats.RequestAsync(endpointInfo.Subject, i, cancellationToken: cancellationToken); + if (i is 7 or 8) + { + Assert.Equal($"{i}", response?.Headers?["Nats-Service-Error-Code"]); + Assert.Equal($"Error{i}", response?.Headers?["Nats-Service-Error"]); + } + else if (i is 9) + { + Assert.Equal("999", response?.Headers?["Nats-Service-Error-Code"]); + Assert.Equal("Handler error", response?.Headers?["Nats-Service-Error"]); + } + else + { + Assert.Equal(i * i, response?.Data); + Assert.Null(response?.Headers); + } + } + + var stat = (await FindServices(nats, "$SRV.STATS", 1, cancellationToken)).First(); + Assert.Single(stat.Endpoints); + var endpointStats = stat.Endpoints.First(); + Assert.Equal("e1", endpointStats.Name); + Assert.Equal(10, endpointStats.NumRequests); + Assert.Equal(3, endpointStats.NumErrors); + Assert.Equal("Handler error (999)", endpointStats.LastError); + Assert.True(endpointStats.ProcessingTime > 0); + Assert.True(endpointStats.AverageProcessingTime > 0); + + Log("OK"); +} + +static async Task> FindServices(NatsConnection nats, string subject, int limit, CancellationToken ct) +{ + var replyOpts = new NatsSubOpts + { + Timeout = TimeSpan.FromSeconds(2), + Serializer = NatsSrvJsonSerializer.Default, + }; + var responses = new List(); + + var count = 0; + await foreach (var msg in nats.RequestManyAsync(subject, null, replyOpts: replyOpts, cancellationToken: ct).ConfigureAwait(false)) + { + responses.Add(msg.Data!); + if (++count == limit) + break; + } + + if (count != limit) + { + throw new Exception($"Find service error: Expected {limit} responses but got {count}"); + } + + return responses; +} diff --git a/tests/NATS.Client.CheckNativeAot/TestDataJsonSerializer.cs b/tests/NATS.Client.CheckNativeAot/TestDataJsonSerializer.cs new file mode 100644 index 000000000..b2747a361 --- /dev/null +++ b/tests/NATS.Client.CheckNativeAot/TestDataJsonSerializer.cs @@ -0,0 +1,17 @@ +using System.Text.Json.Serialization; +using NATS.Client.Core; + +public static class TestDataJsonSerializer +{ + public static readonly INatsSerializer Default = new NatsJsonContextSerializer(TestDataJsonSerializerContext.Default); +} + +public record TestData +{ + public int Test { get; set; } +} + +[JsonSerializable(typeof(TestData))] +public partial class TestDataJsonSerializerContext : JsonSerializerContext +{ +} diff --git a/tests/NATS.Client.Core.Tests/JsonSerializerTests.cs b/tests/NATS.Client.Core.Tests/JsonSerializerTests.cs new file mode 100644 index 000000000..349070668 --- /dev/null +++ b/tests/NATS.Client.Core.Tests/JsonSerializerTests.cs @@ -0,0 +1,51 @@ +using NATS.Client.Serializers.Json; + +namespace NATS.Client.Core.Tests; + +public class JsonSerializerTests +{ + private readonly ITestOutputHelper _output; + + public JsonSerializerTests(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Serialize_any_type() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + var natsOpts = NatsOpts.Default with + { + Serializer = NatsJsonSerializer.Default, + }; + + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(natsOpts); + + await using var sub = await nats.SubscribeAsync("foo", cancellationToken: cancellationToken); + await nats.PingAsync(cancellationToken); + await nats.PublishAsync("foo", new SomeTestData { Name = "bar" }, cancellationToken: cancellationToken); + + var msg = await sub.Msgs.ReadAsync(cancellationToken); + Assert.Equal("bar", msg.Data?.Name); + + // Default serializer won't work with random types + await using var nats1 = server.CreateClientConnection(); + + var signal = new WaitSignal(); + + await nats1.PublishAsync( + subject: "would.not.work", + data: new SomeTestData { Name = "won't work" }, + opts: new NatsPubOpts { ErrorHandler = e => signal.Pulse(e) }, + cancellationToken: cancellationToken); + + var exception = await signal; + Assert.Matches(@"Can't serialize .*SomeTestData", exception.Message); + } + + private class SomeTestData + { + public string? Name { get; set; } + } +} diff --git a/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs b/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs index 11de5c7c6..d9a4dea92 100644 --- a/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs +++ b/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs @@ -28,7 +28,7 @@ await Retry.Until( for (var i = 0; i < 10; i++) { var headers = new NatsHeaders { { "X-Test", $"value-{i}" } }; - await nats.PubModelAsync($"foo.data{i}", i, NatsJsonSerializer.Default, "bar", headers); + await nats.PubModelAsync($"foo.data{i}", i, NatsDefaultSerializer.Default, "bar", headers); } await nats.PubAsync("foo.done"); diff --git a/tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj b/tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj index d6a42e910..f6ac07860 100644 --- a/tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj +++ b/tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj @@ -35,6 +35,7 @@ + diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs index b31aeee8b..905a3f415 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.cs @@ -1,5 +1,6 @@ using System.Diagnostics; using System.Text; +using System.Text.Json.Serialization; using System.Threading.Channels; using Xunit.Sdk; @@ -57,7 +58,7 @@ public async Task EncodingTest() { await using var server = NatsServer.Start(_output, _transportType); - var serializer1 = NatsOpts.Default.Serializer; + var serializer1 = new NatsJsonContextSerializer(SimpleClassJsonSerializerContext.Default); foreach (var serializer in new INatsSerializer[] { serializer1 }) { @@ -324,6 +325,11 @@ await Retry.Until( } } +[JsonSerializable(typeof(SampleClass))] +public partial class SimpleClassJsonSerializerContext : JsonSerializerContext +{ +} + public class SampleClass : IEquatable { public SampleClass(int id, string name) diff --git a/tests/NATS.Client.Core.Tests/SerializerTest.cs b/tests/NATS.Client.Core.Tests/SerializerTest.cs index 8128b4905..12bad03fe 100644 --- a/tests/NATS.Client.Core.Tests/SerializerTest.cs +++ b/tests/NATS.Client.Core.Tests/SerializerTest.cs @@ -62,7 +62,7 @@ public class TestSerializer : INatsSerializer { public INatsSerializer? Next => default; - public int Serialize(ICountableBufferWriter bufferWriter, T? value) => throw new TestSerializerException(); + public void Serialize(IBufferWriter bufferWriter, T? value) => throw new TestSerializerException(); public T? Deserialize(in ReadOnlySequence buffer) => throw new TestSerializerException(); } diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index 6136ed588..0709b937e 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -30,11 +30,11 @@ public async Task Consume_msgs_test() for (var i = 0; i < 30; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); ack.EnsureSuccess(); } - var consumerOpts = new NatsJSConsumeOpts { MaxMsgs = 10 }; + var consumerOpts = new NatsJSConsumeOpts { MaxMsgs = 10, Serializer = TestDataJsonSerializer.Default }; var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; await using var cc = await consumer.ConsumeAsync(consumerOpts, cancellationToken: cts.Token); @@ -91,7 +91,7 @@ public async Task Consume_idle_heartbeat_test() await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); - var ack = await js.PublishAsync("s1.foo", new TestData { Test = 0 }, cancellationToken: cts.Token); + var ack = await js.PublishAsync("s1.foo", new TestData { Test = 0 }, opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); ack.EnsureSuccess(); var signal = new WaitSignal(TimeSpan.FromSeconds(30)); @@ -107,6 +107,7 @@ public async Task Consume_idle_heartbeat_test() { MaxMsgs = 10, IdleHeartbeat = TimeSpan.FromSeconds(5), + Serializer = TestDataJsonSerializer.Default, }; var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; @@ -169,6 +170,7 @@ public async Task Consume_reconnect_test() var consumerOpts = new NatsJSConsumeOpts { MaxMsgs = 10, + Serializer = TestDataJsonSerializer.Default, }; var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); @@ -195,7 +197,7 @@ public async Task Consume_reconnect_test() // Send a message before reconnect { - var ack = await js2.PublishAsync("s1.foo", new TestData { Test = 0 }, cancellationToken: cts.Token); + var ack = await js2.PublishAsync("s1.foo", new TestData { Test = 0 }, opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); ack.EnsureSuccess(); } @@ -215,7 +217,7 @@ await Retry.Until( // Send a message to be received after reconnect { - var ack = await js2.PublishAsync("s1.foo", new TestData { Test = 1 }, cancellationToken: cts.Token); + var ack = await js2.PublishAsync("s1.foo", new TestData { Test = 1 }, opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); ack.EnsureSuccess(); } @@ -244,11 +246,12 @@ public async Task Consume_dispose_test() MaxMsgs = 10, IdleHeartbeat = TimeSpan.FromSeconds(5), Expires = TimeSpan.FromSeconds(10), + Serializer = TestDataJsonSerializer.Default, }; for (var i = 0; i < 100; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); ack.EnsureSuccess(); } @@ -300,11 +303,12 @@ public async Task Consume_stop_test() MaxMsgs = 10, IdleHeartbeat = TimeSpan.FromSeconds(2), Expires = TimeSpan.FromSeconds(4), + Serializer = TestDataJsonSerializer.Default, }; for (var i = 0; i < 100; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); ack.EnsureSuccess(); } @@ -335,9 +339,4 @@ public async Task Consume_stop_test() Assert.True(infos[0].NumAckPending == 0); } - - private record TestData - { - public int Test { get; init; } - } } diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs index f369dbc92..cc891ea8d 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -21,14 +21,14 @@ public async Task Fetch_test() for (var i = 0; i < 10; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); ack.EnsureSuccess(); } var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; await using var fc = - await consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token); + await consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 10, Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token)) { await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); @@ -51,13 +51,13 @@ public async Task FetchNoWait_test() for (var i = 0; i < 10; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); ack.EnsureSuccess(); } var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; - await foreach (var msg in consumer.FetchAllNoWaitAsync(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token)) + await foreach (var msg in consumer.FetchAllNoWaitAsync(new NatsJSFetchOpts { MaxMsgs = 10, Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token)) { await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); Assert.Equal(count, msg.Data!.Test); @@ -84,11 +84,12 @@ public async Task Fetch_dispose_test() MaxMsgs = 10, IdleHeartbeat = TimeSpan.FromSeconds(5), Expires = TimeSpan.FromSeconds(10), + Serializer = TestDataJsonSerializer.Default, }; for (var i = 0; i < 100; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); ack.EnsureSuccess(); } @@ -122,9 +123,4 @@ public async Task Fetch_dispose_test() Assert.True(infos[0].NumAckPending > 0); } - - private record TestData - { - public int Test { get; init; } - } } diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerNextTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerNextTest.cs index c615b2ba9..c2c163df5 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerNextTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerNextTest.cs @@ -21,9 +21,9 @@ public async Task Next_test() for (var i = 0; i < 10; i++) { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); ack.EnsureSuccess(); - var next = await consumer.NextAsync(new NatsJSNextOpts(), cts.Token); + var next = await consumer.NextAsync(new NatsJSNextOpts { Serializer = TestDataJsonSerializer.Default }, cts.Token); if (next is { } msg) { await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); @@ -31,9 +31,4 @@ public async Task Next_test() } } } - - private record TestData - { - public int Test { get; init; } - } } diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs index 14c73fde7..ae487ccfd 100644 --- a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs @@ -52,7 +52,7 @@ public async Task Create_stream_test() Assert.Equal("consumer1", consumer.Info.Config.Name); // Publish - var ack = await js.PublishAsync("events.foo", new TestData { Test = 1 }, cancellationToken: cts1.Token); + var ack = await js.PublishAsync("events.foo", new TestData { Test = 1 }, opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts1.Token); Assert.Null(ack.Error); Assert.Equal("events", ack.Stream); Assert.Equal(1, ack.Seq); @@ -62,6 +62,7 @@ public async Task Create_stream_test() ack = await js.PublishAsync( "events.foo", new TestData { Test = 2 }, + opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, headers: new NatsHeaders { { "Nats-Msg-Id", "test2" } }, cancellationToken: cts1.Token); Assert.Null(ack.Error); @@ -73,6 +74,7 @@ public async Task Create_stream_test() ack = await js.PublishAsync( "events.foo", new TestData { Test = 2 }, + opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, headers: new NatsHeaders { { "Nats-Msg-Id", "test2" } }, cancellationToken: cts1.Token); Assert.Null(ack.Error); @@ -84,7 +86,7 @@ public async Task Create_stream_test() var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var messages = new List>(); var cc = await consumer.ConsumeAsync( - new NatsJSConsumeOpts { MaxMsgs = 100 }, + new NatsJSConsumeOpts { MaxMsgs = 100, Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts2.Token); await foreach (var msg in cc.Msgs.ReadAllAsync(cts2.Token)) { @@ -149,9 +151,4 @@ await js.CreateStreamAsync( Assert.Equal(10059, exception.Error.ErrCode); } } - - private record TestData - { - public int Test { get; init; } - } } diff --git a/tests/NATS.Client.JetStream.Tests/PublishTest.cs b/tests/NATS.Client.JetStream.Tests/PublishTest.cs index dffa40110..598c76e2e 100644 --- a/tests/NATS.Client.JetStream.Tests/PublishTest.cs +++ b/tests/NATS.Client.JetStream.Tests/PublishTest.cs @@ -1,3 +1,4 @@ +using System.Text.Json.Serialization; using NATS.Client.Core.Tests; namespace NATS.Client.JetStream.Tests; @@ -21,7 +22,17 @@ public async Task Publish_test() // Publish { - var ack = await js.PublishAsync("s1.foo", new TestData { Test = 1 }, cancellationToken: cts.Token); + var ack = await js.PublishAsync( + "s1.foo", + new TestData + { + Test = 1, + }, + opts: new NatsPubOpts + { + Serializer = TestDataJsonSerializer.Default, + }, + cancellationToken: cts.Token); Assert.Null(ack.Error); Assert.Equal(1, ack.Seq); Assert.Equal("s1", ack.Stream); @@ -33,6 +44,7 @@ public async Task Publish_test() var ack1 = await js.PublishAsync( subject: "s1.foo", data: new TestData { Test = 2 }, + opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, headers: new NatsHeaders { { "Nats-Msg-Id", "2" } }, cancellationToken: cts.Token); Assert.Null(ack1.Error); @@ -42,15 +54,11 @@ public async Task Publish_test() var ack2 = await js.PublishAsync( subject: "s1.foo", data: new TestData { Test = 2 }, + opts: new NatsPubOpts { Serializer = TestDataJsonSerializer.Default }, headers: new NatsHeaders { { "Nats-Msg-Id", "2" } }, cancellationToken: cts.Token); Assert.Null(ack2.Error); Assert.True(ack2.Duplicate); } } - - private record TestData - { - public int Test { get; init; } - } } diff --git a/tests/NATS.Client.JetStream.Tests/TestDataJsonSerializer.cs b/tests/NATS.Client.JetStream.Tests/TestDataJsonSerializer.cs new file mode 100644 index 000000000..b7efb022e --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/TestDataJsonSerializer.cs @@ -0,0 +1,18 @@ +using System.Text.Json.Serialization; + +namespace NATS.Client.JetStream.Tests; + +public static class TestDataJsonSerializer +{ + public static readonly INatsSerializer Default = new NatsJsonContextSerializer(TestDataJsonSerializerContext.Default); +} + +public record TestData +{ + public int Test { get; set; } +} + +[JsonSerializable(typeof(TestData))] +public partial class TestDataJsonSerializerContext : JsonSerializerContext +{ +} diff --git a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs index daa77578b..44545fcf8 100644 --- a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs @@ -62,7 +62,7 @@ public async Task Put_chunks() // square buffer: all chunks are the same size { - var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, }; + var meta = new ObjectMetadata { Name = "k1", Options = new MetaDataOptions { MaxChunkSize = 10 }, }; var buffer = Encoding.ASCII.GetBytes(buffer90); var stream = new MemoryStream(buffer); @@ -82,7 +82,7 @@ public async Task Put_chunks() // buffer with smaller last chunk { - var meta = new ObjectMetadata { Name = "k2", Options = new Options { MaxChunkSize = 10 }, }; + var meta = new ObjectMetadata { Name = "k2", Options = new MetaDataOptions { MaxChunkSize = 10 }, }; var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45"); var stream = new MemoryStream(buffer); @@ -124,7 +124,7 @@ public async Task Get_chunks() // square buffer: all chunks are the same size { - var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, }; + var meta = new ObjectMetadata { Name = "k1", Options = new MetaDataOptions { MaxChunkSize = 10 }, }; var buffer = Encoding.ASCII.GetBytes(buffer90); var stream = new MemoryStream(buffer); await store.PutAsync(meta, stream, cancellationToken: cancellationToken); @@ -140,7 +140,7 @@ public async Task Get_chunks() // buffer with smaller last chunk { - var meta = new ObjectMetadata { Name = "k2", Options = new Options { MaxChunkSize = 10 }, }; + var meta = new ObjectMetadata { Name = "k2", Options = new MetaDataOptions { MaxChunkSize = 10 }, }; var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45"); var stream = new MemoryStream(buffer); await store.PutAsync(meta, stream, cancellationToken: cancellationToken); @@ -158,7 +158,7 @@ public async Task Get_chunks() [Fact] public async Task Delete_object() { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10_000)); var cancellationToken = cts.Token; await using var server = NatsServer.StartJS(); diff --git a/tests/NATS.Client.Services.Tests/ServicesTests.cs b/tests/NATS.Client.Services.Tests/ServicesTests.cs index ebb777fc2..9b6d6acad 100644 --- a/tests/NATS.Client.Services.Tests/ServicesTests.cs +++ b/tests/NATS.Client.Services.Tests/ServicesTests.cs @@ -1,5 +1,6 @@ using System.Text.Json.Nodes; using NATS.Client.Core.Tests; +using NATS.Client.Services.Internal; using NATS.Client.Services.Models; namespace NATS.Client.Services.Tests; @@ -22,9 +23,9 @@ public async Task Add_service_listeners_ping_info_and_stats() await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken); - var pingsTask = FindServices(server, "$SRV.PING", 1, cancellationToken); - var infosTask = FindServices(server, "$SRV.INFO", 1, cancellationToken); - var statsTask = FindServices(server, "$SRV.STATS", 1, cancellationToken); + var pingsTask = FindServices(nats, "$SRV.PING", 1, cancellationToken); + var infosTask = FindServices(nats, "$SRV.INFO", 1, cancellationToken); + var statsTask = FindServices(nats, "$SRV.STATS", 1, cancellationToken); var pings = await pingsTask; pings.ForEach(x => _output.WriteLine($"{x}")); @@ -48,7 +49,7 @@ public async Task Add_service_listeners_ping_info_and_stats() [Fact] public async Task Add_end_point() { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10_0000)); var cancellationToken = cts.Token; await using var server = NatsServer.Start(); @@ -81,7 +82,7 @@ await s1.AddEndpointAsync( }, cancellationToken: cancellationToken); - var info = (await FindServices(server, "$SRV.INFO", 1, cancellationToken)).First(); + var info = (await FindServices(nats, "$SRV.INFO", 1, cancellationToken)).First(); Assert.Single(info.Endpoints); var endpointInfo = info.Endpoints.First(); Assert.Equal("e1", endpointInfo.Name); @@ -106,7 +107,7 @@ await s1.AddEndpointAsync( } } - var stat = (await FindServices(server, "$SRV.STATS", 1, cancellationToken)).First(); + var stat = (await FindServices(nats, "$SRV.STATS", 1, cancellationToken)).First(); Assert.Single(stat.Endpoints); var endpointStats = stat.Endpoints.First(); Assert.Equal("e1", endpointStats.Name); @@ -163,7 +164,7 @@ await grp2.AddEndpointAsync( // Check that the endpoints are registered correctly { - var info = (await FindServices(server, "$SRV.INFO.s1", 1, cancellationToken)).First(); + var info = (await FindServices(nats, "$SRV.INFO.s1", 1, cancellationToken)).First(); Assert.Equal(5, info.Endpoints.Count); var endpoints = info.Endpoints.ToList(); @@ -202,7 +203,7 @@ await s2.AddEndpointAsync( // Check default queue group and stats handler { - var info = (await FindServices(server, "$SRV.INFO.s2", 1, cancellationToken)).First(); + var info = (await FindServices(nats, "$SRV.INFO.s2", 1, cancellationToken)).First(); Assert.Single(info.Endpoints); var epi = info.Endpoints.First(); @@ -211,7 +212,7 @@ await s2.AddEndpointAsync( Assert.Equal("q2", epi.QueueGroup); Assert.Equal("ep-v1", epi.Metadata["ep-k1"]); - var stat = (await FindServices(server, "$SRV.STATS.s2", 1, cancellationToken)).First(); + var stat = (await FindServices(nats, "$SRV.STATS.s2", 1, cancellationToken)).First(); Assert.Equal("v1", stat.Metadata["k1"]); Assert.Equal("v2", stat.Metadata["k2"]); Assert.Single(stat.Endpoints); @@ -221,10 +222,13 @@ await s2.AddEndpointAsync( } } - private static async Task> FindServices(NatsServer server, string subject, int limit, CancellationToken ct) + private static async Task> FindServices(NatsConnection nats, string subject, int limit, CancellationToken ct) { - await using var nats = server.CreateClientConnection(); - var replyOpts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(2) }; + var replyOpts = new NatsSubOpts + { + Timeout = TimeSpan.FromSeconds(2), + Serializer = NatsSrvJsonSerializer.Default, + }; var responses = new List(); var count = 0; diff --git a/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj b/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj index b31f2182f..771b3f7b6 100644 --- a/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj +++ b/tests/NATS.Client.TestUtilities/NATS.Client.TestUtilities.csproj @@ -6,6 +6,7 @@ enable false $(NoWarn);CS8002 + true