From 6a4069c357bb76d2f964ee22a6e5e9eb32c0aff4 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Mon, 30 Oct 2023 16:46:17 -0500 Subject: [PATCH] [FEAT] added `reason` to JsMsg#term - this feature requires 2.11.0 [TEST] moved util to different test file otherwise some tests run multiple times --- README.md | 56 +++++++------- jetstream/jsmsg.ts | 12 ++- .../tests/jetstream_fetchconsumer_test.ts | 2 +- jetstream/tests/jetstream_test.ts | 73 ++++++++++++++++--- jetstream/tests/jstest_util.ts | 20 ++++- 5 files changed, 118 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index 46492584..e4a7f029 100644 --- a/README.md +++ b/README.md @@ -775,35 +775,35 @@ more performant or appropriate. The following is the list of connection options and default values. -| Option | Default | Description | -| ----------------------- | ------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `authenticator` | none | Specifies the authenticator function that sets the client credentials. | -| `debug` | `false` | If `true`, the client prints protocol interactions to the console. Useful for debugging. | -| `ignoreClusterUpdates` | `false` | If `true` the client will ignore any cluster updates provided by the server. | -| `ignoreAuthErrorAbort` | `false` | Prevents client connection aborts if the client fails more than twice in a row with an authentication error | -| `inboxPrefix` | `"_INBOX"` | Sets de prefix for automatically created inboxes - `createInbox(prefix)` | -| `maxPingOut` | `2` | Max number of pings the client will allow unanswered before raising a stale connection error. | -| `maxReconnectAttempts` | `10` | Sets the maximum number of reconnect attempts. The value of `-1` specifies no limit. | -| `name` | | Optional client name - recommended to be set to a unique client name. | -| `noAsyncTraces` | `false` | When `true` the client will not add additional context to errors associated with request operations. Setting this option to `true` will greatly improve performance of request/reply and JetStream publishers. | +| Option | Default | Description | +| ----------------------- | ------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `authenticator` | none | Specifies the authenticator function that sets the client credentials. | +| `debug` | `false` | If `true`, the client prints protocol interactions to the console. Useful for debugging. | +| `ignoreClusterUpdates` | `false` | If `true` the client will ignore any cluster updates provided by the server. | +| `ignoreAuthErrorAbort` | `false` | Prevents client connection aborts if the client fails more than twice in a row with an authentication error | +| `inboxPrefix` | `"_INBOX"` | Sets de prefix for automatically created inboxes - `createInbox(prefix)` | +| `maxPingOut` | `2` | Max number of pings the client will allow unanswered before raising a stale connection error. | +| `maxReconnectAttempts` | `10` | Sets the maximum number of reconnect attempts. The value of `-1` specifies no limit. | +| `name` | | Optional client name - recommended to be set to a unique client name. | +| `noAsyncTraces` | `false` | When `true` the client will not add additional context to errors associated with request operations. Setting this option to `true` will greatly improve performance of request/reply and JetStream publishers. | | `noEcho` | `false` | Subscriptions receive messages published by the client. Requires server support (1.2.0). If set to true, and the server does not support the feature, an error with code `NO_ECHO_NOT_SUPPORTED` is emitted, and the connection is aborted. Note that it is possible for this error to be emitted on reconnect when the server reconnects to a server that does not support the feature. | -| `noRandomize` | `false` | If set, the order of user-specified servers is randomized. | -| `pass` | | Sets the password for a connection. | -| `pedantic` | `false` | Turns on strict subject format checks. | -| `pingInterval` | `120000` | Number of milliseconds between client-sent pings. | -| `port` | `4222` | Port to connect to (only used if `servers` is not specified). | -| `reconnect` | `true` | If false, client will not attempt reconnecting. | -| `reconnectDelayHandler` | Generated function | A function that returns the number of millis to wait before the next connection to a server it connected to `()=>number`. | -| `reconnectJitter` | `100` | Number of millis to randomize after `reconnectTimeWait`. | -| `reconnectJitterTLS` | `1000` | Number of millis to randomize after `reconnectTimeWait` when TLS options are specified. | -| `reconnectTimeWait` | `2000` | If disconnected, the client will wait the specified number of milliseconds between reconnect attempts. | -| `servers` | `"localhost:4222"` | String or Array of hostport for servers. | -| `timeout` | 20000 | Number of milliseconds the client will wait for a connection to be established. If it fails it will emit a `connection_timeout` event with a NatsError that provides the hostport of the server where the connection was attempted. | -| `tls` | TlsOptions | A configuration object for requiring a TLS connection (not applicable to nats.ws). | -| `token` | | Sets a authorization token for a connection. | -| `user` | | Sets the username for a connection. | -| `verbose` | `false` | Turns on `+OK` protocol acknowledgements. | -| `waitOnFirstConnect` | `false` | If `true` the client will fall back to a reconnect mode if it fails its first connection attempt. | +| `noRandomize` | `false` | If set, the order of user-specified servers is randomized. | +| `pass` | | Sets the password for a connection. | +| `pedantic` | `false` | Turns on strict subject format checks. | +| `pingInterval` | `120000` | Number of milliseconds between client-sent pings. | +| `port` | `4222` | Port to connect to (only used if `servers` is not specified). | +| `reconnect` | `true` | If false, client will not attempt reconnecting. | +| `reconnectDelayHandler` | Generated function | A function that returns the number of millis to wait before the next connection to a server it connected to `()=>number`. | +| `reconnectJitter` | `100` | Number of millis to randomize after `reconnectTimeWait`. | +| `reconnectJitterTLS` | `1000` | Number of millis to randomize after `reconnectTimeWait` when TLS options are specified. | +| `reconnectTimeWait` | `2000` | If disconnected, the client will wait the specified number of milliseconds between reconnect attempts. | +| `servers` | `"localhost:4222"` | String or Array of hostport for servers. | +| `timeout` | 20000 | Number of milliseconds the client will wait for a connection to be established. If it fails it will emit a `connection_timeout` event with a NatsError that provides the hostport of the server where the connection was attempted. | +| `tls` | TlsOptions | A configuration object for requiring a TLS connection (not applicable to nats.ws). | +| `token` | | Sets a authorization token for a connection. | +| `user` | | Sets the username for a connection. | +| `verbose` | `false` | Turns on `+OK` protocol acknowledgements. | +| `waitOnFirstConnect` | `false` | If `true` the client will fall back to a reconnect mode if it fails its first connection attempt. | ### TlsOptions diff --git a/jetstream/jsmsg.ts b/jetstream/jsmsg.ts index 91aa5288..acfdd02f 100644 --- a/jetstream/jsmsg.ts +++ b/jetstream/jsmsg.ts @@ -99,8 +99,10 @@ export interface JsMsg { /** * Indicate to the JetStream server that processing of the message * failed and that the message should not be sent to the consumer again. + * @param reason is a string describing why the message was termed. Note + * that `reason` is only available on servers 2.11.0 or better. */ - term(): void; + term(reason?: string): void; /** * Indicate to the JetStream server that the message was processed @@ -281,8 +283,12 @@ export class JsMsgImpl implements JsMsg { this.msg.respond(payload, reqOpts); } - term() { - this.doAck(TERM); + term(reason = "") { + let term = TERM; + if (reason?.length > 0) { + term = StringCodec().encode(`+TERM ${reason}`); + } + this.doAck(term); } json(): T { diff --git a/jetstream/tests/jetstream_fetchconsumer_test.ts b/jetstream/tests/jetstream_fetchconsumer_test.ts index 85a78e9c..d19f727c 100644 --- a/jetstream/tests/jetstream_fetchconsumer_test.ts +++ b/jetstream/tests/jetstream_fetchconsumer_test.ts @@ -38,7 +38,7 @@ import { Js409Errors } from "../jsutil.ts"; import { nuid } from "../../nats-base-client/nuid.ts"; import { deferred } from "../../nats-base-client/util.ts"; import { assertExists } from "https://deno.land/std@0.200.0/assert/assert_exists.ts"; -import { consume } from "./jetstream_test.ts"; +import { consume } from "./jstest_util.ts"; Deno.test("jetstream - fetch expires waits", async () => { const { ns, nc } = await setup(jetstreamServerConf({}, true)); diff --git a/jetstream/tests/jetstream_test.ts b/jetstream/tests/jetstream_test.ts index 9bc48724..e596fcb0 100644 --- a/jetstream/tests/jetstream_test.ts +++ b/jetstream/tests/jetstream_test.ts @@ -17,6 +17,7 @@ import { NatsServer } from "../../tests/helpers/launcher.ts"; import { initStream } from "./jstest_util.ts"; import { AckPolicy, + Advisory, consumerOpts, DeliverPolicy, JsHeaders, @@ -42,7 +43,6 @@ import { JSONCodec, NatsError, nuid, - QueuedIterator, StringCodec, } from "../../nats-base-client/mod.ts"; import { @@ -95,17 +95,6 @@ export function callbackConsume(debug = false): JsMsgCallback { }; } -export async function consume(iter: QueuedIterator): Promise { - const buf: JsMsg[] = []; - await (async () => { - for await (const m of iter) { - m.ack(); - buf.push(m); - } - })(); - return buf; -} - Deno.test("jetstream - default options", () => { const opts = defaultJsOptions(); assertEquals(opts, { apiPrefix: "$JS.API", timeout: 5000 }); @@ -1626,3 +1615,63 @@ Deno.test("jetstream - source transforms", async () => { await cleanup(ns, nc); }); + +Deno.test("jetstream - term reason", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + if (await notCompatible(ns, nc, "2.11.0")) { + return; + } + const jsm = await nc.jetstreamManager(); + await jsm.streams.add({ + name: "foos", + subjects: ["foo.*"], + }); + + const js = nc.jetstream(); + + await Promise.all( + [ + js.publish("foo.1"), + js.publish("foo.2"), + js.publish("foo.term"), + ], + ); + + await jsm.consumers.add("foos", { + name: "bar", + ack_policy: AckPolicy.Explicit, + }); + + const termed = deferred(); + const advisories = jsm.advisories(); + (async () => { + for await (const a of advisories) { + if (a.kind === "terminated") { + termed.resolve(a); + break; + } + } + })().catch((err) => { + console.log(err); + }); + + const c = await js.consumers.get("foos", "bar"); + const iter = await c.consume(); + await (async () => { + for await (const m of iter) { + if (m.subject.endsWith(".term")) { + m.term("requested termination"); + break; + } else { + m.ack(); + } + } + })().catch(); + + const s = await termed; + const d = s.data as Record; + assertEquals(d.type, "io.nats.jetstream.advisory.v1.terminated"); + assertEquals(d.reason, "requested termination"); + + await cleanup(ns, nc); +}); diff --git a/jetstream/tests/jstest_util.ts b/jetstream/tests/jstest_util.ts index 8897b9cf..fd62357c 100644 --- a/jetstream/tests/jstest_util.ts +++ b/jetstream/tests/jstest_util.ts @@ -13,7 +13,14 @@ * limitations under the License. */ -import { AckPolicy, nanos, PubAck, StreamConfig } from "../../src/mod.ts"; +import { + AckPolicy, + JsMsg, + nanos, + PubAck, + QueuedIterator, + StreamConfig, +} from "../../src/mod.ts"; import { assert } from "https://deno.land/std@0.200.0/assert/mod.ts"; import { Empty, @@ -21,6 +28,17 @@ import { nuid, } from "../../nats-base-client/internal_mod.ts"; +export async function consume(iter: QueuedIterator): Promise { + const buf: JsMsg[] = []; + await (async () => { + for await (const m of iter) { + m.ack(); + buf.push(m); + } + })(); + return buf; +} + export async function initStream( nc: NatsConnection, stream: string = nuid.next(),