Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipelineagent: expose timing metrics & api errors wip #957

Merged
merged 24 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/neat-dogs-work.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"livekit-agents": patch
"livekit-plugins-silero": patch
---

pipelineagent: expose timing metrics
269 changes: 169 additions & 100 deletions livekit-agents/livekit/agents/pipeline/agent_output.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import inspect
import time
from typing import Any, AsyncIterable, Awaitable, Callable, Union

Expand All @@ -9,6 +10,7 @@
from .. import llm, tokenize, utils
from .. import transcription as agent_transcription
from .. import tts as text_to_speech
from . import metrics
from .agent_playout import AgentPlayout, PlayoutHandle
from .log import logger

Expand Down Expand Up @@ -80,7 +82,7 @@ def interrupt(self) -> None:
return

logger.debug(
"interrupting synthesis/playout",
"agent interrupted",
extra={"speech_id": self.speech_id},
)

Expand All @@ -98,12 +100,14 @@ def __init__(
agent_playout: AgentPlayout,
llm: llm.LLM,
tts: text_to_speech.TTS,
metrics_emitter: metrics.PipelineMetrics,
) -> None:
self._room, self._agent_playout, self._llm, self._tts = (
self._room, self._agent_playout, self._llm, self._tts, self._metrics_emitter = (
room,
agent_playout,
llm,
tts,
metrics_emitter,
)
self._tasks = set[asyncio.Task[Any]]()

Expand Down Expand Up @@ -174,9 +178,9 @@ async def _synthesize_task(self, handle: SynthesisHandle) -> None:
transcript_source = await transcript_source

if isinstance(tts_source, str):
co = _str_synthesis_task(tts_source, transcript_source, handle)
co = self._str_synthesis_task(tts_source, transcript_source, handle)
else:
co = _stream_synthesis_task(tts_source, transcript_source, handle)
co = self._stream_synthesis_task(tts_source, transcript_source, handle)

synth = asyncio.create_task(co)
synth.add_done_callback(lambda _: handle._buf_ch.close())
Expand All @@ -187,118 +191,183 @@ async def _synthesize_task(self, handle: SynthesisHandle) -> None:
finally:
await utils.aio.gracefully_cancel(synth)

@utils.log_exceptions(logger=logger)
async def _read_transcript_task(
self, transcript_source: AsyncIterable[str] | str, handle: SynthesisHandle
) -> None:
try:
if isinstance(transcript_source, str):
handle._tr_fwd.push_text(transcript_source)
else:
async for seg in transcript_source:
if not handle._tr_fwd.closed:
handle._tr_fwd.push_text(seg)

@utils.log_exceptions(logger=logger)
async def _read_transcript_task(
transcript_source: AsyncIterable[str] | str, handle: SynthesisHandle
) -> None:
if isinstance(transcript_source, str):
handle._tr_fwd.push_text(transcript_source)
else:
async for seg in transcript_source:
if not handle._tr_fwd.closed:
handle._tr_fwd.push_text(seg)

if not handle.tts_forwarder.closed:
handle.tts_forwarder.mark_text_segment_end()


@utils.log_exceptions(logger=logger)
async def _str_synthesis_task(
tts_text: str, transcript_source: AsyncIterable[str] | str, handle: SynthesisHandle
) -> None:
"""synthesize speech from a string"""
start_time = time.time()
first_frame = True
read_transcript_atask: asyncio.Task | None = None

try:
async for audio in handle._tts.synthesize(tts_text):
if first_frame:
first_frame = False
logger.debug(
"received first TTS frame",
extra={
"speech_id": handle.speech_id,
"elapsed": round(time.time() - start_time, 3),
"streamed": False,
},
)
read_transcript_atask = asyncio.create_task(
_read_transcript_task(transcript_source, handle)
)
if not handle.tts_forwarder.closed:
handle.tts_forwarder.mark_text_segment_end()
finally:
if inspect.isasyncgen(transcript_source):
await transcript_source.aclose()

@utils.log_exceptions(logger=logger)
async def _str_synthesis_task(
self,
tts_text: str,
transcript_source: AsyncIterable[str] | str,
handle: SynthesisHandle,
) -> None:
"""synthesize speech from a string"""
read_transcript_atask: asyncio.Task | None = None

frame = audio.frame
start_time = time.perf_counter()
theomonnom marked this conversation as resolved.
Show resolved Hide resolved
cancelled = False
audio_duration = 0.0
ttfb = -1.0

handle._buf_ch.send_nowait(frame)
if not handle.tts_forwarder.closed:
handle.tts_forwarder.push_audio(frame)
tts_stream = handle._tts.synthesize(tts_text)
try:
async for audio in tts_stream:
if ttfb == -1.0:
ttfb = time.perf_counter() - start_time

finally:
if not handle.tts_forwarder.closed:
handle.tts_forwarder.mark_audio_segment_end()
read_transcript_atask = asyncio.create_task(
self._read_transcript_task(transcript_source, handle)
)

if read_transcript_atask is not None:
await read_transcript_atask
audio_duration += audio.frame.duration

handle._buf_ch.send_nowait(audio.frame)
if not handle.tts_forwarder.closed:
handle.tts_forwarder.push_audio(audio.frame)

@utils.log_exceptions(logger=logger)
async def _stream_synthesis_task(
tts_source: AsyncIterable[str],
transcript_source: AsyncIterable[str] | str,
handle: SynthesisHandle,
) -> None:
"""synthesize speech from streamed text"""
if not handle.tts_forwarder.closed:
davidzhao marked this conversation as resolved.
Show resolved Hide resolved
handle.tts_forwarder.mark_audio_segment_end()

if read_transcript_atask is not None:
await read_transcript_atask
except asyncio.CancelledError:
cancelled = True
finally:
await tts_stream.aclose()

duration = time.perf_counter() - start_time

tts_metrics: metrics.TTSMetrics = {
theomonnom marked this conversation as resolved.
Show resolved Hide resolved
"type": "tts_metrics",
"timestamp": time.time(),
"speech_id": handle.speech_id,
"ttfb": ttfb,
"duration": duration,
"audio_duration": audio_duration,
"cancelled": cancelled,
"streamed": False,
}
self._metrics_emitter.emit("tts_metrics_collected", tts_metrics)

logger.debug(
"tts metrics collected",
extra={
"speech_id": handle.speech_id,
"ttfb": round(ttfb, 3),
"duration": round(duration, 3),
"audio_duration": round(audio_duration, 3),
"cancelled": cancelled,
"streamed": False,
},
)

if read_transcript_atask is not None:
await utils.aio.gracefully_cancel(read_transcript_atask)

@utils.log_exceptions(logger=logger)
async def _read_generated_audio_task():
start_time = time.time()
first_frame = True
async for audio in tts_stream:
if first_frame:
first_frame = False
async def _stream_synthesis_task(
self,
tts_source: AsyncIterable[str],
transcript_source: AsyncIterable[str] | str,
handle: SynthesisHandle,
) -> None:
"""synthesize speech from streamed text"""

@utils.log_exceptions(logger=logger)
async def _read_generated_audio_task(
tts_stream: text_to_speech.SynthesizeStream,
) -> None:
start_time = time.perf_counter()
cancelled = False
audio_duration = 0.0
ttfb = -1.0
try:
async for audio in tts_stream:
if ttfb == -1.0:
ttfb = time.perf_counter() - start_time

if not handle._tr_fwd.closed:
handle._tr_fwd.push_audio(audio.frame)

audio_duration += audio.frame.duration
handle._buf_ch.send_nowait(audio.frame)
except asyncio.CancelledError:
cancelled = True
finally:
if handle._tr_fwd and not handle._tr_fwd.closed:
handle._tr_fwd.mark_audio_segment_end()

await tts_stream.aclose()

duration = time.perf_counter() - start_time

tts_metrics: metrics.TTSMetrics = {
"type": "tts_metrics",
"timestamp": time.time(),
"speech_id": handle.speech_id,
"ttfb": ttfb,
"duration": duration,
"audio_duration": audio_duration,
"cancelled": cancelled,
"streamed": True,
}
self._metrics_emitter.emit("tts_metrics_collected", tts_metrics)
theomonnom marked this conversation as resolved.
Show resolved Hide resolved

logger.debug(
"received first TTS frame",
"tts metrics collected",
extra={
"speech_id": handle.speech_id,
"elapsed": round(time.time() - start_time, 3),
"ttfb": round(ttfb, 3),
"duration": round(duration, 3),
"audio_duration": round(audio_duration, 3),
"cancelled": cancelled,
"streamed": True,
},
)

if not handle._tr_fwd.closed:
handle._tr_fwd.push_audio(audio.frame)

handle._buf_ch.send_nowait(audio.frame)
tts_stream: text_to_speech.SynthesizeStream | None = None
read_tts_atask: asyncio.Task | None = None
read_transcript_atask: asyncio.Task | None = None

if handle._tr_fwd and not handle._tr_fwd.closed:
handle._tr_fwd.mark_audio_segment_end()

tts_stream = handle._tts.stream()
read_tts_atask: asyncio.Task | None = None
read_transcript_atask: asyncio.Task | None = None

try:
async for seg in tts_source:
if read_tts_atask is None:
# start the task when we receive the first text segment (so start_time is more accurate)
read_tts_atask = asyncio.create_task(_read_generated_audio_task())
read_transcript_atask = asyncio.create_task(
_read_transcript_task(transcript_source, handle)
)

tts_stream.push_text(seg)

tts_stream.end_input()

if read_tts_atask is not None:
assert read_transcript_atask is not None
await read_tts_atask
await read_transcript_atask
try:
async for seg in tts_source:
if tts_stream is None:
tts_stream = handle._tts.stream()
# start the task when we receive the first text segment (so start_time is more accurate)
read_tts_atask = asyncio.create_task(
_read_generated_audio_task(tts_stream)
)
read_transcript_atask = asyncio.create_task(
self._read_transcript_task(transcript_source, handle)
)

tts_stream.push_text(seg)

if tts_stream is not None:
tts_stream.end_input()
assert read_transcript_atask and read_tts_atask
await read_tts_atask
await read_transcript_atask

finally:
if read_tts_atask is not None:
assert read_transcript_atask is not None
await utils.aio.gracefully_cancel(read_tts_atask, read_transcript_atask)
finally:
if read_tts_atask is not None:
assert read_transcript_atask is not None
await utils.aio.gracefully_cancel(read_tts_atask, read_transcript_atask)

await tts_stream.aclose()
if inspect.isasyncgen(tts_source):
await tts_source.aclose()
4 changes: 2 additions & 2 deletions livekit-agents/livekit/agents/pipeline/agent_playout.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def _capture_task():
handle._tr_fwd.segment_playout_started()

logger.debug(
"started playing the first frame",
"speech playout started",
extra={"speech_id": handle.speech_id},
)

Expand Down Expand Up @@ -176,7 +176,7 @@ async def _capture_task():
handle._done_fut.set_result(None)

logger.debug(
"playout finished",
"speech playout finished",
extra={
"speech_id": handle.speech_id,
"interrupted": handle.interrupted,
Expand Down
Loading
Loading