Skip to content

Commit

Permalink
Inbox subscription options fix
Browse files Browse the repository at this point in the history
Internal inbox subscription options was being used to create the real
mux-inbox subscription (on first subscription attempt), which I noticed
when trying to use timeouts with request-reply calls. Timeout was ending
the real mux-inbox which was causing the rest of the request-reply calls
to silently fail.
  • Loading branch information
mtmk committed Oct 24, 2023
1 parent 7e77562 commit 835414c
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
11 changes: 8 additions & 3 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public async ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancell
throw new NatsException("Inbox subscriptions don't support queue groups");
}

await SubscribeInboxAsync(sub.Subject, sub.Opts, sub, cancellationToken).ConfigureAwait(false);
await SubscribeInboxAsync(sub, cancellationToken).ConfigureAwait(false);
}
else
{
Expand Down Expand Up @@ -171,7 +171,7 @@ public ISubscriptionManager GetManagerFor(string subject)
return this;
}

private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
private async ValueTask SubscribeInboxAsync(NatsSubBase sub, CancellationToken cancellationToken)
{
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
{
Expand All @@ -181,7 +181,12 @@ private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, N
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
{
var inboxSubject = $"{_inboxPrefix}.*";
_inboxSub = InboxSubBuilder.Build(inboxSubject, opts, _connection, manager: this);

// We need to subscribe to the real inbox subject before we can register the internal subject.
// We use 'default' options here since options provided by the user are for the internal subscription.
// For example if the user provides a timeout, we don't want to timeout the real inbox subscription
// since it must live duration of the connection.
_inboxSub = InboxSubBuilder.Build(inboxSubject, opts: default, _connection, manager: this);
await SubscribeInternalAsync(
inboxSubject,
queueGroup: default,
Expand Down
62 changes: 62 additions & 0 deletions tests/NATS.Client.Core.Tests/RequestReplyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,66 @@ static string ToStr(ReadOnlyMemory<byte> input)
await sub.DisposeAsync();
await reg;
}

[Fact]
public async Task Request_reply_many_multiple_with_timeout_test()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

const string subject = "foo";
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var cancellationToken = cts.Token;

var sub = await nats.SubscribeAsync<int>(subject, cancellationToken: cancellationToken);
var reg = sub.Register(async msg =>
{
await msg.ReplyAsync(msg.Data * 2, cancellationToken: cancellationToken);
});

var opts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(2) };

List<Task<(int index, int data)>> tasks = new();

for (var i = 0; i < 10; i++)
{
var index = i;

tasks.Add(Task.Run(
async () =>
{
var data = -1;

await foreach (var msg in nats.RequestManyAsync<int, int>(subject, index, replyOpts: opts, cancellationToken: cancellationToken))
{
data = msg.Data;
}

return (index, data);
},
cancellationToken));
}

foreach (var task in tasks)
{
var (index, data) = await task;
Assert.Equal(index * 2, data);
}

// Make sure timeout isn't affecting the real inbox subscription
// by waiting double the timeout period which should be enough
for (var i = 1; i <= 2; i++)
{
var data = -1;
await foreach (var msg in nats.RequestManyAsync<int, int>(subject, i * 100, replyOpts: opts, cancellationToken: cancellationToken))
{
data = msg.Data;
}

Assert.Equal(i * 200, data);
}

await sub.DisposeAsync();
await reg;
}
}

0 comments on commit 835414c

Please sign in to comment.