diff --git a/livekit-agents/livekit/agents/ipc/job_executor.py b/livekit-agents/livekit/agents/ipc/job_executor.py index 8fe9b9848..19704791a 100644 --- a/livekit-agents/livekit/agents/ipc/job_executor.py +++ b/livekit-agents/livekit/agents/ipc/job_executor.py @@ -1,5 +1,6 @@ from __future__ import annotations +from enum import Enum from typing import Any, Protocol from ..job import RunningJobInfo @@ -18,6 +19,12 @@ def start_arguments(self, value: Any | None) -> None: ... @property def running_job(self) -> RunningJobInfo | None: ... + @property + def run_status(self) -> RunStatus: ... + + @property + def exception(self) -> Exception | None: ... + async def start(self) -> None: ... async def join(self) -> None: ... @@ -27,3 +34,27 @@ async def initialize(self) -> None: ... async def aclose(self) -> None: ... async def launch_job(self, info: RunningJobInfo) -> None: ... + + +class RunStatus(Enum): + STARTING = "STARTING" + WAITING_FOR_JOB = "WAITING_FOR_JOB" + RUNNING_JOB = "RUNNING_JOB" + FINISHED_FAILED = "FINISHED_FAILED" + FINISHED_CLEAN = "FINISHED_CLEAN" + + +class JobExecutorError(Exception): + pass + + +class JobExecutorError_ShutdownTimeout(JobExecutorError): + pass + + +class JobExecutorError_Unresponsive(JobExecutorError): + pass + + +class JobExecutorError_Runtime(JobExecutorError): + pass diff --git a/livekit-agents/livekit/agents/ipc/proc_job_executor.py b/livekit-agents/livekit/agents/ipc/proc_job_executor.py index f5f846130..2a956d947 100644 --- a/livekit-agents/livekit/agents/ipc/proc_job_executor.py +++ b/livekit-agents/livekit/agents/ipc/proc_job_executor.py @@ -16,6 +16,12 @@ from ..log import logger from ..utils.aio import duplex_unix from . import channel, job_main, proc_lazy_main, proto +from .job_executor import ( + JobExecutorError_Runtime, + JobExecutorError_ShutdownTimeout, + JobExecutorError_Unresponsive, + RunStatus, +) class LogQueueListener: @@ -93,6 +99,7 @@ def __init__( self._running_job: RunningJobInfo | None = None self._exitcode: int | None = None self._pid: int | None = None + self._exception: Exception | None = None self._main_atask: asyncio.Task[None] | None = None self._closing = False @@ -129,6 +136,29 @@ def start_arguments(self, value: Any | None) -> None: def running_job(self) -> RunningJobInfo | None: return self._running_job + @property + def exception(self) -> Exception | None: + return self._exception + + @property + def run_status(self) -> RunStatus: + if not self._running_job: + if self.started: + return RunStatus.WAITING_FOR_JOB + else: + return RunStatus.STARTING + + if not self._main_atask: + return RunStatus.STARTING + + if self._main_atask.done(): + if self.exception: + return RunStatus.FINISHED_FAILED + else: + return RunStatus.FINISHED_CLEAN + else: + return RunStatus.RUNNING_JOB + async def start(self) -> None: """start the job process""" if self.started: @@ -222,6 +252,7 @@ async def initialize(self) -> None: self._send_kill_signal() raise except Exception as e: # should be channel.ChannelClosed most of the time + self._exception = JobExecutorError_Runtime() self._initialize_fut.set_exception(e) raise else: @@ -245,6 +276,7 @@ async def aclose(self) -> None: logger.error( "process did not exit in time, killing job", extra=self.logging_extra() ) + self._exception = JobExecutorError_ShutdownTimeout() self._send_kill_signal() async with self._lock: @@ -312,6 +344,7 @@ async def _main_task(self) -> None: await self._pch.aclose() if self._exitcode != 0 and not self._kill_sent: + self._exception = JobExecutorError_Runtime() logger.error( f"job process exited with non-zero exit code {self.exitcode}", extra=self.logging_extra(), @@ -358,6 +391,7 @@ async def _send_ping_co(): async def _pong_timeout_co(): await pong_timeout logger.error("job is unresponsive, killing job", extra=self.logging_extra()) + self._exception = JobExecutorError_Unresponsive() self._send_kill_signal() tasks = [ diff --git a/livekit-agents/livekit/agents/ipc/proc_pool.py b/livekit-agents/livekit/agents/ipc/proc_pool.py index 307227876..d707987ab 100644 --- a/livekit-agents/livekit/agents/ipc/proc_pool.py +++ b/livekit-agents/livekit/agents/ipc/proc_pool.py @@ -12,7 +12,11 @@ from .job_executor import JobExecutor EventTypes = Literal[ - "process_created", "process_started", "process_ready", "process_closed" + "process_created", + "process_started", + "process_ready", + "process_closed", + "process_job_launched", ] MAX_CONCURRENT_INITIALIZATIONS = 1 @@ -85,6 +89,7 @@ async def launch_job(self, info: RunningJobInfo) -> None: self._proc_needed_sem.release() # notify that a new process can be warmed/started await proc.launch_job(info) + self.emit("process_job_launched", proc) @utils.log_exceptions(logger=logger) async def _proc_watch_task(self) -> None: diff --git a/livekit-agents/livekit/agents/ipc/thread_job_executor.py b/livekit-agents/livekit/agents/ipc/thread_job_executor.py index 99e75f74c..b6908669d 100644 --- a/livekit-agents/livekit/agents/ipc/thread_job_executor.py +++ b/livekit-agents/livekit/agents/ipc/thread_job_executor.py @@ -12,6 +12,11 @@ from ..log import logger from ..utils.aio import duplex_unix from . import channel, job_main, proto +from .job_executor import ( + JobExecutorError_ShutdownTimeout, + JobExecutorError_Unresponsive, + RunStatus, +) @dataclass @@ -42,6 +47,7 @@ def __init__( self._user_args: Any | None = None self._running_job: RunningJobInfo | None = None + self._exception: Exception | None = None self._main_atask: asyncio.Task[None] | None = None self._closing = False @@ -65,6 +71,29 @@ def start_arguments(self, value: Any | None) -> None: def running_job(self) -> RunningJobInfo | None: return self._running_job + @property + def exception(self) -> Exception | None: + return self._exception + + @property + def run_status(self) -> RunStatus: + if not self._running_job: + if self.started: + return RunStatus.WAITING_FOR_JOB + else: + return RunStatus.STARTING + + if not self._main_atask: + return RunStatus.STARTING + + if self._main_atask.done(): + if self.exception: + return RunStatus.FINISHED_FAILED + else: + return RunStatus.FINISHED_CLEAN + else: + return RunStatus.RUNNING_JOB + async def start(self) -> None: if self.started: raise RuntimeError("runner already started") @@ -158,6 +187,7 @@ async def aclose(self) -> None: asyncio.shield(self._main_atask), timeout=self._opts.close_timeout ) except asyncio.TimeoutError: + self._exception = JobExecutorError_ShutdownTimeout() logger.error( "job shutdown is taking too much time..", extra=self.logging_extra() ) @@ -235,6 +265,7 @@ async def _send_ping_co(): async def _pong_timeout_co(): await pong_timeout + self._exception = JobExecutorError_Unresponsive() logger.error("job is unresponsive..", extra=self.logging_extra()) tasks = [ diff --git a/livekit-agents/livekit/agents/worker.py b/livekit-agents/livekit/agents/worker.py index 635a6f280..1fb45ea89 100644 --- a/livekit-agents/livekit/agents/worker.py +++ b/livekit-agents/livekit/agents/worker.py @@ -250,6 +250,11 @@ def __init__( initialize_timeout=opts.initialize_process_timeout, close_timeout=opts.shutdown_process_timeout, ) + self._proc_pool.on("process_started", self._on_process_started) + self._proc_pool.on("process_closed", self._on_process_closed) + self._proc_pool.on("process_job_launched", self._on_process_job_launched) + + self._previous_status = agent.WorkerStatus.WS_AVAILABLE self._api: api.LiveKitAPI | None = None self._http_session: aiohttp.ClientSession | None = None @@ -307,12 +312,7 @@ async def drain(self, timeout: int | None = None) -> None: logger.info("draining worker", extra={"id": self.id, "timeout": timeout}) self._draining = True - - # exit the queue - update_worker = agent.WorkerMessage( - update_worker=agent.UpdateWorkerStatus(status=agent.WorkerStatus.WS_FULL) - ) - await self._queue_msg(update_worker) + await self._update_worker_status() async def _join_jobs(): for proc in self._proc_pool.processes: @@ -469,50 +469,9 @@ async def _run_ws(self, ws: aiohttp.ClientWebSocketResponse): async def _load_task(): """periodically check load and update worker status""" interval = utils.aio.interval(UPDATE_LOAD_INTERVAL) - current_status = agent.WorkerStatus.WS_AVAILABLE while True: await interval.tick() - - old_status = current_status - current_load = await asyncio.get_event_loop().run_in_executor( - None, self._opts.load_fnc - ) - - is_full = current_load >= _WorkerEnvOption.getvalue( - self._opts.load_threshold, self._devmode - ) - currently_available = not is_full and not self._draining - - current_status = ( - agent.WorkerStatus.WS_AVAILABLE - if currently_available - else agent.WorkerStatus.WS_FULL - ) - - update = agent.UpdateWorkerStatus( - load=current_load, status=current_status - ) - - # only log if status has changed - if old_status != current_status and not self._draining: - extra = { - "load": current_load, - "threshold": self._opts.load_threshold, - } - if is_full: - logger.info( - "worker is at full capacity, marking as unavailable", - extra=extra, - ) - else: - logger.info( - "worker is below capacity, marking as available", - extra=extra, - ) - - msg = agent.WorkerMessage(update_worker=update) - with contextlib.suppress(utils.aio.ChanClosed): - await self._queue_msg(msg) + await self._update_worker_status() async def _send_task(): nonlocal closing_ws @@ -713,3 +672,85 @@ async def _handle_termination(self, msg: agent.JobTermination): # safe to ignore return await proc.aclose() + + def _on_process_closed(self, proc: ipc.job_executor.JobExecutor) -> None: + self._update_job_status_sync(proc) + + def _on_process_started(self, proc: ipc.job_executor.JobExecutor) -> None: + self._update_job_status_sync(proc) + + def _on_process_job_launched(self, proc: ipc.job_executor.JobExecutor) -> None: + self._update_job_status_sync(proc) + + async def _update_worker_status(self): + if self._draining: + update = agent.UpdateWorkerStatus(status=agent.WorkerStatus.WS_FULL) + msg = agent.WorkerMessage(update_worker=update) + await self._queue_msg(msg) + return + + current_load = await asyncio.get_event_loop().run_in_executor( + None, self._opts.load_fnc + ) + + is_full = current_load >= _WorkerEnvOption.getvalue( + self._opts.load_threshold, self._devmode + ) + currently_available = not is_full and not self._draining + + status = ( + agent.WorkerStatus.WS_AVAILABLE + if currently_available + else agent.WorkerStatus.WS_FULL + ) + + update = agent.UpdateWorkerStatus(load=current_load, status=status) + + # only log if status has changed + if self._previous_status != status and not self._draining: + self._previous_status = status + extra = { + "load": current_load, + "threshold": self._opts.load_threshold, + } + if is_full: + logger.info( + "worker is at full capacity, marking as unavailable", + extra=extra, + ) + else: + logger.info( + "worker is below capacity, marking as available", + extra=extra, + ) + + msg = agent.WorkerMessage(update_worker=update) + with contextlib.suppress(utils.aio.ChanClosed): + await self._queue_msg(msg) + + def _update_job_status_sync(self, proc: ipc.job_executor.JobExecutor) -> None: + t = self._loop.create_task(self._update_job_status(proc)) + self._tasks.add(t) + t.add_done_callback(self._tasks.discard) + + async def _update_job_status(self, proc: ipc.job_executor.JobExecutor) -> None: + job_info = proc.running_job + if not job_info: + logger.error("job_info not found for process") + return + status: agent.JobStatus = agent.JobStatus.JS_RUNNING + if proc.run_status == ipc.job_executor.RunStatus.FINISHED_FAILED: + status = agent.JobStatus.JS_FAILED + elif proc.run_status == ipc.job_executor.RunStatus.FINISHED_CLEAN: + status = agent.JobStatus.JS_SUCCESS + elif proc.run_status == ipc.job_executor.RunStatus.STARTING: + status = agent.JobStatus.JS_PENDING + + error: str | None = None + if proc.exception: + error = str(proc.exception) + update = agent.UpdateJobStatus( + job_id=job_info.job.id, status=status, error=error + ) + msg = agent.WorkerMessage(update_job=update) + await self._queue_msg(msg)