diff --git a/jetstream/tests/consumers_test.ts b/jetstream/tests/consumers_test.ts index 7ea6983f..455ffd15 100644 --- a/jetstream/tests/consumers_test.ts +++ b/jetstream/tests/consumers_test.ts @@ -284,7 +284,7 @@ Deno.test("consumers - consume", async () => { })); const count = 50_000; - const conf = await memStream(nc, count); + const conf = await memStream(nc, count, 0, 5000); const jsm = await nc.jetstreamManager(); await jsm.consumers.add(conf.stream, { diff --git a/jetstream/tests/jetstream_test.ts b/jetstream/tests/jetstream_test.ts index 240c22ae..55a6e60d 100644 --- a/jetstream/tests/jetstream_test.ts +++ b/jetstream/tests/jetstream_test.ts @@ -1132,7 +1132,9 @@ Deno.test("jetstream - fetch one - no wait breaks fast", async () => { await done; sw.mark(); - assert(25 > sw.duration()); + console.log({ duration: sw.duration() }); + const duration = sw.duration(); + assert(25 > duration, `${duration}`); assertEquals(batch.getReceived(), 1); await cleanup(ns, nc); }); @@ -2287,6 +2289,9 @@ Deno.test("jetstream - source", async () => { ], }); + // source will not process right away? + await delay(1000); + const ci = await jsm.consumers.add("work", { ack_policy: AckPolicy.Explicit, durable_name: "worker", diff --git a/jetstream/tests/jstest_util.ts b/jetstream/tests/jstest_util.ts index e71d1ddf..8897b9cf 100644 --- a/jetstream/tests/jstest_util.ts +++ b/jetstream/tests/jstest_util.ts @@ -127,8 +127,8 @@ export class Mark { } assertInRange(target: number) { - const min = .8 * target; - const max = 1.2 * target; + const min = .50 * target; + const max = 1.50 * target; const d = this.duration(); assert( d >= min && max >= d, diff --git a/nats-base-client/bench.ts b/nats-base-client/bench.ts index 78439bc2..d0e93227 100644 --- a/nats-base-client/bench.ts +++ b/nats-base-client/bench.ts @@ -402,11 +402,15 @@ export class Bench { } } -function throughput(bytes: number, seconds: number): string { - return humanizeBytes(bytes / seconds); +export function throughput(bytes: number, seconds: number): string { + return `${humanizeBytes(bytes / seconds)}/sec`; } -function humanizeBytes(bytes: number, si = false): string { +export function msgThroughput(msgs: number, seconds: number): string { + return `${(Math.floor(msgs / seconds))} msgs/sec`; +} + +export function humanizeBytes(bytes: number, si = false): string { const base = si ? 1000 : 1024; const pre = si ? ["k", "M", "G", "T", "P", "E"] @@ -414,11 +418,11 @@ function humanizeBytes(bytes: number, si = false): string { const post = si ? "iB" : "B"; if (bytes < base) { - return `${bytes.toFixed(2)} ${post}/sec`; + return `${bytes.toFixed(2)} ${post}`; } const exp = parseInt(Math.log(bytes) / Math.log(base) + ""); const index = parseInt((exp - 1) + ""); - return `${(bytes / Math.pow(base, exp)).toFixed(2)} ${pre[index]}${post}/sec`; + return `${(bytes / Math.pow(base, exp)).toFixed(2)} ${pre[index]}${post}`; } function humanizeNumber(n: number) { diff --git a/tests/idleheartbeats_test.ts b/tests/idleheartbeats_test.ts index d4ada3d4..48dc722a 100644 --- a/tests/idleheartbeats_test.ts +++ b/tests/idleheartbeats_test.ts @@ -65,15 +65,15 @@ Deno.test("idleheartbeat - timeout recover", async () => { return true; }, { maxOut: 5 }); - const interval = setInterval(() => { - h.work(); - }, 1000); - setTimeout(() => { h.cancel(); d.resolve(); clearInterval(interval); - }, 1650); + }, 1730); + + const interval = setInterval(() => { + h.work(); + }, 1000); await d; assertEquals(h.missed, 2); diff --git a/tests/reconnect_test.ts b/tests/reconnect_test.ts index 3e2116bc..1150f84b 100644 --- a/tests/reconnect_test.ts +++ b/tests/reconnect_test.ts @@ -14,6 +14,7 @@ */ import { assert, + assertAlmostEquals, assertEquals, fail, } from "https://deno.land/std@0.200.0/assert/mod.ts"; @@ -124,15 +125,15 @@ Deno.test("reconnect - reconnecting after proper delay", async () => { reconnectTimeWait: 500, maxReconnectAttempts: 1, }) as NatsConnectionImpl; - const serverLastConnect = nc.protocol.servers.getCurrentServer().lastConnect; + const first = nc.protocol.servers.getCurrentServer().lastConnect; const dt = deferred(); (async () => { for await (const e of nc.status()) { - switch (e.type) { + switch (e.type as string) { case DebugEvents.Reconnecting: { - const elapsed = Date.now() - serverLastConnect; - dt.resolve(elapsed); + const last = nc.protocol.servers.getCurrentServer().lastConnect; + dt.resolve(last - first); break; } } @@ -140,7 +141,7 @@ Deno.test("reconnect - reconnecting after proper delay", async () => { })().then(); await srv.stop(); const elapsed = await dt; - assert(elapsed >= 500 && elapsed <= 700, `elapsed was ${elapsed}`); + assert(elapsed >= 500 && elapsed <= 800, `elapsed was ${elapsed}`); await nc.closed(); }); diff --git a/tests/service_test.ts b/tests/service_test.ts index 1926d411..b6b3e365 100644 --- a/tests/service_test.ts +++ b/tests/service_test.ts @@ -344,7 +344,7 @@ Deno.test("service - callback error", async () => { await cleanup(ns, nc); }); -Deno.test("service -service error is headers", async () => { +Deno.test("service - service error is headers", async () => { const { ns, nc } = await setup(); const srv = await nc.services.add({ name: "test",