Skip to content

Commit

Permalink
Consumer notifications (#265)
Browse files Browse the repository at this point in the history
* Consumer terminal errors and notifications

Consume and fetch methods now generates notifications for unhandled
errors. Also the unrecoverable 409 errors will stop the consume loop
with an exception in addition to 400 and 404 errors.

* Format fixed
  • Loading branch information
mtmk authored Nov 30, 2023
1 parent 7766619 commit 87b897b
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 7 deletions.
4 changes: 3 additions & 1 deletion src/NATS.Client.JetStream/INatsJSNotification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ public interface INatsJSNotification
string Name { get; }
}

public class NatsJSTimeoutNotification : INatsJSNotification
public record NatsJSTimeoutNotification : INatsJSNotification
{
public string Name => "Timeout";
}

public record NatsJSProtocolNotification(string Name, int HeaderCode, string HeaderMessageText) : INatsJSNotification;
1 change: 1 addition & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ protected override async ValueTask ReceiveInternalAsync(
}
else
{
_notificationChannel?.Notify(new NatsJSProtocolNotification("Unhandled protocol message", headers.Code, headers.MessageText));
_logger.LogWarning(NatsJSLogEvents.ProtocolMessage, "Unhandled protocol message: {Code} {Description}", headers.Code, headers.MessageText);
}
}
Expand Down
26 changes: 21 additions & 5 deletions src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,25 @@ internal static class NatsJSExtensionsInternal
{
public static long ToNanos(this TimeSpan timeSpan) => (long)(timeSpan.TotalMilliseconds * 1_000_000);

public static bool HasTerminalJSError(this NatsHeaders headers) => headers
is { Code: 400 }
or { Code: 404 }
or { Code: 409, Message: NatsHeaders.Messages.ConsumerDeleted }
or { Code: 409, Message: NatsHeaders.Messages.ConsumerIsPushBased };
public static bool HasTerminalJSError(this NatsHeaders headers)
{
// terminal codes
if (headers is { Code: 400 } or { Code: 404 })
return true;

// sometimes terminal 409s
if (headers is { Code: 409 })
{
if (headers is { Message: NatsHeaders.Messages.ConsumerDeleted } or { Message: NatsHeaders.Messages.ConsumerIsPushBased })
return true;

if (headers.MessageText.StartsWith("Exceeded MaxRequestBatch", StringComparison.OrdinalIgnoreCase)
|| headers.MessageText.StartsWith("Exceeded MaxRequestExpires", StringComparison.OrdinalIgnoreCase)
|| headers.MessageText.StartsWith("Exceeded MaxRequestMaxBytes", StringComparison.OrdinalIgnoreCase)
|| headers.MessageText.StartsWith("Exceeded MaxWaiting", StringComparison.OrdinalIgnoreCase))
return true;
}

return false;
}
}
1 change: 1 addition & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSFetch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ protected override async ValueTask ReceiveInternalAsync(
}
else
{
_notificationChannel?.Notify(new NatsJSProtocolNotification("Unhandled protocol message", headers.Code, headers.MessageText));
_logger.LogWarning(NatsJSLogEvents.ProtocolMessage, "Unhandled protocol message: {Code} {Description}", headers.Code, headers.MessageText);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.JetStream/NatsJSOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public record NatsJSFetchOpts
/// <summary>
/// Maximum number of messages to return
/// </summary>
public int? MaxMsgs { get; init; }
public int? MaxMsgs { get; init; }

/// <summary>
/// Amount of time to wait for the request to expire
Expand Down
165 changes: 165 additions & 0 deletions tests/NATS.Client.JetStream.Tests/ConsumerNotificationTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
using System.Diagnostics;
using NATS.Client.Core.Tests;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream.Tests;

public class ConsumerNotificationTest
{
[SkipOnPlatform("WINDOWS", "doesn't support signals")]
public async Task Non_terminal_errors_sent_as_notifications()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
(await js.PublishAsync("s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess();

var consumer1 = await js.CreateOrUpdateConsumerAsync(stream: "s1", config: new ConsumerConfig("c1"), cancellationToken: cts.Token);
var consumer2 = await js.CreateOrUpdateConsumerAsync(stream: "s1", config: new ConsumerConfig("c2"), cancellationToken: cts.Token);

var cts1 = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);
var natsJSConsumeOpts = new NatsJSConsumeOpts
{
MaxMsgs = 10,
NotificationHandler = (notification, _) =>
{
if (notification is NatsJSProtocolNotification { HeaderCode: 409, HeaderMessageText: "Server Shutdown" })
{
cts1.Cancel();
}

return Task.CompletedTask;
},
};

var cts2 = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);
var natsJSFetchOpts = new NatsJSFetchOpts
{
MaxMsgs = 10,
NotificationHandler = (notification, _) =>
{
if (notification is NatsJSProtocolNotification { HeaderCode: 409, HeaderMessageText: "Server Shutdown" })
{
cts2.Cancel();
}

return Task.CompletedTask;
},
};

var signal1 = new WaitSignal();

var consumeTask = Task.Run(async () =>
{
await foreach (var unused in consumer1.ConsumeAsync<int>(opts: natsJSConsumeOpts, cancellationToken: cts1.Token))
{
signal1.Pulse();
}
});

var signal2 = new WaitSignal();

var fetchTask = Task.Run(async () =>
{
await foreach (var unused in consumer2.FetchAsync<int>(opts: natsJSFetchOpts, cancellationToken: cts2.Token))
{
signal2.Pulse();
}
});

await signal1;
await signal2;

// SIGTERM: Stops the server gracefully
Process.Start("kill", $"-TERM {server.ServerProcess!.Id}");

await Task.WhenAll(consumeTask, fetchTask);
}

[Fact]
public async Task Exceeded_max_errors()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);

