Skip to content

Commit

Permalink
Merge pull request #738 from nats-io/fix-redelivery
Browse files Browse the repository at this point in the history
Update message metadata to use `deliveryCount` consistently
  • Loading branch information
aricart authored Jan 10, 2025
2 parents 04a6476 + 0b538b9 commit f1a7438
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 16 deletions.
2 changes: 1 addition & 1 deletion examples/legacy_js/07_pullsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const psub = await js.pullSubscribe(subj, {
for await (const m of psub) {
console.log(
`[${m.seq}] ${
m.redelivered ? `- redelivery ${m.info.redeliveryCount}` : ""
m.redelivered ? `- redelivery ${m.info.deliveryCount}` : ""
}`,
);
if (m.seq % 2 === 0) {
Expand Down
8 changes: 6 additions & 2 deletions jetstream/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ export interface ConsumerUpdateConfig {
*/
"ack_wait"?: Nanos;
/**
* The number of times a message will be redelivered to consumers if not acknowledged in time
* The maximum number of times a message will be delivered to consumers.
*/
"max_deliver"?: number;
/**
Expand Down Expand Up @@ -1083,7 +1083,11 @@ export interface DeliveryInfo {
*/
consumer: string;
/**
* The number of times the message has been redelivered.
* The number of times the message has been delivered.
*/
deliveryCount: number;
/**
* @deprecated: use deliveryCount
*/
redeliveryCount: number;
/**
Expand Down
11 changes: 6 additions & 5 deletions jetstream/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,18 @@ export function parseInfo(s: string): DeliveryInfo {
}

// old
// "$JS.ACK.<stream>.<consumer>.<redeliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
// "$JS.ACK.<stream>.<consumer>.<deliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
// new
// $JS.ACK.<domain>.<accounthash>.<stream>.<consumer>.<redeliveryCount>.<streamSeq>.<deliverySequence>.<timestamp>.<pending>.<random>
// $JS.ACK.<domain>.<accounthash>.<stream>.<consumer>.<deliveryCount>.<streamSeq>.<deliverySequence>.<timestamp>.<pending>.<random>
const di = {} as DeliveryInfo;
// if domain is "_", replace with blank
di.domain = tokens[2] === "_" ? "" : tokens[2];
di.account_hash = tokens[3];
di.stream = tokens[4];
di.consumer = tokens[5];
di.redeliveryCount = parseInt(tokens[6], 10);
di.redelivered = di.redeliveryCount > 1;
di.deliveryCount = parseInt(tokens[6], 10);
di.redeliveryCount = di.deliveryCount;
di.redelivered = di.deliveryCount > 1;
di.streamSequence = parseInt(tokens[7], 10);
di.deliverySequence = parseInt(tokens[8], 10);
di.timestampNanos = parseInt(tokens[9], 10);
Expand Down Expand Up @@ -196,7 +197,7 @@ export class JsMsgImpl implements JsMsg {
}

get redelivered(): boolean {
return this.info.redeliveryCount > 1;
return this.info.deliveryCount > 1;
}

get reply(): string {
Expand Down
2 changes: 1 addition & 1 deletion jetstream/tests/fetch_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ Deno.test("fetch - listener leaks", async () => {
for await (const m of iter) {
assertEquals(nci.protocol.listeners.length, base);
m?.nak();
if (m.info.redeliveryCount > 100) {
if (m.info.deliveryCount > 100) {
done = true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion jetstream/tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ Deno.test("jetstream - backoff", async () => {
start = Date.now();
}
arrive.push(Date.now());
if (m.info.redeliveryCount === 4) {
if (m.info.deliveryCount === 4) {
break;
}
}
Expand Down
10 changes: 5 additions & 5 deletions jetstream/tests/jsmsg_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ import { JetStreamManagerImpl } from "../jsm.ts";
import { MsgImpl } from "../../nats-base-client/msg.ts";

Deno.test("jsmsg - parse", () => {
// "$JS.ACK.<stream>.<consumer>.<redeliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
// "$JS.ACK.<stream>.<consumer>.<deliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
const rs = `$JS.ACK.streamname.consumername.2.3.4.${nanos(Date.now())}.100`;
const info = parseInfo(rs);
assertEquals(info.stream, "streamname");
assertEquals(info.consumer, "consumername");
assertEquals(info.redeliveryCount, 2);
assertEquals(info.deliveryCount, 2);
assertEquals(info.streamSequence, 3);
assertEquals(info.pending, 100);
});

Deno.test("jsmsg - parse long", () => {
// $JS.ACK.<domain>.<accounthash>.<stream>.<consumer>.<redeliveryCount>.<streamSeq>.<deliverySequence>.<timestamp>.<pending>.<random>
// $JS.ACK.<domain>.<accounthash>.<stream>.<consumer>.<deliveryCount>.<streamSeq>.<deliverySequence>.<timestamp>.<pending>.<random>
const rs = `$JS.ACK.domain.account.streamname.consumername.2.3.4.${
nanos(Date.now())
}.100.rand`;
Expand All @@ -58,7 +58,7 @@ Deno.test("jsmsg - parse long", () => {
assertEquals(info.account_hash, "account");
assertEquals(info.stream, "streamname");
assertEquals(info.consumer, "consumername");
assertEquals(info.redeliveryCount, 2);
assertEquals(info.deliveryCount, 2);
assertEquals(info.streamSequence, 3);
assertEquals(info.pending, 100);
});
Expand Down Expand Up @@ -95,7 +95,7 @@ Deno.test("jsmsg - acks", async () => {
fail(err.message);
}
msg.respond(Empty, {
// "$JS.ACK.<stream>.<consumer>.<redeliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
// "$JS.ACK.<stream>.<consumer>.<deliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
reply:
`MY.TEST.streamname.consumername.1.${counter}.${counter}.${Date.now()}.0`,
});
Expand Down
2 changes: 1 addition & 1 deletion jetstream/tests/next_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Deno.test("next - listener leaks", async () => {
const m = await consumer.next();
if (m) {
m.nak();
if (m.info?.redeliveryCount > 100) {
if (m.info?.deliveryCount > 100) {
break;
}
}
Expand Down

0 comments on commit f1a7438

Please sign in to comment.