From bfd20a471bcd6a76f03b34678e93a776c92b6dad Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Wed, 25 Oct 2023 11:05:06 +0100 Subject: [PATCH] Version bump and FetchNoWait docs --- .../Example.JetStream.PullConsumer/Program.cs | 44 ++++++++++++++++- src/NATS.Client.JetStream/NatsJSConsumer.cs | 48 ++++++++++++++++++- .../ConsumerFetchTest.cs | 2 +- version.txt | 2 +- 4 files changed, 91 insertions(+), 5 deletions(-) diff --git a/sandbox/Example.JetStream.PullConsumer/Program.cs b/sandbox/Example.JetStream.PullConsumer/Program.cs index cfcef3f33..27a398806 100644 --- a/sandbox/Example.JetStream.PullConsumer/Program.cs +++ b/sandbox/Example.JetStream.PullConsumer/Program.cs @@ -1,4 +1,3 @@ -using System.Buffers; using System.Diagnostics; using System.Text; using Microsoft.Extensions.Logging; @@ -98,6 +97,49 @@ void Report(int i, Stopwatch sw, string data) } } } + else if (cmd == "fetch-all-no-wait") + { + while (!cts.Token.IsCancellationRequested) + { + try + { + const int max = 10; + Console.WriteLine($"___\nFETCH-NO-WAIT {max}"); + await consumer.RefreshAsync(cts.Token); + + var fetchNoWaitOpts = new NatsJSFetchOpts { MaxMsgs = max }; + var fetchMsgCount = 0; + + await foreach (var msg in consumer.FetchAllNoWaitAsync>(fetchNoWaitOpts, cts.Token)) + { + fetchMsgCount++; + using (msg.Data) + { + var message = Encoding.ASCII.GetString(msg.Data.Span); + Console.WriteLine($"Received: {message}"); + } + + await msg.AckAsync(cancellationToken: cts.Token); + Report(++count, stopwatch, $"data: {msg.Data}"); + } + + if (fetchMsgCount < fetchNoWaitOpts.MaxMsgs) + { + Console.WriteLine("No more messages. Pause for more..."); + await Task.Delay(TimeSpan.FromSeconds(5)); + } + } + catch (NatsJSProtocolException e) + { + Console.WriteLine(e.Message); + } + catch (NatsJSException e) + { + Console.WriteLine(e.Message); + await Task.Delay(1000); + } + } + } else if (cmd == "fetch-all") { while (!cts.Token.IsCancellationRequested) diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 7dc1df3d0..b3031ea96 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -197,7 +197,51 @@ await sub.CallMsgNextAsync( } } - public async IAsyncEnumerable> FetchNoWait( + /// + /// Consume a set number of messages from the stream using this consumer. + /// Returns immediately if no messages are available. + /// + /// Fetch options. (default: MaxMsgs 1,000 and timeout is ignored) + /// A used to cancel the call. + /// Message type to deserialize. + /// Async enumerable of messages which can be used in a await foreach loop. + /// Consumer is deleted, it's push based or request sent to server is invalid. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. + /// + /// + /// This method will return immediately if no messages are available. + /// + /// + /// Using this method is discouraged because it might create an unnecessary load on your cluster. + /// Use Consume or Fetch instead. + /// + /// + /// + /// + /// However, there are scenarios where this method is useful. For example if your application is + /// processing messages in batches infrequently (for example every 5 minutes) you might want to + /// consider FetchNoWait. You must make sure to count your messages and stop fetching + /// if you received all of them in one call, meaning when count < MaxMsgs. + /// + /// + /// const int max = 10; + /// var count = 0; + /// + /// await foreach (var msg in consumer.FetchAllNoWaitAsync<int>(new NatsJSFetchOpts { MaxMsgs = max })) + /// { + /// count++; + /// Process(msg); + /// await msg.AckAsync(); + /// } + /// + /// if (count < max) + /// { + /// // No more messages. Pause for more. + /// await Task.Delay(TimeSpan.FromMinutes(5)); + /// } + /// + /// + public async IAsyncEnumerable> FetchAllNoWaitAsync( NatsJSFetchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { @@ -254,7 +298,7 @@ await sub.CallMsgNextAsync( // When no wait is set we don't need to send the idle heartbeat and expiration // If no message is available the server will respond with a 404 immediately // If messages are available the server will send a 408 direct after the last message - ? new ConsumerGetnextRequest {Batch = max.MaxMsgs, MaxBytes = max.MaxBytes, NoWait = opts.NoWait} + ? new ConsumerGetnextRequest { Batch = max.MaxMsgs, MaxBytes = max.MaxBytes, NoWait = opts.NoWait } : new ConsumerGetnextRequest { Batch = max.MaxMsgs, diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs index ceeb15245..f369dbc92 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -57,7 +57,7 @@ public async Task FetchNoWait_test() var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; - await foreach (var msg in consumer.FetchNoWait(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token)) + await foreach (var msg in consumer.FetchAllNoWaitAsync(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token)) { await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); Assert.Equal(count, msg.Data!.Test); diff --git a/version.txt b/version.txt index fe5c16b9b..e5e3d83eb 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -2.0.0-alpha.5 +2.0.0-alpha.6