diff --git a/jetstream/jsmsg.ts b/jetstream/jsmsg.ts index 91aa5288..acfdd02f 100644 --- a/jetstream/jsmsg.ts +++ b/jetstream/jsmsg.ts @@ -99,8 +99,10 @@ export interface JsMsg { /** * Indicate to the JetStream server that processing of the message * failed and that the message should not be sent to the consumer again. + * @param reason is a string describing why the message was termed. Note + * that `reason` is only available on servers 2.11.0 or better. */ - term(): void; + term(reason?: string): void; /** * Indicate to the JetStream server that the message was processed @@ -281,8 +283,12 @@ export class JsMsgImpl implements JsMsg { this.msg.respond(payload, reqOpts); } - term() { - this.doAck(TERM); + term(reason = "") { + let term = TERM; + if (reason?.length > 0) { + term = StringCodec().encode(`+TERM ${reason}`); + } + this.doAck(term); } json(): T { diff --git a/jetstream/tests/jetstream_fetchconsumer_test.ts b/jetstream/tests/jetstream_fetchconsumer_test.ts index 85a78e9c..d19f727c 100644 --- a/jetstream/tests/jetstream_fetchconsumer_test.ts +++ b/jetstream/tests/jetstream_fetchconsumer_test.ts @@ -38,7 +38,7 @@ import { Js409Errors } from "../jsutil.ts"; import { nuid } from "../../nats-base-client/nuid.ts"; import { deferred } from "../../nats-base-client/util.ts"; import { assertExists } from "https://deno.land/std@0.200.0/assert/assert_exists.ts"; -import { consume } from "./jetstream_test.ts"; +import { consume } from "./jstest_util.ts"; Deno.test("jetstream - fetch expires waits", async () => { const { ns, nc } = await setup(jetstreamServerConf({}, true)); diff --git a/jetstream/tests/jetstream_test.ts b/jetstream/tests/jetstream_test.ts index 9bc48724..e596fcb0 100644 --- a/jetstream/tests/jetstream_test.ts +++ b/jetstream/tests/jetstream_test.ts @@ -17,6 +17,7 @@ import { NatsServer } from "../../tests/helpers/launcher.ts"; import { initStream } from "./jstest_util.ts"; import { AckPolicy, + Advisory, consumerOpts, DeliverPolicy, JsHeaders, @@ -42,7 +43,6 @@ import { JSONCodec, NatsError, nuid, - QueuedIterator, StringCodec, } from "../../nats-base-client/mod.ts"; import { @@ -95,17 +95,6 @@ export function callbackConsume(debug = false): JsMsgCallback { }; } -export async function consume(iter: QueuedIterator): Promise { - const buf: JsMsg[] = []; - await (async () => { - for await (const m of iter) { - m.ack(); - buf.push(m); - } - })(); - return buf; -} - Deno.test("jetstream - default options", () => { const opts = defaultJsOptions(); assertEquals(opts, { apiPrefix: "$JS.API", timeout: 5000 }); @@ -1626,3 +1615,63 @@ Deno.test("jetstream - source transforms", async () => { await cleanup(ns, nc); }); + +Deno.test("jetstream - term reason", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.11.0")) { + return; + } + const jsm = await nc.jetstreamManager(); + await jsm.streams.add({ + name: "foos", + subjects: ["foo.*"], + }); + + const js = nc.jetstream(); + + await Promise.all( + [ + js.publish("foo.1"), + js.publish("foo.2"), + js.publish("foo.term"), + ], + ); + + await jsm.consumers.add("foos", { + name: "bar", + ack_policy: AckPolicy.Explicit, + }); + + const termed = deferred(); + const advisories = jsm.advisories(); + (async () => { + for await (const a of advisories) { + if (a.kind === "terminated") { + termed.resolve(a); + break; + } + } + })().catch((err) => { + console.log(err); + }); + + const c = await js.consumers.get("foos", "bar"); + const iter = await c.consume(); + await (async () => { + for await (const m of iter) { + if (m.subject.endsWith(".term")) { + m.term("requested termination"); + break; + } else { + m.ack(); + } + } + })().catch(); + + const s = await termed; + const d = s.data as Record; + assertEquals(d.type, "io.nats.jetstream.advisory.v1.terminated"); + assertEquals(d.reason, "requested termination"); + + await cleanup(ns, nc); +}); diff --git a/jetstream/tests/jstest_util.ts b/jetstream/tests/jstest_util.ts index 8897b9cf..fd62357c 100644 --- a/jetstream/tests/jstest_util.ts +++ b/jetstream/tests/jstest_util.ts @@ -13,7 +13,14 @@ * limitations under the License. */ -import { AckPolicy, nanos, PubAck, StreamConfig } from "../../src/mod.ts"; +import { + AckPolicy, + JsMsg, + nanos, + PubAck, + QueuedIterator, + StreamConfig, +} from "../../src/mod.ts"; import { assert } from "https://deno.land/std@0.200.0/assert/mod.ts"; import { Empty, @@ -21,6 +28,17 @@ import { nuid, } from "../../nats-base-client/internal_mod.ts"; +export async function consume(iter: QueuedIterator): Promise { + const buf: JsMsg[] = []; + await (async () => { + for await (const m of iter) { + m.ack(); + buf.push(m); + } + })(); + return buf; +} + export async function initStream( nc: NatsConnection, stream: string = nuid.next(),