Skip to content

Commit

Permalink
Merge pull request #612 from nats-io/test-fixes
Browse files Browse the repository at this point in the history
[TEST] tweaks to locally flapping tests
  • Loading branch information
aricart authored Oct 14, 2023
2 parents 6755d72 + 39f5cc5 commit 29ce5ec
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 88 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
deno-version: ${{ matrix.deno-version }}

- name: Set NATS Server Version
run: echo "NATS_VERSION=v2.10.1" >> $GITHUB_ENV
run: echo "NATS_VERSION=v2.10.3" >> $GITHUB_ENV

# this here because dns seems to be wedged on gha
# - name: Add hosts to /etc/hosts
Expand Down
90 changes: 18 additions & 72 deletions jetstream/tests/consumers_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import {
} from "../consumer.ts";
import { deadline } from "../../nats-base-client/util.ts";
import { syncIterator } from "../../nats-base-client/core.ts";
import { setupStreamAndConsumer } from "../../examples/jetstream/util.ts";

Deno.test("consumers - min supported server", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
Expand Down Expand Up @@ -277,32 +278,20 @@ Deno.test("consumers - fetch exactly messages", async () => {
});

Deno.test("consumers - consume", async () => {
const { ns, nc } = await setup(jetstreamServerConf({
jetstream: {
max_memory_store: 1024 * 1024 * 1024,
},
}));

const count = 50_000;
const conf = await memStream(nc, count, 0, 5000);
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(conf.stream, {
durable_name: "b",
ack_policy: AckPolicy.Explicit,
});
const count = 1000;
const { stream, consumer } = await setupStreamAndConsumer(nc, count);

const js = nc.jetstream({ timeout: 30_000 });
const c = await js.consumers.get(stream, consumer);
const ci = await c.info();
assertEquals(ci.num_pending, count);
const start = Date.now();
const js = nc.jetstream();
const consumer = await js.consumers.get(conf.stream, "b");
assertEquals((await consumer.info(true)).num_pending, count);
const iter = await consumer.consume({
expires: 10_000,
max_messages: 50_000,
});
const iter = await c.consume({ expires: 2_000, max_messages: 10 });
for await (const m of iter) {
m.ack();
if (m.seq === count) {
if (m.info.pending === 0) {
const millis = Date.now() - start;
console.log(
`consumer: ${millis}ms - ${count / (millis / 1000)} msgs/sec`,
Expand All @@ -312,33 +301,23 @@ Deno.test("consumers - consume", async () => {
}
assertEquals(iter.getReceived(), count);
assertEquals(iter.getProcessed(), count);
assertEquals((await consumer.info()).num_pending, 0);
assertEquals((await c.info()).num_pending, 0);
await cleanup(ns, nc);
});

Deno.test("consumers - consume callback rejects iter", async () => {
const { ns, nc } = await setup(jetstreamServerConf({
jetstream: {
max_memory_store: 1024 * 1024 * 1024,
},
}));

const conf = await memStream(nc, 0);
const jsm = await nc.jetstreamManager();
await jsm.consumers.add(conf.stream, {
durable_name: "b",
ack_policy: AckPolicy.Explicit,
});

const { ns, nc } = await setup(jetstreamServerConf());
const { stream, consumer } = await setupStreamAndConsumer(nc, 0);
const js = nc.jetstream();
const consumer = await js.consumers.get(conf.stream, "b");
const iter = await consumer.consume({
expires: 10_000,
max_messages: 50_000,
const c = await js.consumers.get(stream, consumer);
const iter = await c.consume({
expires: 5_000,
max_messages: 10_000,
callback: (m) => {
m.ack();
},
});

await assertRejects(
async () => {
for await (const _o of iter) {
Expand All @@ -361,39 +340,6 @@ Deno.test("consumers - fetch heartbeats", async () => {
await consumerHbTest(true);
});

async function memStream(
nc: NatsConnection,
msgs = 1000,
size = 0,
batch = 10000,
): Promise<{ millis: number; stream: string; subj: string }> {
const jsm = await nc.jetstreamManager();
const stream = nuid.next();
const subj = nuid.next();
await jsm.streams.add({
name: stream,
subjects: [subj],
storage: StorageType.Memory,
});
const payload = new Uint8Array(size);

const js = nc.jetstream();
const start = Date.now();
const buf: Promise<PubAck>[] = [];
for (let i = 0; i < msgs; i++) {
buf.push(js.publish(subj, payload));
if (buf.length === batch) {
await Promise.all(buf);
buf.length = 0;
}
}
if (buf.length) {
await Promise.all(buf);
buf.length = 0;
}
return { millis: Date.now() - start, subj, stream };
}

/**
* Setup a cluster that has N nodes with the first node being just a connection
* server - rest are JetStream - min number of servers is 3
Expand Down
1 change: 1 addition & 0 deletions jetstream/tests/objectstore_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ Deno.test("objectstore - hashtests", async () => {
const oi = await os.put(
{ name: t.hash, options: { max_chunk_size: 9 } },
rs.stream(),
{ timeout: 20_000 },
);
assertEquals(oi.digest, `${digestType}${t.hash}`);
}
Expand Down
12 changes: 3 additions & 9 deletions tests/basics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1175,15 +1175,9 @@ Deno.test("basics - initial connect error", async () => {
Deno.test("basics - close promise resolves", async () => {
const ns = await NatsServer.start();
const nc = await connect({ port: ns.port, reconnect: false });
setTimeout(() => {
ns.stop();
});

await nc.closed().then((err) => {
assertEquals(err, undefined);
}).catch((err) => {
fail(err);
});
const results = await Promise.all([nc.closed(), nc.close()]);
assertEquals(results[0], undefined);
await ns.stop();
});

Deno.test("basics - inbox prefixes cannot have wildcards", async () => {
Expand Down
7 changes: 4 additions & 3 deletions tests/heartbeats_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ Deno.test("heartbeat - timers fire", async () => {
await delay(400);
assert(hb.timer);
hb.cancel();
// we can have a timer still running here - we need to wait for lag
await delay(50);
assertEquals(hb.timer, undefined);
assert(status.length >= 3, `status ${status.length} >= 3`);
assert(status.length >= 2, `status ${status.length} >= 2`);
assertEquals(status[0].type, DebugEvents.PingTimer);
});

Expand All @@ -82,7 +84,6 @@ Deno.test("heartbeat - errors fire on missed maxOut", async () => {

const hb = new Heartbeat(ph, 100, 3);
hb._schedule();

await disconnect;
assertEquals(hb.timer, undefined);
assert(status.length >= 7, `${status.length} >= 7`);
Expand All @@ -99,7 +100,7 @@ Deno.test("heartbeat - recovers from missed", async () => {

const hb = new Heartbeat(ph, 100, 3);
hb._schedule();
await delay(800);
await delay(850);
hb.cancel();
assertEquals(hb.timer, undefined);
assert(status.length >= 6, `${status.length} >= 6`);
Expand Down
2 changes: 1 addition & 1 deletion tests/idleheartbeats_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ Deno.test("idleheartbeat - timeout autocancel", async () => {
// and resource leaks for a timer if not cleared.

await d;
assert(h.count >= 7);
assertEquals(h.cancelAfter, 2000);
assertEquals(h.timer, 0);
assertEquals(h.autoCancelTimer, 0);
assert(h.count >= 7, `${h.count} >= 7`);
});
2 changes: 1 addition & 1 deletion tests/mrequest_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async function requestManyJitter(noMux = false): Promise<void> {
}
}
const time = Date.now() - start;
assert(500 > time);
assert(1000 > time);
assertEquals(iter.getProcessed(), 10);
await cleanup(ns, nc);
}
Expand Down
5 changes: 4 additions & 1 deletion tests/reconnect_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ Deno.test("reconnect - indefinite reconnects", async () => {
await srv.stop();
await lock;
await srv.stop();
assert(reconnects > 5);
assert(reconnects >= 5, `${reconnects} >= 5`);
assert(reconnect);
assertEquals(disconnects, 1);
});
Expand Down Expand Up @@ -334,6 +334,9 @@ Deno.test("reconnect - close stops reconnects", async () => {
// the promise will reject if deadline exceeds
fail(err);
});
// await some more, because the close could have a timer pending that
// didn't complete flapping the test on resource leak
await delay(750);
});

Deno.test("reconnect - stale connections don't close", async () => {
Expand Down

0 comments on commit 29ce5ec

Please sign in to comment.