Skip to content

Commit

Permalink
Merge pull request #619 from nats-io/fix-586
Browse files Browse the repository at this point in the history
Fix #586
  • Loading branch information
aricart authored Oct 19, 2023
2 parents fbff584 + a6b4aba commit bb1bd03
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 12 deletions.
8 changes: 4 additions & 4 deletions jetstream/jsm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ export class DirectMsgImpl implements DirectMsg {
}

get subject(): string {
return this.header.get(DirectMsgHeaders.Subject);
return this.header.last(DirectMsgHeaders.Subject);
}

get seq(): number {
const v = this.header.get(DirectMsgHeaders.Sequence);
const v = this.header.last(DirectMsgHeaders.Sequence);
return typeof v === "string" ? parseInt(v) : 0;
}

Expand All @@ -113,11 +113,11 @@ export class DirectMsgImpl implements DirectMsg {
}

get timestamp(): string {
return this.header.get(DirectMsgHeaders.TimeStamp);
return this.header.last(DirectMsgHeaders.TimeStamp);
}

get stream(): string {
return this.header.get(DirectMsgHeaders.Stream);
return this.header.last(DirectMsgHeaders.Stream);
}

json<T = unknown>(reviver?: ReviverFn): T {
Expand Down
8 changes: 1 addition & 7 deletions jetstream/tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,7 @@ import {
assertRejects,
assertStringIncludes,
} from "https://deno.land/[email protected]/assert/mod.ts";
import {
deferred,
Empty,
NatsConnection,
nuid,
StringCodec,
} from "../../nats-base-client/mod.ts";
import { deferred, Empty, StringCodec } from "../../nats-base-client/mod.ts";
import {
AckPolicy,
Consumer,
Expand Down
38 changes: 38 additions & 0 deletions jetstream/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1894,3 +1894,41 @@ Deno.test("kv - watch history no deletes", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - republish header handling", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
const n = nuid.next();
await jsm.streams.add({
name: n,
subjects: ["A.>"],
storage: StorageType.Memory,
republish: {
src: ">",
dest: `$KV.${n}.>`,
},
});

const js = nc.jetstream();
const kv = await js.views.kv(n);

nc.publish("A.orange", "hey");
await js.publish("A.tomato", "hello");
await kv.put("A.potato", "yo");

async function check(allow_direct = false): Promise<void> {
const B = await js.views.kv(n, { allow_direct });
let e = await B.get("A.orange");
assertExists(e);

e = await B.get("A.tomato");
assertExists(e);

e = await B.get("A.potato");
assertExists(e);
}
await check();
await check(true);

await cleanup(ns, nc);
});
2 changes: 2 additions & 0 deletions nats-base-client/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ export interface MsgHdrs extends Iterable<[string, string[]]> {
values(k: string, match?: Match): string[];

delete(k: string, match?: Match): void;

last(k: string, match?: Match): string;
}

export interface RequestOptions {
Expand Down
11 changes: 11 additions & 0 deletions nats-base-client/headers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,17 @@ export class MsgHdrsImpl implements MsgHdrs {
return "";
}

last(k: string, match = Match.Exact): string {
const keys = this.findKeys(k, match);
if (keys.length) {
const v = this.headers.get(keys[0]);
if (v) {
return Array.isArray(v) ? v[v.length - 1] : v;
}
}
return "";
}

has(k: string, match = Match.Exact): boolean {
return this.findKeys(k, match).length > 0;
}
Expand Down
1 change: 0 additions & 1 deletion tests/reconnect_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/
import {
assert,
assertAlmostEquals,
assertEquals,
fail,
} from "https://deno.land/[email protected]/assert/mod.ts";
Expand Down

0 comments on commit bb1bd03

Please sign in to comment.