diff --git a/agents/src/ipc/job_main.ts b/agents/src/ipc/job_main.ts index c794dd5f..3a40f1b4 100644 --- a/agents/src/ipc/job_main.ts +++ b/agents/src/ipc/job_main.ts @@ -2,30 +2,58 @@ // // SPDX-License-Identifier: Apache-2.0 -import { IPC_MESSAGE, JobMainArgs, Message, Ping, StartJobRequest } from './protocol'; +import { IPC_MESSAGE, JobMainArgs, Message, Ping } from './protocol'; import { Room } from '@livekit/rtc-node'; import { EventEmitter, once } from 'events'; import { JobContext } from '../job_context'; import { log } from '../log'; -import { AgentEntry } from '../job_request'; +import { ChildProcess, fork } from 'child_process'; +import { JobAssignment, ServerMessage } from '@livekit/protocol'; + +export const runJob = (args: JobMainArgs): ChildProcess => { + return fork(__filename, [args.raw, args.entry, args.fallbackURL]); +}; + +if (process.send) { + // process.argv: + // [0] `node' or `bun' + // [1] __filename + // [2] proto.JobAssignment, serialized to JSON string + // [3] entry function, serialized to string + // [4] fallback URL in case JobAssignment.url is empty + + const msg = new ServerMessage(); + msg.fromJsonString(process.argv[2]); + const args = msg.message.value as JobAssignment; -export const runJob = async (event: EventEmitter, args: JobMainArgs) => { const room = new Room(); - const conn = room.connect(args.url, args.token); - let request: StartJobRequest | undefined = undefined; + const closeEvent = new EventEmitter(); let shuttingDown = false; let closed = false; - let task: AgentEntry | undefined = undefined; - let context: JobContext | undefined = undefined; - const start = () => { - if (request && room.isConnected && !closed) { - event.emit('msg', { type: IPC_MESSAGE.StartJobResponse }); + process.on('message', (msg: Message) => { + if (msg.type === IPC_MESSAGE.ShutdownRequest) { + shuttingDown = true; + closed = true; + closeEvent.emit('close'); + } else if (msg.type === IPC_MESSAGE.Ping) { + process.send!({ + type: IPC_MESSAGE.Pong, + lastTimestamp: (msg as Ping).timestamp, + timestamp: Date.now(), + }); + } + }); - task = args.acceptData.entry; - context = new JobContext(event, request.job, room); + // don't do anything on C-c + process.on('SIGINT', () => {}); - task(context); + const conn = room.connect(args.url || process.argv[4], args.token); + + const start = () => { + if (room.isConnected && !closed) { + process.send!({ type: IPC_MESSAGE.StartJobResponse }); + new Function('return ' + process.argv[3])()(new JobContext(closeEvent, args.job!, room)); } }; @@ -35,35 +63,17 @@ export const runJob = async (event: EventEmitter, args: JobMainArgs) => { if (!closed) start(); }) .catch((err) => { - if (!closed) event.emit('msg', { type: IPC_MESSAGE.StartJobResponse, err }); - }); - }); - - event.on('msg', (msg: Message) => { - if (msg.type === IPC_MESSAGE.ShutdownRequest) { - shuttingDown = true; - closed = true; - event.emit('close') - } else if (msg.type === IPC_MESSAGE.StartJobRequest) { - request = msg as StartJobRequest; - start(); - } else if (msg.type === IPC_MESSAGE.Ping) { - event.emit('msg', { - type: IPC_MESSAGE.Pong, - lastTimestamp: (msg as Ping).timestamp, - timestamp: Date.now(), + if (!closed) process.send!({ type: IPC_MESSAGE.StartJobResponse, err }); }); - } }); - await once(event, 'close'); + await once(closeEvent, 'close'); log.debug('disconnecting from room'); await room.disconnect(); if (shuttingDown) { - event.emit('msg', { type: IPC_MESSAGE.ShutdownResponse }); + process.send({ type: IPC_MESSAGE.ShutdownResponse }); } else { - event.emit('msg', { type: IPC_MESSAGE.UserExit }); - closed = true; + process.send({ type: IPC_MESSAGE.UserExit }); } - event.emit('exit'); -}; + process.exit(); +} diff --git a/agents/src/ipc/job_process.ts b/agents/src/ipc/job_process.ts index 6e3cdeca..b1b7346a 100644 --- a/agents/src/ipc/job_process.ts +++ b/agents/src/ipc/job_process.ts @@ -5,10 +5,11 @@ import { Job } from '@livekit/protocol'; import { IPC_MESSAGE, JobMainArgs, Message, Pong, StartJobResponse } from './protocol'; import { runJob } from './job_main'; -import { EventEmitter, once } from 'events'; +import { once } from 'events'; import { log } from '../log'; -import { AcceptData } from '../job_request'; import { Logger } from 'pino'; +import { AcceptData } from '../job_request'; +import { ChildProcess } from 'child_process'; const START_TIMEOUT = 90 * 1000; const PING_INTERVAL = 5 * 1000; @@ -18,16 +19,15 @@ const HIGH_PING_THRESHOLD = 10; export class JobProcess { #job: Job; args: JobMainArgs; - event: EventEmitter; logger: Logger; + process?: ChildProcess; startTimeout?: Timer; pongTimeout?: Timer; pingInterval?: Timer; - constructor(job: Job, url: string, token: string, acceptData: AcceptData) { + constructor(job: Job, acceptData: AcceptData, raw: string, fallbackURL: string) { this.#job = job; - this.args = { jobID: job.id, url, token, acceptData }; - this.event = new EventEmitter(); + this.args = { entry: acceptData.entry.toString(), raw: raw, fallbackURL }; this.logger = log.child({ job_id: this.#job.id }); } @@ -37,41 +37,42 @@ export class JobProcess { async close() { this.logger.info('closing job process'); - this.event.emit('msg', { type: IPC_MESSAGE.ShutdownRequest }); await this.clear(); - await once(this.event, 'exit') + this.process!.send({ type: IPC_MESSAGE.ShutdownRequest }); + await once(this.process!, 'disconnect'); this.logger.info('job process closed'); } async clear() { - clearTimeout(this.startTimeout) - clearTimeout(this.pongTimeout) - clearTimeout(this.pingInterval) + clearTimeout(this.startTimeout); + clearTimeout(this.pongTimeout); + clearInterval(this.pingInterval); } async run() { let resp: StartJobResponse | undefined = undefined; - runJob(this.event, this.args); - this.event.emit('msg', { type: IPC_MESSAGE.StartJobRequest, job: this.job }); - this.startTimeout = setTimeout(() => { if (resp === undefined) { this.logger.error('process start timed out, killing job'); - this.close(); + this.process?.kill(); + this.clear(); } }, START_TIMEOUT); this.pingInterval = setInterval(() => { - this.event.emit('msg', { type: IPC_MESSAGE.Ping, timestamp: Date.now() }); + this.process?.send({ type: IPC_MESSAGE.Ping, timestamp: Date.now() }); }, PING_INTERVAL); this.pongTimeout = setTimeout(() => { this.logger.error('job ping timed out, killing job'); - this.close(); + this.process?.kill(); + this.clear(); }, PING_TIMEOUT); - this.event.on('msg', (msg: Message) => { + this.process = runJob(this.args); + + this.process.on('message', (msg: Message) => { if (msg.type === IPC_MESSAGE.StartJobResponse) { resp = msg as StartJobResponse; } else if (msg.type === IPC_MESSAGE.Pong) { @@ -83,10 +84,10 @@ export class JobProcess { this.pongTimeout.refresh(); } else if (msg.type === IPC_MESSAGE.UserExit || msg.type === IPC_MESSAGE.ShutdownResponse) { this.logger.info('job exiting'); - this.close(); + this.clear(); } }); - await once(this.event, 'exit'); + await once(this.process, 'disconnect'); } } diff --git a/agents/src/ipc/protocol.ts b/agents/src/ipc/protocol.ts index 91e10632..0190e87e 100644 --- a/agents/src/ipc/protocol.ts +++ b/agents/src/ipc/protocol.ts @@ -2,25 +2,16 @@ // // SPDX-License-Identifier: Apache-2.0 -import { Job } from '@livekit/protocol'; -import { AcceptData } from '../job_request'; - export type JobMainArgs = { - jobID: string; - url: string; - token: string; - acceptData: AcceptData; + entry: string; + raw: string; + fallbackURL: string; }; export interface Message { type: IPC_MESSAGE; } -export interface StartJobRequest extends Message { - type: IPC_MESSAGE.StartJobRequest; - job: Job; -} - export interface StartJobResponse extends Message { type: IPC_MESSAGE.StartJobResponse; err?: Error; diff --git a/agents/src/worker.ts b/agents/src/worker.ts index cc37c067..9a02c67d 100644 --- a/agents/src/worker.ts +++ b/agents/src/worker.ts @@ -11,8 +11,8 @@ import { WorkerMessage, ParticipantPermission, ServerMessage, - JobAssignment, AvailabilityRequest, + JobAssignment, } from '@livekit/protocol'; import { AcceptData } from './job_request'; import { HTTPServer } from './http_server'; @@ -120,11 +120,18 @@ class ActiveJob { } } +type AssignmentPair = { + // this string is the JSON string version of the JobAssignment. + // we keep it around to unpack it again in the child, because we can't pass Job directly. + raw: string; + asgn: JobAssignment; +}; + class PendingAssignment { - promise = new Promise((resolve) => { + promise = new Promise((resolve) => { this.resolve = resolve; // oh, JavaScript. }); - resolve(arg: JobAssignment) { + resolve(arg: AssignmentPair) { arg; } } @@ -201,8 +208,8 @@ export class Worker { await Promise.all([workerWS(), this.httpServer.run()]); } - startProcess(job: Job, url: string, token: string, acceptData: AcceptData) { - const proc = new JobProcess(job, url, token, acceptData); + startProcess(job: Job, acceptData: AcceptData, raw: string) { + const proc = new JobProcess(job, acceptData, raw, this.opts.wsURL); this.processes[job.id] = { proc, activeJob: new ActiveJob(job, acceptData) }; proc .run() @@ -256,7 +263,10 @@ export class Worker { if (job.id in this.pending) { const task = this.pending[job.id]; delete this.pending[job.id]; - task.value.resolve(msg.message.value); + task.value.resolve({ + asgn: msg.message.value, + raw: msg.toJsonString(), + }); } else { log.child({ job }).warn('received assignment for unknown job ' + job.id); } @@ -328,10 +338,9 @@ export class Worker { log.child({ req }).warn(`assignment for job ${req.id} timed out`); return; }, ASSIGNMENT_TIMEOUT); - this.pending[req.id].value.promise.then((value) => { + this.pending[req.id].value.promise.then(({ asgn, raw }) => { clearTimeout(timer); - const url = value.url || this.opts.wsURL; - this.startProcess(value!.job!, url, value.token, av.data!); + this.startProcess(asgn!.job!, av.data!, raw); }); });