Skip to content

Commit

Permalink
Merge pull request #638 from nats-io/consumer-no-giveup-streamdelete
Browse files Browse the repository at this point in the history
[FIX] added a status notification if the stream is deleted while a consumer in a disconnect cycle
  • Loading branch information
aricart authored Dec 4, 2023
2 parents 37f5bd6 + 907b979 commit 2aeb0e1
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions jetstream/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ export enum ConsumerEvents {
*/
HeartbeatsMissed = "heartbeats_missed",
/**
* Notification that the consumer was not found. Consumers that yielded at least
* one message will be retried for more messages regardless of the not being found
* Notification that the consumer was not found. Consumers that were accessible at
* least once, will be retried for more messages regardless of the not being found
* or timeouts etc. This notification includes a count of consecutive attempts to
* find the consumer. Note that if you get this notification possibly your code should
* attempt to recreate the consumer. Note that this notification is only informational
Expand All @@ -171,6 +171,16 @@ export enum ConsumerEvents {
ConsumerNotFound = "consumer_not_found",

/**
* Notification that the stream was not found. Consumers were accessible at least once,
* will be retried for more messages regardless of the not being found
* or timeouts etc. This notification includes a count of consecutive attempts to
* find the consumer. Note that if you get this notification possibly your code should
* attempt to recreate the consumer. Note that this notification is only informational
* for ordered consumers, as the consumer will be created in those cases automatically.
*/
StreamNotFound = "stream_not_found",

/*
* Notification that the consumer was deleted. This notification
* means the consumer will not get messages unless it is recreated. The client
* will continue to attempt to pull messages. Ordered consumer will recreate it.
Expand Down Expand Up @@ -508,6 +518,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>

async resetPending(): Promise<boolean> {
let notFound = 0;
let streamNotFound = 0;
const bo = backoff();
let attempt = 0;
while (true) {
Expand All @@ -528,7 +539,10 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
return true;
} catch (err) {
// game over
if (err.message === "consumer not found") {
if (err.message === "stream not found") {
streamNotFound++;
this.notify(ConsumerEvents.StreamNotFound, streamNotFound);
} else if (err.message === "consumer not found") {
notFound++;
this.notify(ConsumerEvents.ConsumerNotFound, notFound);
if (this.resetHandler) {
Expand All @@ -543,6 +557,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
} else {
notFound = 0;
streamNotFound = 0;
}
const to = bo.backoff(attempt);
// wait for delay or till the client closes
Expand Down

0 comments on commit 2aeb0e1

Please sign in to comment.