Skip to content

Commit

Permalink
Merge branch 'main' into 341-send-buffer-refinement
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/NATS.Client.Core/Commands/ProtocolWriter.cs
#	src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs
#	tests/NATS.Client.TestUtilities/InMemoryTestLoggerFactory.cs
  • Loading branch information
mtmk committed Jan 23, 2024
2 parents abf863c + 7b930ea commit d607449
Show file tree
Hide file tree
Showing 17 changed files with 271 additions and 69 deletions.
18 changes: 18 additions & 0 deletions src/NATS.Client.Core/AsyncEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace NATS.Client.Core;

/// <summary>
/// An asynchronous event handler.
/// </summary>
/// <param name="sender">The sender of the event.</param>
/// <param name="args">Event arguments.</param>
/// <returns>A value task whose completion signals handling is finished.</returns>
public delegate ValueTask AsyncEventHandler(object? sender, EventArgs args);

/// <summary>
/// An asynchronous event handler.
/// </summary>
/// <typeparam name="TEventArgs">The type of event arguments.</typeparam>
/// <param name="sender">The sender of the event.</param>
/// <param name="args">Event arguments.</param>
/// <returns>A value task whose completion signals handling is finished.</returns>
public delegate ValueTask AsyncEventHandler<in TEventArgs>(object? sender, TEventArgs args);
34 changes: 0 additions & 34 deletions src/NATS.Client.Core/INatsError.cs

This file was deleted.

94 changes: 94 additions & 0 deletions src/NATS.Client.Core/Internal/AsyncEventExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// origin: https://github.com/microsoft/vs-threading/blob/9065e6e4b5593e6ed6e3ff0a9159d4e2765430d6/src/Microsoft.VisualStudio.Threading/TplExtensions.cs
// license: MIT
//
// Microsoft.VisualStudio.Threading
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.

namespace NATS.Client.Core.Internal;

internal static class AsyncEventExtensions
{
/// <summary>
/// Invokes asynchronous event handlers, returning a task that completes when all event handlers have been invoked.
/// Each handler is fully executed (including continuations) before the next handler in the list is invoked.
/// </summary>
/// <param name="handlers">The event handlers. May be <see langword="null" />.</param>
/// <param name="sender">The event source.</param>
/// <param name="args">The event argument.</param>
/// <returns>The task that completes when all handlers have completed.</returns>
/// <exception cref="AggregateException">Thrown if any handlers fail. It contains a collection of all failures.</exception>
public static async Task InvokeAsync(this AsyncEventHandler? handlers, object? sender, EventArgs args)
{
if (handlers != null)
{
var individualHandlers = handlers.GetInvocationList();
List<Exception>? exceptions = null;
foreach (var asyncHandler in individualHandlers)
{
var handler = (AsyncEventHandler)asyncHandler;
try
{
await handler(sender, args).ConfigureAwait(true);
}
catch (Exception ex)
{
exceptions ??= new List<Exception>(2);
exceptions.Add(ex);
}
}

if (exceptions != null)
{
throw new AggregateException(exceptions);
}
}
}

/// <summary>
/// Invokes asynchronous event handlers, returning a task that completes when all event handlers have been invoked.
/// Each handler is fully executed (including continuations) before the next handler in the list is invoked.
/// </summary>
/// <typeparam name="TEventArgs">The type of argument passed to each handler.</typeparam>
/// <param name="handlers">The event handlers. May be <see langword="null" />.</param>
/// <param name="sender">The event source.</param>
/// <param name="args">The event argument.</param>
/// <returns>The task that completes when all handlers have completed. The task is faulted if any handlers throw an exception.</returns>
/// <exception cref="AggregateException">Thrown if any handlers fail. It contains a collection of all failures.</exception>
public static async Task InvokeAsync<TEventArgs>(this AsyncEventHandler<TEventArgs>? handlers, object? sender, TEventArgs args)
{
if (handlers != null)
{
var individualHandlers = handlers.GetInvocationList();
List<Exception>? exceptions = null;
foreach (var asyncHandler in individualHandlers)
{
var handler = (AsyncEventHandler<TEventArgs>)asyncHandler;
try
{
await handler(sender, args).ConfigureAwait(true);
}
catch (Exception ex)
{
exceptions ??= new List<Exception>(2);
exceptions.Add(ex);
}
}

if (exceptions != null)
{
throw new AggregateException(exceptions);
}
}
}
}
77 changes: 66 additions & 11 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ public enum NatsConnectionState
Reconnecting,
}

internal enum NatsEvent
{
ConnectionOpened,
ConnectionDisconnected,
ReconnectFailed,
MessageDropped,
}

