Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetstream throws debug messages after resuming from Hibernation. #544

Open
alberk8 opened this issue Jul 5, 2024 · 11 comments
Open

Jetstream throws debug messages after resuming from Hibernation. #544

alberk8 opened this issue Jul 5, 2024 · 11 comments

Comments

@alberk8
Copy link

alberk8 commented Jul 5, 2024

Observed behavior

After getting out of hibernation (Windows 10) where I have a Jetstream Consumer, sometimes I get the following error. It cannot be reliably replicated. The code that got triggered here.

The Nats server and the Windows 10 client are running on different machine in a LAN network.

[09:49:10.298  DBG][] NATS.Client.JetStream.Internal.NatsJSConsume: Idle heartbeat timeout after 00:00:15ns
[09:49:40.298  DBG][] NATS.Client.JetStream.Internal.NatsJSConsume: Idle heartbeat timeout after 00:00:15ns
[09:50:10.301  DBG][] NATS.Client.JetStream.Internal.NatsJSConsume: Idle heartbeat timeout after 00:00:15ns
[09:50:40.300  DBG][] NATS.Client.JetStream.Internal.NatsJSConsume: Idle heartbeat timeout after 00:00:15ns
[09:51:10.312  DBG][] NATS.Client.JetStream.Internal.NatsJSConsume: Idle heartbeat timeout after 00:00:15ns 

Expected behavior

Continue to work.

Server and client version

Nats Server v2.10.16
Nats Client v2.2.3

Host environment

Nats Server Running on Ubuntu Linux 20.04 and running 24hrs.

Steps to reproduce

No response

@alberk8 alberk8 changed the title Jetstream throw debug message resuming from Hibernation. Jetstream throws debug messages after resuming from Hibernation. Jul 5, 2024
@mtmk
Copy link
Collaborator

mtmk commented Jul 5, 2024

So when it comes back, the consume method isn't yielding anymore messages?

not sure how to replicate this. I can maybe suspend the process and see what happens.

@alberk8
Copy link
Author

alberk8 commented Jul 6, 2024

So when it comes back, the consume method isn't yielding anymore messages?

not sure how to replicate this. I can maybe suspend the process and see what happens.

yes, no more message.

