Skip to content

Commit

Permalink
pipelineagent: add metrics example (#999)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Oct 29, 2024
1 parent 850f8e0 commit 22e4fa5
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 5 deletions.
122 changes: 122 additions & 0 deletions examples/voice-pipeline-agent/pipeline_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import logging

from dotenv import load_dotenv
from livekit.agents import (
AutoSubscribe,
JobContext,
JobProcess,
WorkerOptions,
cli,
llm,
)
from livekit.agents.pipeline import PipelineMetrics, VoicePipelineAgent
from livekit.plugins import deepgram, openai, silero

load_dotenv()
logger = logging.getLogger("metrics-example")


OPENAI_LLM_INPUT_PRICE = 2.50 / (10**6) # $2.50 per million tokens
OPENAI_LLM_OUTPUT_PRICE = 10 / (10**6) # $10 per million tokens
OPENAI_TTS_PRICE = 15 / (10**6) # $15 per million characters
DEEPGRAM_STT_PRICE = 0.0043 # $0.0043 per minute


def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()


async def entrypoint(ctx: JobContext):
initial_ctx = llm.ChatContext().append(
role="system",
text=(
"You are a voice assistant created by LiveKit. Your interface with users will be voice. "
"You should use short and concise responses, and avoiding usage of unpronouncable punctuation."
),
)

await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

participant = await ctx.wait_for_participant()
agent = VoicePipelineAgent(
vad=ctx.proc.userdata["vad"],
stt=deepgram.STT(),
llm=openai.LLM(),
tts=openai.TTS(),
chat_ctx=initial_ctx,
)

total_llm_prompt_tokens = 0
total_llm_completion_tokens = 0
total_tts_characters_count = 0
total_stt_audio_duration = 0

@agent.on("metrics_collected")
def on_metrics_collected(metrics: PipelineMetrics):
nonlocal \
total_llm_prompt_tokens, \
total_llm_completion_tokens, \
total_tts_characters_count, \
total_stt_audio_duration

if metrics["type"] == "vad_metrics":
return # don't log VAD metrics because it is noisy

if metrics["type"] == "llm_metrics":
total_llm_prompt_tokens += metrics["prompt_tokens"]
total_llm_completion_tokens += metrics["completion_tokens"]

sequence_id = metrics["sequence_id"]
ttft = metrics["ttft"]
tokens_per_second = metrics["tokens_per_second"]

logger.info(
f"LLM metrics: sequence_id={sequence_id}, ttft={ttft:.2f}, tokens_per_second={tokens_per_second:.2f}"
)

elif metrics["type"] == "tts_metrics":
total_tts_characters_count += metrics["characters_count"]

sequence_id = metrics["sequence_id"]
ttfb = metrics["ttfb"]
audio_duration = metrics["audio_duration"]

logger.info(
f"TTS metrics: sequence_id={sequence_id}, ttfb={ttfb}, audio_duration={audio_duration:.2f}"
)

elif metrics["type"] == "eou_metrics":
sequence_id = metrics["sequence_id"]
end_of_utterance_delay = metrics["end_of_utterance_delay"]
transcription_delay = metrics["transcription_delay"]

logger.info(
f"EOU metrics: sequence_id={sequence_id}, end_of_utterance_delay={end_of_utterance_delay:.2f}, transcription_delay={transcription_delay:.2f}"
)

elif metrics["type"] == "stt_metrics":
total_stt_audio_duration += metrics["audio_duration"]
logger.info(f"STT metrics: audio_duration={metrics['audio_duration']:.2f}")

async def log_session_cost():
llm_cost = (
total_llm_prompt_tokens * OPENAI_LLM_INPUT_PRICE
+ total_llm_completion_tokens * OPENAI_LLM_OUTPUT_PRICE
)
tts_cost = total_tts_characters_count * OPENAI_TTS_PRICE
stt_cost = total_stt_audio_duration * DEEPGRAM_STT_PRICE / 60

total_cost = llm_cost + tts_cost + stt_cost

logger.info(
f"Total cost: ${total_cost:.4f} (LLM: ${llm_cost:.4f}, TTS: ${tts_cost:.4f}, STT: ${stt_cost:.4f})"
)

ctx.add_shutdown_callback(log_session_cost)

agent.start(ctx.room, participant)
await agent.say("Hey, how can I help you today?", allow_interruptions=True)


if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))
10 changes: 10 additions & 0 deletions livekit-agents/livekit/agents/stt/stream_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
from typing import AsyncIterable