public partial class NatsConnection : INatsConnection
{
#pragma warning disable SA1401
Expand All @@ -36,7 +44,9 @@ public partial class NatsConnection : INatsConnection
private readonly string _name;
private readonly TimeSpan _socketComponentDisposeTimeout = TimeSpan.FromSeconds(5);
private readonly BoundedChannelOptions _defaultSubscriptionChannelOpts;
private readonly Channel<(NatsEvent, NatsEventArgs)> _eventChannel;
private readonly ClientOpts _clientOpts;

private int _pongCount;
private int _connectionState;

Expand Down Expand Up @@ -82,16 +92,25 @@ public NatsConnection(NatsOpts opts)
SingleReader = false,
AllowSynchronousContinuations = false,
};

// push consumer events to a channel so handlers can be awaited (also prevents user code from blocking us)
_eventChannel = Channel.CreateUnbounded<(NatsEvent, NatsEventArgs)>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleWriter = false,
SingleReader = true,
});
_ = Task.Run(PublishEventsAsync, _disposedCancellationTokenSource.Token);
}

// events
public event EventHandler<string>? ConnectionDisconnected;
public event AsyncEventHandler<NatsEventArgs>? ConnectionDisconnected;

public event EventHandler<string>? ConnectionOpened;
public event AsyncEventHandler<NatsEventArgs>? ConnectionOpened;

public event EventHandler<string>? ReconnectFailed;
public event AsyncEventHandler<NatsEventArgs>? ReconnectFailed;

public event EventHandler<INatsError>? OnError;
public event AsyncEventHandler<NatsMessageDroppedEventArgs>? MessageDropped;

public NatsOpts Opts { get; }

Expand Down Expand Up @@ -212,11 +231,11 @@ internal ValueTask UnsubscribeAsync(int sid)
return ValueTask.CompletedTask;
}

internal void MessageDropped<T>(NatsSubBase natsSub, int pending, NatsMsg<T> msg)
internal void OnMessageDropped<T>(NatsSubBase natsSub, int pending, NatsMsg<T> msg)
{
var subject = msg.Subject;
_logger.LogWarning("Dropped message from {Subject} with {Pending} pending messages", subject, pending);
OnError?.Invoke(this, new MessageDroppedError(natsSub, pending, subject, msg.ReplyTo, msg.Headers, msg.Data));
_eventChannel.Writer.TryWrite((NatsEvent.MessageDropped, new NatsMessageDroppedEventArgs(natsSub, pending, subject, msg.ReplyTo, msg.Headers, msg.Data)));
}

internal BoundedChannelOptions GetChannelOpts(NatsOpts connectionOpts, NatsSubChannelOpts? subChannelOpts)
Expand Down Expand Up @@ -344,7 +363,7 @@ private async ValueTask InitialConnectAsync()
StartPingTimer(_pingTimerCancellationTokenSource.Token);
_waitForOpenConnection.TrySetResult();
_ = Task.Run(ReconnectLoop);
ConnectionOpened?.Invoke(this, url?.ToString() ?? string.Empty);
_eventChannel.Writer.TryWrite((NatsEvent.ConnectionOpened, new NatsEventArgs(url?.ToString() ?? string.Empty)));
}
}

Expand Down Expand Up @@ -487,8 +506,8 @@ private async void ReconnectLoop()
_pingTimerCancellationTokenSource?.Cancel();
}

// Invoke after state changed
ConnectionDisconnected?.Invoke(this, _currentConnectUri?.ToString() ?? string.Empty);
// Invoke event after state changed
_eventChannel.Writer.TryWrite((NatsEvent.ConnectionDisconnected, new NatsEventArgs(_currentConnectUri?.ToString() ?? string.Empty)));

// Cleanup current socket
await DisposeSocketAsync(true).ConfigureAwait(false);
Expand Down Expand Up @@ -566,7 +585,7 @@ private async void ReconnectLoop()
_logger.LogWarning(NatsLogEvents.Connection, ex, "Failed to connect NATS {Url}", url);
}

ReconnectFailed?.Invoke(this, url?.ToString() ?? string.Empty);
_eventChannel.Writer.TryWrite((NatsEvent.ReconnectFailed, new NatsEventArgs(url?.ToString() ?? string.Empty)));
await WaitWithJitterAsync().ConfigureAwait(false);
goto CONNECT_AGAIN;
}
Expand All @@ -581,7 +600,7 @@ private async void ReconnectLoop()
StartPingTimer(_pingTimerCancellationTokenSource.Token);
_waitForOpenConnection.TrySetResult();
_ = Task.Run(ReconnectLoop);
ConnectionOpened?.Invoke(this, url?.ToString() ?? string.Empty);
_eventChannel.Writer.TryWrite((NatsEvent.ConnectionOpened, new NatsEventArgs(url.ToString())));
}
}
catch (Exception ex)
Expand All @@ -593,6 +612,42 @@ private async void ReconnectLoop()
}
}