// 409 Exceeded MaxRequestBatch
await ConsumeAndFetchTerminatesAsync(
js,
new ConsumerConfig("c1") { MaxBatch = 10, },
new NatsJSConsumeOpts { MaxMsgs = 20, },
new NatsJSFetchOpts { MaxMsgs = 20, },
expectedCode: 409,
expectedMessage: "Exceeded MaxRequestBatch of 10",
cts.Token);

// 409 Exceeded MaxRequestExpires
await ConsumeAndFetchTerminatesAsync(
js,
new ConsumerConfig("c2") { MaxExpires = TimeSpan.FromSeconds(10), },
new NatsJSConsumeOpts { MaxMsgs = 20, Expires = TimeSpan.FromSeconds(20) },
new NatsJSFetchOpts { MaxMsgs = 20, Expires = TimeSpan.FromSeconds(20) },
expectedCode: 409,
expectedMessage: "Exceeded MaxRequestExpires of 10s",
cts.Token);

// 409 Exceeded MaxRequestMaxBytes
await ConsumeAndFetchTerminatesAsync(
js,
new ConsumerConfig("c3") { MaxBytes = 1024, },
new NatsJSConsumeOpts { MaxBytes = 2048, },
new NatsJSFetchOpts { MaxBytes = 2048, },
expectedCode: 409,
expectedMessage: "Exceeded MaxRequestMaxBytes of 1024",
cts.Token);
}

private async Task ConsumeAndFetchTerminatesAsync(
NatsJSContext js,
ConsumerConfig consumerConfig,
NatsJSConsumeOpts natsJSConsumeOpts,
NatsJSFetchOpts natsJSFetchOpts,
int expectedCode,
string expectedMessage,
CancellationToken cancellationToken)
{
var consumer = await js.CreateOrUpdateConsumerAsync(
stream: "s1",
config: consumerConfig,
cancellationToken: cancellationToken);

// consume
{
var e = await Assert.ThrowsAsync<NatsJSProtocolException>(async () =>
{
await foreach (var unused in consumer.ConsumeAsync<int>(opts: natsJSConsumeOpts, cancellationToken: cancellationToken))
{
}
});

Assert.Equal(expectedCode, e.HeaderCode);
Assert.Equal(expectedMessage, e.HeaderMessageText);
}

// fetch
{
var e = await Assert.ThrowsAsync<NatsJSProtocolException>(async () =>
{
await foreach (var unused in consumer.FetchAsync<int>(opts: natsJSFetchOpts, cancellationToken: cancellationToken))
{
}
});

Assert.Equal(expectedCode, e.HeaderCode);
Assert.Equal(expectedMessage, e.HeaderMessageText);
}
}
}

0 comments on commit 87b897b

Please sign in to comment.