diff --git a/src/NATS.Client.Core/AsyncEventHandler.cs b/src/NATS.Client.Core/AsyncEventHandler.cs new file mode 100644 index 000000000..d999f20ba --- /dev/null +++ b/src/NATS.Client.Core/AsyncEventHandler.cs @@ -0,0 +1,18 @@ +namespace NATS.Client.Core; + +/// +/// An asynchronous event handler. +/// +/// The sender of the event. +/// Event arguments. +/// A value task whose completion signals handling is finished. +public delegate ValueTask AsyncEventHandler(object? sender, EventArgs args); + +/// +/// An asynchronous event handler. +/// +/// The type of event arguments. +/// The sender of the event. +/// Event arguments. +/// A value task whose completion signals handling is finished. +public delegate ValueTask AsyncEventHandler(object? sender, TEventArgs args); diff --git a/src/NATS.Client.Core/INatsError.cs b/src/NATS.Client.Core/INatsError.cs deleted file mode 100644 index 943742a82..000000000 --- a/src/NATS.Client.Core/INatsError.cs +++ /dev/null @@ -1,34 +0,0 @@ -namespace NATS.Client.Core; - -public interface INatsError -{ - string Message { get; } -} - -public sealed class MessageDroppedError : INatsError -{ - public MessageDroppedError(NatsSubBase subscription, int pending, string subject, string? replyTo, NatsHeaders? headers, object? data) - { - Subscription = subscription; - Pending = pending; - Subject = subject; - ReplyTo = replyTo; - Headers = headers; - Data = data; - Message = $"Dropped message from {subject} with {pending} pending messages"; - } - - public NatsSubBase Subscription { get; } - - public int Pending { get; } - - public string Subject { get; } - - public string? ReplyTo { get; } - - public NatsHeaders? Headers { get; } - - public object? Data { get; } - - public string Message { get; } -} diff --git a/src/NATS.Client.Core/Internal/AsyncEventExtensions.cs b/src/NATS.Client.Core/Internal/AsyncEventExtensions.cs new file mode 100644 index 000000000..833af470d --- /dev/null +++ b/src/NATS.Client.Core/Internal/AsyncEventExtensions.cs @@ -0,0 +1,94 @@ +// origin: https://github.com/microsoft/vs-threading/blob/9065e6e4b5593e6ed6e3ff0a9159d4e2765430d6/src/Microsoft.VisualStudio.Threading/TplExtensions.cs +// license: MIT +// +// Microsoft.VisualStudio.Threading +// Copyright (c) Microsoft Corporation +// All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +namespace NATS.Client.Core.Internal; + +internal static class AsyncEventExtensions +{ + /// + /// Invokes asynchronous event handlers, returning a task that completes when all event handlers have been invoked. + /// Each handler is fully executed (including continuations) before the next handler in the list is invoked. + /// + /// The event handlers. May be . + /// The event source. + /// The event argument. + /// The task that completes when all handlers have completed. + /// Thrown if any handlers fail. It contains a collection of all failures. + public static async Task InvokeAsync(this AsyncEventHandler? handlers, object? sender, EventArgs args) + { + if (handlers != null) + { + var individualHandlers = handlers.GetInvocationList(); + List? exceptions = null; + foreach (var asyncHandler in individualHandlers) + { + var handler = (AsyncEventHandler)asyncHandler; + try + { + await handler(sender, args).ConfigureAwait(true); + } + catch (Exception ex) + { + exceptions ??= new List(2); + exceptions.Add(ex); + } + } + + if (exceptions != null) + { + throw new AggregateException(exceptions); + } + } + } + + /// + /// Invokes asynchronous event handlers, returning a task that completes when all event handlers have been invoked. + /// Each handler is fully executed (including continuations) before the next handler in the list is invoked. + /// + /// The type of argument passed to each handler. + /// The event handlers. May be . + /// The event source. + /// The event argument. + /// The task that completes when all handlers have completed. The task is faulted if any handlers throw an exception. + /// Thrown if any handlers fail. It contains a collection of all failures. + public static async Task InvokeAsync(this AsyncEventHandler? handlers, object? sender, TEventArgs args) + { + if (handlers != null) + { + var individualHandlers = handlers.GetInvocationList(); + List? exceptions = null; + foreach (var asyncHandler in individualHandlers) + { + var handler = (AsyncEventHandler)asyncHandler; + try + { + await handler(sender, args).ConfigureAwait(true); + } + catch (Exception ex) + { + exceptions ??= new List(2); + exceptions.Add(ex); + } + } + + if (exceptions != null) + { + throw new AggregateException(exceptions); + } + } + } +} diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index 8dbfe3a81..97b2674a9 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -1,6 +1,5 @@ using System.Buffers; using System.Diagnostics; -using System.IO.Pipelines; using System.Threading.Channels; using Microsoft.Extensions.Logging; using NATS.Client.Core.Commands; @@ -16,6 +15,14 @@ public enum NatsConnectionState Reconnecting, } +internal enum NatsEvent +{ + ConnectionOpened, + ConnectionDisconnected, + ReconnectFailed, + MessageDropped, +} + public partial class NatsConnection : INatsConnection { #pragma warning disable SA1401 @@ -37,7 +44,9 @@ public partial class NatsConnection : INatsConnection private readonly string _name; private readonly TimeSpan _socketComponentDisposeTimeout = TimeSpan.FromSeconds(5); private readonly BoundedChannelOptions _defaultSubscriptionChannelOpts; + private readonly Channel<(NatsEvent, NatsEventArgs)> _eventChannel; private readonly ClientOpts _clientOpts; + private int _pongCount; private int _connectionState; @@ -84,16 +93,25 @@ public NatsConnection(NatsOpts opts) SingleReader = false, AllowSynchronousContinuations = false, }; + + // push consumer events to a channel so handlers can be awaited (also prevents user code from blocking us) + _eventChannel = Channel.CreateUnbounded<(NatsEvent, NatsEventArgs)>(new UnboundedChannelOptions + { + AllowSynchronousContinuations = false, + SingleWriter = false, + SingleReader = true, + }); + _ = Task.Run(PublishEventsAsync, _disposedCancellationTokenSource.Token); } // events - public event EventHandler? ConnectionDisconnected; + public event AsyncEventHandler? ConnectionDisconnected; - public event EventHandler? ConnectionOpened; + public event AsyncEventHandler? ConnectionOpened; - public event EventHandler? ReconnectFailed; + public event AsyncEventHandler? ReconnectFailed; - public event EventHandler? OnError; + public event AsyncEventHandler? MessageDropped; public NatsOpts Opts { get; } @@ -213,11 +231,11 @@ internal ValueTask UnsubscribeAsync(int sid) return ValueTask.CompletedTask; } - internal void MessageDropped(NatsSubBase natsSub, int pending, NatsMsg msg) + internal void OnMessageDropped(NatsSubBase natsSub, int pending, NatsMsg msg) { var subject = msg.Subject; _logger.LogWarning("Dropped message from {Subject} with {Pending} pending messages", subject, pending); - OnError?.Invoke(this, new MessageDroppedError(natsSub, pending, subject, msg.ReplyTo, msg.Headers, msg.Data)); + _eventChannel.Writer.TryWrite((NatsEvent.MessageDropped, new NatsMessageDroppedEventArgs(natsSub, pending, subject, msg.ReplyTo, msg.Headers, msg.Data))); } internal BoundedChannelOptions GetChannelOpts(NatsOpts connectionOpts, NatsSubChannelOpts? subChannelOpts) @@ -345,7 +363,7 @@ private async ValueTask InitialConnectAsync() StartPingTimer(_pingTimerCancellationTokenSource.Token); _waitForOpenConnection.TrySetResult(); _ = Task.Run(ReconnectLoop); - ConnectionOpened?.Invoke(this, url?.ToString() ?? string.Empty); + _eventChannel.Writer.TryWrite((NatsEvent.ConnectionOpened, new NatsEventArgs(url?.ToString() ?? string.Empty))); } } @@ -488,8 +506,8 @@ private async void ReconnectLoop() _pingTimerCancellationTokenSource?.Cancel(); } - // Invoke after state changed - ConnectionDisconnected?.Invoke(this, _currentConnectUri?.ToString() ?? string.Empty); + // Invoke event after state changed + _eventChannel.Writer.TryWrite((NatsEvent.ConnectionDisconnected, new NatsEventArgs(_currentConnectUri?.ToString() ?? string.Empty))); // Cleanup current socket await DisposeSocketAsync(true).ConfigureAwait(false); @@ -567,7 +585,7 @@ private async void ReconnectLoop() _logger.LogWarning(NatsLogEvents.Connection, ex, "Failed to connect NATS {Url}", url); } - ReconnectFailed?.Invoke(this, url?.ToString() ?? string.Empty); + _eventChannel.Writer.TryWrite((NatsEvent.ReconnectFailed, new NatsEventArgs(url?.ToString() ?? string.Empty))); await WaitWithJitterAsync().ConfigureAwait(false); goto CONNECT_AGAIN; } @@ -582,7 +600,7 @@ private async void ReconnectLoop() StartPingTimer(_pingTimerCancellationTokenSource.Token); _waitForOpenConnection.TrySetResult(); _ = Task.Run(ReconnectLoop); - ConnectionOpened?.Invoke(this, url?.ToString() ?? string.Empty); + _eventChannel.Writer.TryWrite((NatsEvent.ConnectionOpened, new NatsEventArgs(url.ToString()))); } } catch (Exception ex) @@ -594,6 +612,42 @@ private async void ReconnectLoop() } } + private async Task PublishEventsAsync() + { + try + { + while (!_disposedCancellationTokenSource.IsCancellationRequested) + { + var hasData = await _eventChannel.Reader.WaitToReadAsync(_disposedCancellationTokenSource.Token).ConfigureAwait(false); + while (hasData && _eventChannel.Reader.TryRead(out var eventArgs)) + { + var (natsEvent, args) = eventArgs; + switch (natsEvent) + { + case NatsEvent.ConnectionOpened when ConnectionOpened != null: + await ConnectionOpened.InvokeAsync(this, args).ConfigureAwait(false); + break; + case NatsEvent.ConnectionDisconnected when ConnectionDisconnected != null: + await ConnectionDisconnected.InvokeAsync(this, args).ConfigureAwait(false); + break; + case NatsEvent.ReconnectFailed when ReconnectFailed != null: + await ReconnectFailed.InvokeAsync(this, args).ConfigureAwait(false); + break; + case NatsEvent.MessageDropped when MessageDropped != null && args is NatsMessageDroppedEventArgs error: + await MessageDropped.InvokeAsync(this, error).ConfigureAwait(false); + break; + } + } + } + } + catch (Exception ex) + { + _logger.LogError(NatsLogEvents.Connection, ex, "Error occured when publishing events"); + if (!_disposedCancellationTokenSource.IsCancellationRequested) + _ = Task.Run(PublishEventsAsync, _disposedCancellationTokenSource.Token); + } + } + private NatsUri FixTlsHost(NatsUri uri) { var lastSeedConnectUri = _lastSeedConnectUri; diff --git a/src/NATS.Client.Core/NatsEventArgs.cs b/src/NATS.Client.Core/NatsEventArgs.cs new file mode 100644 index 000000000..254930288 --- /dev/null +++ b/src/NATS.Client.Core/NatsEventArgs.cs @@ -0,0 +1,35 @@ +// ReSharper disable UnusedAutoPropertyAccessor.Global - properties are used by consumers outside of this library +namespace NATS.Client.Core; + +public class NatsEventArgs : EventArgs +{ + public NatsEventArgs(string message) => Message = message; + + public string Message { get; } +} + +public class NatsMessageDroppedEventArgs : NatsEventArgs +{ + public NatsMessageDroppedEventArgs(NatsSubBase subscription, int pending, string subject, string? replyTo, NatsHeaders? headers, object? data) + : base($"Dropped message from {subject} with {pending} pending messages") + { + Subscription = subscription; + Pending = pending; + Subject = subject; + ReplyTo = replyTo; + Headers = headers; + Data = data; + } + + public NatsSubBase Subscription { get; } + + public int Pending { get; } + + public string Subject { get; } + + public string? ReplyTo { get; } + + public NatsHeaders? Headers { get; } + + public object? Data { get; } +} diff --git a/src/NATS.Client.Core/NatsOpts.cs b/src/NATS.Client.Core/NatsOpts.cs index c44ffda56..4056427e8 100644 --- a/src/NATS.Client.Core/NatsOpts.cs +++ b/src/NATS.Client.Core/NatsOpts.cs @@ -108,7 +108,7 @@ public sealed record NatsOpts /// /// If the client reaches this internal limit (bounded channel capacity), by default it will drop messages /// and continue to process new messages. This is aligned with NATS at most once delivery. It is up to - /// the application to detect the missing messages () and recover + /// the application to detect the missing messages () and recover /// from this condition or set a different default such as BoundedChannelFullMode.Wait in which /// case it might risk server disconnecting the client as a slow consumer. /// diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSub.cs index b10439411..853408e83 100644 --- a/src/NATS.Client.Core/NatsSub.cs +++ b/src/NATS.Client.Core/NatsSub.cs @@ -21,7 +21,7 @@ internal NatsSub( { _msgs = Channel.CreateBounded>( connection.GetChannelOpts(connection.Opts, opts?.ChannelOpts), - msg => Connection.MessageDropped(this, _msgs?.Reader.Count ?? 0, msg)); + msg => Connection.OnMessageDropped(this, _msgs?.Reader.Count ?? 0, msg)); Serializer = serializer; } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs index 2b6d2d17f..e1a698320 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs @@ -4,7 +4,6 @@ using Microsoft.Extensions.Logging; using NATS.Client.Core; using NATS.Client.Core.Commands; -using NATS.Client.Core.Internal; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream.Internal; @@ -97,7 +96,7 @@ public NatsJSOrderedConsume( // This channel is used to pass messages to the user from the subscription. _userMsgs = Channel.CreateBounded>( Connection.GetChannelOpts(Connection.Opts, opts?.ChannelOpts), - msg => Connection.MessageDropped(this, _userMsgs?.Reader.Count ?? 0, msg.Msg)); + msg => Connection.OnMessageDropped(this, _userMsgs?.Reader.Count ?? 0, msg.Msg)); Msgs = _userMsgs.Reader; // Pull request channel is set as unbounded because we don't want to drop @@ -317,10 +316,11 @@ protected override void TryComplete() _userMsgs.Writer.TryComplete(); } - private void ConnectionOnConnectionDisconnected(object? sender, string e) + private ValueTask ConnectionOnConnectionDisconnected(object? sender, NatsEventArgs args) { - _logger.LogWarning(NatsJSLogEvents.Connection, "Disconnected {Reason}", e); - _userMsgs.Writer.TryComplete(new NatsJSConnectionException(e)); + _logger.LogWarning(NatsJSLogEvents.Connection, "Disconnected {Reason}", args.Message); + _userMsgs.Writer.TryComplete(new NatsJSConnectionException(args.Message)); + return default; } private void ResetPending() diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs index 48401196a..04c762ffc 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -163,7 +163,11 @@ internal void Done() _msgChannel.Writer.TryComplete(); } - private void OnDisconnected(object? sender, string e) => StopHeartbeatTimer(); + private ValueTask OnDisconnected(object? sender, NatsEventArgs args) + { + StopHeartbeatTimer(); + return default; + } private async Task CommandLoop() { @@ -446,6 +450,10 @@ protected override void TryComplete() { } - private void OnConnectionOpened(object? sender, string e) => - _commands.TryWrite(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Ready }); + private ValueTask OnConnectionOpened(object? sender, NatsEventArgs args) + { + // result is discarded, so this code is assumed to not be failing + _ = _commands.TryWrite(new NatsJSOrderedPushConsumerMsg { Command = NatsJSOrderedPushConsumerCommand.Ready }); + return default; + } } diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs index faf15fda8..fb0d2c7eb 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs @@ -62,8 +62,10 @@ protected override void TryComplete() { } - private void OnConnectionOpened(object? sender, string e) + private ValueTask OnConnectionOpened(object? sender, NatsEventArgs args) { - _commands.TryWrite(new NatsKVWatchCommandMsg { Command = NatsKVWatchCommand.Ready }); + // result is discarded, so this code is assumed to not be failing + _ = _commands.TryWrite(new NatsKVWatchCommandMsg { Command = NatsKVWatchCommand.Ready }); + return default; } } diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs index 241a7c4d9..5871ff24c 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs @@ -142,7 +142,11 @@ internal ValueTask InitAsync() return CreatePushConsumer("init"); } - private void OnDisconnected(object? sender, string e) => StopHeartbeatTimer(); + private ValueTask OnDisconnected(object? sender, NatsEventArgs args) + { + StopHeartbeatTimer(); + return default; + } private async Task CommandLoop() { diff --git a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs index f9954623b..dc1107b31 100644 --- a/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs +++ b/tests/NATS.Client.Core.Tests/ConnectionRetryTest.cs @@ -18,7 +18,11 @@ public async Task Max_retry_reached_after_disconnect() }); var signal = new WaitSignal(); - nats.ReconnectFailed += (_, e) => signal.Pulse(); + nats.ReconnectFailed += (_, _) => + { + signal.Pulse(); + return default; + }; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); @@ -41,7 +45,11 @@ public async Task Retry_and_connect_after_disconnected() }); var signal = new WaitSignal(); - nats.ReconnectFailed += (_, e) => signal.Pulse(); + nats.ReconnectFailed += (_, _) => + { + signal.Pulse(); + return default; + }; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index e97fef5b3..35a64e266 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -315,7 +315,11 @@ await Retry.Until( async () => await nats.PublishAsync(subject, 1)); var disconnected = new WaitSignal(); - nats.ConnectionDisconnected += (_, _) => disconnected.Pulse(); + nats.ConnectionDisconnected += (_, _) => + { + disconnected.Pulse(); + return default; + }; proxy.Reset(); diff --git a/tests/NATS.Client.Core.Tests/SlowConsumerTest.cs b/tests/NATS.Client.Core.Tests/SlowConsumerTest.cs index 5b0645c55..618e53c24 100644 --- a/tests/NATS.Client.Core.Tests/SlowConsumerTest.cs +++ b/tests/NATS.Client.Core.Tests/SlowConsumerTest.cs @@ -13,13 +13,15 @@ public async Task Slow_consumer() var nats = server.CreateClientConnection(new NatsOpts { SubPendingChannelCapacity = 3 }); var lost = 0; - nats.OnError += (_, e) => + nats.MessageDropped += (_, e) => { - if (e is MessageDroppedError dropped) + if (e is { } dropped) { Interlocked.Increment(ref lost); _output.WriteLine($"LOST {dropped.Data}"); } + + return default; }; var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs index 21e632f22..d48498ba8 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs @@ -56,7 +56,11 @@ public async Task Watcher_reconnect_with_history() } var signal = new WaitSignal(); - nats2.ConnectionDisconnected += (_, _) => signal.Pulse(); + nats2.ConnectionDisconnected += (_, _) => + { + signal.Pulse(); + return default; + }; proxy.Reset(); diff --git a/tests/NATS.Client.TestUtilities/WaitSignal.cs b/tests/NATS.Client.TestUtilities/WaitSignal.cs index b5814ba92..8a81ed97c 100644 --- a/tests/NATS.Client.TestUtilities/WaitSignal.cs +++ b/tests/NATS.Client.TestUtilities/WaitSignal.cs @@ -1,5 +1,4 @@ using System.Runtime.CompilerServices; -using NATS.Client.Core; namespace NATS.Client.Core.Tests; @@ -8,9 +7,10 @@ public static class WaitSignalExtensions public static Task ConnectionDisconnectedAsAwaitable(this NatsConnection connection) { var signal = new WaitSignal(); - connection.ConnectionDisconnected += (sender, e) => + connection.ConnectionDisconnected += (_, _) => { signal.Pulse(); + return default; }; return signal.Task.WaitAsync(signal.Timeout); } @@ -18,9 +18,10 @@ public static Task ConnectionDisconnectedAsAwaitable(this NatsConnection connect public static Task ConnectionOpenedAsAwaitable(this NatsConnection connection) { var signal = new WaitSignal(); - connection.ConnectionOpened += (sender, e) => + connection.ConnectionOpened += (_, _) => { signal.Pulse(); + return default; }; return signal.Task.WaitAsync(signal.Timeout); } @@ -28,9 +29,10 @@ public static Task ConnectionOpenedAsAwaitable(this NatsConnection connection) public static Task ReconnectFailedAsAwaitable(this NatsConnection connection) { var signal = new WaitSignal(); - connection.ReconnectFailed += (sender, e) => + connection.ReconnectFailed += (_, _) => { signal.Pulse(); + return default; }; return signal.Task.WaitAsync(signal.Timeout); }