Skip to content

Commit

Permalink
ipc: implement job isolation
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp committed Apr 24, 2024
1 parent eadd5eb commit 0351b6d
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 78 deletions.
84 changes: 47 additions & 37 deletions agents/src/ipc/job_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,58 @@
//
// SPDX-License-Identifier: Apache-2.0

import { IPC_MESSAGE, JobMainArgs, Message, Ping, StartJobRequest } from './protocol';
import { IPC_MESSAGE, JobMainArgs, Message, Ping } from './protocol';
import { Room } from '@livekit/rtc-node';
import { EventEmitter, once } from 'events';
import { JobContext } from '../job_context';
import { log } from '../log';
import { AgentEntry } from '../job_request';
import { ChildProcess, fork } from 'child_process';
import { JobAssignment, ServerMessage } from '@livekit/protocol';

export const runJob = (args: JobMainArgs): ChildProcess => {
return fork(__filename, [args.raw, args.entry, args.fallbackURL]);
};

if (process.send) {
// process.argv:
// [0] `node' or `bun'
// [1] __filename
// [2] proto.JobAssignment, serialized to JSON string
// [3] entry function, serialized to string
// [4] fallback URL in case JobAssignment.url is empty

const msg = new ServerMessage();
msg.fromJsonString(process.argv[2]);
const args = msg.message.value as JobAssignment;

export const runJob = async (event: EventEmitter, args: JobMainArgs) => {
const room = new Room();
const conn = room.connect(args.url, args.token);
let request: StartJobRequest | undefined = undefined;
const closeEvent = new EventEmitter();
let shuttingDown = false;
let closed = false;
let task: AgentEntry | undefined = undefined;
let context: JobContext | undefined = undefined;

const start = () => {
if (request && room.isConnected && !closed) {
event.emit('msg', { type: IPC_MESSAGE.StartJobResponse });
process.on('message', (msg: Message) => {
if (msg.type === IPC_MESSAGE.ShutdownRequest) {
shuttingDown = true;
closed = true;
closeEvent.emit('close');
} else if (msg.type === IPC_MESSAGE.Ping) {
process.send!({
type: IPC_MESSAGE.Pong,
lastTimestamp: (msg as Ping).timestamp,
timestamp: Date.now(),
});
}
});

task = args.acceptData.entry;
context = new JobContext(event, request.job, room);
// don't do anything on C-c
process.on('SIGINT', () => {});

task(context);
const conn = room.connect(args.url || process.argv[4], args.token);

const start = () => {
if (room.isConnected && !closed) {
process.send!({ type: IPC_MESSAGE.StartJobResponse });
new Function('return ' + process.argv[3])()(new JobContext(closeEvent, args.job!, room));
}
};

Expand All @@ -35,35 +63,17 @@ export const runJob = async (event: EventEmitter, args: JobMainArgs) => {
if (!closed) start();
})
.catch((err) => {
if (!closed) event.emit('msg', { type: IPC_MESSAGE.StartJobResponse, err });
});
});

event.on('msg', (msg: Message) => {
if (msg.type === IPC_MESSAGE.ShutdownRequest) {
shuttingDown = true;
closed = true;
event.emit('close')
} else if (msg.type === IPC_MESSAGE.StartJobRequest) {
request = msg as StartJobRequest;
start();
} else if (msg.type === IPC_MESSAGE.Ping) {
event.emit('msg', {
type: IPC_MESSAGE.Pong,
lastTimestamp: (msg as Ping).timestamp,
timestamp: Date.now(),
if (!closed) process.send!({ type: IPC_MESSAGE.StartJobResponse, err });
});
}
});

await once(event, 'close');
await once(closeEvent, 'close');
log.debug('disconnecting from room');
await room.disconnect();
if (shuttingDown) {
event.emit('msg', { type: IPC_MESSAGE.ShutdownResponse });
process.send({ type: IPC_MESSAGE.ShutdownResponse });
} else {
event.emit('msg', { type: IPC_MESSAGE.UserExit });
closed = true;
process.send({ type: IPC_MESSAGE.UserExit });
}
event.emit('exit');
};
process.exit();
}
41 changes: 21 additions & 20 deletions agents/src/ipc/job_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import { Job } from '@livekit/protocol';
import { IPC_MESSAGE, JobMainArgs, Message, Pong, StartJobResponse } from './protocol';
import { runJob } from './job_main';
import { EventEmitter, once } from 'events';
import { once } from 'events';
import { log } from '../log';
import { AcceptData } from '../job_request';
import { Logger } from 'pino';
import { AcceptData } from '../job_request';
import { ChildProcess } from 'child_process';

