From 22e4fa5fa1ad6d1d8f3fe4be5350105860a345bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9o=20Monnom?= Date: Tue, 29 Oct 2024 11:51:26 -0700 Subject: [PATCH] pipelineagent: add metrics example (#999) --- .../voice-pipeline-agent/pipeline_metrics.py | 122 ++++++++++++++++++ .../livekit/agents/stt/stream_adapter.py | 10 ++ .../livekit/agents/tts/stream_adapter.py | 11 ++ livekit-agents/livekit/agents/tts/tts.py | 6 +- .../livekit/plugins/deepgram/stt.py | 1 + tests/test_tts.py | 2 - 6 files changed, 147 insertions(+), 5 deletions(-) create mode 100644 examples/voice-pipeline-agent/pipeline_metrics.py diff --git a/examples/voice-pipeline-agent/pipeline_metrics.py b/examples/voice-pipeline-agent/pipeline_metrics.py new file mode 100644 index 000000000..6fe0c7cd9 --- /dev/null +++ b/examples/voice-pipeline-agent/pipeline_metrics.py @@ -0,0 +1,122 @@ +import logging + +from dotenv import load_dotenv +from livekit.agents import ( + AutoSubscribe, + JobContext, + JobProcess, + WorkerOptions, + cli, + llm, +) +from livekit.agents.pipeline import PipelineMetrics, VoicePipelineAgent +from livekit.plugins import deepgram, openai, silero + +load_dotenv() +logger = logging.getLogger("metrics-example") + + +OPENAI_LLM_INPUT_PRICE = 2.50 / (10**6) # $2.50 per million tokens +OPENAI_LLM_OUTPUT_PRICE = 10 / (10**6) # $10 per million tokens +OPENAI_TTS_PRICE = 15 / (10**6) # $15 per million characters +DEEPGRAM_STT_PRICE = 0.0043 # $0.0043 per minute + + +def prewarm(proc: JobProcess): + proc.userdata["vad"] = silero.VAD.load() + + +async def entrypoint(ctx: JobContext): + initial_ctx = llm.ChatContext().append( + role="system", + text=( + "You are a voice assistant created by LiveKit. Your interface with users will be voice. " + "You should use short and concise responses, and avoiding usage of unpronouncable punctuation." + ), + ) + + await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) + + participant = await ctx.wait_for_participant() + agent = VoicePipelineAgent( + vad=ctx.proc.userdata["vad"], + stt=deepgram.STT(), + llm=openai.LLM(), + tts=openai.TTS(), + chat_ctx=initial_ctx, + ) + + total_llm_prompt_tokens = 0 + total_llm_completion_tokens = 0 + total_tts_characters_count = 0 + total_stt_audio_duration = 0 + + @agent.on("metrics_collected") + def on_metrics_collected(metrics: PipelineMetrics): + nonlocal \ + total_llm_prompt_tokens, \ + total_llm_completion_tokens, \ + total_tts_characters_count, \ + total_stt_audio_duration + + if metrics["type"] == "vad_metrics": + return # don't log VAD metrics because it is noisy + + if metrics["type"] == "llm_metrics": + total_llm_prompt_tokens += metrics["prompt_tokens"] + total_llm_completion_tokens += metrics["completion_tokens"] + + sequence_id = metrics["sequence_id"] + ttft = metrics["ttft"] + tokens_per_second = metrics["tokens_per_second"] + + logger.info( + f"LLM metrics: sequence_id={sequence_id}, ttft={ttft:.2f}, tokens_per_second={tokens_per_second:.2f}" + ) + + elif metrics["type"] == "tts_metrics": + total_tts_characters_count += metrics["characters_count"] + + sequence_id = metrics["sequence_id"] + ttfb = metrics["ttfb"] + audio_duration = metrics["audio_duration"] + + logger.info( + f"TTS metrics: sequence_id={sequence_id}, ttfb={ttfb}, audio_duration={audio_duration:.2f}" + ) + + elif metrics["type"] == "eou_metrics": + sequence_id = metrics["sequence_id"] + end_of_utterance_delay = metrics["end_of_utterance_delay"] + transcription_delay = metrics["transcription_delay"] + + logger.info( + f"EOU metrics: sequence_id={sequence_id}, end_of_utterance_delay={end_of_utterance_delay:.2f}, transcription_delay={transcription_delay:.2f}" + ) + + elif metrics["type"] == "stt_metrics": + total_stt_audio_duration += metrics["audio_duration"] + logger.info(f"STT metrics: audio_duration={metrics['audio_duration']:.2f}") + + async def log_session_cost(): + llm_cost = ( + total_llm_prompt_tokens * OPENAI_LLM_INPUT_PRICE + + total_llm_completion_tokens * OPENAI_LLM_OUTPUT_PRICE + ) + tts_cost = total_tts_characters_count * OPENAI_TTS_PRICE + stt_cost = total_stt_audio_duration * DEEPGRAM_STT_PRICE / 60 + + total_cost = llm_cost + tts_cost + stt_cost + + logger.info( + f"Total cost: ${total_cost:.4f} (LLM: ${llm_cost:.4f}, TTS: ${tts_cost:.4f}, STT: ${stt_cost:.4f})" + ) + + ctx.add_shutdown_callback(log_session_cost) + + agent.start(ctx.room, participant) + await agent.say("Hey, how can I help you today?", allow_interruptions=True) + + +if __name__ == "__main__": + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm)) diff --git a/livekit-agents/livekit/agents/stt/stream_adapter.py b/livekit-agents/livekit/agents/stt/stream_adapter.py index 011889e4d..39745d640 100644 --- a/livekit-agents/livekit/agents/stt/stream_adapter.py +++ b/livekit-agents/livekit/agents/stt/stream_adapter.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +from typing import AsyncIterable from .. import utils from ..log import logger @@ -16,6 +17,10 @@ def __init__(self, *, stt: STT, vad: VAD) -> None: self._vad = vad self._stt = stt + @self._stt.on("metrics_collected") + def _forward_metrics(*args, **kwargs): + self.emit("metrics_collected", *args, **kwargs) + @property def wrapped_stt(self) -> STT: return self._stt @@ -41,6 +46,11 @@ def __init__( self._vad_stream = self._vad.stream() self._language = language + async def _metrics_monitor_task( + self, event_aiter: AsyncIterable[SpeechEvent] + ) -> None: + pass # do nothing + @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: async def _forward_input(): diff --git a/livekit-agents/livekit/agents/tts/stream_adapter.py b/livekit-agents/livekit/agents/tts/stream_adapter.py index 81d8a0469..d8bc0cd69 100644 --- a/livekit-agents/livekit/agents/tts/stream_adapter.py +++ b/livekit-agents/livekit/agents/tts/stream_adapter.py @@ -1,12 +1,14 @@ from __future__ import annotations import asyncio +from typing import AsyncIterable from .. import tokenize, utils from ..log import logger from .tts import ( TTS, ChunkedStream, + SynthesizedAudio, SynthesizeStream, TTSCapabilities, ) @@ -29,6 +31,10 @@ def __init__( self._tts = tts self._sentence_tokenizer = sentence_tokenizer + @self._tts.on("metrics_collected") + def _forward_metrics(*args, **kwargs): + self.emit("metrics_collected", *args, **kwargs) + def synthesize(self, text: str) -> ChunkedStream: return self._tts.synthesize(text=text) @@ -52,6 +58,11 @@ def __init__( self._wrapped_tts = wrapped_tts self._sent_stream = sentence_tokenizer.stream() + async def _metrics_monitor_task( + self, event_aiter: AsyncIterable[SynthesizedAudio] + ) -> None: + pass # do nothing + @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: async def _forward_input(): diff --git a/livekit-agents/livekit/agents/tts/tts.py b/livekit-agents/livekit/agents/tts/tts.py index 65587ad34..ef1deda07 100644 --- a/livekit-agents/livekit/agents/tts/tts.py +++ b/livekit-agents/livekit/agents/tts/tts.py @@ -18,7 +18,7 @@ class TTSMetrics(TypedDict): duration: float audio_duration: float cancelled: bool - num_characters: int + characters_count: int label: str streamed: bool @@ -114,7 +114,7 @@ async def _metrics_monitor_task( "request_id": request_id, "ttfb": ttfb, "duration": duration, - "num_characters": len(self._input_text), + "characters_count": len(self._input_text), "audio_duration": audio_duration, "cancelled": self._task.cancelled(), "label": self._tts._label, @@ -190,7 +190,7 @@ def _emit_metrics(): "request_id": request_id, "ttfb": ttfb, "duration": duration, - "num_characters": len(text), + "characters_count": len(text), "audio_duration": audio_duration, "cancelled": self._task.cancelled(), "label": self._tts._label, diff --git a/livekit-plugins/livekit-plugins-deepgram/livekit/plugins/deepgram/stt.py b/livekit-plugins/livekit-plugins-deepgram/livekit/plugins/deepgram/stt.py index 192d56959..277b26919 100644 --- a/livekit-plugins/livekit-plugins-deepgram/livekit/plugins/deepgram/stt.py +++ b/livekit-plugins/livekit-plugins-deepgram/livekit/plugins/deepgram/stt.py @@ -383,6 +383,7 @@ async def send_task(): audio_duration=self._pushed_audio_duration ), ) + self._pushed_audio_duration = 0.0 self._event_ch.send_nowait(usage_event) # tell deepgram we are done sending audio/inputs diff --git a/tests/test_tts.py b/tests/test_tts.py index cd1858607..5b2ebe1d4 100644 --- a/tests/test_tts.py +++ b/tests/test_tts.py @@ -41,7 +41,6 @@ async def _assert_valid_synthesized_audio( google.TTS(), azure.TTS(), cartesia.TTS(), - cartesia.TTS(speed="fastest", emotion=["surprise:highest"]), ] @@ -62,7 +61,6 @@ async def test_synthesize(tts: agents.tts.TTS): elevenlabs.TTS(), elevenlabs.TTS(encoding="pcm_44100"), cartesia.TTS(), - cartesia.TTS(speed="fastest", emotion=["surprise:highest"]), agents.tts.StreamAdapter( tts=openai.TTS(), sentence_tokenizer=STREAM_SENT_TOKENIZER ),