Skip to content

Commit

Permalink
[FEAT] cluster tool support for websocket and tls
Browse files Browse the repository at this point in the history
[FIX] nats cluster port for first node now on standard 4225
[FIX] cluster listen for cluster members, websocket, and http monitoring is now stable (selected ports were random for all cluster nodes on every restart)
  • Loading branch information
aricart committed Dec 3, 2023
1 parent 37f5bd6 commit 60ad741
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 17 deletions.
58 changes: 52 additions & 6 deletions tests/helpers/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import { rgb24 } from "https://deno.land/[email protected]/fmt/colors.ts";
const defaults = {
c: 3,
p: 4222,
w: 80,
chaos: false,
cert: "",
key: "",
};

const argv = parse(
Expand All @@ -31,6 +34,7 @@ const argv = parse(
"c": ["count"],
"d": ["debug"],
"j": ["jetstream"],
"w": ["websocket"],
},
default: defaults,
boolean: ["debug", "jetstream", "server-debug", "chaos"],
Expand All @@ -39,18 +43,60 @@ const argv = parse(

if (argv.h || argv.help) {
console.log(
"usage: cluster [--count 3] [--port 4222] [--debug] [--jetstream] [--chaos]\n",
"usage: cluster [--count 3] [--port 4222] [--key path] [--cert path] [--websocket 80] [--debug] [--jetstream] [--chaos]\n",
);
Deno.exit(0);
}

const count = argv["count"] as number || 3;
const port = argv["port"] as number ?? 4222;
const cert = argv["cert"] as string || undefined;
const key = argv["key"] as string || undefined;

if (cert?.length) {
await Deno.stat(cert).catch((err) => {
console.error(`error loading certificate: ${err.message}`);
Deno.exit(1);
});
}
if (key?.length) {
await Deno.stat(key).catch((err) => {
console.error(`error loading certificate key: ${err.message}`);
Deno.exit(1);
});
}
let wsport = argv["websocket"] as number;
if (wsport === 80 && cert?.length) {
wsport = 443;
}

let chaosTimer: number | undefined;
let cluster: NatsServer[];

try {
const base = { debug: false };
const base = {
debug: false,
tls: {},
websocket: {
port: wsport,
no_tls: true,
compression: true,
tls: {},
},
};

if (cert) {
base.tls = {
cert_file: cert,
key_file: key,
};
base.websocket.no_tls = false;
base.websocket.tls = {
cert_file: cert,
key_file: key,
};
}

const serverDebug = argv["debug"];
if (serverDebug) {
base.debug = true;
Expand All @@ -76,12 +122,12 @@ try {

cluster.forEach((s) => {
const pid = rgb24(`[${s.process.pid}]`, s.rgb);
console.log(
`${pid} ${s.configFile} at nats://${s.hostname}:${s.port}
console.log(`${pid} config ${s.configFile}
\tnats://${s.hostname}:${s.port}
\tws${cert ? "s" : ""}://${s.hostname}:${s.websocket}
\tcluster://${s.hostname}:${s.cluster}
\thttp://127.0.0.1:${s.monitoring}
\tstore: ${s.config?.jetstream?.store_dir}`,
);
${argv.jetstream ? `\tstore: ${s.config?.jetstream?.store_dir}` : ""}`);
});

if (argv.chaos === true && confirm("start chaos?")) {
Expand Down
40 changes: 29 additions & 11 deletions tests/helpers/launcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ export interface JSZ {
};
}

function parseHostport(s?: string) {
function parseHostport(
s?: string,
): { hostname: string; port: number } | undefined {
if (!s) {
return;
}
Expand Down Expand Up @@ -192,12 +194,10 @@ export class NatsServer implements PortInfo {
})();
}

restart(): Promise<NatsServer> {
return this.stop().then(() => {
const conf = JSON.parse(JSON.stringify(this.config));
conf.port = this.port;
return NatsServer.start(conf, this.debug);
});
async restart(): Promise<NatsServer> {
await this.stop();
const conf = JSON.parse(JSON.stringify(this.config));
return await NatsServer.start(conf, this.debug);
}

pid(): number {
Expand Down Expand Up @@ -427,7 +427,7 @@ export class NatsServer implements PortInfo {
conf = Object.assign({}, conf);
conf.cluster = conf.cluster || {};
conf.cluster.name = nuid.next();
conf.cluster.listen = conf.cluster.listen || "127.0.0.1:-1";
conf.cluster.listen = conf.cluster.listen || "127.0.0.1:4225";

const ns = await NatsServer.start(conf, debug);
const cluster = [ns];
Expand Down Expand Up @@ -489,10 +489,13 @@ export class NatsServer implements PortInfo {
conf = conf || {};
conf = JSON.parse(JSON.stringify(conf));
conf.port = -1;
if (conf.websocket) {
conf.websocket.port = -1;
}
conf.http = "127.0.0.1:-1";
conf.cluster = conf.cluster || {};
conf.cluster.name = ns.clusterName;
conf.cluster.listen = conf.cluster.listen || "127.0.0.1:-1";
conf.cluster.listen = "127.0.0.1:-1";
conf.cluster.routes = [`nats://${ns.hostname}:${ns.cluster}`];

return NatsServer.start(conf, debug);
Expand All @@ -513,7 +516,6 @@ export class NatsServer implements PortInfo {
/**
* this is only expecting authentication type changes
* @param conf
* @param debug
*/
async reload(conf?: any): Promise<void> {
conf = NatsServer.confDefaults(conf);
Expand All @@ -535,7 +537,7 @@ export class NatsServer implements PortInfo {
conf = NatsServer.confDefaults(conf);
conf.ports_file_dir = tmp;

const confFile = await Deno.makeTempFileSync();
const confFile = Deno.makeTempFileSync();
await Deno.writeFile(confFile, new TextEncoder().encode(toConf(conf)));
if (debug) {
console.info(`${exe} -c ${confFile}`);
Expand Down Expand Up @@ -584,6 +586,22 @@ export class NatsServer implements PortInfo {
if (conf.cluster?.name) {
ports.clusterName = conf.cluster.name;
}

// if we have -1 ports, lets freeze them
conf.port = ports.port;
if (ports.cluster) {
conf.cluster.listen = `${ports.hostname}:${ports.cluster}`;
conf.cluster.name = ports.clusterName;
}
if (ports.monitoring) {
conf.http = ports.monitoring;
}
if (ports.websocket) {
conf.websocket = conf.websocket || {};
conf.websocket.port = ports.websocket;
}
await Deno.writeFile(confFile, new TextEncoder().encode(toConf(conf)));

try {
await check(
async () => {
Expand Down

0 comments on commit 60ad741

Please sign in to comment.