From 20cc5013a9160094db5563cbbfe2fb1c74adcceb Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Fri, 20 Oct 2023 00:09:03 +0100 Subject: [PATCH 1/2] Consume clean exit fixes * Enlarge subscription channel to avoid blocking other socket reads on the same connection. * Pass cancellation token to Consume() so it can be gracefully stopped. * 'Consume' can also be gracefully stopped when Stop() is called. * There is no graceful stop for 'Fetch' since we have no option but wait for the single pull request to complete. * Disposing the 'Consume' or 'Fetch' would exit loops a.s.a.p. --- .../Example.JetStream.PullConsumer/Program.cs | 108 +++++++++++++---- .../Example.JetStream.PullConsumer/RawData.cs | 12 -- .../RawDataSerializer.cs | 28 ----- src/NATS.Client.JetStream/INatsJSConsume.cs | 28 +++-- src/NATS.Client.JetStream/INatsJSFetch.cs | 13 +- .../Internal/NatsJSConsume.cs | 65 +++++++++- .../Internal/NatsJSFetch.cs | 16 ++- src/NATS.Client.JetStream/NatsJSConsumer.cs | 44 +++---- src/NATS.Client.JetStream/NatsJSLogEvents.cs | 1 + tests/NATS.Client.Core.Tests/TlsFirstTest.cs | 1 - .../ConsumerConsumeTest.cs | 112 +++++++++++++++++- .../ConsumerFetchTest.cs | 57 +++++++++ 12 files changed, 371 insertions(+), 114 deletions(-) delete mode 100644 sandbox/Example.JetStream.PullConsumer/RawData.cs delete mode 100644 sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs diff --git a/sandbox/Example.JetStream.PullConsumer/Program.cs b/sandbox/Example.JetStream.PullConsumer/Program.cs index 5ca11f716..ad3b0b2fc 100644 --- a/sandbox/Example.JetStream.PullConsumer/Program.cs +++ b/sandbox/Example.JetStream.PullConsumer/Program.cs @@ -1,5 +1,6 @@ +using System.Buffers; using System.Diagnostics; -using Example.JetStream.PullConsumer; +using System.Text; using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.JetStream; @@ -20,12 +21,12 @@ var consumer = await js.CreateConsumerAsync("s1", "c1"); -var idle = TimeSpan.FromSeconds(15); -var expires = TimeSpan.FromSeconds(30); +var idle = TimeSpan.FromSeconds(5); +var expires = TimeSpan.FromSeconds(10); // int? maxMsgs = null; // int? maxBytes = 128; -int? maxMsgs = 1000; +int? maxMsgs = 10; int? maxBytes = null; void Report(int i, Stopwatch sw, string data) @@ -41,7 +42,6 @@ void Report(int i, Stopwatch sw, string data) MaxBytes = maxBytes, Expires = expires, IdleHeartbeat = idle, - Serializer = new RawDataSerializer(), }; var fetchOpts = new NatsJSFetchOpts @@ -50,22 +50,23 @@ void Report(int i, Stopwatch sw, string data) MaxBytes = maxBytes, Expires = expires, IdleHeartbeat = idle, - Serializer = new RawDataSerializer(), }; var nextOpts = new NatsJSNextOpts { Expires = expires, IdleHeartbeat = idle, - Serializer = new RawDataSerializer(), }; var stopwatch = Stopwatch.StartNew(); var count = 0; +var cmd = args.Length > 0 ? args[0] : "consume"; +var cmdOpt = args.Length > 1 ? args[1] : "none"; + try { - if (args.Length > 0 && args[0] == "fetch") + if (cmd == "fetch") { while (!cts.Token.IsCancellationRequested) { @@ -73,9 +74,18 @@ void Report(int i, Stopwatch sw, string data) { Console.WriteLine($"___\nFETCH {maxMsgs}"); await consumer.RefreshAsync(cts.Token); - await using var sub = await consumer.FetchAsync(fetchOpts, cts.Token); + await using var sub = await consumer.FetchAsync>(fetchOpts, cts.Token); await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token)) { + if (msg.Data is { } memoryOwner) + { + using (memoryOwner) + { + var message = Encoding.ASCII.GetString(memoryOwner.Memory.Span); + Console.WriteLine($"Received: {message}"); + } + } + await msg.AckAsync(cancellationToken: cts.Token); Report(++count, stopwatch, $"data: {msg.Data}"); } @@ -91,7 +101,7 @@ void Report(int i, Stopwatch sw, string data) } } } - else if (args.Length > 0 && args[0] == "fetch-all") + else if (cmd == "fetch-all") { while (!cts.Token.IsCancellationRequested) { @@ -99,8 +109,17 @@ void Report(int i, Stopwatch sw, string data) { Console.WriteLine($"___\nFETCH {maxMsgs}"); await consumer.RefreshAsync(cts.Token); - await foreach (var msg in consumer.FetchAllAsync(fetchOpts, cts.Token)) + await foreach (var msg in consumer.FetchAllAsync>(fetchOpts, cts.Token)) { + if (msg.Data is { } memoryOwner) + { + using (memoryOwner) + { + var message = Encoding.ASCII.GetString(memoryOwner.Memory.Span); + Console.WriteLine($"Received: {message}"); + } + } + await msg.AckAsync(cancellationToken: cts.Token); Report(++count, stopwatch, $"data: {msg.Data}"); } @@ -116,16 +135,25 @@ void Report(int i, Stopwatch sw, string data) } } } - else if (args.Length > 0 && args[0] == "next") + else if (cmd == "next") { while (!cts.Token.IsCancellationRequested) { try { Console.WriteLine("___\nNEXT"); - var next = await consumer.NextAsync(nextOpts, cts.Token); + var next = await consumer.NextAsync>(nextOpts, cts.Token); if (next is { } msg) { + if (msg.Data is { } memoryOwner) + { + using (memoryOwner) + { + var message = Encoding.ASCII.GetString(memoryOwner.Memory.Span); + Console.WriteLine($"Received: {message}"); + } + } + await msg.AckAsync(cancellationToken: cts.Token); Report(++count, stopwatch, $"data: {msg.Data}"); } @@ -141,21 +169,52 @@ void Report(int i, Stopwatch sw, string data) } } } - else if (args.Length > 0 && args[0] == "consume") + else if (cmd == "consume") { while (!cts.Token.IsCancellationRequested) { try { Console.WriteLine("___\nCONSUME"); - await using var sub = await consumer.ConsumeAsync( - consumeOpts, - cts.Token); + await using var sub = await consumer.ConsumeAsync>(consumeOpts); - await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token)) + cts.Token.Register(() => + { + sub.DisposeAsync().GetAwaiter().GetResult(); + }); + + var stopped = false; + await foreach (var msg in sub.Msgs.ReadAllAsync()) { + Console.WriteLine($"CANCEL:{cts.Token.IsCancellationRequested}"); + if (msg.Data is { } memoryOwner) + { + using (memoryOwner) + { + var message = Encoding.ASCII.GetString(memoryOwner.Memory.Span); + Console.WriteLine($"Received: {message}"); + if (message == "stop") + { + Console.WriteLine("Stopping consumer..."); + sub.Stop(); + stopped = true; + } + } + } + await msg.AckAsync(cancellationToken: cts.Token); Report(++count, stopwatch, $"data: {msg.Data}"); + + if (cmdOpt == "with-pause") + { + await Task.Delay(1_000); + } + } + + if (stopped) + { + Console.WriteLine("Stopped consumer."); + break; } } catch (NatsJSProtocolException e) @@ -169,15 +228,24 @@ void Report(int i, Stopwatch sw, string data) } } } - else if (args.Length > 0 && args[0] == "consume-all") + else if (cmd == "consume-all") { while (!cts.Token.IsCancellationRequested) { try { Console.WriteLine("___\nCONSUME-ALL"); - await foreach (var msg in consumer.ConsumeAllAsync(consumeOpts, cts.Token)) + await foreach (var msg in consumer.ConsumeAllAsync>(consumeOpts, cts.Token)) { + if (msg.Data is { } memoryOwner) + { + using (memoryOwner) + { + var message = Encoding.ASCII.GetString(memoryOwner.Memory.Span); + Console.WriteLine($"Received: {message}"); + } + } + await msg.AckAsync(cancellationToken: cts.Token); Report(++count, stopwatch, $"data: {msg.Data}"); } diff --git a/sandbox/Example.JetStream.PullConsumer/RawData.cs b/sandbox/Example.JetStream.PullConsumer/RawData.cs deleted file mode 100644 index 89e12b76e..000000000 --- a/sandbox/Example.JetStream.PullConsumer/RawData.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System.Text; - -namespace Example.JetStream.PullConsumer; - -public class RawData -{ - public RawData(byte[] buffer) => Buffer = buffer; - - public byte[] Buffer { get; } - - public override string ToString() => Encoding.ASCII.GetString(Buffer); -} diff --git a/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs b/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs deleted file mode 100644 index efdfb15fa..000000000 --- a/sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs +++ /dev/null @@ -1,28 +0,0 @@ -using System.Buffers; -using NATS.Client.Core; - -namespace Example.JetStream.PullConsumer; - -public class RawDataSerializer : INatsSerializer -{ - public INatsSerializer? Next => default; - - public int Serialize(ICountableBufferWriter bufferWriter, T? value) - { - if (value is RawData data) - { - bufferWriter.Write(data.Buffer); - return data.Buffer.Length; - } - - throw new Exception($"Can only work with '{typeof(RawData)}'"); - } - - public T? Deserialize(in ReadOnlySequence buffer) - { - if (typeof(T) != typeof(RawData)) - throw new Exception($"Can only work with '{typeof(RawData)}'"); - - return (T)(object)new RawData(buffer.ToArray()); - } -} diff --git a/src/NATS.Client.JetStream/INatsJSConsume.cs b/src/NATS.Client.JetStream/INatsJSConsume.cs index bae608922..92302c7ef 100644 --- a/src/NATS.Client.JetStream/INatsJSConsume.cs +++ b/src/NATS.Client.JetStream/INatsJSConsume.cs @@ -2,18 +2,28 @@ namespace NATS.Client.JetStream; -/// -/// Interface to manage a consume() operation on a consumer. -/// -public interface INatsJSConsume : IAsyncDisposable -{ - void Stop(); -} - /// /// Interface to extract messages from a consume() operation on a consumer. /// -public interface INatsJSConsume : INatsJSConsume +public interface INatsJSConsume : IAsyncDisposable { + /// + /// Messages received from the consumer. + /// ChannelReader> Msgs { get; } + + /// + /// Stop the consumer gracefully. + /// + /// + /// + /// This will wait for any inflight messages to be processed before stopping. + /// + /// + /// Disposing would stop consuming immediately. This might leave messages behind + /// without acknowledgement. Which is fine, messages will be scheduled for redelivery, + /// however, it might not be the desired behavior. + /// + /// + void Stop(); } diff --git a/src/NATS.Client.JetStream/INatsJSFetch.cs b/src/NATS.Client.JetStream/INatsJSFetch.cs index e835678e2..8a551d541 100644 --- a/src/NATS.Client.JetStream/INatsJSFetch.cs +++ b/src/NATS.Client.JetStream/INatsJSFetch.cs @@ -2,18 +2,13 @@ namespace NATS.Client.JetStream; -/// -/// Interface to manage a fetch() operation on a consumer. -/// -public interface INatsJSFetch : IAsyncDisposable -{ - void Stop(); -} - /// /// Interface to extract messages from a fetch() operation on a consumer. /// -public interface INatsJSFetch : INatsJSFetch +public interface INatsJSFetch : IAsyncDisposable { + /// + /// User messages received from the consumer. + /// ChannelReader> Msgs { get; } } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index 081e50eb8..eee5a9dd1 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -19,11 +19,13 @@ internal class NatsJSConsume : NatsSubBase, INatsJSConsume { private readonly ILogger _logger; private readonly bool _debug; + private readonly CancellationTokenSource _cts; private readonly Channel> _userMsgs; private readonly Channel _pullRequests; private readonly NatsJSContext _context; private readonly string _stream; private readonly string _consumer; + private readonly CancellationToken _cancellationToken; private readonly INatsSerializer _serializer; private readonly Timer _timer; private readonly Task _pullTask; @@ -39,6 +41,7 @@ internal class NatsJSConsume : NatsSubBase, INatsJSConsume private readonly object _pendingGate = new(); private long _pendingMsgs; private long _pendingBytes; + private int _disposed; public NatsJSConsume( long maxMsgs, @@ -52,9 +55,12 @@ public NatsJSConsume( string consumer, string subject, string? queueGroup, - NatsSubOpts? opts) + NatsSubOpts? opts, + CancellationToken cancellationToken) : base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts) { + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _cancellationToken = _cts.Token; _logger = Connection.Opts.LoggerFactory.CreateLogger>(); _debug = _logger.IsEnabled(LogLevel.Debug); _context = context; @@ -91,6 +97,15 @@ public NatsJSConsume( static state => { var self = (NatsJSConsume)state!; + + if (self._cancellationToken.IsCancellationRequested) + { + // We complete stop here since heartbeat timeout would kick in + // when there are no pull requests or messages left in-flight. + self.CompleteStop(); + return; + } + self.Pull("heartbeat-timeout", self._maxMsgs, self._maxBytes); self.ResetPending(); if (self._debug) @@ -105,10 +120,17 @@ public NatsJSConsume( Timeout.Infinite, Timeout.Infinite); - _userMsgs = Channel.CreateBounded>(NatsSubUtils.GetChannelOpts(opts?.ChannelOpts)); + // Keep user channel small to avoid blocking the user code + // when disposed otherwise channel reader will continue delivering messages + // if there are messages queued up already. This channel is used to pass messages + // to the user from the subscription channel (which should be set to a + // sufficiently large value to avoid blocking socket reads in the + // NATS connection). + _userMsgs = Channel.CreateBounded>(1); Msgs = _userMsgs.Reader; - _pullRequests = Channel.CreateBounded(NatsSubUtils.GetChannelOpts(opts?.ChannelOpts)); + // Capacity as 1 is enough here since it's used for signaling only. + _pullRequests = Channel.CreateBounded(1); _pullTask = Task.Run(PullLoop); ResetPending(); @@ -116,10 +138,13 @@ public NatsJSConsume( public ChannelReader> Msgs { get; } - public void Stop() => EndSubscription(NatsSubEndReason.None); + public void Stop() => _cts.Cancel(); public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request, CancellationToken cancellationToken = default) { + if (_cancellationToken.IsCancellationRequested) + return default; + if (_debug) { _logger.LogDebug("Sending pull request for {Origin} {Msgs}, {Bytes}", origin, request.Batch, request.MaxBytes); @@ -138,6 +163,7 @@ public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request, public override async ValueTask DisposeAsync() { + Interlocked.Exchange(ref _disposed, 1); await base.DisposeAsync().ConfigureAwait(false); await _pullTask.ConfigureAwait(false); await _timer.DisposeAsync().ConfigureAwait(false); @@ -158,6 +184,9 @@ internal override IEnumerable GetReconnectCommands(int sid) Expires = _expires, }; + if (_cancellationToken.IsCancellationRequested) + yield break; + yield return PublishCommand.Create( pool: Connection.ObjectPool, subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", @@ -311,7 +340,15 @@ protected override async ValueTask ReceiveInternalAsync( } } - await _userMsgs.Writer.WriteAsync(msg).ConfigureAwait(false); + // Stop feeding the user if we are disposed. + // We need to exit as soon as possible. + if (Volatile.Read(ref _disposed) == 0) + { + // We can't pass cancellation token here because we need to hand + // the message to the user to be processed. Writer will be completed + // when the user calls Stop() or when the subscription is closed. + await _userMsgs.Writer.WriteAsync(msg).ConfigureAwait(false); + } } CheckPending(); @@ -355,6 +392,24 @@ private void CheckPending() } } + private void CompleteStop() + { + if (_debug) + { + _logger.LogDebug(NatsJSLogEvents.Stopping, "No more pull requests or messages in-flight, stopping"); + } + + // Schedule on the thread pool to avoid potential deadlocks. + ThreadPool.UnsafeQueueUserWorkItem( + state => + { + var self = (NatsJSConsume)state!; + self._userMsgs.Writer.TryComplete(); + self.EndSubscription(NatsSubEndReason.None); + }, + this); + } + private void Pull(string origin, long batch, long maxBytes) => _pullRequests.Writer.TryWrite(new PullRequest { Request = new ConsumerGetnextRequest diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index 08fcf32e9..2d69b663d 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -28,6 +28,7 @@ internal class NatsJSFetch : NatsSubBase, INatsJSFetch private long _pendingMsgs; private long _pendingBytes; + private int _disposed; public NatsJSFetch( long maxMsgs, @@ -57,7 +58,10 @@ public NatsJSFetch( _pendingMsgs = _maxMsgs; _pendingBytes = _maxBytes; - _userMsgs = Channel.CreateBounded>(NatsSubUtils.GetChannelOpts(opts?.ChannelOpts)); + // Keep user channel small to avoid blocking the user code when disposed, + // otherwise channel reader will continue delivering messages even after + // this 'fetch' object being disposed. + _userMsgs = Channel.CreateBounded>(1); Msgs = _userMsgs.Reader; if (_debug) @@ -112,8 +116,6 @@ public NatsJSFetch( public ChannelReader> Msgs { get; } - public void Stop() => EndSubscription(NatsSubEndReason.None); - public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationToken cancellationToken = default) => Connection.PubModelAsync( subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", @@ -127,6 +129,7 @@ public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationTo public override async ValueTask DisposeAsync() { + Interlocked.Exchange(ref _disposed, 1); await base.DisposeAsync().ConfigureAwait(false); await _hbTimer.DisposeAsync().ConfigureAwait(false); await _expiresTimer.DisposeAsync().ConfigureAwait(false); @@ -227,7 +230,12 @@ protected override async ValueTask ReceiveInternalAsync( _pendingMsgs--; _pendingBytes -= msg.Size; - await _userMsgs.Writer.WriteAsync(msg).ConfigureAwait(false); + // Stop feeding the user if we are disposed. + // We need to exit as soon as possible. + if (Volatile.Read(ref _disposed) == 0) + { + await _userMsgs.Writer.WriteAsync(msg).ConfigureAwait(false); + } } if (_maxBytes > 0 && _pendingBytes <= 0) diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 0b50f4359..de55fcd6d 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -85,18 +85,7 @@ public async ValueTask> ConsumeAsync(NatsJSConsumeOpts? opt var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes); var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); - var requestOpts = new NatsSubOpts - { - Serializer = opts.Serializer, - ChannelOpts = new NatsSubChannelOpts - { - // Keep capacity at 1 to make sure message acknowledgements are sent - // right after the message is processed and messages aren't queued up - // which might cause timeouts for acknowledgments. - Capacity = 1, - FullMode = BoundedChannelFullMode.Wait, - }, - }; + var requestOpts = BuildRequestOpts(opts.Serializer, opts.MaxMsgs); var sub = new NatsJSConsume( stream: _stream, @@ -110,7 +99,8 @@ public async ValueTask> ConsumeAsync(NatsJSConsumeOpts? opt thresholdMsgs: max.ThresholdMsgs, thresholdBytes: max.ThresholdBytes, expires: timeouts.Expires, - idle: timeouts.IdleHeartbeat); + idle: timeouts.IdleHeartbeat, + cancellationToken: cancellationToken); await _context.Connection.SubAsync(sub: sub, cancellationToken); @@ -228,18 +218,7 @@ public async ValueTask> FetchAsync( var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes); var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); - var requestOpts = new NatsSubOpts - { - Serializer = opts.Serializer, - ChannelOpts = new NatsSubChannelOpts - { - // Keep capacity at 1 to make sure message acknowledgements are sent - // right after the message is processed and messages aren't queued up - // which might cause timeouts for acknowledgments. - Capacity = 1, - FullMode = BoundedChannelFullMode.Wait, - }, - }; + var requestOpts = BuildRequestOpts(opts.Serializer, opts.MaxMsgs); var sub = new NatsJSFetch( stream: _stream, @@ -282,6 +261,21 @@ public async ValueTask RefreshAsync(CancellationToken cancellationToken = defaul request: null, cancellationToken).ConfigureAwait(false); + private static NatsSubOpts BuildRequestOpts(INatsSerializer? serializer, int? maxMsgs) => + new() + { + Serializer = serializer, + ChannelOpts = new NatsSubChannelOpts + { + // Keep capacity large enough not to block the socket reads. + // This might delay message acknowledgements on slow consumers + // but it's crucial to keep the reads flowing on the main + // NATS TCP connection. + Capacity = maxMsgs > 0 ? maxMsgs * 2 : 1_000, + FullMode = BoundedChannelFullMode.Wait, + }, + }; + private void ThrowIfDeleted() { if (_deleted) diff --git a/src/NATS.Client.JetStream/NatsJSLogEvents.cs b/src/NATS.Client.JetStream/NatsJSLogEvents.cs index af409a800..4a8660e72 100644 --- a/src/NATS.Client.JetStream/NatsJSLogEvents.cs +++ b/src/NATS.Client.JetStream/NatsJSLogEvents.cs @@ -17,4 +17,5 @@ public static class NatsJSLogEvents public static readonly EventId DeleteOldDeliverySubject = new(2011, nameof(DeleteOldDeliverySubject)); public static readonly EventId NewDeliverySubject = new(2012, nameof(NewDeliverySubject)); public static readonly EventId NewConsumerCreated = new(2013, nameof(NewConsumerCreated)); + public static readonly EventId Stopping = new(2014, nameof(Stopping)); } diff --git a/tests/NATS.Client.Core.Tests/TlsFirstTest.cs b/tests/NATS.Client.Core.Tests/TlsFirstTest.cs index 35f48921b..bbc624950 100644 --- a/tests/NATS.Client.Core.Tests/TlsFirstTest.cs +++ b/tests/NATS.Client.Core.Tests/TlsFirstTest.cs @@ -62,7 +62,6 @@ public async Task Implicit_TLS_fails_when_disabled() Assert.Matches(@"can not start to connect nats server", exception.Message); _output.WriteLine($"Implicit TLS connection rejected"); - } // Normal TLS connection should work diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index 97fcf9106..6136ed588 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -1,6 +1,7 @@ using System.Text.RegularExpressions; using Microsoft.Extensions.Logging; using NATS.Client.Core.Tests; +using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream.Tests; @@ -154,7 +155,7 @@ await Retry.Until( [Fact] public async Task Consume_reconnect_test() { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3000)); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await using var server = NatsServer.StartJS(); var (nats, proxy) = server.CreateProxiedClientConnection(); @@ -226,6 +227,115 @@ await Retry.Until( await nats.DisposeAsync(); } + [Fact] + public async Task Consume_dispose_test() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + await using var server = NatsServer.StartJS(); + + await using var nats = server.CreateClientConnection(); + + var js = new NatsJSContext(nats); + var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + + var consumerOpts = new NatsJSConsumeOpts + { + MaxMsgs = 10, + IdleHeartbeat = TimeSpan.FromSeconds(5), + Expires = TimeSpan.FromSeconds(10), + }; + + for (var i = 0; i < 100; i++) + { + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + ack.EnsureSuccess(); + } + + var cc = await consumer.ConsumeAsync(consumerOpts, cancellationToken: cts.Token); + + var signal = new WaitSignal(); + var reader = Task.Run(async () => + { + await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token)) + { + await msg.AckAsync(cancellationToken: cts.Token); + signal.Pulse(); + + // Introduce delay to make sure not all messages will be acked. + await Task.Delay(1_000, cts.Token); + } + }); + + await signal; + await cc.DisposeAsync(); + + await reader; + + var infos = new List(); + await foreach (var natsJSConsumer in stream.ListConsumersAsync(cts.Token)) + { + infos.Add(natsJSConsumer.Info); + } + + Assert.Single(infos); + + Assert.True(infos[0].NumAckPending > 0); + } + + [Fact] + public async Task Consume_stop_test() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + await using var server = NatsServer.StartJS(); + + await using var nats = server.CreateClientConnection(); + + var js = new NatsJSContext(nats); + var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + + var consumerOpts = new NatsJSConsumeOpts + { + MaxMsgs = 10, + IdleHeartbeat = TimeSpan.FromSeconds(2), + Expires = TimeSpan.FromSeconds(4), + }; + + for (var i = 0; i < 100; i++) + { + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + ack.EnsureSuccess(); + } + + var cc = await consumer.ConsumeAsync(consumerOpts, cancellationToken: cts.Token); + + var signal = new WaitSignal(); + var reader = Task.Run(async () => + { + await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token)) + { + await msg.AckAsync(cancellationToken: cts.Token); + signal.Pulse(); + } + }); + + await signal; + cc.Stop(); + + await reader; + + var infos = new List(); + await foreach (var natsJSConsumer in stream.ListConsumersAsync(cts.Token)) + { + infos.Add(natsJSConsumer.Info); + } + + Assert.Single(infos); + + Assert.True(infos[0].NumAckPending == 0); + } + private record TestData { public int Test { get; init; } diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs index 05d8c19cc..bc0223c63 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -1,4 +1,5 @@ using NATS.Client.Core.Tests; +using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream.Tests; @@ -38,6 +39,62 @@ public async Task Fetch_test() Assert.Equal(10, count); } + [Fact] + public async Task Fetch_dispose_test() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + await using var server = NatsServer.StartJS(); + + await using var nats = server.CreateClientConnection(); + + var js = new NatsJSContext(nats); + var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + + var fetchOpts = new NatsJSFetchOpts + { + MaxMsgs = 10, + IdleHeartbeat = TimeSpan.FromSeconds(5), + Expires = TimeSpan.FromSeconds(10), + }; + + for (var i = 0; i < 100; i++) + { + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + ack.EnsureSuccess(); + } + + var fc = await consumer.FetchAsync(fetchOpts, cancellationToken: cts.Token); + + var signal = new WaitSignal(); + var reader = Task.Run(async () => + { + await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token)) + { + await msg.AckAsync(cancellationToken: cts.Token); + signal.Pulse(); + + // Introduce delay to make sure not all messages will be acked. + await Task.Delay(1_000, cts.Token); + } + }); + + await signal; + await fc.DisposeAsync(); + + await reader; + + var infos = new List(); + await foreach (var natsJSConsumer in stream.ListConsumersAsync(cts.Token)) + { + infos.Add(natsJSConsumer.Info); + } + + Assert.Single(infos); + + Assert.True(infos[0].NumAckPending > 0); + } + private record TestData { public int Test { get; init; } From 83dcf70c26ee611fecdae3d06db349bf3a273ae3 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Fri, 20 Oct 2023 06:14:07 +0100 Subject: [PATCH 2/2] Use new memory owner for example --- .../Example.JetStream.PullConsumer/Program.cs | 64 +++++++------------ 1 file changed, 24 insertions(+), 40 deletions(-) diff --git a/sandbox/Example.JetStream.PullConsumer/Program.cs b/sandbox/Example.JetStream.PullConsumer/Program.cs index ad3b0b2fc..cfcef3f33 100644 --- a/sandbox/Example.JetStream.PullConsumer/Program.cs +++ b/sandbox/Example.JetStream.PullConsumer/Program.cs @@ -74,16 +74,13 @@ void Report(int i, Stopwatch sw, string data) { Console.WriteLine($"___\nFETCH {maxMsgs}"); await consumer.RefreshAsync(cts.Token); - await using var sub = await consumer.FetchAsync>(fetchOpts, cts.Token); + await using var sub = await consumer.FetchAsync>(fetchOpts, cts.Token); await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token)) { - if (msg.Data is { } memoryOwner) + using (msg.Data) { - using (memoryOwner) - { - var message = Encoding.ASCII.GetString(memoryOwner.Memory.Span); - Console.WriteLine($"Received: {message}"); - } + var message = Encoding.ASCII.GetString(msg.Data.Span); + Console.WriteLine($"Received: {message}"); } await msg.AckAsync(cancellationToken: cts.Token); @@ -109,15 +106,12 @@ void Report(int i, Stopwatch sw, string data) { Console.WriteLine($"___\nFETCH {maxMsgs}"); await consumer.RefreshAsync(cts.Token); - await foreach (var msg in consumer.FetchAllAsync>(fetchOpts, cts.Token)) + await foreach (var msg in consumer.FetchAllAsync>(fetchOpts, cts.Token)) { - if (msg.Data is { } memoryOwner) + using (msg.Data) { - using (memoryOwner) - { - var message = Encoding.ASCII.GetString(memoryOwner.Memory.Span); - Console.WriteLine($"Received: {message}"); - } + var message = Encoding.ASCII.GetString(msg.Data.Span); + Console.WriteLine($"Received: {message}"); } await msg.AckAsync(cancellationToken: cts.Token); @@ -142,16 +136,13 @@ void Report(int i, Stopwatch sw, string data) try { Console.WriteLine("___\nNEXT"); - var next = await consumer.NextAsync>(nextOpts, cts.Token); + var next = await consumer.NextAsync>(nextOpts, cts.Token); if (next is { } msg) { - if (msg.Data is { } memoryOwner) + using (msg.Data) { - using (memoryOwner) - { - var message = Encoding.ASCII.GetString(memoryOwner.Memory.Span); - Console.WriteLine($"Received: {message}"); - } + var message = Encoding.ASCII.GetString(msg.Data.Span); + Console.WriteLine($"Received: {message}"); } await msg.AckAsync(cancellationToken: cts.Token); @@ -176,7 +167,7 @@ void Report(int i, Stopwatch sw, string data) try { Console.WriteLine("___\nCONSUME"); - await using var sub = await consumer.ConsumeAsync>(consumeOpts); + await using var sub = await consumer.ConsumeAsync>(consumeOpts); cts.Token.Register(() => { @@ -186,19 +177,15 @@ void Report(int i, Stopwatch sw, string data) var stopped = false; await foreach (var msg in sub.Msgs.ReadAllAsync()) { - Console.WriteLine($"CANCEL:{cts.Token.IsCancellationRequested}"); - if (msg.Data is { } memoryOwner) + using (msg.Data) { - using (memoryOwner) + var message = Encoding.ASCII.GetString(msg.Data.Span); + Console.WriteLine($"Received: {message}"); + if (message == "stop") { - var message = Encoding.ASCII.GetString(memoryOwner.Memory.Span); - Console.WriteLine($"Received: {message}"); - if (message == "stop") - { - Console.WriteLine("Stopping consumer..."); - sub.Stop(); - stopped = true; - } + Console.WriteLine("Stopping consumer..."); + sub.Stop(); + stopped = true; } } @@ -235,15 +222,12 @@ void Report(int i, Stopwatch sw, string data) try { Console.WriteLine("___\nCONSUME-ALL"); - await foreach (var msg in consumer.ConsumeAllAsync>(consumeOpts, cts.Token)) + await foreach (var msg in consumer.ConsumeAllAsync>(consumeOpts, cts.Token)) { - if (msg.Data is { } memoryOwner) + using (msg.Data) { - using (memoryOwner) - { - var message = Encoding.ASCII.GetString(memoryOwner.Memory.Span); - Console.WriteLine($"Received: {message}"); - } + var message = Encoding.ASCII.GetString(msg.Data.Span); + Console.WriteLine($"Received: {message}"); } await msg.AckAsync(cancellationToken: cts.Token);