Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify entry file heuristics #9

Merged
merged 2 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions agents/src/generator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { JobContext } from './job_context.js';

type entryFunction = (job: JobContext) => Promise<void>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it return Promise<void | () => void> to reflect that it could return a function?

Copy link
Member Author

@nbsp nbsp May 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it return anything? all of the examples on Python's agents return void, and regardless we don't do anything with the potential return value of this function, we just run it.
there's no reason to return anything because there's nowhere to return it to

if we really want to allow returning things for whatever reason we should replace void with unknown to prevent type errors

Copy link
Contributor

@lukasIO lukasIO May 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the confusion is my fault, I thought it'd be nice to provide some sort of resource cleanup functionality, but I thought about it just in terms of API and not in terms of what's already implemented... I guess that we should just remove the comment that mentions the return cleanup function and leave the return type as Promise<void>.


export interface Agent {
entry: entryFunction;
}

/**
* Helper to define an agent according to the required interface.
* @example `export default defineAgent(myAgent);`
*/
export function defineAgent(entry: entryFunction): Agent {
return { entry };
}
1 change: 1 addition & 0 deletions agents/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ export * from './job_request.js';
export * from './worker.js';
export * from './utils.js';
export * from './log.js';
export * from './generator.js';

export { cli, stt, tts };
14 changes: 7 additions & 7 deletions agents/src/ipc/job_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import { log } from '../log.js';
import { IPC_MESSAGE, type JobMainArgs, type Message, type Ping } from './protocol.js';

export const runJob = (args: JobMainArgs): ChildProcess => {
return fork(__filename, [args.raw, args.entry, args.fallbackURL]);
return fork(import.meta.filename, [args.raw, args.entry, args.fallbackURL]);
};

