Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: move to node:stream/web from homegrown API #194

Draft
wants to merge 7 commits into
base: next
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/curvy-pumpkins-rush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@livekit/agents": patch
"@livekit/agents-plugin-deepgram": patch
"@livekit/agents-plugin-elevenlabs": patch
"@livekit/agents-plugin-openai": patch
"@livekit/agents-plugin-silero": patch
---

chore: move to node:stream/web from homegrown API
29 changes: 17 additions & 12 deletions agents/src/llm/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
// SPDX-License-Identifier: Apache-2.0
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import { EventEmitter } from 'node:events';
import type { ReadableStream } from 'node:stream/web';
import { TransformStream } from 'node:stream/web';
import type { LLMMetrics } from '../metrics/base.js';
import { AsyncIterableQueue } from '../utils.js';
import type { ChatContext, ChatRole } from './chat_context.js';
import type { FunctionCallInfo, FunctionContext } from './function_context.js';

Expand Down Expand Up @@ -59,31 +60,32 @@ export abstract class LLM extends (EventEmitter as new () => TypedEmitter<LLMCal
}

export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
protected output = new AsyncIterableQueue<ChatChunk>();
protected queue = new AsyncIterableQueue<ChatChunk>();
protected output = new TransformStream<ChatChunk, ChatChunk>();
protected closed = false;
protected _functionCalls: FunctionCallInfo[] = [];
abstract label: string;

#llm: LLM;
#chatCtx: ChatContext;
#fncCtx?: FunctionContext;
#reader: ReadableStreamDefaultReader<ChatChunk>;

constructor(llm: LLM, chatCtx: ChatContext, fncCtx?: FunctionContext) {
this.#llm = llm;
this.#chatCtx = chatCtx;
this.#fncCtx = fncCtx;
this.monitorMetrics();
const [r1, r2] = this.output.readable.tee();
this.#reader = r1.getReader();
this.monitorMetrics(r2);
}

