Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume clean exit fixes #161

Merged
merged 3 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 72 additions & 20 deletions sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Buffers;
using System.Diagnostics;
using Example.JetStream.PullConsumer;
using System.Text;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;
Expand All @@ -20,12 +21,12 @@

var consumer = await js.CreateConsumerAsync("s1", "c1");

var idle = TimeSpan.FromSeconds(15);
var expires = TimeSpan.FromSeconds(30);
var idle = TimeSpan.FromSeconds(5);
var expires = TimeSpan.FromSeconds(10);

// int? maxMsgs = null;
// int? maxBytes = 128;
int? maxMsgs = 1000;
int? maxMsgs = 10;
int? maxBytes = null;

void Report(int i, Stopwatch sw, string data)
Expand All @@ -41,7 +42,6 @@ void Report(int i, Stopwatch sw, string data)
MaxBytes = maxBytes,
Expires = expires,
IdleHeartbeat = idle,
Serializer = new RawDataSerializer(),
};

var fetchOpts = new NatsJSFetchOpts
Expand All @@ -50,32 +50,39 @@ void Report(int i, Stopwatch sw, string data)
MaxBytes = maxBytes,
Expires = expires,
IdleHeartbeat = idle,
Serializer = new RawDataSerializer(),
};

var nextOpts = new NatsJSNextOpts
{
Expires = expires,
IdleHeartbeat = idle,
Serializer = new RawDataSerializer(),
};

var stopwatch = Stopwatch.StartNew();
var count = 0;

var cmd = args.Length > 0 ? args[0] : "consume";
var cmdOpt = args.Length > 1 ? args[1] : "none";

try
{
if (args.Length > 0 && args[0] == "fetch")
if (cmd == "fetch")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
Console.WriteLine($"___\nFETCH {maxMsgs}");
await consumer.RefreshAsync(cts.Token);
await using var sub = await consumer.FetchAsync<RawData>(fetchOpts, cts.Token);
await using var sub = await consumer.FetchAsync<NatsMemoryOwner<byte>>(fetchOpts, cts.Token);
await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
{
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
Console.WriteLine($"Received: {message}");
}

await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
Expand All @@ -91,16 +98,22 @@ void Report(int i, Stopwatch sw, string data)
}
}
}
else if (args.Length > 0 && args[0] == "fetch-all")
else if (cmd == "fetch-all")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
Console.WriteLine($"___\nFETCH {maxMsgs}");
await consumer.RefreshAsync(cts.Token);
await foreach (var msg in consumer.FetchAllAsync<RawData>(fetchOpts, cts.Token))
await foreach (var msg in consumer.FetchAllAsync<NatsMemoryOwner<byte>>(fetchOpts, cts.Token))
{
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
Console.WriteLine($"Received: {message}");
}

await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
Expand All @@ -116,16 +129,22 @@ void Report(int i, Stopwatch sw, string data)
}
}
}
else if (args.Length > 0 && args[0] == "next")
else if (cmd == "next")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
Console.WriteLine("___\nNEXT");
var next = await consumer.NextAsync<RawData>(nextOpts, cts.Token);
var next = await consumer.NextAsync<NatsMemoryOwner<byte>>(nextOpts, cts.Token);
if (next is { } msg)
{
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
Console.WriteLine($"Received: {message}");
}

await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
Expand All @@ -141,21 +160,48 @@ void Report(int i, Stopwatch sw, string data)
}
}
}
else if (args.Length > 0 && args[0] == "consume")
else if (cmd == "consume")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
Console.WriteLine("___\nCONSUME");
await using var sub = await consumer.ConsumeAsync<RawData>(
consumeOpts,
cts.Token);
await using var sub = await consumer.ConsumeAsync<NatsMemoryOwner<byte>>(consumeOpts);

await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
cts.Token.Register(() =>
{
sub.DisposeAsync().GetAwaiter().GetResult();
});

var stopped = false;
await foreach (var msg in sub.Msgs.ReadAllAsync())
{
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
Console.WriteLine($"Received: {message}");
if (message == "stop")
{
Console.WriteLine("Stopping consumer...");
sub.Stop();
stopped = true;
}
}

await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");

if (cmdOpt == "with-pause")
{
await Task.Delay(1_000);
}
}

if (stopped)
{
Console.WriteLine("Stopped consumer.");
break;
}
}
catch (NatsJSProtocolException e)
Expand All @@ -169,15 +215,21 @@ void Report(int i, Stopwatch sw, string data)
}
}
}
else if (args.Length > 0 && args[0] == "consume-all")
else if (cmd == "consume-all")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
Console.WriteLine("___\nCONSUME-ALL");
await foreach (var msg in consumer.ConsumeAllAsync<RawData>(consumeOpts, cts.Token))
await foreach (var msg in consumer.ConsumeAllAsync<NatsMemoryOwner<byte>>(consumeOpts, cts.Token))
{
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
Console.WriteLine($"Received: {message}");
}

await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
Expand Down
12 changes: 0 additions & 12 deletions sandbox/Example.JetStream.PullConsumer/RawData.cs

This file was deleted.

28 changes: 0 additions & 28 deletions sandbox/Example.JetStream.PullConsumer/RawDataSerializer.cs

This file was deleted.

28 changes: 19 additions & 9 deletions src/NATS.Client.JetStream/INatsJSConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,28 @@

namespace NATS.Client.JetStream;

/// <summary>
/// Interface to manage a <c>consume()</c> operation on a consumer.
/// </summary>
public interface INatsJSConsume : IAsyncDisposable
{
void Stop();
}

/// <summary>
/// Interface to extract messages from a <c>consume()</c> operation on a consumer.
/// </summary>
public interface INatsJSConsume<T> : INatsJSConsume
public interface INatsJSConsume<T> : IAsyncDisposable
{
/// <summary>
/// Messages received from the consumer.
/// </summary>
ChannelReader<NatsJSMsg<T?>> Msgs { get; }

/// <summary>
/// Stop the consumer gracefully.
/// </summary>
/// <remarks>
/// <para>
/// This will wait for any inflight messages to be processed before stopping.
/// </para>
/// <para>
/// Disposing would stop consuming immediately. This might leave messages behind
/// without acknowledgement. Which is fine, messages will be scheduled for redelivery,
/// however, it might not be the desired behavior.
/// </para>
/// </remarks>
void Stop();
}
13 changes: 4 additions & 9 deletions src/NATS.Client.JetStream/INatsJSFetch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@

namespace NATS.Client.JetStream;

/// <summary>
/// Interface to manage a <c>fetch()</c> operation on a consumer.
/// </summary>
public interface INatsJSFetch : IAsyncDisposable
{
void Stop();
}

/// <summary>
/// Interface to extract messages from a <c>fetch()</c> operation on a consumer.
/// </summary>
public interface INatsJSFetch<T> : INatsJSFetch
public interface INatsJSFetch<T> : IAsyncDisposable
{
/// <summary>
/// User messages received from the consumer.
/// </summary>
ChannelReader<NatsJSMsg<T?>> Msgs { get; }
}
Loading
Loading