Sample Code

 public class NatsHostedConsume : IHostedService, IAsyncDisposable
 {
     private CancellationTokenSource? _cts;
     private Task? _task;
     private NatsConnection? _natsConnection;
     private NatsJSContext? _natsJSContext;
     private INatsJSStream? _natsJSStream;

     private readonly IHostApplicationLifetime _lifetime;
     private readonly INatsConnection _inatsConnection;
     private readonly ILogger<NatsHostedConsume> _logger;

     public NatsHostedConsume
     (
          IHostApplicationLifetime lifetime,
          INatsConnection inatsConnection,
          ILogger<NatsHostedConsume> logger
     ) 
     {
         _lifetime = lifetime;
         _inatsConnection = inatsConnection;
         _logger = logger;
     }

     public async ValueTask DisposeAsync()
     {
         _cts?.Cancel();
         if (_task is not null)
         {
             await _task;
         }
         return;
     }

     public async Task StartAsync(CancellationToken cancellationToken)
     {
         _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

         _natsConnection = new NatsConnection(_inatsConnection.Opts);
         _natsJSContext = new NatsJSContext(_natsConnection);

         Console.WriteLine("Starting Hosted");

         await SetupStream(_natsJSContext);

         _task = Task.Run(async () => await ProcessDataAsync(_cts.Token));

          Console.WriteLine("Task Status: " + _task!.Status);
     }

     public async Task StopAsync(CancellationToken cancellationToken)
     {
         _cts?.Cancel();
         if (_task is not null)
         {
             Console.WriteLine("Task Status: " + _task!.Status);
             await _task;
         }
         return;
     }

     private async Task SetupStream(NatsJSContext natsJSContext)
     {
         var streamConfig = new StreamConfig("TestStream", new[] { "TestSubject" })
         {
             MaxAge = TimeSpan.FromDays(7),
             Compression = StreamConfigCompression.S2
         };

         try
         {
             
             _natsJSStream = await natsJSContext.UpdateStreamAsync(streamConfig);  
         }
         catch (NatsJSApiException e) when (e.Error.Code is 404)
         {
             _natsJSStream = await natsJSContext.CreateStreamAsync(streamConfig);
         }
         catch (Exception ex)
         {
             _logger.LogDebug(ex.Message);
             throw;
         }
     }

     private async Task ProcessDataAsync(CancellationToken cancellationToken)
     {
         var consumerConfig = new ConsumerConfig("DataConsumer")
         {
           
             FilterSubject = "TestSubject",
             DeliverPolicy = ConsumerConfigDeliverPolicy.All,
             AckPolicy = ConsumerConfigAckPolicy.Explicit,
             AckWait = TimeSpan.FromSeconds(10),
         };
          
         var consumer = await _natsJSContext!.CreateOrUpdateConsumerAsync("TestStream", consumerConfig);

         while (!cancellationToken.IsCancellationRequested)
         {
             await consumer.RefreshAsync(cancellationToken);

             await foreach (NatsJSMsg<DataPayload> msg in consumer.ConsumeAsync<DataPayload>().WithCancellation(cancellationToken))
             
             {
                 if (msg.Data is null)
                 {
                     await msg.AckAsync();
                     continue;
                 }
                 try
                 {
                     // Do Processing
                     _logger.LogDebug("Process Data");
                 }
                 catch (NatsNoRespondersException ex)
                 {
                     _logger.LogDebug($"Nats Error: {ex.Message}");
                     await msg.NakAsync();
                 }

                 catch (Exception ex)
                 {
                  
                     _logger.LogDebug($"Exception Error: {ex.Message}");
                     await msg.NakAsync();
                 }

                 await msg.AckAsync();
             }
             try
             {
                 await Task.Delay(1_000, cancellationToken);
             }
             catch (TaskCanceledException) { }
         }
     }
 }

@mtmk
Copy link
Collaborator

mtmk commented Jul 6, 2024

are you seeing any exceptions? Could you wrap ProcessDataAsync() in a try catch and log if there are any exceptions?

@alberk8
Copy link
Author

alberk8 commented Jul 6, 2024

No exception. I am putting in try catch on in ProcessDataAsync and will monitor and report back.
One other thing I notice is that the Log that I have reported above is only in the Console and not log to file, other LogDebug from Nats are logged to file. I am using Serilog for logging.

@mtmk
Copy link
Collaborator

mtmk commented Jul 6, 2024

No exception. I am putting in try catch on in ProcessDataAsync and will monitor and report back.

👍

One other thing I notice is that the Log that I have reported above is only in the Console and not log to file, other LogDebug from Nats are logged to file. I am using Serilog for logging.

Depends on how you're injecting NatsConnection. Have a look at the NATS.Extensions.Microsoft.DependencyInjection package.

@alberk8
Copy link
Author

alberk8 commented Jul 6, 2024

I am using that nuget package already.

@mtmk
Copy link
Collaborator

mtmk commented Jul 6, 2024

is it maybe the order of your build statements i.e. you add nats first then configure logging? would that make a difference?

@alberk8
Copy link
Author

alberk8 commented Jul 10, 2024

The the try and catch on the foreach, still getting the same error. I think it is due to state where disconnect and connect to Nats server when it comes out of hibernation that is causing the problem. This also happens to request/reply where it will say no responder.

@mtmk
Copy link
Collaborator

mtmk commented Jul 10, 2024

coincidently there is a bug fix PR in review which might be the reason for this:

@mtmk
Copy link
Collaborator

mtmk commented Jul 10, 2024

I hope the fix I mentioned above fixes this issue. we just released it in v2.3.2: https://github.com/nats-io/nats.net.v2/releases/tag/v2.3.2

@alberk8
Copy link
Author

alberk8 commented Jul 11, 2024

Updated the Nuget and running Test, Will report back the result later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants