From 8ae6b9d5e9a139616e433890e0cb71cd287da04e Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 27 Nov 2023 16:41:48 +0000 Subject: [PATCH] Increase consume and fetch channel size (#246) Increase consume and fetch channel size to avoid blocking the socket reads for other operations. I think we have done this change before but somehow reverted back during merges perhaps. --- .../Internal/NatsJSConsume.cs | 6 +- .../Internal/NatsJSFetch.cs | 9 +-- .../DoubleAckTest.cs | 67 +++++++++++++++++++ 3 files changed, 74 insertions(+), 8 deletions(-) create mode 100644 tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index 11cdf9e2d..800c725c1 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -130,13 +130,11 @@ public NatsJSConsume( Timeout.Infinite, Timeout.Infinite); - // Keep user channel small to avoid blocking the user code - // when disposed otherwise channel reader will continue delivering messages - // if there are messages queued up already. This channel is used to pass messages + // This channel is used to pass messages // to the user from the subscription channel (which should be set to a // sufficiently large value to avoid blocking socket reads in the // NATS connection). - _userMsgs = Channel.CreateBounded>(1); + _userMsgs = Channel.CreateBounded>(1000); Msgs = _userMsgs.Reader; // Capacity as 1 is enough here since it's used for signaling only. diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index cfb52faf2..a91559589 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -67,10 +67,11 @@ public NatsJSFetch( _pendingMsgs = _maxMsgs; _pendingBytes = _maxBytes; - // Keep user channel small to avoid blocking the user code when disposed, - // otherwise channel reader will continue delivering messages even after - // this 'fetch' object being disposed. - _userMsgs = Channel.CreateBounded>(1); + // This channel is used to pass messages + // to the user from the subscription channel (which should be set to a + // sufficiently large value to avoid blocking socket reads in the + // NATS connection). + _userMsgs = Channel.CreateBounded>(1000); Msgs = _userMsgs.Reader; if (_debug) diff --git a/tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs b/tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs new file mode 100644 index 000000000..04c762d6c --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs @@ -0,0 +1,67 @@ +using NATS.Client.Core.Tests; + +namespace NATS.Client.JetStream.Tests; + +public class DoubleAckTest +{ + [Fact] + public async Task Fetch_should_not_block_socket() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + await using var server = NatsServer.StartJS(); + + await using var nats = server.CreateClientConnection(); + + var js = new NatsJSContext(nats); + await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + + for (var i = 0; i < 100; i++) + { + var ack = await js.PublishAsync("s1.foo", i, cancellationToken: cts.Token); + ack.EnsureSuccess(); + } + + // fetch loop + { + var consumer = (NatsJSConsumer) await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + + var fetchOpts = new NatsJSFetchOpts + { + MaxMsgs = 100, Expires = TimeSpan.FromSeconds(5), + }; + + var count = 0; + await foreach (var msg in consumer.FetchAsync(opts: fetchOpts, cancellationToken: cts.Token)) + { + // double ack will use the same TCP stream to wait for the ACK from the server + // fetch must not block the socket so that the ACK can be received + await msg.AckAsync(new AckOpts(DoubleAck: true), cts.Token); + count++; + } + + Assert.Equal(100, count); + } + + // consume loop + { + var consumer = (NatsJSConsumer) await js.CreateConsumerAsync("s1", "c2", cancellationToken: cts.Token); + + var opts = new NatsJSConsumeOpts + { + MaxMsgs = 100, Expires = TimeSpan.FromSeconds(5), + }; + + var count = 0; + await foreach (var msg in consumer.ConsumeAsync(opts: opts, cancellationToken: cts.Token)) + { + // double ack will use the same TCP stream to wait for the ACK from the server + // fetch must not block the socket so that the ACK can be received + await msg.AckAsync(new AckOpts(DoubleAck: true), cts.Token); + count++; + } + + Assert.Equal(100, count); + } + + } +}