protected async monitorMetrics() {
protected async monitorMetrics(readable: ReadableStream<ChatChunk>) {
const startTime = process.hrtime.bigint();
let ttft: bigint | undefined;
let requestId = '';
let usage: CompletionUsage | undefined;

for await (const ev of this.queue) {
this.output.put(ev);
for await (const ev of readable) {
requestId = ev.requestId;
if (!ttft) {
ttft = process.hrtime.bigint() - startTime;
Expand All @@ -92,7 +94,6 @@ export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
usage = ev.usage;
}
}
this.output.close();

const duration = process.hrtime.bigint() - startTime;
const metrics: LLMMetrics = {
Expand Down Expand Up @@ -138,13 +139,17 @@ export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
return this._functionCalls;
}

next(): Promise<IteratorResult<ChatChunk>> {
return this.output.next();
async next(): Promise<IteratorResult<ChatChunk>> {
return this.#reader.read().then(({ value }) => {
if (value) {
return { value, done: false };
} else {
return { value: undefined, done: true };
}
});
}

close() {
this.output.close();
this.queue.close();
this.closed = true;
}

Expand Down
15 changes: 8 additions & 7 deletions agents/src/multimodal/agent_playout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import type { AudioFrame } from '@livekit/rtc-node';
import { type AudioSource } from '@livekit/rtc-node';
import { EventEmitter } from 'node:events';
import type { TransformStream } from 'node:stream/web';
import { AudioByteStream } from '../audio.js';
import type { TranscriptionForwarder } from '../transcription.js';
import { type AsyncIterableQueue, CancellablePromise, Future, gracefullyCancel } from '../utils.js';
import { CancellablePromise, Future, gracefullyCancel } from '../utils.js';

export const proto = {};

Expand Down Expand Up @@ -112,8 +113,8 @@ export class AgentPlayout extends EventEmitter {
itemId: string,
contentIndex: number,
transcriptionFwd: TranscriptionForwarder,
textStream: AsyncIterableQueue<string>,
audioStream: AsyncIterableQueue<AudioFrame>,
textStream: TransformStream<string, string>,
audioStream: TransformStream<AudioFrame, AudioFrame>,
): PlayoutHandle {
const handle = new PlayoutHandle(
this.#audioSource,
Expand All @@ -129,8 +130,8 @@ export class AgentPlayout extends EventEmitter {
#makePlayoutTask(
oldTask: CancellablePromise<void> | null,
handle: PlayoutHandle,
textStream: AsyncIterableQueue<string>,
audioStream: AsyncIterableQueue<AudioFrame>,
textStream: TransformStream<string, string>,
audioStream: TransformStream<AudioFrame, AudioFrame>,
): CancellablePromise<void> {
return new CancellablePromise<void>((resolve, reject, onCancel) => {
let cancelled = false;
Expand All @@ -155,7 +156,7 @@ export class AgentPlayout extends EventEmitter {

(async () => {
try {
for await (const text of textStream) {
for await (const text of textStream.readable) {
if (cancelledText || cancelled) {
break;
}
Expand Down Expand Up @@ -184,7 +185,7 @@ export class AgentPlayout extends EventEmitter {
samplesPerChannel,
);

for await (const frame of audioStream) {
for await (const frame of audioStream.readable) {
if (cancelledCapture || cancelled) {
break;
}
Expand Down
24 changes: 17 additions & 7 deletions agents/src/pipeline/agent_output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
//
// SPDX-License-Identifier: Apache-2.0
import type { AudioFrame } from '@livekit/rtc-node';
import { TransformStream } from 'node:stream/web';
import { log } from '../log.js';
import { SynthesizeStream, type TTS } from '../tts/index.js';
import { AsyncIterableQueue, CancellablePromise, Future, gracefullyCancel } from '../utils.js';
import { CancellablePromise, Future, gracefullyCancel } from '../utils.js';
import type { AgentPlayout, PlayoutHandle } from './agent_playout.js';

export type SpeechSource = AsyncIterable<string> | string | Promise<string>;
Expand All @@ -17,7 +18,10 @@ export class SynthesisHandle {
ttsSource: SpeechSource;
#agentPlayout: AgentPlayout;
tts: TTS;
queue = new AsyncIterableQueue<AudioFrame | typeof SynthesisHandle.FLUSH_SENTINEL>();
queue = new TransformStream<
AudioFrame | typeof SynthesisHandle.FLUSH_SENTINEL,
AudioFrame | typeof SynthesisHandle.FLUSH_SENTINEL
>();
#playHandle?: PlayoutHandle;
intFut = new Future();
#logger = log();
Expand Down Expand Up @@ -51,7 +55,7 @@ export class SynthesisHandle {
throw new Error('synthesis was interrupted');
}

this.#playHandle = this.#agentPlayout.play(this.#speechId, this.queue);
this.#playHandle = this.#agentPlayout.play(this.#speechId, this.queue.readable);
return this.#playHandle;
}

Expand Down Expand Up @@ -134,6 +138,8 @@ const stringSynthesisTask = (text: string, handle: SynthesisHandle): Cancellable
cancelled = true;
});

const writer = handle.queue.writable.getWriter();

const ttsStream = handle.tts.stream();
ttsStream.pushText(text);
ttsStream.flush();
Expand All @@ -142,9 +148,10 @@ const stringSynthesisTask = (text: string, handle: SynthesisHandle): Cancellable
if (cancelled || audio === SynthesizeStream.END_OF_STREAM) {
break;
}
handle.queue.put(audio.frame);
await writer.write(audio.frame);
}
handle.queue.put(SynthesisHandle.FLUSH_SENTINEL);
await writer.write(SynthesisHandle.FLUSH_SENTINEL);
writer.releaseLock();

resolve(text);
});
Expand All @@ -162,16 +169,19 @@ const streamSynthesisTask = (
cancelled = true;
});

const writer = handle.queue.writable.getWriter();

const ttsStream = handle.tts.stream();
const readGeneratedAudio = async () => {
for await (const audio of ttsStream) {
if (cancelled) break;
if (audio === SynthesizeStream.END_OF_STREAM) {
break;
}
handle.queue.put(audio.frame);
await writer.write(audio.frame);
}
handle.queue.put(SynthesisHandle.FLUSH_SENTINEL);
await writer.write(SynthesisHandle.FLUSH_SENTINEL);
writer.releaseLock();
};
readGeneratedAudio();

Expand Down
7 changes: 4 additions & 3 deletions agents/src/pipeline/agent_playout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import type { AudioFrame, AudioSource } from '@livekit/rtc-node';
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import EventEmitter from 'node:events';
import type { ReadableStream } from 'node:stream/web';
import { log } from '../log.js';
import { CancellablePromise, Future, gracefullyCancel } from '../utils.js';
import { SynthesisHandle } from './agent_output.js';
Expand All @@ -21,7 +22,7 @@ export type AgentPlayoutCallbacks = {
export class PlayoutHandle {
#speechId: string;
#audioSource: AudioSource;
playoutSource: AsyncIterable<AudioFrame | typeof SynthesisHandle.FLUSH_SENTINEL>;
playoutSource: ReadableStream<AudioFrame | typeof SynthesisHandle.FLUSH_SENTINEL>;
totalPlayedTime?: number;
#interrupted = false;
pushedDuration = 0;
Expand All @@ -31,7 +32,7 @@ export class PlayoutHandle {
constructor(
speechId: string,
audioSource: AudioSource,
playoutSource: AsyncIterable<AudioFrame | typeof SynthesisHandle.FLUSH_SENTINEL>,
playoutSource: ReadableStream<AudioFrame | typeof SynthesisHandle.FLUSH_SENTINEL>,
) {
this.#speechId = speechId;
this.#audioSource = audioSource;
Expand Down Expand Up @@ -90,7 +91,7 @@ export class AgentPlayout extends (EventEmitter as new () => TypedEmitter<AgentP

play(
speechId: string,
playoutSource: AsyncIterable<AudioFrame | typeof SynthesisHandle.FLUSH_SENTINEL>,
playoutSource: ReadableStream<AudioFrame | typeof SynthesisHandle.FLUSH_SENTINEL>,
): PlayoutHandle {
if (this.#closed) {
throw new Error('source closed');
Expand Down
19 changes: 13 additions & 6 deletions agents/src/pipeline/pipeline_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from '@livekit/rtc-node';
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import EventEmitter from 'node:events';
import { TransformStream } from 'node:stream/web';
import type {
CallableFunctionResult,
FunctionCallInfo,
Expand All @@ -30,7 +31,7 @@ import {
import type { SentenceTokenizer, WordTokenizer } from '../tokenize/tokenizer.js';
import type { TTS } from '../tts/index.js';
import { TTSEvent, StreamAdapter as TTSStreamAdapter } from '../tts/index.js';
import { AsyncIterableQueue, CancellablePromise, Future, gracefullyCancel } from '../utils.js';
import { CancellablePromise, Future, gracefullyCancel } from '../utils.js';
import { type VAD, type VADEvent, VADEventType } from '../vad.js';
import type { SpeechSource, SynthesisHandle } from './agent_output.js';
import { AgentOutput } from './agent_output.js';
Expand Down Expand Up @@ -241,7 +242,10 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
#transcribedText = '';
#transcribedInterimText = '';
#speechQueueOpen = new Future();
#speechQueue = new AsyncIterableQueue<SpeechHandle | typeof VoicePipelineAgent.FLUSH_SENTINEL>();
#speechQueue = new TransformStream<
SpeechHandle | typeof VoicePipelineAgent.FLUSH_SENTINEL,
SpeechHandle | typeof VoicePipelineAgent.FLUSH_SENTINEL
>();
#updateStateTask?: CancellablePromise<void>;
#started = false;
#room?: Room;
Expand All @@ -251,6 +255,7 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
#agentPublication?: LocalTrackPublication;
#lastFinalTranscriptTime?: number;
#lastSpeechTime?: number;
#writer: WritableStreamDefaultWriter<SpeechHandle | typeof VoicePipelineAgent.FLUSH_SENTINEL>;

constructor(
/** Voice Activity Detection instance. */
Expand Down Expand Up @@ -285,6 +290,8 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
this.#validateReplyIfPossible.bind(this),
this.#opts.minEndpointingDelay,
);

this.#writer = this.#speechQueue.writable.getWriter();
}

get fncCtx(): FunctionContext | undefined {
Expand Down Expand Up @@ -545,7 +552,7 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<

while (true) {
await this.#speechQueueOpen.await;
for await (const speech of this.#speechQueue) {
for await (const speech of this.#speechQueue.readable) {
if (speech === VoicePipelineAgent.FLUSH_SENTINEL) break;
this.#playingSpeech = speech;
await this.#playSpeech(speech);
Expand Down Expand Up @@ -868,7 +875,7 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
// in some bad timimg, we could end up with two pushed agent replies inside the speech queue.
// so make sure we directly interrupt every reply when validating a new one
if (this.#speechQueueOpen.done) {
for await (const speech of this.#speechQueue) {
for await (const speech of this.#speechQueue.readable) {
if (speech === VoicePipelineAgent.FLUSH_SENTINEL) break;
if (!speech.isReply) continue;
if (speech.allowInterruptions) speech.interrupt();
Expand Down Expand Up @@ -920,8 +927,8 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
}

#addSpeechForPlayout(handle: SpeechHandle) {
this.#speechQueue.put(handle);
this.#speechQueue.put(VoicePipelineAgent.FLUSH_SENTINEL);
this.#writer.write(handle);
this.#writer.write(VoicePipelineAgent.FLUSH_SENTINEL);
this.#speechQueueOpen.resolve();
}

Expand Down
9 changes: 5 additions & 4 deletions agents/src/stt/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export class StreamAdapterWrapper extends SpeechStream {

async #run() {
const forwardInput = async () => {
for await (const input of this.input) {
for await (const input of this.input.readable) {
if (input === SpeechStream.FLUSH_SENTINEL) {
this.#vadStream.flush();
} else {
Expand All @@ -63,20 +63,21 @@ export class StreamAdapterWrapper extends SpeechStream {
};

const recognize = async () => {
const writer = this.output.writable.getWriter();
for await (const ev of this.#vadStream) {
switch (ev.type) {
case VADEventType.START_OF_SPEECH:
this.output.put({ type: SpeechEventType.START_OF_SPEECH });
writer.write({ type: SpeechEventType.START_OF_SPEECH });
break;
case VADEventType.END_OF_SPEECH:
this.output.put({ type: SpeechEventType.END_OF_SPEECH });
writer.write({ type: SpeechEventType.END_OF_SPEECH });

const event = await this.#stt.recognize(ev.frames);
if (!event.alternatives![0].text) {
continue;
}

this.output.put(event);
writer.write(event);
break;
}
}
Expand Down
Loading
Loading