Skip to content

Commit

Permalink
Async event handlers (#324)
Browse files Browse the repository at this point in the history
* Async event handlers

Write events to channel and publish from a background task. This allows handlers to be async and also prevents event listeners from blocking the NatsConnection code.

* Added WaitToReadAsync return value check

* Added NatsEventArgs and renamed MessageDroppedError to NatsMessageDroppedEventArgs. Changed events to use EventArgs and adjusted internal subscribers. Renamed OnError to MessageDropped, as that better reflects what error is signaled and also is naming-wise more consistent with the other events.

* Use inner while to read all queued messages before waiting again

* Changed AsyncEventHandler to return ValueTask. Removed license blurp from AsyncEventHandler as it  now different from the original delegate. Adjusted code to match new signature. Note that "default" for ValueTask is a completed task.

* Fixed build

* dotnet format

---------

Co-authored-by: Ziya Suzen <[email protected]>
  • Loading branch information
mnmr and mtmk authored Jan 23, 2024
1 parent f44a527 commit 7b930ea
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 69 deletions.
18 changes: 18 additions & 0 deletions src/NATS.Client.Core/AsyncEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace NATS.Client.Core;

/// <summary>
/// An asynchronous event handler.
/// </summary>
/// <param name="sender">The sender of the event.</param>
/// <param name="args">Event arguments.</param>
/// <returns>A value task whose completion signals handling is finished.</returns>
public delegate ValueTask AsyncEventHandler(object? sender, EventArgs args);

/// <summary>
/// An asynchronous event handler.
/// </summary>
/// <typeparam name="TEventArgs">The type of event arguments.</typeparam>
/// <param name="sender">The sender of the event.</param>
/// <param name="args">Event arguments.</param>
/// <returns>A value task whose completion signals handling is finished.</returns>
public delegate ValueTask AsyncEventHandler<in TEventArgs>(object? sender, TEventArgs args);
34 changes: 0 additions & 34 deletions src/NATS.Client.Core/INatsError.cs

This file was deleted.

94 changes: 94 additions & 0 deletions src/NATS.Client.Core/Internal/AsyncEventExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// origin: https://github.com/microsoft/vs-threading/blob/9065e6e4b5593e6ed6e3ff0a9159d4e2765430d6/src/Microsoft.VisualStudio.Threading/TplExtensions.cs
// license: MIT
//
// Microsoft.VisualStudio.Threading
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.

namespace NATS.Client.Core.Internal;

internal static class AsyncEventExtensions
{
/// <summary>
/// Invokes asynchronous event handlers, returning a task that completes when all event handlers have been invoked.
/// Each handler is fully executed (including continuations) before the next handler in the list is invoked.
/// </summary>
/// <param name="handlers">The event handlers. May be <see langword="null" />.</param>
/// <param name="sender">The event source.</param>
/// <param name="args">The event argument.</param>
/// <returns>The task that completes when all handlers have completed.</returns>
/// <exception cref="AggregateException">Thrown if any handlers fail. It contains a collection of all failures.</exception>
public static async Task InvokeAsync(this AsyncEventHandler? handlers, object? sender, EventArgs args)
{
if (handlers != null)
{
var individualHandlers = handlers.GetInvocationList();
List<Exception>? exceptions = null;
foreach (var asyncHandler in individualHandlers)
{
var handler = (AsyncEventHandler)asyncHandler;
try
{
await handler(sender, args).ConfigureAwait(true);
}
catch (Exception ex)
{
exceptions ??= new List<Exception>(2);
exceptions.Add(ex);
}
}

if (exceptions != null)
{
throw new AggregateException(exceptions);
}
}
}

/// <summary>
/// Invokes asynchronous event handlers, returning a task that completes when all event handlers have been invoked.
/// Each handler is fully executed (including continuations) before the next handler in the list is invoked.
/// </summary>
/// <typeparam name="TEventArgs">The type of argument passed to each handler.</typeparam>
/// <param name="handlers">The event handlers. May be <see langword="null" />.</param>
/// <param name="sender">The event source.</param>
/// <param name="args">The event argument.</param>
/// <returns>The task that completes when all handlers have completed. The task is faulted if any handlers throw an exception.</returns>
/// <exception cref="AggregateException">Thrown if any handlers fail. It contains a collection of all failures.</exception>
public static async Task InvokeAsync<TEventArgs>(this AsyncEventHandler<TEventArgs>? handlers, object? sender, TEventArgs args)
{
if (handlers != null)
{
var individualHandlers = handlers.GetInvocationList();
List<Exception>? exceptions = null;
foreach (var asyncHandler in individualHandlers)
{
var handler = (AsyncEventHandler<TEventArgs>)asyncHandler;
try
{
await handler(sender, args).ConfigureAwait(true);
}
catch (Exception ex)
{
exceptions ??= new List<Exception>(2);
exceptions.Add(ex);
}
}

if (exceptions != null)
{
throw new AggregateException(exceptions);
}
}
}
}
78 changes: 66 additions & 12 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Client.Core.Commands;
Expand All @@ -16,6 +15,14 @@ public enum NatsConnectionState
Reconnecting,
}

