Skip to content

Commit

Permalink
Merge pull request #630 from nats-io/term-reason
Browse files Browse the repository at this point in the history
[FEAT] added `reason` to JsMsg#term - this feature requires 2.11.0
  • Loading branch information
aricart authored Nov 2, 2023
2 parents bd61b18 + 6a4069c commit 4225414
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 17 deletions.
12 changes: 9 additions & 3 deletions jetstream/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ export interface JsMsg {
/**
* Indicate to the JetStream server that processing of the message
* failed and that the message should not be sent to the consumer again.
* @param reason is a string describing why the message was termed. Note
* that `reason` is only available on servers 2.11.0 or better.
*/
term(): void;
term(reason?: string): void;

/**
* Indicate to the JetStream server that the message was processed
Expand Down Expand Up @@ -281,8 +283,12 @@ export class JsMsgImpl implements JsMsg {
this.msg.respond(payload, reqOpts);
}

term() {
this.doAck(TERM);
term(reason = "") {
let term = TERM;
if (reason?.length > 0) {
term = StringCodec().encode(`+TERM ${reason}`);
}
this.doAck(term);
}

json<T = unknown>(): T {
Expand Down
2 changes: 1 addition & 1 deletion jetstream/tests/jetstream_fetchconsumer_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import { Js409Errors } from "../jsutil.ts";
import { nuid } from "../../nats-base-client/nuid.ts";
import { deferred } from "../../nats-base-client/util.ts";
import { assertExists } from "https://deno.land/[email protected]/assert/assert_exists.ts";
import { consume } from "./jetstream_test.ts";
import { consume } from "./jstest_util.ts";

Deno.test("jetstream - fetch expires waits", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
Expand Down
73 changes: 61 additions & 12 deletions jetstream/tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { NatsServer } from "../../tests/helpers/launcher.ts";
import { initStream } from "./jstest_util.ts";
import {
AckPolicy,
Advisory,
consumerOpts,
DeliverPolicy,
JsHeaders,
Expand All @@ -42,7 +43,6 @@ import {
JSONCodec,
NatsError,
nuid,
QueuedIterator,
StringCodec,
} from "../../nats-base-client/mod.ts";
import {
Expand Down Expand Up @@ -95,17 +95,6 @@ export function callbackConsume(debug = false): JsMsgCallback {
};
}

export async function consume(iter: QueuedIterator<JsMsg>): Promise<JsMsg[]> {
const buf: JsMsg[] = [];
await (async () => {
for await (const m of iter) {
m.ack();
buf.push(m);
}
})();
return buf;
}

Deno.test("jetstream - default options", () => {
const opts = defaultJsOptions();
assertEquals(opts, { apiPrefix: "$JS.API", timeout: 5000 });
Expand Down Expand Up @@ -1626,3 +1615,63 @@ Deno.test("jetstream - source transforms", async () => {

await cleanup(ns, nc);
});

Deno.test("jetstream - term reason", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
if (await notCompatible(ns, nc, "2.11.0")) {
return;
}
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name: "foos",
subjects: ["foo.*"],
});

const js = nc.jetstream();

await Promise.all(
[
js.publish("foo.1"),
js.publish("foo.2"),
js.publish("foo.term"),
],
);

await jsm.consumers.add("foos", {
name: "bar",
ack_policy: AckPolicy.Explicit,
});

const termed = deferred<Advisory>();
const advisories = jsm.advisories();
(async () => {
for await (const a of advisories) {
if (a.kind === "terminated") {
termed.resolve(a);
break;
}
}
})().catch((err) => {
console.log(err);
});

const c = await js.consumers.get("foos", "bar");
const iter = await c.consume();
await (async () => {
for await (const m of iter) {
if (m.subject.endsWith(".term")) {
m.term("requested termination");
break;
} else {
m.ack();
}
}
})().catch();

const s = await termed;
const d = s.data as Record<string, unknown>;
assertEquals(d.type, "io.nats.jetstream.advisory.v1.terminated");
assertEquals(d.reason, "requested termination");

await cleanup(ns, nc);
});
20 changes: 19 additions & 1 deletion jetstream/tests/jstest_util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,32 @@
* limitations under the License.
*/

import { AckPolicy, nanos, PubAck, StreamConfig } from "../../src/mod.ts";
import {
AckPolicy,
JsMsg,
nanos,
PubAck,
QueuedIterator,
StreamConfig,
} from "../../src/mod.ts";
import { assert } from "https://deno.land/[email protected]/assert/mod.ts";
import {
Empty,
NatsConnection,
nuid,
} from "../../nats-base-client/internal_mod.ts";

export async function consume(iter: QueuedIterator<JsMsg>): Promise<JsMsg[]> {
const buf: JsMsg[] = [];
await (async () => {
for await (const m of iter) {
m.ack();
buf.push(m);
}
})();
return buf;
}

export async function initStream(
nc: NatsConnection,
stream: string = nuid.next(),
Expand Down

0 comments on commit 4225414

Please sign in to comment.