Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release prep 2.0.0-alpha.6 #168

Merged
merged 1 commit into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Buffers;
using System.Diagnostics;
using System.Text;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -98,6 +97,49 @@ void Report(int i, Stopwatch sw, string data)
}
}
}
else if (cmd == "fetch-all-no-wait")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
const int max = 10;
Console.WriteLine($"___\nFETCH-NO-WAIT {max}");
await consumer.RefreshAsync(cts.Token);

var fetchNoWaitOpts = new NatsJSFetchOpts { MaxMsgs = max };
var fetchMsgCount = 0;

await foreach (var msg in consumer.FetchAllNoWaitAsync<NatsMemoryOwner<byte>>(fetchNoWaitOpts, cts.Token))
{
fetchMsgCount++;
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
Console.WriteLine($"Received: {message}");
}

await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}

if (fetchMsgCount < fetchNoWaitOpts.MaxMsgs)
{
Console.WriteLine("No more messages. Pause for more...");
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
catch (NatsJSProtocolException e)
{
Console.WriteLine(e.Message);
}
catch (NatsJSException e)
{
Console.WriteLine(e.Message);
await Task.Delay(1000);
}
}
}
else if (cmd == "fetch-all")
{
while (!cts.Token.IsCancellationRequested)
Expand Down
48 changes: 46 additions & 2 deletions src/NATS.Client.JetStream/NatsJSConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,51 @@ await sub.CallMsgNextAsync(
}
}

public async IAsyncEnumerable<NatsJSMsg<T?>> FetchNoWait<T>(
/// <summary>
/// Consume a set number of messages from the stream using this consumer.
/// Returns immediately if no messages are available.
/// </summary>
/// <param name="opts">Fetch options. (default: <c>MaxMsgs</c> 1,000 and timeout is ignored)</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the call.</param>
/// <typeparam name="T">Message type to deserialize.</typeparam>
/// <returns>Async enumerable of messages which can be used in a <c>await foreach</c> loop.</returns>
/// <exception cref="NatsJSProtocolException">Consumer is deleted, it's push based or request sent to server is invalid.</exception>
/// <exception cref="NatsJSException">There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <remarks>
/// <para>
/// This method will return immediately if no messages are available.
/// </para>
/// <para>
/// Using this method is discouraged because it might create an unnecessary load on your cluster.
/// Use <c>Consume</c> or <c>Fetch</c> instead.
/// </para>
/// </remarks>
/// <example>
/// <para>
/// However, there are scenarios where this method is useful. For example if your application is
/// processing messages in batches infrequently (for example every 5 minutes) you might want to
/// consider <c>FetchNoWait</c>. You must make sure to count your messages and stop fetching
/// if you received all of them in one call, meaning when <c>count &lt; MaxMsgs</c>.
/// </para>
/// <code>
/// const int max = 10;
/// var count = 0;
///
/// await foreach (var msg in consumer.FetchAllNoWaitAsync&lt;int&gt;(new NatsJSFetchOpts { MaxMsgs = max }))
/// {
/// count++;
/// Process(msg);
/// await msg.AckAsync();
/// }
///
/// if (count &lt; max)
/// {
/// // No more messages. Pause for more.
/// await Task.Delay(TimeSpan.FromMinutes(5));
/// }
/// </code>
/// </example>
public async IAsyncEnumerable<NatsJSMsg<T?>> FetchAllNoWaitAsync<T>(
NatsJSFetchOpts? opts = default,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -254,7 +298,7 @@ await sub.CallMsgNextAsync(
// When no wait is set we don't need to send the idle heartbeat and expiration
// If no message is available the server will respond with a 404 immediately
// If messages are available the server will send a 408 direct after the last message
? new ConsumerGetnextRequest {Batch = max.MaxMsgs, MaxBytes = max.MaxBytes, NoWait = opts.NoWait}
? new ConsumerGetnextRequest { Batch = max.MaxMsgs, MaxBytes = max.MaxBytes, NoWait = opts.NoWait }
: new ConsumerGetnextRequest
{
Batch = max.MaxMsgs,
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public async Task FetchNoWait_test()

var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token);
var count = 0;
await foreach (var msg in consumer.FetchNoWait<TestData>(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token))
await foreach (var msg in consumer.FetchAllNoWaitAsync<TestData>(new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token))
{
await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token);
Assert.Equal(count, msg.Data!.Test);
Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.0-alpha.5
2.0.0-alpha.6
Loading