private async Task PublishEventsAsync()
{
try
{
while (!_disposedCancellationTokenSource.IsCancellationRequested)
{
var hasData = await _eventChannel.Reader.WaitToReadAsync(_disposedCancellationTokenSource.Token).ConfigureAwait(false);
while (hasData && _eventChannel.Reader.TryRead(out var eventArgs))
{
var (natsEvent, args) = eventArgs;
switch (natsEvent)
{
case NatsEvent.ConnectionOpened when ConnectionOpened != null:
await ConnectionOpened.InvokeAsync(this, args).ConfigureAwait(false);
break;
case NatsEvent.ConnectionDisconnected when ConnectionDisconnected != null:
await ConnectionDisconnected.InvokeAsync(this, args).ConfigureAwait(false);
break;
case NatsEvent.ReconnectFailed when ReconnectFailed != null:
await ReconnectFailed.InvokeAsync(this, args).ConfigureAwait(false);
break;
case NatsEvent.MessageDropped when MessageDropped != null && args is NatsMessageDroppedEventArgs error:
await MessageDropped.InvokeAsync(this, error).ConfigureAwait(false);
break;
}
}
}
}
catch (Exception ex)
{
_logger.LogError(NatsLogEvents.Connection, ex, "Error occured when publishing events");
if (!_disposedCancellationTokenSource.IsCancellationRequested)
_ = Task.Run(PublishEventsAsync, _disposedCancellationTokenSource.Token);
}
}

private NatsUri FixTlsHost(NatsUri uri)
{
var lastSeedConnectUri = _lastSeedConnectUri;
Expand Down
35 changes: 35 additions & 0 deletions src/NATS.Client.Core/NatsEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// ReSharper disable UnusedAutoPropertyAccessor.Global - properties are used by consumers outside of this library
namespace NATS.Client.Core;

public class NatsEventArgs : EventArgs
{
public NatsEventArgs(string message) => Message = message;

public string Message { get; }
}

public class NatsMessageDroppedEventArgs : NatsEventArgs
{
public NatsMessageDroppedEventArgs(NatsSubBase subscription, int pending, string subject, string? replyTo, NatsHeaders? headers, object? data)
: base($"Dropped message from {subject} with {pending} pending messages")
{
Subscription = subscription;
Pending = pending;
Subject = subject;
ReplyTo = replyTo;
Headers = headers;
Data = data;
}

public NatsSubBase Subscription { get; }

public int Pending { get; }

public string Subject { get; }

public string? ReplyTo { get; }

public NatsHeaders? Headers { get; }

public object? Data { get; }
}
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public sealed record NatsOpts
/// <remarks>
/// If the client reaches this internal limit (bounded channel capacity), by default it will drop messages
/// and continue to process new messages. This is aligned with NATS at most once delivery. It is up to
/// the application to detect the missing messages (<seealso cref="NatsConnection.OnError"/>) and recover
/// the application to detect the missing messages (<seealso cref="NatsConnection.MessageDropped"/>) and recover
/// from this condition or set a different default such as <c>BoundedChannelFullMode.Wait</c> in which
/// case it might risk server disconnecting the client as a slow consumer.
/// </remarks>
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal NatsSub(
{
_msgs = Channel.CreateBounded<NatsMsg<T>>(
connection.GetChannelOpts(connection.Opts, opts?.ChannelOpts),
msg => Connection.MessageDropped(this, _msgs?.Reader.Count ?? 0, msg));
msg => Connection.OnMessageDropped(this, _msgs?.Reader.Count ?? 0, msg));

Serializer = serializer;
}
Expand Down
10 changes: 5 additions & 5 deletions src/NATS.Client.JetStream/Internal/NatsJSOrderedConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.Core.Commands;
using NATS.Client.Core.Internal;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream.Internal;
Expand Down Expand Up @@ -97,7 +96,7 @@ public NatsJSOrderedConsume(
// This channel is used to pass messages to the user from the subscription.
_userMsgs = Channel.CreateBounded<NatsJSMsg<TMsg>>(
Connection.GetChannelOpts(Connection.Opts, opts?.ChannelOpts),
msg => Connection.MessageDropped(this, _userMsgs?.Reader.Count ?? 0, msg.Msg));
msg => Connection.OnMessageDropped(this, _userMsgs?.Reader.Count ?? 0, msg.Msg));
Msgs = _userMsgs.Reader;

// Pull request channel is set as unbounded because we don't want to drop
Expand Down Expand Up @@ -317,10 +316,11 @@ protected override void TryComplete()
_userMsgs.Writer.TryComplete();
}

private void ConnectionOnConnectionDisconnected(object? sender, string e)
private ValueTask ConnectionOnConnectionDisconnected(object? sender, NatsEventArgs args)
{
_logger.LogWarning(NatsJSLogEvents.Connection, "Disconnected {Reason}", e);
_userMsgs.Writer.TryComplete(new NatsJSConnectionException(e));
_logger.LogWarning(NatsJSLogEvents.Connection, "Disconnected {Reason}", args.Message);
_userMsgs.Writer.TryComplete(new NatsJSConnectionException(args.Message));
return default;
}

private void ResetPending()
Expand Down
Loading

0 comments on commit d607449

Please sign in to comment.