From f44a52787131bfcede41361872772b6f3dfb31ff Mon Sep 17 00:00:00 2001 From: Caleb Lloyd <2414837+caleblloyd@users.noreply.github.com> Date: Tue, 23 Jan 2024 07:50:09 -0500 Subject: [PATCH 1/2] pipe reader: don't mark commands as consumed until pending=0 (#347) * Send buffer refinements * pipe reader: don't mark commands as consumed until pending=0 Signed-off-by: Caleb Lloyd --------- Signed-off-by: Caleb Lloyd Co-authored-by: Ziya Suzen --- .../Commands/ProtocolWriter.cs | 4 +- .../NatsPipeliningWriteProtocolProcessor.cs | 137 ++++++++++-------- tests/NATS.Client.Core.Tests/ProtocolTest.cs | 48 ++++++ .../InMemoryTestLoggerFactory.cs | 55 +++++++ 4 files changed, 179 insertions(+), 65 deletions(-) create mode 100644 tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs diff --git a/src/NATS.Client.Core/Commands/ProtocolWriter.cs b/src/NATS.Client.Core/Commands/ProtocolWriter.cs index 4ddc439d3..fe7bd9e6e 100644 --- a/src/NATS.Client.Core/Commands/ProtocolWriter.cs +++ b/src/NATS.Client.Core/Commands/ProtocolWriter.cs @@ -91,12 +91,12 @@ public int WritePublish(string subject, T? value, NatsHeaders? headers, strin int ctrlLen; if (headers == null) { - // 'PUB ' + subject +' '+ payload len +'\r\n' + // 'PUB ' + subject +' '+ payload len +'\r\n' ctrlLen = PubSpaceLength + _subjectEncoding.GetByteCount(subject) + 1 + MaxIntStringLength + NewLineLength; } else { - // 'HPUB ' + subject +' '+ header len +' '+ payload len +'\r\n' + // 'HPUB ' + subject +' '+ header len +' '+ payload len +'\r\n' ctrlLen = HpubSpaceLength + _subjectEncoding.GetByteCount(subject) + 1 + MaxIntStringLength + 1 + MaxIntStringLength + NewLineLength; } diff --git a/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs index b26e144ec..50661adaa 100644 --- a/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs @@ -61,6 +61,7 @@ private async Task WriteLoopAsync() var pending = 0; var trimming = 0; var examinedOffset = 0; + var sent = 0; // memory segment used to consolidate multiple small memory chunks // should <= (minimumSegmentSize * 0.5) in CommandWriter @@ -79,9 +80,11 @@ private async Task WriteLoopAsync() { while (true) { + // read data from pipe reader var result = await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(false); if (result.IsCanceled) { + // if the pipe has been canceled, break break; } @@ -89,7 +92,8 @@ private async Task WriteLoopAsync() var examinedPos = result.Buffer.Start; try { - var buffer = result.Buffer.Slice(examinedOffset); + // move from _queuedCommandReader to _inFlightCommands until the total size + // of all _inFlightCommands is >= result.Buffer.Length while (inFlightSum < result.Buffer.Length) { QueuedCommand queuedCommand; @@ -102,8 +106,15 @@ private async Task WriteLoopAsync() inFlightSum += queuedCommand.Size; } + // examinedOffset was processed last iteration, so slice it off the buffer + var buffer = result.Buffer.Slice(examinedOffset); + + // iterate until buffer is empty + // any time buffer sliced and re-assigned, continue should be called + // so that this conditional is checked while (buffer.Length > 0) { + // if there are no pending bytes to send, set to next command if (pending == 0) { var peek = _inFlightCommands.Peek(); @@ -111,24 +122,73 @@ private async Task WriteLoopAsync() trimming = peek.Trim; } + // from this point forward, pending != 0 + // any operation that decrements pending should check if it is 0, + // and dequeue from _inFlightCommands if it is + + // trim any bytes that should not be sent if (trimming > 0) { var trimmed = Math.Min(trimming, (int)buffer.Length); - consumedPos = buffer.GetPosition(trimmed); - examinedPos = buffer.GetPosition(trimmed); - examinedOffset = 0; - buffer = buffer.Slice(trimmed); pending -= trimmed; trimming -= trimmed; if (pending == 0) { // the entire command was trimmed (canceled) inFlightSum -= _inFlightCommands.Dequeue().Size; + consumedPos = buffer.GetPosition(trimmed); + examinedPos = consumedPos; + examinedOffset = 0; + buffer = buffer.Slice(trimmed); + + // iterate in case buffer is now empty + continue; + } + + // the command was partially trimmed + examinedPos = buffer.GetPosition(trimmed); + examinedOffset += trimmed; + buffer = buffer.Slice(trimmed); + + // iterate in case buffer is now empty + continue; + } + + if (sent > 0) + { + if (pending <= sent) + { + // the entire command was sent + inFlightSum -= _inFlightCommands.Dequeue().Size; + Interlocked.Add(ref _counter.PendingMessages, -1); + Interlocked.Add(ref _counter.SentMessages, 1); + + // mark the bytes as consumed, and reset pending + sent -= pending; + consumedPos = buffer.GetPosition(pending); + examinedPos = consumedPos; + examinedOffset = 0; + buffer = buffer.Slice(pending); + pending = 0; + + // iterate in case buffer is now empty + continue; } + // the command was partially sent + // decrement pending by the number of bytes that were sent + pending -= sent; + examinedPos = buffer.GetPosition(sent); + examinedOffset += sent; + buffer = buffer.Slice(sent); + sent = 0; + + // iterate in case buffer is now empty continue; } + // loop through _inFlightCommands to determine whether any commands + // in the first memory segment need trimming var sendMem = buffer.First; var maxSize = 0; var maxSizeCap = Math.Max(sendMem.Length, consolidateMem.Length); @@ -158,14 +218,17 @@ private async Task WriteLoopAsync() maxSize += command.Size; } + // adjust the first memory segment to end on a command boundary if (sendMem.Length > maxSize) { sendMem = sendMem[..maxSize]; } - var bufferIter = buffer; - if (doTrim || (bufferIter.Length > sendMem.Length && sendMem.Length < consolidateMem.Length)) + // if trimming is required or the first memory segment is smaller than consolidateMem + // consolidate bytes that need to be sent into consolidateMem + if (doTrim || (buffer.Length > sendMem.Length && sendMem.Length < consolidateMem.Length)) { + var bufferIter = buffer; var memIter = consolidateMem; var trimmedSize = 0; foreach (var command in _inFlightCommands) @@ -215,77 +278,25 @@ private async Task WriteLoopAsync() // perform send _stopwatch.Restart(); - var sent = await _socketConnection.SendAsync(sendMem).ConfigureAwait(false); + sent = await _socketConnection.SendAsync(sendMem).ConfigureAwait(false); _stopwatch.Stop(); Interlocked.Add(ref _counter.SentBytes, sent); if (isEnabledTraceLogging) { logger.LogTrace("Socket.SendAsync. Size: {0} Elapsed: {1}ms", sent, _stopwatch.Elapsed.TotalMilliseconds); } - - var consumed = 0; - var sentAndTrimmed = sent; - while (consumed < sentAndTrimmed) - { - if (pending == 0) - { - var peek = _inFlightCommands.Peek(); - pending = peek.Size - peek.Trim; - consumed += peek.Trim; - sentAndTrimmed += peek.Trim; - - if (pending == 0) - { - // the entire command was trimmed (canceled) - inFlightSum -= _inFlightCommands.Dequeue().Size; - continue; - } - } - - if (pending <= sentAndTrimmed - consumed) - { - // the entire command was sent - inFlightSum -= _inFlightCommands.Dequeue().Size; - Interlocked.Add(ref _counter.PendingMessages, -1); - Interlocked.Add(ref _counter.SentMessages, 1); - - // mark the bytes as consumed, and reset pending - consumed += pending; - pending = 0; - } - else - { - // the entire command was not sent; decrement pending by - // the number of bytes from the command that was sent - pending += consumed - sentAndTrimmed; - break; - } - } - - if (consumed > 0) - { - // mark fully sent commands as consumed - consumedPos = buffer.GetPosition(consumed); - examinedOffset = sentAndTrimmed - consumed; - } - else - { - // no commands were consumed - examinedOffset += sentAndTrimmed; - } - - // lop off sent bytes for next iteration - examinedPos = buffer.GetPosition(sentAndTrimmed); - buffer = buffer.Slice(sentAndTrimmed); } } finally { + // _pipeReader.AdvanceTo must be called exactly once for every + // _pipeReader.ReadAsync, which is why it is in the finally block _pipeReader.AdvanceTo(consumedPos, examinedPos); } if (result.IsCompleted) { + // if the pipe has been completed, break break; } } diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index 030ece422..e97fef5b3 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -1,5 +1,7 @@ using System.Buffers; using System.Text; +using Microsoft.Extensions.Logging; +using NATS.Client.TestUtilities; namespace NATS.Client.Core.Tests; @@ -338,6 +340,52 @@ await Retry.Until( await nats.DisposeAsync(); } + [Fact] + public async Task Protocol_parser_under_load() + { + await using var server = NatsServer.Start(); + var logger = new InMemoryTestLoggerFactory(LogLevel.Error); + var opts = server.ClientOpts(NatsOpts.Default) with { LoggerFactory = logger }; + var nats = new NatsConnection(opts); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var signal = new WaitSignal(); + + _ = Task.Run( + async () => + { + var count = 0; + await foreach (var unused in nats.SubscribeAsync("x", cancellationToken: cts.Token)) + { + if (++count > 10_000) + signal.Pulse(); + } + }, + cts.Token); + + _ = Task.Run( + async () => + { + while (!cts.Token.IsCancellationRequested) + await nats.PublishAsync("x", "x", cancellationToken: cts.Token); + }, + cts.Token); + + await signal; + + for (var i = 0; i < 3; i++) + { + await Task.Delay(1_000, cts.Token); + await server.RestartAsync(); + } + + foreach (var log in logger.Logs.Where(x => x.EventId == NatsLogEvents.Protocol && x.LogLevel == LogLevel.Error)) + { + Assert.DoesNotContain("Unknown Protocol Operation", log.Message); + } + } + private sealed class NatsSubReconnectTest : NatsSubBase { private readonly Action _callback; diff --git a/tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs b/tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs new file mode 100644 index 000000000..fddde99e8 --- /dev/null +++ b/tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs @@ -0,0 +1,55 @@ +using Microsoft.Extensions.Logging; + +namespace NATS.Client.TestUtilities; + +public class InMemoryTestLoggerFactory(LogLevel level) : ILoggerFactory +{ + private readonly List _messages = new(); + + public IReadOnlyList Logs + { + get + { + lock (_messages) + return _messages.ToList(); + } + } + + public ILogger CreateLogger(string categoryName) => new TestLogger(categoryName, level, this); + + public void AddProvider(ILoggerProvider provider) + { + } + + public void Dispose() + { + } + + private void Log(string categoryName, LogLevel logLevel, EventId eventId, Exception? exception, string message) + { + lock (_messages) + _messages.Add(new LogMessage(categoryName, logLevel, eventId, exception, message)); + } + + public record LogMessage(string Category, LogLevel LogLevel, EventId EventId, Exception? Exception, string Message); + + private class TestLogger(string categoryName, LogLevel level, InMemoryTestLoggerFactory logger) : ILogger + { + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) + { + if (logLevel >= level) + logger.Log(categoryName, logLevel, eventId, exception, formatter(state, exception)); + } + + public bool IsEnabled(LogLevel logLevel) => logLevel >= level; + + public IDisposable BeginScope(TState state) => new NullDisposable(); + + private class NullDisposable : IDisposable + { + public void Dispose() + { + } + } + } +} From 7b930ea6478579f0265e45f4d82e73d1a69ac44f Mon Sep 17 00:00:00 2001 From: Morten Mertner Date: Tue, 23 Jan 2024 14:39:03 +0100 Subject: [PATCH 2/2] Async event handlers (#324) * Async event handlers Write events to channel and publish from a background task. This allows handlers to be async and also prevents event listeners from blocking the NatsConnection code. * Added WaitToReadAsync return value check * Added NatsEventArgs and renamed MessageDroppedError to NatsMessageDroppedEventArgs. Changed events to use EventArgs and adjusted internal subscribers. Renamed OnError to MessageDropped, as that better reflects what error is signaled and also is naming-wise more consistent with the other events. * Use inner while to read all queued messages before waiting again * Changed AsyncEventHandler to return ValueTask. Removed license blurp from AsyncEventHandler as it now different from the original delegate. Adjusted code to match new signature. Note that "default" for ValueTask is a completed task. * Fixed build * dotnet format --------- Co-authored-by: Ziya Suzen --- src/NATS.Client.Core/AsyncEventHandler.cs | 18 ++++ src/NATS.Client.Core/INatsError.cs | 34 ------- .../Internal/AsyncEventExtensions.cs | 94 +++++++++++++++++++ src/NATS.Client.Core/NatsConnection.cs | 78 ++++++++++++--- src/NATS.Client.Core/NatsEventArgs.cs | 35 +++++++ src/NATS.Client.Core/NatsOpts.cs | 2 +- src/NATS.Client.Core/NatsSub.cs | 2 +- .../Internal/NatsJSOrderedConsume.cs | 10 +- .../Internal/NatsJSOrderedPushConsumer.cs | 14 ++- .../Internal/NatsKVWatchSub.cs | 6 +- .../Internal/NatsKVWatcher.cs | 6 +- .../ConnectionRetryTest.cs | 12 ++- tests/NATS.Client.Core.Tests/ProtocolTest.cs | 6 +- .../SlowConsumerTest.cs | 6 +- .../NatsKVWatcherTest.cs | 6 +- tests/NATS.Client.TestUtilities/WaitSignal.cs | 10 +- 16 files changed, 270 insertions(+), 69 deletions(-) create mode 100644 src/NATS.Client.Core/AsyncEventHandler.cs delete mode 100644 src/NATS.Client.Core/INatsError.cs create mode 100644 src/NATS.Client.Core/Internal/AsyncEventExtensions.cs create mode 100644 src/NATS.Client.Core/NatsEventArgs.cs diff --git a/src/NATS.Client.Core/AsyncEventHandler.cs b/src/NATS.Client.Core/AsyncEventHandler.cs new file mode 100644 index 000000000..d999f20ba --- /dev/null +++ b/src/NATS.Client.Core/AsyncEventHandler.cs @@ -0,0 +1,18 @@ +namespace NATS.Client.Core; + +/// +/// An asynchronous event handler. +/// +/// The sender of the event. +/// Event arguments. +/// A value task whose completion signals handling is finished. +public delegate ValueTask AsyncEventHandler(object? sender, EventArgs args); + +/// +/// An asynchronous event handler. +/// +/// The type of event arguments. +/// The sender of the event. +/// Event arguments. +/// A value task whose completion signals handling is finished. +public delegate ValueTask AsyncEventHandler(object? sender, TEventArgs args); diff --git a/src/NATS.Client.Core/INatsError.cs b/src/NATS.Client.Core/INatsError.cs deleted file mode 100644 index 943742a82..000000000 --- a/src/NATS.Client.Core/INatsError.cs +++ /dev/null @@ -1,34 +0,0 @@ -namespace NATS.Client.Core; - -public interface INatsError -{ - string Message { get; } -} - -public sealed class MessageDroppedError : INatsError -{ - public MessageDroppedError(NatsSubBase subscription, int pending, string subject, string? replyTo, NatsHeaders? headers, object? data) - { - Subscription = subscription; - Pending = pending; - Subject = subject; - ReplyTo = replyTo; - Headers = headers; - Data = data; - Message = $"Dropped message from {subject} with {pending} pending messages"; - } - - public NatsSubBase Subscription { get; } - - public int Pending { get; } - - public string Subject { get; } - - public string? ReplyTo { get; } - - public NatsHeaders? Headers { get; } - - public object? Data { get; } - - public string Message { get; } -} diff --git a/src/NATS.Client.Core/Internal/AsyncEventExtensions.cs b/src/NATS.Client.Core/Internal/AsyncEventExtensions.cs new file mode 100644 index 000000000..833af470d --- /dev/null +++ b/src/NATS.Client.Core/Internal/AsyncEventExtensions.cs @@ -0,0 +1,94 @@ +// origin: https://github.com/microsoft/vs-threading/blob/9065e6e4b5593e6ed6e3ff0a9159d4e2765430d6/src/Microsoft.VisualStudio.Threading/TplExtensions.cs +// license: MIT +// +// Microsoft.VisualStudio.Threading +// Copyright (c) Microsoft Corporation +// All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +namespace NATS.Client.Core.Internal; + +internal static class AsyncEventExtensions +{ + /// + /// Invokes asynchronous event handlers, returning a task that completes when all event handlers have been invoked. + /// Each handler is fully executed (including continuations) before the next handler in the list is invoked. + /// + /// The event handlers. May be . + /// The event source. + /// The event argument. + /// The task that completes when all handlers have completed. + /// Thrown if any handlers fail. It contains a collection of all failures. + public static async Task InvokeAsync(this AsyncEventHandler? handlers, object? sender, EventArgs args) + { + if (handlers != null) + { + var individualHandlers = handlers.GetInvocationList(); + List? exceptions = null; + foreach (var asyncHandler in individualHandlers) + { + var handler = (AsyncEventHandler)asyncHandler; + try + { + await handler(sender, args).ConfigureAwait(true); + } + catch (Exception ex) + { + exceptions ??= new List(2); + exceptions.Add(ex); + } + } + + if (exceptions != null) + { + throw new AggregateException(exceptions); + } + } + } + + /// + /// Invokes asynchronous event handlers, returning a task that completes when all event handlers have been invoked. + /// Each handler is fully executed (including continuations) before the next handler in the list is invoked. + /// + /// The type of argument passed to each handler. + /// The event handlers. May be . + /// The event source. + /// The event argument. + /// The task that completes when all handlers have completed. The task is faulted if any handlers throw an exception. + /// Thrown if any handlers fail. It contains a collection of all failures. + public static async Task InvokeAsync(this AsyncEventHandler? handlers, object? sender, TEventArgs args) + { + if (handlers != null) + { + var individualHandlers = handlers.GetInvocationList(); + List? exceptions = null; + foreach (var asyncHandler in individualHandlers) + { + var handler = (AsyncEventHandler)asyncHandler; + try + { + await handler(sender, args).ConfigureAwait(true); + } + catch (Exception ex) + { + exceptions ??= new List(2); + exceptions.Add(ex); + } + } + + if (exceptions != null) + { + throw new AggregateException(exceptions); + } + } + } +} diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index 8dbfe3a81..97b2674a9 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -1,6 +1,5 @@ using System.Buffers; using System.Diagnostics; -using System.IO.Pipelines; using System.Threading.Channels; using Microsoft.Extensions.Logging; using NATS.Client.Core.Commands; @@ -16,6 +15,14 @@ public enum NatsConnectionState Reconnecting, } +internal enum NatsEvent +{ + ConnectionOpened, + ConnectionDisconnected, + ReconnectFailed, + MessageDropped, +} + public partial class NatsConnection : INatsConnection { #pragma warning disable SA1401 @@ -37,7 +44,9 @@ public partial class NatsConnection : INatsConnection private readonly string _name; private readonly TimeSpan _socketComponentDisposeTimeout = TimeSpan.FromSeconds(5); private readonly BoundedChannelOptions _defaultSubscriptionChannelOpts; + private readonly Channel<(NatsEvent, NatsEventArgs)> _eventChannel; private readonly ClientOpts _clientOpts; + private int _pongCount; private int _connectionState; @@ -84,16 +93,25 @@ public NatsConnection(NatsOpts opts) SingleReader = false, AllowSynchronousContinuations = false, }; + + // push consumer events to a channel so handlers can be awaited (also prevents user code from blocking us) + _eventChannel = Channel.CreateUnbounded<(NatsEvent, NatsEventArgs)>(new UnboundedChannelOptions + { + AllowSynchronousContinuations = false, + SingleWriter = false, + SingleReader = true, + }); + _ = Task.Run(PublishEventsAsync, _disposedCancellationTokenSource.Token); } // events - public event EventHandler? ConnectionDisconnected; + public event AsyncEventHandler? ConnectionDisconnected; - public event EventHandler? ConnectionOpened; + public event AsyncEventHandler? ConnectionOpened; - public event EventHandler? ReconnectFailed; + public event AsyncEventHandler? ReconnectFailed; - public event EventHandler? OnError; + public event AsyncEventHandler? MessageDropped; public NatsOpts Opts { get; } @@ -213,11 +231,11 @@ internal ValueTask UnsubscribeAsync(int sid) return ValueTask.CompletedTask; } - internal void MessageDropped(NatsSubBase natsSub, int pending, NatsMsg msg) + internal void OnMessageDropped(NatsSubBase natsSub, int pending, NatsMsg msg) { var subject = msg.Subject; _logger.LogWarning("Dropped message from {Subject} with {Pending} pending messages", subject, pending); - OnError?.Invoke(this, new MessageDroppedError(natsSub, pending, subject, msg.ReplyTo, msg.Headers, msg.Data)); + _eventChannel.Writer.TryWrite((NatsEvent.MessageDropped, new NatsMessageDroppedEventArgs(natsSub, pending, subject, msg.ReplyTo, msg.Headers, msg.Data))); } internal BoundedChannelOptions GetChannelOpts(NatsOpts connectionOpts, NatsSubChannelOpts? subChannelOpts) @@ -345,7 +363,7 @@ private async ValueTask InitialConnectAsync() StartPingTimer(_pingTimerCancellationTokenSource.Token); _waitForOpenConnection.TrySetResult(); _ = Task.Run(ReconnectLoop); - ConnectionOpened?.Invoke(this, url?.ToString() ?? string.Empty); + _eventChannel.Writer.TryWrite((NatsEvent.ConnectionOpened, new NatsEventArgs(url?.ToString() ?? string.Empty))); } } @@ -488,8 +506,8 @@ private async void ReconnectLoop() _pingTimerCancellationTokenSource?.Cancel(); } - // Invoke after state changed - ConnectionDisconnected?.Invoke(this, _currentConnectUri?.ToString() ?? string.Empty); + // Invoke event after state changed + _eventChannel.Writer.TryWrite((NatsEvent.ConnectionDisconnected, new NatsEventArgs(_currentConnectUri?.ToString() ?? string.Empty))); // Cleanup current socket await DisposeSocketAsync(true).ConfigureAwait(false); @@ -567,7 +585,7 @@ private async void ReconnectLoop() _logger.LogWarning(NatsLogEvents.Connection, ex, "Failed to connect NATS {Url}", url); } - ReconnectFailed?.Invoke(this, url?.ToString() ?? string.Empty); + _eventChannel.Writer.TryWrite((NatsEvent.ReconnectFailed, new NatsEventArgs(url?.ToString() ?? string.Empty))); await WaitWithJitterAsync().ConfigureAwait(false); goto CONNECT_AGAIN; } @@ -582,7 +600,7 @@ private async void ReconnectLoop() StartPingTimer(_pingTimerCancellationTokenSource.Token); _waitForOpenConnection.TrySetResult(); _ = Task.Run(ReconnectLoop); - ConnectionOpened?.Invoke(this, url?.ToString() ?? string.Empty); + _eventChannel.Writer.TryWrite((NatsEvent.ConnectionOpened, new NatsEventArgs(url.ToString()))); } } catch (Exception ex) @@ -594,6 +612,42 @@ private async void ReconnectLoop() } } + private async Task PublishEventsAsync() + { + try + { + while (!_disposedCancellationTokenSource.IsCancellationRequested) + { + var hasData = await _eventChannel.Reader.WaitToReadAsync(_disposedCancellationTokenSource.Token).ConfigureAwait(false); + while (hasData && _eventChannel.Reader.TryRead(out var eventArgs)) + { + var (natsEvent, args) = eventArgs; + switch (natsEvent) + { + case NatsEvent.ConnectionOpened when ConnectionOpened != null: + await ConnectionOpened.InvokeAsync(this, args).ConfigureAwait(false); + break; + case NatsEvent.ConnectionDisconnected when ConnectionDisconnected != null: + await ConnectionDisconnected.InvokeAsync(this, args).ConfigureAwait(false); + break; + case NatsEvent.ReconnectFailed when ReconnectFailed != null: + await ReconnectFailed.InvokeAsync(this, args).ConfigureAwait(false); + break; + case NatsEvent.MessageDropped when MessageDropped != null && args is NatsMessageDroppedEventArgs error: + await MessageDropped.InvokeAsync(this, error).ConfigureAwait(false); + break; + } + } + } + } + catch (Exception ex) + { + _logger.LogError(NatsLogEvents.Connection, ex, "Error occured when publishing events"); + if (!_disposedCancellationTokenSource.IsCancellationRequested) + _ = Task.Run(PublishEventsAsync, _disposedCancellationTokenSource.Token); + } + } + private NatsUri FixTlsHost(NatsUri uri) { var lastSeedConnectUri = _lastSeedConnectUri; diff --git a/src/NATS.Client.Core/NatsEventArgs.cs b/src/NATS.Client.Core/NatsEventArgs.cs new file mode 100644 index 000000000..254930288 --- /dev/null +++ b/src/NATS.Client.Core/NatsEventArgs.cs @@ -0,0 +1,35 @@ +// ReSharper disable UnusedAutoPropertyAccessor.Global - properties are used by consumers outside of this library +namespace NATS.Client.Core; + +public class NatsEventArgs : EventArgs +{ + public NatsEventArgs(string message) => Message = message; + + public string Message { get; } +} + +public class NatsMessageDroppedEventArgs : NatsEventArgs +{ + public NatsMessageDroppedEventArgs(NatsSubBase subscription, int pending, string subject, string? replyTo, NatsHeaders? headers, object? data) + : base($"Dropped message from {subject} with {pending} pending messages") + { + Subscription = subscription; + Pending = pending; + Subject = subject; + ReplyTo = replyTo; + Headers = headers; + Data = data; + } + + public NatsSubBase Subscription { get; } + + public int Pending { get; } + + public string Subject { get; } + + public string? ReplyTo { get; } + + public NatsHeaders? Headers { get; } + + public object? Data { get; } +} diff --git a/src/NATS.Client.Core/NatsOpts.cs b/src/NATS.Client.Core/NatsOpts.cs index c44ffda56..4056427e8 100644 --- a/src/NATS.Client.Core/NatsOpts.cs +++ b/src/NATS.Client.Core/NatsOpts.cs @@ -108,7 +108,7 @@ public sealed record NatsOpts /// /// If the client reaches this internal limit (bounded channel capacity), by default it will drop messages /// and continue to process new messages. This is aligned with NATS at most once delivery. It is up to - /// the application to detect the missing messages () and recover + /// the application to detect the missing messages () and recover /// from this condition or set a different default such as BoundedChannelFullMode.Wait in which /// case it might risk server disconnecting the client as a slow consumer. /// diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSub.cs index b10439411..853408e83 100644 --- a/src/NATS.Client.Core/NatsSub.cs +++ b/src/NATS.Client.Core/NatsSub.cs @@ -21,7 +21,7 @@ internal NatsSub( { _msgs = Channel.CreateBounded>( connection.GetChannelOpts(connection.Opts, opts?.ChannelOpts), - msg => Connection.MessageDropped(this, _msgs?.Reader.Count ?? 0, msg)); + msg => Connection.OnMessageDropped(this, _msgs?.Reader.Count ?? 0, msg)); Serializer = serializer; } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs index 2b6d2d17f..e1a698320 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs @@ -4,7 +4,6 @@ using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Core.Commands; -using NATS.Client.Core.Internal; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream.Internal; @@ -97,7 +96,7 @@ public NatsJSOrderedConsume( // This channel is used to pass messages to the user from the subscription. _userMsgs = Channel.CreateBounded>( Connection.GetChannelOpts(Connection.Opts, opts?.ChannelOpts), - msg => Connection.MessageDropped(this, _userMsgs?.Reader.Count ?? 0, msg.Msg)); + msg => Connection.OnMessageDropped(this, _userMsgs?.Reader.Count ?? 0, msg.Msg)); Msgs = _userMsgs.Reader; // Pull request channel is set as unbounded because we don't want to drop @@ -317,10 +316,11 @@ protected override void TryComplete() _userMsgs.Writer.TryComplete(); } - private void ConnectionOnConnectionDisconnected(object? sender, string e) + private ValueTask ConnectionOnConnectionDisconnected(object? sender, NatsEventArgs args) { - _logger.LogWarning(NatsJSLogEvents.Connection, "Disconnected {Reason}", e); - _userMsgs.Writer.TryComplete(new NatsJSConnectionException(e)); + _logger.LogWarning(NatsJSLogEvents.Connection, "Disconnected {Reason}", args.Message); + _userMsgs.Writer.TryComplete(new NatsJSConnectionException(args.Message)); + return default; } private void ResetPending() diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs index 48401196a..04c762ffc 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -163,7 +163,11 @@ internal void Done() _msgChannel.Writer.TryComplete(); } - private void OnDisconnected(object? sender, string e) => StopHeartbeatTimer(); + private ValueTask OnDisconnected(object? sender, NatsEventArgs args) + { + StopHeartbeatTimer(); + return default; + } private async Task CommandLoop() { @@ -446,6 +450,10 @@ protected override void TryComplete() { } - private void OnConnectionOpened(object? sender, string e) => - _commands.TryWrite(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Ready }); + private ValueTask OnConnectionOpened(object? sender, NatsEventArgs args) + { + // result is discarded, so this code is assumed to not be failing + _ = _commands.TryWrite(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Ready }); + return default; + } } diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs index faf15fda8..fb0d2c7eb 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs @@ -62,8 +62,10 @@ protected override void TryComplete() { } - private void OnConnectionOpened(object? sender, string e) + private ValueTask OnConnectionOpened(object? sender, NatsEventArgs args) { - _commands.TryWrite(new NatsKVWatchCommandMsg { Command = NatsKVWatchCommand.Ready }); + // result is discarded, so this code is assumed to not be failing + _ = _commands.TryWrite(new NatsKVWatchCommandMsg { Command = NatsKVWatchCommand.Ready }); + return default; } } diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs index 241a7c4d9..5871ff24c 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs @@ -142,7 +142,11 @@ internal ValueTask InitAsync() return CreatePushConsumer("init"); } - private void OnDisconnected(object? sender, string e) => StopHeartbeatTimer(); + private ValueTask OnDisconnected(object? sender, NatsEventArgs args) + { + StopHeartbeatTimer(); + return default; + } private async Task CommandLoop() { diff --git a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs index f9954623b..dc1107b31 100644 --- a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs +++ b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs @@ -18,7 +18,11 @@ public async Task Max_retry_reached_after_disconnect() }); var signal = new WaitSignal(); - nats.ReconnectFailed += (_, e) => signal.Pulse(); + nats.ReconnectFailed += (_, _) => + { + signal.Pulse(); + return default; + }; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); @@ -41,7 +45,11 @@ public async Task Retry_and_connect_after_disconnected() }); var signal = new WaitSignal(); - nats.ReconnectFailed += (_, e) => signal.Pulse(); + nats.ReconnectFailed += (_, _) => + { + signal.Pulse(); + return default; + }; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index e97fef5b3..35a64e266 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -315,7 +315,11 @@ await Retry.Until( async () => await nats.PublishAsync(subject, 1)); var disconnected = new WaitSignal(); - nats.ConnectionDisconnected += (_, _) => disconnected.Pulse(); + nats.ConnectionDisconnected += (_, _) => + { + disconnected.Pulse(); + return default; + }; proxy.Reset(); diff --git a/tests/NATS.Client.Core.Tests/SlowConsumerTest.cs b/tests/NATS.Client.Core.Tests/SlowConsumerTest.cs index 5b0645c55..618e53c24 100644 --- a/tests/NATS.Client.Core.Tests/SlowConsumerTest.cs +++ b/tests/NATS.Client.Core.Tests/SlowConsumerTest.cs @@ -13,13 +13,15 @@ public async Task Slow_consumer() var nats = server.CreateClientConnection(new NatsOpts { SubPendingChannelCapacity = 3 }); var lost = 0; - nats.OnError += (_, e) => + nats.MessageDropped += (_, e) => { - if (e is MessageDroppedError dropped) + if (e is { } dropped) { Interlocked.Increment(ref lost); _output.WriteLine($"LOST {dropped.Data}"); } + + return default; }; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs index 21e632f22..d48498ba8 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs @@ -56,7 +56,11 @@ public async Task Watcher_reconnect_with_history() } var signal = new WaitSignal(); - nats2.ConnectionDisconnected += (_, _) => signal.Pulse(); + nats2.ConnectionDisconnected += (_, _) => + { + signal.Pulse(); + return default; + }; proxy.Reset(); diff --git a/tests/NATS.Client.TestUtilities/WaitSignal.cs b/tests/NATS.Client.TestUtilities/WaitSignal.cs index b5814ba92..8a81ed97c 100644 --- a/tests/NATS.Client.TestUtilities/WaitSignal.cs +++ b/tests/NATS.Client.TestUtilities/WaitSignal.cs @@ -1,5 +1,4 @@ using System.Runtime.CompilerServices; -using NATS.Client.Core; namespace NATS.Client.Core.Tests; @@ -8,9 +7,10 @@ public static class WaitSignalExtensions public static Task ConnectionDisconnectedAsAwaitable(this NatsConnection connection) { var signal = new WaitSignal(); - connection.ConnectionDisconnected += (sender, e) => + connection.ConnectionDisconnected += (_, _) => { signal.Pulse(); + return default; }; return signal.Task.WaitAsync(signal.Timeout); } @@ -18,9 +18,10 @@ public static Task ConnectionDisconnectedAsAwaitable(this NatsConnection connect public static Task ConnectionOpenedAsAwaitable(this NatsConnection connection) { var signal = new WaitSignal(); - connection.ConnectionOpened += (sender, e) => + connection.ConnectionOpened += (_, _) => { signal.Pulse(); + return default; }; return signal.Task.WaitAsync(signal.Timeout); } @@ -28,9 +29,10 @@ public static Task ConnectionOpenedAsAwaitable(this NatsConnection connection) public static Task ReconnectFailedAsAwaitable(this NatsConnection connection) { var signal = new WaitSignal(); - connection.ReconnectFailed += (sender, e) => + connection.ReconnectFailed += (_, _) => { signal.Pulse(); + return default; }; return signal.Task.WaitAsync(signal.Timeout); }