internal enum NatsEvent
{
ConnectionOpened,
ConnectionDisconnected,
ReconnectFailed,
MessageDropped,
}

public partial class NatsConnection : INatsConnection
{
#pragma warning disable SA1401
Expand All @@ -37,7 +44,9 @@ public partial class NatsConnection : INatsConnection
private readonly string _name;
private readonly TimeSpan _socketComponentDisposeTimeout = TimeSpan.FromSeconds(5);
private readonly BoundedChannelOptions _defaultSubscriptionChannelOpts;
private readonly Channel<(NatsEvent, NatsEventArgs)> _eventChannel;
private readonly ClientOpts _clientOpts;

private int _pongCount;
private int _connectionState;

Expand Down Expand Up @@ -84,16 +93,25 @@ public NatsConnection(NatsOpts opts)
SingleReader = false,
AllowSynchronousContinuations = false,
};

// push consumer events to a channel so handlers can be awaited (also prevents user code from blocking us)
_eventChannel = Channel.CreateUnbounded<(NatsEvent, NatsEventArgs)>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleWriter = false,
SingleReader = true,
});
_ = Task.Run(PublishEventsAsync, _disposedCancellationTokenSource.Token);
}

// events
public event EventHandler<string>? ConnectionDisconnected;
public event AsyncEventHandler<NatsEventArgs>? ConnectionDisconnected;

public event EventHandler<string>? ConnectionOpened;
public event AsyncEventHandler<NatsEventArgs>? ConnectionOpened;

public event EventHandler<string>? ReconnectFailed;
public event AsyncEventHandler<NatsEventArgs>? ReconnectFailed;

public event EventHandler<INatsError>? OnError;
public event AsyncEventHandler<NatsMessageDroppedEventArgs>? MessageDropped;

public NatsOpts Opts { get; }

Expand Down Expand Up @@ -213,11 +231,11 @@ internal ValueTask UnsubscribeAsync(int sid)
return ValueTask.CompletedTask;
}

internal void MessageDropped<T>(NatsSubBase natsSub, int pending, NatsMsg<T> msg)
internal void OnMessageDropped<T>(NatsSubBase natsSub, int pending, NatsMsg<T> msg)
{
var subject = msg.Subject;
_logger.LogWarning("Dropped message from {Subject} with {Pending} pending messages", subject, pending);
OnError?.Invoke(this, new MessageDroppedError(natsSub, pending, subject, msg.ReplyTo, msg.Headers, msg.Data));
_eventChannel.Writer.TryWrite((NatsEvent.MessageDropped, new NatsMessageDroppedEventArgs(natsSub, pending, subject, msg.ReplyTo, msg.Headers, msg.Data)));
}

internal BoundedChannelOptions GetChannelOpts(NatsOpts connectionOpts, NatsSubChannelOpts? subChannelOpts)
Expand Down Expand Up @@ -345,7 +363,7 @@ private async ValueTask InitialConnectAsync()
StartPingTimer(_pingTimerCancellationTokenSource.Token);
_waitForOpenConnection.TrySetResult();
_ = Task.Run(ReconnectLoop);
ConnectionOpened?.Invoke(this, url?.ToString() ?? string.Empty);
_eventChannel.Writer.TryWrite((NatsEvent.ConnectionOpened, new NatsEventArgs(url?.ToString() ?? string.Empty)));
}
}

Expand Down Expand Up @@ -488,8 +506,8 @@ private async void ReconnectLoop()
_pingTimerCancellationTokenSource?.Cancel();
}

// Invoke after state changed
ConnectionDisconnected?.Invoke(this, _currentConnectUri?.ToString() ?? string.Empty);
// Invoke event after state changed
_eventChannel.Writer.TryWrite((NatsEvent.ConnectionDisconnected, new NatsEventArgs(_currentConnectUri?.ToString() ?? string.Empty)));

// Cleanup current socket
await DisposeSocketAsync(true).ConfigureAwait(false);
Expand Down Expand Up @@ -567,7 +585,7 @@ private async void ReconnectLoop()
_logger.LogWarning(NatsLogEvents.Connection, ex, "Failed to connect NATS {Url}", url);
}

ReconnectFailed?.Invoke(this, url?.ToString() ?? string.Empty);
_eventChannel.Writer.TryWrite((NatsEvent.ReconnectFailed, new NatsEventArgs(url?.ToString() ?? string.Empty)));
await WaitWithJitterAsync().ConfigureAwait(false);
goto CONNECT_AGAIN;
}
Expand All @@ -582,7 +600,7 @@ private async void ReconnectLoop()
StartPingTimer(_pingTimerCancellationTokenSource.Token);
_waitForOpenConnection.TrySetResult();
_ = Task.Run(ReconnectLoop);
ConnectionOpened?.Invoke(this, url?.ToString() ?? string.Empty);
_eventChannel.Writer.TryWrite((NatsEvent.ConnectionOpened, new NatsEventArgs(url.ToString())));
}
}
catch (Exception ex)
Expand All @@ -594,6 +612,42 @@ private async void ReconnectLoop()
}
}

