From 4f6ba9e3b2451094aee0389f056fe1a1ce845e68 Mon Sep 17 00:00:00 2001 From: Manuel de la Pena Date: Wed, 18 Sep 2024 11:13:14 -0400 Subject: [PATCH 1/3] [Internal API] Clean Hub.cs to be prepare for the parallem limit impl. (#77) Clean the code a little and remove some duplicated code. This will make it simpler to understand the following changes around the use of a SemaphoreSlim to improve the resource usage. --- src/Marille/Hub.cs | 53 ++++--------------------------- src/Marille/TimeSpanExtensions.cs | 20 ++++++++++++ src/Marille/WorkerExtensions.cs | 44 +++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 47 deletions(-) create mode 100644 src/Marille/TimeSpanExtensions.cs diff --git a/src/Marille/Hub.cs b/src/Marille/Hub.cs index 15ea17a..967e4a3 100644 --- a/src/Marille/Hub.cs +++ b/src/Marille/Hub.cs @@ -47,13 +47,7 @@ void DeliverAtLeastOnceAsync (string name, Channel> channel, IWork { logger?.LogTraceDeliverAtLeastOnceAsync (item.Payload, name, typeof(T)); Parallel.ForEachAsync (workersArray, async (worker, _) => { - CancellationToken token = default; - if (timeout.HasValue) { - logger?.LogTraceTimeoutCreation (timeout.Value); - var cts = new CancellationTokenSource (); - cts.CancelAfter (timeout.Value); - token = cts.Token; - } + var token = timeout.GetCancellationToken (logger); var task = worker.ConsumeAsync (item.Payload, token); await HandleConsumerError (name, task, channel, item); }); @@ -66,25 +60,10 @@ Task DeliverAtLeastOnceSync (string name, Channel> channel, IWorke logger?.LogTraceDeliverAtLeastOnceSync (item.Payload, name, typeof(T)); // we just need to execute all the provider workers with the same message and return the // task when all are done - CancellationToken token = default; - if (timeout.HasValue) { - logger?.LogTraceTimeoutCreation (timeout.Value); - var cts = new CancellationTokenSource (); - cts.CancelAfter (timeout.Value); - token = cts.Token; - } - + var token = timeout.GetCancellationToken (logger); var tasks = new Task [workersArray.Length]; for(var index = 0; index < workersArray.Length; index++) { - var worker = workersArray [index]; - _ = worker.TryGetUseBackgroundThread (out var useBackgroundThread); - if (useBackgroundThread) { - tasks [index] = Task.Run (async () => { - await worker.ConsumeAsync (item.Payload, token).ConfigureAwait (false); - }, token); - } else { - tasks [index] = worker.ConsumeAsync (item.Payload, token); - } + tasks [index] = workersArray [index].ConsumeThreadAsync (item.Payload, token); } return HandleConsumerError (name, Task.WhenAll (tasks), channel, item); } @@ -97,20 +76,8 @@ Task DeliverAtMostOnceAsync (string name, Channel> channel, IWorke // we do know we are not empty, and in the AtMostOnce mode we will only use the first worker // present var worker = workersArray [0]; - CancellationToken token = default; - if (timeout.HasValue) { - logger?.LogTraceTimeoutCreation (timeout.Value); - var cts = new CancellationTokenSource (); - cts.CancelAfter (timeout.Value); - token = cts.Token; - } - - _ = worker.TryGetUseBackgroundThread (out var useBackgroundThread); - var task = useBackgroundThread ? - Task.Run (async () => { - await worker.ConsumeAsync (item.Payload, token).ConfigureAwait (false); - }, token) : - worker.ConsumeAsync (item.Payload, token); + var token = timeout.GetCancellationToken (logger); + var task = worker.ConsumeThreadAsync (item.Payload, token); return HandleConsumerError (name, task, channel, item); } @@ -139,15 +106,7 @@ async Task ConsumeChannel (string name, TopicConfiguration configuration, Cha // the error task in a try/catch to make sure that if the user did raise an exception, we do not // crash the whole consuming task. Sometimes java was right when adding exceptions to a method signature try { - var _ = errorWorker.TryGetUseBackgroundThread (out var useBackgroundThread); - if (useBackgroundThread) - await Task.Run (async () => { - await errorWorker.ConsumeAsync ( - item.Payload, item.Exception, cancellationToken).ConfigureAwait (false); - }, cancellationToken); - else - await errorWorker.ConsumeAsync ( - item.Payload, item.Exception, cancellationToken).ConfigureAwait (false); + await errorWorker.ConsumeThreadAsync (item.Payload, item.Exception, cancellationToken); } catch { // should we log the exception we are ignoring? logger?.LogErrorConsumerException (errorWorker.GetType (), item.Payload, item.Exception, name, diff --git a/src/Marille/TimeSpanExtensions.cs b/src/Marille/TimeSpanExtensions.cs new file mode 100644 index 0000000..dd2171e --- /dev/null +++ b/src/Marille/TimeSpanExtensions.cs @@ -0,0 +1,20 @@ +using Microsoft.Extensions.Logging; + +namespace Marille; + +internal static class TimeSpanExtensions { + + public static CancellationToken GetCancellationToken (this TimeSpan? timeout, ILogger? logger) + { + if (!timeout.HasValue) + return default; + + CancellationToken token = default; + logger?.LogTraceTimeoutCreation (timeout.Value); + var cts = new CancellationTokenSource (); + cts.CancelAfter (timeout.Value); + token = cts.Token; + + return token; + } +} diff --git a/src/Marille/WorkerExtensions.cs b/src/Marille/WorkerExtensions.cs index f752589..865cef9 100644 --- a/src/Marille/WorkerExtensions.cs +++ b/src/Marille/WorkerExtensions.cs @@ -25,4 +25,48 @@ public static bool TryGetUseBackgroundThread (this IErrorWorker worker, ou return false; } } + + public static async Task ConsumeThreadAsync (this IWorker worker, T message, + CancellationToken cancellationToken = default, + SemaphoreSlim? parallelSemaphore = null) where T : struct + { + _ = worker.TryGetUseBackgroundThread (out var useBackgroundThread); + if (useBackgroundThread) { + if (parallelSemaphore is not null) + await parallelSemaphore.WaitAsync (cancellationToken).ConfigureAwait (false); + // spawn a new thread to consume the message + await Task.Run (async () => { + try { + await worker.ConsumeAsync (message, cancellationToken).ConfigureAwait (false); + } finally { + parallelSemaphore?.Release (); + } + }, cancellationToken); + } else { + await worker.ConsumeAsync (message, cancellationToken).ConfigureAwait (false); + } + } + + public static async Task ConsumeThreadAsync (this IErrorWorker worker, T message, + Exception exception, + CancellationToken cancellationToken = default, + SemaphoreSlim? parallelSemaphore = null) where T : struct + { + _ = worker.TryGetUseBackgroundThread (out var useBackgroundThread); + if (useBackgroundThread) { + if (parallelSemaphore is not null) + await parallelSemaphore.WaitAsync (cancellationToken).ConfigureAwait (false); + // spawn a new thread to consume the message + await Task.Run (async () => { + try { + await worker.ConsumeAsync (message, exception, cancellationToken).ConfigureAwait (false); + } finally { + parallelSemaphore?.Release (); + } + }, cancellationToken); + } else { + await worker.ConsumeAsync (message, exception, cancellationToken).ConfigureAwait (false); + } + } + } From ea782314aa8fcafaeedb766acabd97b6ef93118c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 7 Oct 2024 10:26:26 -0400 Subject: [PATCH 2/3] Bump DotNet.ReproducibleBuilds from 1.2.4 to 1.2.25 in /src (#78) Bumps [DotNet.ReproducibleBuilds](https://github.com/dotnet/reproducible-builds) from 1.2.4 to 1.2.25. - [Release notes](https://github.com/dotnet/reproducible-builds/releases) - [Commits](https://github.com/dotnet/reproducible-builds/compare/v1.2.4...v1.2.25) --- updated-dependencies: - dependency-name: DotNet.ReproducibleBuilds dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- src/Marille/Marille.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Marille/Marille.csproj b/src/Marille/Marille.csproj index 4a8c25f..9e29cd3 100644 --- a/src/Marille/Marille.csproj +++ b/src/Marille/Marille.csproj @@ -45,7 +45,7 @@ - + runtime; build; native; contentfiles; analyzers; buildtransitive all From b0d83a1e70387a27223bde22896065e4c7844a36 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 7 Oct 2024 10:26:35 -0400 Subject: [PATCH 3/3] Bump xunit from 2.9.0 to 2.9.2 in /src (#80) Bumps [xunit](https://github.com/xunit/xunit) from 2.9.0 to 2.9.2. - [Commits](https://github.com/xunit/xunit/compare/v2-2.9.0...v2-2.9.2) --- updated-dependencies: - dependency-name: xunit dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- src/Marille.Tests/Marille.Tests.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Marille.Tests/Marille.Tests.csproj b/src/Marille.Tests/Marille.Tests.csproj index b672cdd..6dad90b 100644 --- a/src/Marille.Tests/Marille.Tests.csproj +++ b/src/Marille.Tests/Marille.Tests.csproj @@ -11,7 +11,7 @@ - + runtime; build; native; contentfiles; analyzers; buildtransitive all