if (process.send) {
// process.argv:
// [0] `node' or `bun'
// [1] __filename
// [0] `node'
// [1] import.meta.filename
// [2] proto.JobAssignment, serialized to JSON string
// [3] __filename of function containing entry file
// [3] import.meta.filename of function containing entry file
// [4] fallback URL in case JobAssignment.url is empty

const msg = new ServerMessage();
Expand Down Expand Up @@ -54,9 +54,9 @@ if (process.send) {
process.send!({ type: IPC_MESSAGE.StartJobResponse });

// here we import the file containing the exported entry function, and call it.
// the function in that file /has/ to be called [entry] and /has/ to be exported.
import(process.argv[3]).then((ext) => {
ext.entry(new JobContext(closeEvent, args.job!, room));
// the file must export default an Agent, usually using defineAgent().
import(process.argv[3]).then((agent) => {
agent.default.entry(new JobContext(closeEvent, args.job!, room));
});
}
};
Expand Down
7 changes: 3 additions & 4 deletions agents/src/ipc/job_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { once } from 'events';
import type { Logger } from 'pino';
import type { AcceptData } from '../job_request.js';
import { log } from '../log.js';
import { runJob } from './job_main.js';
import {
IPC_MESSAGE,
type JobMainArgs,
Expand Down Expand Up @@ -41,7 +40,7 @@ export class JobProcess {
}

async close() {
this.logger.info('closing job process');
this.logger.debug('closing job process');
await this.clear();
this.process!.send({ type: IPC_MESSAGE.ShutdownRequest });
await once(this.process!, 'disconnect');
Expand Down Expand Up @@ -75,7 +74,7 @@ export class JobProcess {
this.clear();
}, PING_TIMEOUT);

this.process = runJob(this.args);
this.process = await import('./job_main.js').then((main) => main.runJob(this.args));

this.process.on('message', (msg: Message) => {
if (msg.type === IPC_MESSAGE.StartJobResponse) {
Expand All @@ -87,7 +86,7 @@ export class JobProcess {
}
this.pongTimeout?.refresh();
} else if (msg.type === IPC_MESSAGE.UserExit || msg.type === IPC_MESSAGE.ShutdownResponse) {
this.logger.info('job exiting');
this.logger.debug('job exiting');
this.clear();
}
});
Expand Down
2 changes: 2 additions & 0 deletions agents/src/tts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
import { StreamAdapter, StreamAdapterWrapper } from './stream_adapter.js';
import {
ChunkedStream,
SynthesisEvent,
SynthesisEventType,
SynthesizeStream,
Expand All @@ -18,4 +19,5 @@ export {
SynthesizeStream,
StreamAdapter,
StreamAdapterWrapper,
ChunkedStream,
};
20 changes: 8 additions & 12 deletions agents/src/tts/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0
import type { SentenceStream, SentenceTokenizer } from '../tokenize.js';
import {
SynthesisEvent,
SynthesisEventType,
SynthesizeStream,
type SynthesizedAudio,
TTS,
} from './tts.js';
import { ChunkedStream, SynthesisEvent, SynthesisEventType, SynthesizeStream, TTS } from './tts.js';

export class StreamAdapterWrapper extends SynthesizeStream {
closed: boolean;
Expand Down Expand Up @@ -41,10 +35,12 @@ export class StreamAdapterWrapper extends SynthesizeStream {
reject(new Error('cancelled'));
};
for await (const sentence of this.sentenceStream) {
const audio = await this.tts.synthesize(sentence.text);
this.eventQueue.push(new SynthesisEvent(SynthesisEventType.STARTED));
this.eventQueue.push(new SynthesisEvent(SynthesisEventType.AUDIO, audio));
this.eventQueue.push(new SynthesisEvent(SynthesisEventType.FINISHED));
const audio = await this.tts.synthesize(sentence.text).then((data) => data.next());
if (!audio.done) {
this.eventQueue.push(new SynthesisEvent(SynthesisEventType.STARTED));
this.eventQueue.push(new SynthesisEvent(SynthesisEventType.AUDIO, audio.value));
this.eventQueue.push(new SynthesisEvent(SynthesisEventType.FINISHED));
}
}
}
}
Expand Down Expand Up @@ -86,7 +82,7 @@ export class StreamAdapter extends TTS {
this.tokenizer = tokenizer;
}

synthesize(text: string): Promise<SynthesizedAudio> {
synthesize(text: string): Promise<ChunkedStream> {
return this.tts.synthesize(text);
}

Expand Down
24 changes: 23 additions & 1 deletion agents/src/tts/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0
import type { AudioFrame } from '@livekit/rtc-node';
import { mergeFrames } from '../utils.js';

export interface SynthesizedAudio {
text: string;
Expand Down Expand Up @@ -46,11 +47,32 @@ export abstract class TTS {
this.#streamingSupported = streamingSupported;
}

abstract synthesize(text: string): Promise<SynthesizedAudio>;
abstract synthesize(text: string): Promise<ChunkedStream>;

abstract stream(): SynthesizeStream;

get streamingSupported(): boolean {
return this.#streamingSupported;
}
}

export abstract class ChunkedStream implements AsyncIterableIterator<SynthesizedAudio> {
async collect(): Promise<AudioFrame> {
const frames = [];
for await (const ev of this) {
frames.push(ev.data);
}
return mergeFrames(frames);
}

abstract close(): Promise<void>;
abstract next(): Promise<IteratorResult<SynthesizedAudio>>;

[Symbol.iterator](): ChunkedStream {
return this;
}

[Symbol.asyncIterator](): ChunkedStream {
return this;
}
}
6 changes: 3 additions & 3 deletions agents/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ export class Worker {
this.processes[job.id] = { proc, activeJob: new ActiveJob(job, acceptData) };
proc
.run()
.catch(() => {
proc.logger.error(`error running job process ${proc.job.id}`);
.catch((e) => {
proc.logger.error(`error running job process ${proc.job.id}: ${e}`);
})
.finally(() => {
proc.clear();
Expand Down Expand Up @@ -374,7 +374,7 @@ export class Worker {
async close() {
if (this.closed) return;
this.closed = true;
this.logger.info('shutting down worker');
this.logger.debug('shutting down worker');
await this.httpServer.close();
for await (const value of Object.values(this.processes)) {
await value.proc.close();
Expand Down
21 changes: 10 additions & 11 deletions examples/src/minimal.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { type JobContext, type JobRequest, WorkerOptions, cli } from '@livekit/agents';
import { type JobContext, type JobRequest, WorkerOptions, cli, defineAgent } from '@livekit/agents';
import { fileURLToPath } from 'url';

// your entry file *has* to include an exported function [entry].
// this file will be imported from inside the library, and this function
// will be called.
export const entry = async (job: JobContext) => {
console.log('starting voice assistant...');
job;
// etc
};

const requestFunc = async (req: JobRequest) => {
console.log('received request', req);
await req.accept(__filename);
await req.accept(import.meta.filename);
};

if (process.argv[1] === fileURLToPath(import.meta.url)) {
cli.runApp(new WorkerOptions({ requestFunc }));
}

// your entry file has to provide a default export of type Agent.
// use the defineAgent() helper function to generate your agent.
export default defineAgent(async (job: JobContext) => {
console.log('starting voice assistant...');
job;
// etc
});
47 changes: 30 additions & 17 deletions examples/src/tts.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { type JobContext, type JobRequest, WorkerOptions, cli, log } from '@livekit/agents';
import {
type JobContext,
type JobRequest,
WorkerOptions,
cli,
defineAgent,
log,
} from '@livekit/agents';
import { TTS } from '@livekit/agents-plugin-elevenlabs';
import { AudioSource, LocalAudioTrack, TrackPublishOptions, TrackSource } from '@livekit/rtc-node';
import { fileURLToPath } from 'url';

export const entry = async (job: JobContext) => {
const requestFunc = async (req: JobRequest) => {
await req.accept(import.meta.filename);
};

if (process.argv[1] === fileURLToPath(import.meta.url)) {
cli.runApp(new WorkerOptions({ requestFunc }));
}

export default defineAgent(async (job: JobContext) => {
log.info('starting TTS example agent');

const source = new AudioSource(24000, 1);
Expand All @@ -17,22 +32,20 @@ export const entry = async (job: JobContext) => {

const tts = new TTS();
log.info('speaking "Hello!"');
await tts.synthesize('Hello!').then((output) => {
source.captureFrame(output.data);
});
await tts
.synthesize('Hello!')
.then((output) => output.collect())
.then((output) => {
source.captureFrame(output);
});

await new Promise((resolve) => setTimeout(resolve, 1000));

log.info('speaking "Goodbye."');
await tts.synthesize('Goodbye.').then((output) => {
source.captureFrame(output.data);
});
};

const requestFunc = async (req: JobRequest) => {
await req.accept(__filename);
};

if (process.argv[1] === fileURLToPath(import.meta.url)) {
cli.runApp(new WorkerOptions({ requestFunc }));
}
await tts
.synthesize('Goodbye.')
.then((output) => output.collect())
.then((output) => {
source.captureFrame(output);
});
});
Loading
Loading