private async Task PublishEventsAsync()
{
try
{
while (!_disposedCancellationTokenSource.IsCancellationRequested)
{
var hasData = await _eventChannel.Reader.WaitToReadAsync(_disposedCancellationTokenSource.Token).ConfigureAwait(false);
while (hasData && _eventChannel.Reader.TryRead(out var eventArgs))
{
var (natsEvent, args) = eventArgs;
switch (natsEvent)
{
case NatsEvent.ConnectionOpened when ConnectionOpened != null:
await ConnectionOpened.InvokeAsync(this, args).ConfigureAwait(false);
break;
case NatsEvent.ConnectionDisconnected when ConnectionDisconnected != null:
await ConnectionDisconnected.InvokeAsync(this, args).ConfigureAwait(false);
break;
case NatsEvent.ReconnectFailed when ReconnectFailed != null:
await ReconnectFailed.InvokeAsync(this, args).ConfigureAwait(false);
break;
case NatsEvent.MessageDropped when MessageDropped != null && args is NatsMessageDroppedEventArgs error:
await MessageDropped.InvokeAsync(this, error).ConfigureAwait(false);
break;
}
}
}
}
catch (Exception ex)
{
_logger.LogError(NatsLogEvents.Connection, ex, "Error occured when publishing events");
if (!_disposedCancellationTokenSource.IsCancellationRequested)
_ = Task.Run(PublishEventsAsync, _disposedCancellationTokenSource.Token);
}
}

private NatsUri FixTlsHost(NatsUri uri)
{
var lastSeedConnectUri = _lastSeedConnectUri;
Expand Down
35 changes: 35 additions & 0 deletions src/NATS.Client.Core/NatsEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// ReSharper disable UnusedAutoPropertyAccessor.Global - properties are used by consumers outside of this library
namespace NATS.Client.Core;

public class NatsEventArgs : EventArgs
{
public NatsEventArgs(string message) => Message = message;

public string Message { get; }
}

public class NatsMessageDroppedEventArgs : NatsEventArgs
{
public NatsMessageDroppedEventArgs(NatsSubBase subscription, int pending, string subject, string? replyTo, NatsHeaders? headers, object? data)
: base($"Dropped message from {subject} with {pending} pending messages")
{
Subscription = subscription;
Pending = pending;
Subject = subject;
ReplyTo = replyTo;
Headers = headers;
Data = data;
}

public NatsSubBase Subscription { get; }

public int Pending { get; }

public string Subject { get; }

public string? ReplyTo { get; }

public NatsHeaders? Headers { get; }

public object? Data { get; }
}
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public sealed record NatsOpts
/// <remarks>
/// If the client reaches this internal limit (bounded channel capacity), by default it will drop messages
/// and continue to process new messages. This is aligned with NATS at most once delivery. It is up to
/// the application to detect the missing messages (<seealso cref="NatsConnection.OnError"/>) and recover
/// the application to detect the missing messages (<seealso cref="NatsConnection.MessageDropped"/>) and recover
/// from this condition or set a different default such as <c>BoundedChannelFullMode.Wait</c> in which
/// case it might risk server disconnecting the client as a slow consumer.
/// </remarks>
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal NatsSub(
{
_msgs = Channel.CreateBounded<NatsMsg<T>>(
connection.GetChannelOpts(connection.Opts, opts?.ChannelOpts),
msg => Connection.MessageDropped(this, _msgs?.Reader.Count ?? 0, msg));
msg => Connection.OnMessageDropped(this, _msgs?.Reader.Count ?? 0, msg));

Serializer = serializer;
}
Expand Down
10 changes: 5 additions & 5 deletions src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.Core.Commands;
using NATS.Client.Core.Internal;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream.Internal;
Expand Down Expand Up @@ -97,7 +96,7 @@ public NatsJSOrderedConsume(
// This channel is used to pass messages to the user from the subscription.
_userMsgs = Channel.CreateBounded<NatsJSMsg<TMsg>>(
Connection.GetChannelOpts(Connection.Opts, opts?.ChannelOpts),
msg => Connection.MessageDropped(this, _userMsgs?.Reader.Count ?? 0, msg.Msg));
msg => Connection.OnMessageDropped(this, _userMsgs?.Reader.Count ?? 0, msg.Msg));
Msgs = _userMsgs.Reader;

// Pull request channel is set as unbounded because we don't want to drop
Expand Down Expand Up @@ -317,10 +316,11 @@ protected override void TryComplete()
_userMsgs.Writer.TryComplete();
}

private void ConnectionOnConnectionDisconnected(object? sender, string e)
private ValueTask ConnectionOnConnectionDisconnected(object? sender, NatsEventArgs args)
{
_logger.LogWarning(NatsJSLogEvents.Connection, "Disconnected {Reason}", e);
_userMsgs.Writer.TryComplete(new NatsJSConnectionException(e));
_logger.LogWarning(NatsJSLogEvents.Connection, "Disconnected {Reason}", args.Message);
_userMsgs.Writer.TryComplete(new NatsJSConnectionException(args.Message));
return default;
}

private void ResetPending()
Expand Down
Loading

0 comments on commit 7b930ea

Please sign in to comment.