Skip to content

Commit

Permalink
Merge branch 'main' into implement-max-parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
mandel-macaque committed Oct 7, 2024
2 parents 318f33a + b0d83a1 commit c7a82ca
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/Marille.Tests/Marille.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1"/>
<PackageReference Include="xunit" Version="2.9.0"/>
<PackageReference Include="xunit" Version="2.9.2"/>
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
Expand Down
47 changes: 7 additions & 40 deletions src/Marille/Hub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,7 @@ void DeliverAtLeastOnceAsync<T> (string name, Channel<Message<T>> channel, IWork
{
logger?.LogTraceDeliverAtLeastOnceAsync (item.Payload, name, typeof(T));
Parallel.ForEachAsync (workersArray, async (worker, _) => {
CancellationToken token = default;
if (timeout.HasValue) {
logger?.LogTraceTimeoutCreation (timeout.Value);
var cts = new CancellationTokenSource ();
cts.CancelAfter (timeout.Value);
token = cts.Token;
}
var token = timeout.GetCancellationToken (logger);
var task = worker.ConsumeAsync (item.Payload, token);
await HandleConsumerError (name, task, channel, item);
});
Expand All @@ -66,25 +60,10 @@ Task DeliverAtLeastOnceSync<T> (string name, Channel<Message<T>> channel, IWorke
logger?.LogTraceDeliverAtLeastOnceSync (item.Payload, name, typeof(T));
// we just need to execute all the provider workers with the same message and return the
// task when all are done
CancellationToken token = default;
if (timeout.HasValue) {
logger?.LogTraceTimeoutCreation (timeout.Value);
var cts = new CancellationTokenSource ();
cts.CancelAfter (timeout.Value);
token = cts.Token;
}

var token = timeout.GetCancellationToken (logger);
var tasks = new Task [workersArray.Length];
for(var index = 0; index < workersArray.Length; index++) {
var worker = workersArray [index];
_ = worker.TryGetUseBackgroundThread (out var useBackgroundThread);
if (useBackgroundThread) {
tasks [index] = Task.Run (async () => {
await worker.ConsumeAsync (item.Payload, token).ConfigureAwait (false);
}, token);
} else {
tasks [index] = worker.ConsumeAsync (item.Payload, token);
}
tasks [index] = workersArray [index].ConsumeThreadAsync (item.Payload, token);
}
return HandleConsumerError (name, Task.WhenAll (tasks), channel, item);
}
Expand All @@ -97,20 +76,8 @@ Task DeliverAtMostOnceAsync<T> (string name, Channel<Message<T>> channel, IWorke
// we do know we are not empty, and in the AtMostOnce mode we will only use the first worker
// present
var worker = workersArray [0];
CancellationToken token = default;
if (timeout.HasValue) {
logger?.LogTraceTimeoutCreation (timeout.Value);
var cts = new CancellationTokenSource ();
cts.CancelAfter (timeout.Value);
token = cts.Token;
}

_ = worker.TryGetUseBackgroundThread (out var useBackgroundThread);
var task = useBackgroundThread ?
Task.Run (async () => {
await worker.ConsumeAsync (item.Payload, token).ConfigureAwait (false);
}, token) :
worker.ConsumeAsync (item.Payload, token);
var token = timeout.GetCancellationToken (logger);
var task = worker.ConsumeThreadAsync (item.Payload, token);
return HandleConsumerError (name, task, channel, item);
}

Expand Down Expand Up @@ -148,14 +115,14 @@ async Task ConsumeChannel<T> (string name, TopicConfiguration configuration, Cha
await parallelSemaphore.WaitAsync (cancellationToken);
await Task.Run (async () => {
try {
await errorWorker.ConsumeAsync (
await errorWorker.ConsumeThreadAsync (
item.Payload, item.Exception, cancellationToken).ConfigureAwait (false);
} finally {
parallelSemaphore?.Release ();
}
}, cancellationToken);
} else {
await errorWorker.ConsumeAsync (
await errorWorker.ConsumeThreadAsync (
item.Payload, item.Exception, cancellationToken).ConfigureAwait (false);
}
} catch {
Expand Down
2 changes: 1 addition & 1 deletion src/Marille/Marille.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="DotNet.ReproducibleBuilds" Version="1.2.4">
<PackageReference Include="DotNet.ReproducibleBuilds" Version="1.2.25">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
20 changes: 20 additions & 0 deletions src/Marille/TimeSpanExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Microsoft.Extensions.Logging;

namespace Marille;

internal static class TimeSpanExtensions {

public static CancellationToken GetCancellationToken (this TimeSpan? timeout, ILogger? logger)
{
if (!timeout.HasValue)
return default;

CancellationToken token = default;
logger?.LogTraceTimeoutCreation (timeout.Value);
var cts = new CancellationTokenSource ();
cts.CancelAfter (timeout.Value);
token = cts.Token;

return token;
}
}
44 changes: 44 additions & 0 deletions src/Marille/WorkerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,48 @@ public static bool TryGetUseBackgroundThread<T> (this IErrorWorker<T> worker, ou
return false;
}
}

public static async Task ConsumeThreadAsync<T> (this IWorker<T> worker, T message,
CancellationToken cancellationToken = default,
SemaphoreSlim? parallelSemaphore = null) where T : struct
{
_ = worker.TryGetUseBackgroundThread (out var useBackgroundThread);
if (useBackgroundThread) {
if (parallelSemaphore is not null)
await parallelSemaphore.WaitAsync (cancellationToken).ConfigureAwait (false);
// spawn a new thread to consume the message
await Task.Run (async () => {
try {
await worker.ConsumeAsync (message, cancellationToken).ConfigureAwait (false);
} finally {
parallelSemaphore?.Release ();
}
}, cancellationToken);
} else {
await worker.ConsumeAsync (message, cancellationToken).ConfigureAwait (false);
}
}

public static async Task ConsumeThreadAsync<T> (this IErrorWorker<T> worker, T message,
Exception exception,
CancellationToken cancellationToken = default,
SemaphoreSlim? parallelSemaphore = null) where T : struct
{
_ = worker.TryGetUseBackgroundThread (out var useBackgroundThread);
if (useBackgroundThread) {
if (parallelSemaphore is not null)
await parallelSemaphore.WaitAsync (cancellationToken).ConfigureAwait (false);
// spawn a new thread to consume the message
await Task.Run (async () => {
try {
await worker.ConsumeAsync (message, exception, cancellationToken).ConfigureAwait (false);
} finally {
parallelSemaphore?.Release ();
}
}, cancellationToken);
} else {
await worker.ConsumeAsync (message, exception, cancellationToken).ConfigureAwait (false);
}
}

}

0 comments on commit c7a82ca

Please sign in to comment.