from .. import utils
from ..log import logger
Expand All @@ -16,6 +17,10 @@ def __init__(self, *, stt: STT, vad: VAD) -> None:
self._vad = vad
self._stt = stt

@self._stt.on("metrics_collected")
def _forward_metrics(*args, **kwargs):
self.emit("metrics_collected", *args, **kwargs)

@property
def wrapped_stt(self) -> STT:
return self._stt
Expand All @@ -41,6 +46,11 @@ def __init__(
self._vad_stream = self._vad.stream()
self._language = language

async def _metrics_monitor_task(
self, event_aiter: AsyncIterable[SpeechEvent]
) -> None:
pass # do nothing

@utils.log_exceptions(logger=logger)
async def _main_task(self) -> None:
async def _forward_input():
Expand Down
11 changes: 11 additions & 0 deletions livekit-agents/livekit/agents/tts/stream_adapter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

import asyncio
from typing import AsyncIterable

from .. import tokenize, utils
from ..log import logger
from .tts import (
TTS,
ChunkedStream,
SynthesizedAudio,
SynthesizeStream,
TTSCapabilities,
)
Expand All @@ -29,6 +31,10 @@ def __init__(
self._tts = tts
self._sentence_tokenizer = sentence_tokenizer

@self._tts.on("metrics_collected")
def _forward_metrics(*args, **kwargs):
self.emit("metrics_collected", *args, **kwargs)

def synthesize(self, text: str) -> ChunkedStream:
return self._tts.synthesize(text=text)

Expand All @@ -52,6 +58,11 @@ def __init__(
self._wrapped_tts = wrapped_tts
self._sent_stream = sentence_tokenizer.stream()

async def _metrics_monitor_task(
self, event_aiter: AsyncIterable[SynthesizedAudio]
) -> None:
pass # do nothing

@utils.log_exceptions(logger=logger)
async def _main_task(self) -> None:
async def _forward_input():
Expand Down
6 changes: 3 additions & 3 deletions livekit-agents/livekit/agents/tts/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TTSMetrics(TypedDict):
duration: float
audio_duration: float
cancelled: bool
num_characters: int
characters_count: int
label: str
streamed: bool

Expand Down Expand Up @@ -114,7 +114,7 @@ async def _metrics_monitor_task(
"request_id": request_id,
"ttfb": ttfb,
"duration": duration,
"num_characters": len(self._input_text),
"characters_count": len(self._input_text),
"audio_duration": audio_duration,
"cancelled": self._task.cancelled(),
"label": self._tts._label,
Expand Down Expand Up @@ -190,7 +190,7 @@ def _emit_metrics():
"request_id": request_id,
"ttfb": ttfb,
"duration": duration,
"num_characters": len(text),
"characters_count": len(text),
"audio_duration": audio_duration,
"cancelled": self._task.cancelled(),
"label": self._tts._label,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ async def send_task():
audio_duration=self._pushed_audio_duration
),
)
self._pushed_audio_duration = 0.0
self._event_ch.send_nowait(usage_event)

# tell deepgram we are done sending audio/inputs
Expand Down
2 changes: 0 additions & 2 deletions tests/test_tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ async def _assert_valid_synthesized_audio(
google.TTS(),
azure.TTS(),
cartesia.TTS(),
cartesia.TTS(speed="fastest", emotion=["surprise:highest"]),
]


Expand All @@ -62,7 +61,6 @@ async def test_synthesize(tts: agents.tts.TTS):
elevenlabs.TTS(),
elevenlabs.TTS(encoding="pcm_44100"),
cartesia.TTS(),
cartesia.TTS(speed="fastest", emotion=["surprise:highest"]),
agents.tts.StreamAdapter(
tts=openai.TTS(), sentence_tokenizer=STREAM_SENT_TOKENIZER
),
Expand Down

0 comments on commit 22e4fa5

Please sign in to comment.