Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipelineagent: expose timing metrics & api errors wip #957

Merged
merged 24 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .changeset/hot-toys-wash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
"livekit-plugins-elevenlabs": patch
"livekit-plugins-anthropic": patch
"livekit-plugins-cartesia": patch
"livekit-plugins-deepgram": patch
"livekit-plugins-browser": patch
"livekit-plugins-google": patch
"livekit-plugins-openai": patch
"livekit-plugins-playht": patch
"livekit-plugins-silero": patch
"livekit-plugins-azure": patch
"livekit-agents": patch
---

pipelineagent: expose timing metrics & api errors wip
12 changes: 11 additions & 1 deletion livekit-agents/livekit/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@
voice_assistant,
)
from ._constants import ATTRIBUTE_AGENT_STATE
from ._exceptions import AssignmentTimeoutError
from ._exceptions import (
APIConnectionError,
APIError,
APIStatusError,
APITimeoutError,
AssignmentTimeoutError,
)
from ._types import AgentState
from .job import AutoSubscribe, JobContext, JobExecutorType, JobProcess, JobRequest
from .plugin import Plugin
Expand Down Expand Up @@ -60,6 +66,10 @@
"voice_assistant",
"cli",
"AssignmentTimeoutError",
"APIConnectionError",
"APIError",
"APIStatusError",
"APITimeoutError",
"ATTRIBUTE_AGENT_STATE",
"AgentState",
]
Expand Down
67 changes: 67 additions & 0 deletions livekit-agents/livekit/agents/_exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,72 @@
from __future__ import annotations


class AssignmentTimeoutError(Exception):
"""Raised when accepting a job but not receiving an assignment within the specified timeout.
The server may have chosen another worker to handle this job."""

pass


# errors used by our plugins


class APIError(Exception):
"""Raised when an API request failed.
This is used on our TTS/STT/LLM plugins."""

message: str
"""
The error message returned by the API.
"""

body: object | None
"""The API response body, if available.


If the API returned a valid json, the body will contains
the decodede result.
"""

def __init__(self, message: str, *, body: object | None) -> None:
super().__init__(message)

self.message = message
self.body = body


class APIStatusError(APIError):
"""Raised when an API response has a status code of 4xx or 5xx."""

status_code: int
"""The status code of the API response."""

request_id: str | None
"""The request ID of the API response, if available."""

def __init__(
self,
message: str,
*,
status_code: int,
request_id: str | None,
body: object | None,
) -> None:
super().__init__(message, body=body)

self.status_code = status_code
self.request_id = request_id


class APIConnectionError(APIError):
"""Raised when an API request failed due to a connection error."""

def __init__(self, message: str = "Connection error.") -> None:
super().__init__(message, body=None)


class APITimeoutError(APIConnectionError):
"""Raised when an API request timed out."""

def __init__(self, message: str = "Request timed out.") -> None:
super().__init__(message)
3 changes: 2 additions & 1 deletion livekit-agents/livekit/agents/llm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
TypeInfo,
ai_callable,
)
from .llm import LLM, ChatChunk, Choice, ChoiceDelta, LLMStream
from .llm import LLM, ChatChunk, Choice, ChoiceDelta, LLMMetrics, LLMStream

__all__ = [
"LLM",
"LLMMetrics",
"LLMStream",
"ChatContext",
"ChatRole",
Expand Down
92 changes: 78 additions & 14 deletions livekit-agents/livekit/agents/llm/llm.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
from __future__ import annotations

import abc
import asyncio
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, AsyncIterator
from typing import Any, AsyncIterable, AsyncIterator, Literal, TypedDict

from livekit import rtc

from .. import utils
from ..utils import aio
from . import function_context
from .chat_context import ChatContext, ChatRole


class LLMMetrics(TypedDict):
request_id: str
timestamp: float
ttft: float
duration: float
label: str
cancelled: bool
davidzhao marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
class ChoiceDelta:
role: ChatRole
Expand All @@ -25,11 +38,16 @@ class Choice:

@dataclass
class ChatChunk:
request_id: str
choices: list[Choice] = field(default_factory=list)


class LLM(abc.ABC):
@abc.abstractmethod
class LLM(ABC, rtc.EventEmitter[Literal["metrics_collected"]]):
def __init__(self) -> None:
super().__init__()
self._label = f"{type(self).__module__}.{type(self).__name__}"

@abstractmethod
def chat(
self,
*,
Expand All @@ -41,15 +59,56 @@ def chat(
) -> "LLMStream": ...


class LLMStream(abc.ABC):
class LLMStream(ABC):
def __init__(
self, *, chat_ctx: ChatContext, fnc_ctx: function_context.FunctionContext | None
self,
llm: LLM,
*,
chat_ctx: ChatContext,
fnc_ctx: function_context.FunctionContext | None,
) -> None:
self._function_calls_info: list[function_context.FunctionCallInfo] = []
self._tasks = set[asyncio.Task[Any]]()
self._llm = llm
self._chat_ctx = chat_ctx
self._fnc_ctx = fnc_ctx

self._event_ch = aio.Chan[ChatChunk]()
self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2)
self._metrics_task = asyncio.create_task(
self._metrics_monitor_task(monitor_aiter), name="LLM._metrics_task"
)

self._task = asyncio.create_task(self._main_task())
self._task.add_done_callback(lambda _: self._event_ch.close())

self._function_calls_info: list[function_context.FunctionCallInfo] = []
self._function_tasks = set[asyncio.Task[Any]]()

@abstractmethod
async def _main_task(self) -> None: ...

async def _metrics_monitor_task(
self, event_aiter: AsyncIterable[ChatChunk]
) -> None:
start_time = time.perf_counter()
ttft = -1.0
request_id = ""

async for ev in event_aiter:
request_id = ev.request_id
if ttft == -1.0:
ttft = time.perf_counter() - start_time

duration = time.perf_counter() - start_time
metrics: LLMMetrics = {
"timestamp": time.time(),
"request_id": request_id,
"ttft": ttft,
"duration": duration,
"cancelled": self._task.cancelled(),
"label": self._llm._label,
}
self._llm.emit("metrics_collected", metrics)

@property
def function_calls(self) -> list[function_context.FunctionCallInfo]:
"""List of called functions from this stream."""
Expand All @@ -70,17 +129,22 @@ def execute_functions(self) -> list[function_context.CalledFunction]:
called_functions: list[function_context.CalledFunction] = []
for fnc_info in self._function_calls_info:
called_fnc = fnc_info.execute()
self._tasks.add(called_fnc.task)
called_fnc.task.add_done_callback(self._tasks.remove)
self._function_tasks.add(called_fnc.task)
called_fnc.task.add_done_callback(self._function_tasks.remove)
called_functions.append(called_fnc)

return called_functions

async def aclose(self) -> None:
await utils.aio.gracefully_cancel(*self._tasks)
await aio.gracefully_cancel(self._task)
await utils.aio.gracefully_cancel(*self._function_tasks)
await self._metrics_task

async def __anext__(self) -> ChatChunk:
if self._task.done() and (exc := self._task.exception()):
raise exc

return await self._event_aiter.__anext__()

def __aiter__(self) -> AsyncIterator[ChatChunk]:
return self

@abc.abstractmethod
async def __anext__(self) -> ChatChunk: ...
Loading
Loading