From a6b4aba99c2c5971ad98dc6a07ad191d96a840cf Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 17 Oct 2023 17:12:53 -0500 Subject: [PATCH] [FEAT] added `last()` to MsgHdrs to retrieve the last header value if multiple headers share the same name. [FIX] changed direct header implementation to read last header value instead of first. If message was republished, the last header value will be the correct one for the message --- jetstream/jsm.ts | 8 +++---- jetstream/tests/consumers_test.ts | 8 +------ jetstream/tests/kv_test.ts | 38 +++++++++++++++++++++++++++++++ nats-base-client/core.ts | 2 ++ nats-base-client/headers.ts | 11 +++++++++ tests/reconnect_test.ts | 1 - 6 files changed, 56 insertions(+), 12 deletions(-) diff --git a/jetstream/jsm.ts b/jetstream/jsm.ts index c743cd47..1671986b 100644 --- a/jetstream/jsm.ts +++ b/jetstream/jsm.ts @@ -100,11 +100,11 @@ export class DirectMsgImpl implements DirectMsg { } get subject(): string { - return this.header.get(DirectMsgHeaders.Subject); + return this.header.last(DirectMsgHeaders.Subject); } get seq(): number { - const v = this.header.get(DirectMsgHeaders.Sequence); + const v = this.header.last(DirectMsgHeaders.Sequence); return typeof v === "string" ? parseInt(v) : 0; } @@ -113,11 +113,11 @@ export class DirectMsgImpl implements DirectMsg { } get timestamp(): string { - return this.header.get(DirectMsgHeaders.TimeStamp); + return this.header.last(DirectMsgHeaders.TimeStamp); } get stream(): string { - return this.header.get(DirectMsgHeaders.Stream); + return this.header.last(DirectMsgHeaders.Stream); } json(reviver?: ReviverFn): T { diff --git a/jetstream/tests/consumers_test.ts b/jetstream/tests/consumers_test.ts index 21e6f339..46b8893a 100644 --- a/jetstream/tests/consumers_test.ts +++ b/jetstream/tests/consumers_test.ts @@ -19,13 +19,7 @@ import { assertRejects, assertStringIncludes, } from "https://deno.land/std@0.200.0/assert/mod.ts"; -import { - deferred, - Empty, - NatsConnection, - nuid, - StringCodec, -} from "../../nats-base-client/mod.ts"; +import { deferred, Empty, StringCodec } from "../../nats-base-client/mod.ts"; import { AckPolicy, Consumer, diff --git a/jetstream/tests/kv_test.ts b/jetstream/tests/kv_test.ts index 486a4eb7..02ed85a0 100644 --- a/jetstream/tests/kv_test.ts +++ b/jetstream/tests/kv_test.ts @@ -1894,3 +1894,41 @@ Deno.test("kv - watch history no deletes", async () => { await cleanup(ns, nc); }); + +Deno.test("kv - republish header handling", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + const jsm = await nc.jetstreamManager(); + const n = nuid.next(); + await jsm.streams.add({ + name: n, + subjects: ["A.>"], + storage: StorageType.Memory, + republish: { + src: ">", + dest: `$KV.${n}.>`, + }, + }); + + const js = nc.jetstream(); + const kv = await js.views.kv(n); + + nc.publish("A.orange", "hey"); + await js.publish("A.tomato", "hello"); + await kv.put("A.potato", "yo"); + + async function check(allow_direct = false): Promise { + const B = await js.views.kv(n, { allow_direct }); + let e = await B.get("A.orange"); + assertExists(e); + + e = await B.get("A.tomato"); + assertExists(e); + + e = await B.get("A.potato"); + assertExists(e); + } + await check(); + await check(true); + + await cleanup(ns, nc); +}); diff --git a/nats-base-client/core.ts b/nats-base-client/core.ts index 0e6fe3fc..cb0ff888 100644 --- a/nats-base-client/core.ts +++ b/nats-base-client/core.ts @@ -384,6 +384,8 @@ export interface MsgHdrs extends Iterable<[string, string[]]> { values(k: string, match?: Match): string[]; delete(k: string, match?: Match): void; + + last(k: string, match?: Match): string; } export interface RequestOptions { diff --git a/nats-base-client/headers.ts b/nats-base-client/headers.ts index fe0398c8..815ab4a7 100644 --- a/nats-base-client/headers.ts +++ b/nats-base-client/headers.ts @@ -217,6 +217,17 @@ export class MsgHdrsImpl implements MsgHdrs { return ""; } + last(k: string, match = Match.Exact): string { + const keys = this.findKeys(k, match); + if (keys.length) { + const v = this.headers.get(keys[0]); + if (v) { + return Array.isArray(v) ? v[v.length - 1] : v; + } + } + return ""; + } + has(k: string, match = Match.Exact): boolean { return this.findKeys(k, match).length > 0; } diff --git a/tests/reconnect_test.ts b/tests/reconnect_test.ts index 2a858ac2..913eec17 100644 --- a/tests/reconnect_test.ts +++ b/tests/reconnect_test.ts @@ -14,7 +14,6 @@ */ import { assert, - assertAlmostEquals, assertEquals, fail, } from "https://deno.land/std@0.200.0/assert/mod.ts";