const START_TIMEOUT = 90 * 1000;
const PING_INTERVAL = 5 * 1000;
Expand All @@ -18,16 +19,15 @@ const HIGH_PING_THRESHOLD = 10;
export class JobProcess {
#job: Job;
args: JobMainArgs;
event: EventEmitter;
logger: Logger;
process?: ChildProcess;
startTimeout?: Timer;
pongTimeout?: Timer;
pingInterval?: Timer;

constructor(job: Job, url: string, token: string, acceptData: AcceptData) {
constructor(job: Job, acceptData: AcceptData, raw: string, fallbackURL: string) {
this.#job = job;
this.args = { jobID: job.id, url, token, acceptData };
this.event = new EventEmitter();
this.args = { entry: acceptData.entry.toString(), raw: raw, fallbackURL };
this.logger = log.child({ job_id: this.#job.id });
}

Expand All @@ -37,41 +37,42 @@ export class JobProcess {

async close() {
this.logger.info('closing job process');
this.event.emit('msg', { type: IPC_MESSAGE.ShutdownRequest });
await this.clear();
await once(this.event, 'exit')
this.process!.send({ type: IPC_MESSAGE.ShutdownRequest });
await once(this.process!, 'disconnect');
this.logger.info('job process closed');
}

async clear() {
clearTimeout(this.startTimeout)
clearTimeout(this.pongTimeout)
clearTimeout(this.pingInterval)
clearTimeout(this.startTimeout);
clearTimeout(this.pongTimeout);
clearInterval(this.pingInterval);
}

async run() {
let resp: StartJobResponse | undefined = undefined;

runJob(this.event, this.args);
this.event.emit('msg', { type: IPC_MESSAGE.StartJobRequest, job: this.job });

this.startTimeout = setTimeout(() => {
if (resp === undefined) {
this.logger.error('process start timed out, killing job');
this.close();
this.process?.kill();
this.clear();
}
}, START_TIMEOUT);

this.pingInterval = setInterval(() => {
this.event.emit('msg', { type: IPC_MESSAGE.Ping, timestamp: Date.now() });
this.process?.send({ type: IPC_MESSAGE.Ping, timestamp: Date.now() });
}, PING_INTERVAL);

this.pongTimeout = setTimeout(() => {
this.logger.error('job ping timed out, killing job');
this.close();
this.process?.kill();
this.clear();
}, PING_TIMEOUT);

this.event.on('msg', (msg: Message) => {
this.process = runJob(this.args);

this.process.on('message', (msg: Message) => {
if (msg.type === IPC_MESSAGE.StartJobResponse) {
resp = msg as StartJobResponse;
} else if (msg.type === IPC_MESSAGE.Pong) {
Expand All @@ -83,10 +84,10 @@ export class JobProcess {
this.pongTimeout.refresh();
} else if (msg.type === IPC_MESSAGE.UserExit || msg.type === IPC_MESSAGE.ShutdownResponse) {
this.logger.info('job exiting');
this.close();
this.clear();
}
});

await once(this.event, 'exit');
await once(this.process, 'disconnect');
}
}
15 changes: 3 additions & 12 deletions agents/src/ipc/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,16 @@
//
// SPDX-License-Identifier: Apache-2.0

import { Job } from '@livekit/protocol';
import { AcceptData } from '../job_request';

export type JobMainArgs = {
jobID: string;
url: string;
token: string;
acceptData: AcceptData;
entry: string;
raw: string;
fallbackURL: string;
};

export interface Message {
type: IPC_MESSAGE;
}

export interface StartJobRequest extends Message {
type: IPC_MESSAGE.StartJobRequest;
job: Job;
}

export interface StartJobResponse extends Message {
type: IPC_MESSAGE.StartJobResponse;
err?: Error;
Expand Down
27 changes: 18 additions & 9 deletions agents/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import {
WorkerMessage,
ParticipantPermission,
ServerMessage,
JobAssignment,
AvailabilityRequest,
JobAssignment,
} from '@livekit/protocol';
import { AcceptData } from './job_request';
import { HTTPServer } from './http_server';
Expand Down Expand Up @@ -120,11 +120,18 @@ class ActiveJob {
}
}

type AssignmentPair = {
// this string is the JSON string version of the JobAssignment.
// we keep it around to unpack it again in the child, because we can't pass Job directly.
raw: string;
asgn: JobAssignment;
};

class PendingAssignment {
promise = new Promise<JobAssignment>((resolve) => {
promise = new Promise<AssignmentPair>((resolve) => {
this.resolve = resolve; // oh, JavaScript.
});
resolve(arg: JobAssignment) {
resolve(arg: AssignmentPair) {
arg;
}
}
Expand Down Expand Up @@ -201,8 +208,8 @@ export class Worker {
await Promise.all([workerWS(), this.httpServer.run()]);
}

startProcess(job: Job, url: string, token: string, acceptData: AcceptData) {
const proc = new JobProcess(job, url, token, acceptData);
startProcess(job: Job, acceptData: AcceptData, raw: string) {
const proc = new JobProcess(job, acceptData, raw, this.opts.wsURL);
this.processes[job.id] = { proc, activeJob: new ActiveJob(job, acceptData) };
proc
.run()
Expand Down Expand Up @@ -256,7 +263,10 @@ export class Worker {
if (job.id in this.pending) {
const task = this.pending[job.id];
delete this.pending[job.id];
task.value.resolve(msg.message.value);
task.value.resolve({
asgn: msg.message.value,
raw: msg.toJsonString(),
});
} else {
log.child({ job }).warn('received assignment for unknown job ' + job.id);
}
Expand Down Expand Up @@ -328,10 +338,9 @@ export class Worker {
log.child({ req }).warn(`assignment for job ${req.id} timed out`);
return;
}, ASSIGNMENT_TIMEOUT);
this.pending[req.id].value.promise.then((value) => {
this.pending[req.id].value.promise.then(({ asgn, raw }) => {
clearTimeout(timer);
const url = value.url || this.opts.wsURL;
this.startProcess(value!.job!, url, value.token, av.data!);
this.startProcess(asgn!.job!, av.data!, raw);
});
});

Expand Down

0 comments on commit 0351b6d

Please sign in to comment.