Skip to content

Commit

Permalink
Send JobStatus Updates (#998)
Browse files Browse the repository at this point in the history
  • Loading branch information
keepingitneil authored Nov 5, 2024
1 parent 22f06bf commit 3b58ee5
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 49 deletions.
31 changes: 31 additions & 0 deletions livekit-agents/livekit/agents/ipc/job_executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from enum import Enum
from typing import Any, Protocol

from ..job import RunningJobInfo
Expand All @@ -18,6 +19,12 @@ def start_arguments(self, value: Any | None) -> None: ...
@property
def running_job(self) -> RunningJobInfo | None: ...

@property
def run_status(self) -> RunStatus: ...

@property
def exception(self) -> Exception | None: ...

async def start(self) -> None: ...

async def join(self) -> None: ...
Expand All @@ -27,3 +34,27 @@ async def initialize(self) -> None: ...
async def aclose(self) -> None: ...

async def launch_job(self, info: RunningJobInfo) -> None: ...


class RunStatus(Enum):
STARTING = "STARTING"
WAITING_FOR_JOB = "WAITING_FOR_JOB"
RUNNING_JOB = "RUNNING_JOB"
FINISHED_FAILED = "FINISHED_FAILED"
FINISHED_CLEAN = "FINISHED_CLEAN"


class JobExecutorError(Exception):
pass


class JobExecutorError_ShutdownTimeout(JobExecutorError):
pass


class JobExecutorError_Unresponsive(JobExecutorError):
pass


class JobExecutorError_Runtime(JobExecutorError):
pass
34 changes: 34 additions & 0 deletions livekit-agents/livekit/agents/ipc/proc_job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
from ..log import logger
from ..utils.aio import duplex_unix
from . import channel, job_main, proc_lazy_main, proto
from .job_executor import (
JobExecutorError_Runtime,
JobExecutorError_ShutdownTimeout,
JobExecutorError_Unresponsive,
RunStatus,
)


class LogQueueListener:
Expand Down Expand Up @@ -93,6 +99,7 @@ def __init__(
self._running_job: RunningJobInfo | None = None
self._exitcode: int | None = None
self._pid: int | None = None
self._exception: Exception | None = None

self._main_atask: asyncio.Task[None] | None = None
self._closing = False
Expand Down Expand Up @@ -129,6 +136,29 @@ def start_arguments(self, value: Any | None) -> None:
def running_job(self) -> RunningJobInfo | None:
return self._running_job

@property
def exception(self) -> Exception | None:
return self._exception

@property
def run_status(self) -> RunStatus:
if not self._running_job:
if self.started:
return RunStatus.WAITING_FOR_JOB
else:
return RunStatus.STARTING

if not self._main_atask:
return RunStatus.STARTING

if self._main_atask.done():
if self.exception:
return RunStatus.FINISHED_FAILED
else:
return RunStatus.FINISHED_CLEAN
else:
return RunStatus.RUNNING_JOB

async def start(self) -> None:
"""start the job process"""
if self.started:
Expand Down Expand Up @@ -222,6 +252,7 @@ async def initialize(self) -> None:
self._send_kill_signal()
raise
except Exception as e: # should be channel.ChannelClosed most of the time
self._exception = JobExecutorError_Runtime()
self._initialize_fut.set_exception(e)
raise
else:
Expand All @@ -245,6 +276,7 @@ async def aclose(self) -> None:
logger.error(
"process did not exit in time, killing job", extra=self.logging_extra()
)
self._exception = JobExecutorError_ShutdownTimeout()
self._send_kill_signal()

async with self._lock:
Expand Down Expand Up @@ -312,6 +344,7 @@ async def _main_task(self) -> None:
await self._pch.aclose()

if self._exitcode != 0 and not self._kill_sent:
self._exception = JobExecutorError_Runtime()
logger.error(
f"job process exited with non-zero exit code {self.exitcode}",
extra=self.logging_extra(),
Expand Down Expand Up @@ -358,6 +391,7 @@ async def _send_ping_co():
async def _pong_timeout_co():
await pong_timeout
logger.error("job is unresponsive, killing job", extra=self.logging_extra())
self._exception = JobExecutorError_Unresponsive()
self._send_kill_signal()

tasks = [
Expand Down
7 changes: 6 additions & 1 deletion livekit-agents/livekit/agents/ipc/proc_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
from .job_executor import JobExecutor

EventTypes = Literal[
"process_created", "process_started", "process_ready", "process_closed"
"process_created",
"process_started",
"process_ready",
"process_closed",
"process_job_launched",
]

MAX_CONCURRENT_INITIALIZATIONS = 1
Expand Down Expand Up @@ -85,6 +89,7 @@ async def launch_job(self, info: RunningJobInfo) -> None:
self._proc_needed_sem.release() # notify that a new process can be warmed/started

await proc.launch_job(info)
self.emit("process_job_launched", proc)

@utils.log_exceptions(logger=logger)
async def _proc_watch_task(self) -> None:
Expand Down
31 changes: 31 additions & 0 deletions livekit-agents/livekit/agents/ipc/thread_job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
from ..log import logger
from ..utils.aio import duplex_unix
from . import channel, job_main, proto
from .job_executor import (
JobExecutorError_ShutdownTimeout,
JobExecutorError_Unresponsive,
RunStatus,
)


@dataclass
Expand Down Expand Up @@ -42,6 +47,7 @@ def __init__(

self._user_args: Any | None = None
self._running_job: RunningJobInfo | None = None
self._exception: Exception | None = None

self._main_atask: asyncio.Task[None] | None = None
self._closing = False
Expand All @@ -65,6 +71,29 @@ def start_arguments(self, value: Any | None) -> None:
def running_job(self) -> RunningJobInfo | None:
return self._running_job

@property
def exception(self) -> Exception | None:
return self._exception

@property
def run_status(self) -> RunStatus:
if not self._running_job:
if self.started:
return RunStatus.WAITING_FOR_JOB
else:
return RunStatus.STARTING

if not self._main_atask:
return RunStatus.STARTING

if self._main_atask.done():
if self.exception:
return RunStatus.FINISHED_FAILED
else:
return RunStatus.FINISHED_CLEAN
else:
return RunStatus.RUNNING_JOB

async def start(self) -> None:
if self.started:
raise RuntimeError("runner already started")
Expand Down Expand Up @@ -158,6 +187,7 @@ async def aclose(self) -> None:
asyncio.shield(self._main_atask), timeout=self._opts.close_timeout
)
except asyncio.TimeoutError:
self._exception = JobExecutorError_ShutdownTimeout()
logger.error(
"job shutdown is taking too much time..", extra=self.logging_extra()
)
Expand Down Expand Up @@ -235,6 +265,7 @@ async def _send_ping_co():

async def _pong_timeout_co():
await pong_timeout
self._exception = JobExecutorError_Unresponsive()
logger.error("job is unresponsive..", extra=self.logging_extra())

tasks = [
Expand Down
137 changes: 89 additions & 48 deletions livekit-agents/livekit/agents/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ def __init__(
initialize_timeout=opts.initialize_process_timeout,
close_timeout=opts.shutdown_process_timeout,
)
self._proc_pool.on("process_started", self._on_process_started)
self._proc_pool.on("process_closed", self._on_process_closed)
self._proc_pool.on("process_job_launched", self._on_process_job_launched)

self._previous_status = agent.WorkerStatus.WS_AVAILABLE

self._api: api.LiveKitAPI | None = None
self._http_session: aiohttp.ClientSession | None = None
Expand Down Expand Up @@ -307,12 +312,7 @@ async def drain(self, timeout: int | None = None) -> None:

logger.info("draining worker", extra={"id": self.id, "timeout": timeout})
self._draining = True

# exit the queue
update_worker = agent.WorkerMessage(
update_worker=agent.UpdateWorkerStatus(status=agent.WorkerStatus.WS_FULL)
)
await self._queue_msg(update_worker)
await self._update_worker_status()

async def _join_jobs():
for proc in self._proc_pool.processes:
Expand Down Expand Up @@ -469,50 +469,9 @@ async def _run_ws(self, ws: aiohttp.ClientWebSocketResponse):
async def _load_task():
"""periodically check load and update worker status"""
interval = utils.aio.interval(UPDATE_LOAD_INTERVAL)
current_status = agent.WorkerStatus.WS_AVAILABLE
while True:
await interval.tick()

old_status = current_status
current_load = await asyncio.get_event_loop().run_in_executor(
None, self._opts.load_fnc
)

is_full = current_load >= _WorkerEnvOption.getvalue(
self._opts.load_threshold, self._devmode
)
currently_available = not is_full and not self._draining

current_status = (
agent.WorkerStatus.WS_AVAILABLE
if currently_available
else agent.WorkerStatus.WS_FULL
)

update = agent.UpdateWorkerStatus(
load=current_load, status=current_status
)

# only log if status has changed
if old_status != current_status and not self._draining:
extra = {
"load": current_load,
"threshold": self._opts.load_threshold,
}
if is_full:
logger.info(
"worker is at full capacity, marking as unavailable",
extra=extra,
)
else:
logger.info(
"worker is below capacity, marking as available",
extra=extra,
)

msg = agent.WorkerMessage(update_worker=update)
with contextlib.suppress(utils.aio.ChanClosed):
await self._queue_msg(msg)
await self._update_worker_status()

async def _send_task():
nonlocal closing_ws
Expand Down Expand Up @@ -713,3 +672,85 @@ async def _handle_termination(self, msg: agent.JobTermination):
# safe to ignore
return
await proc.aclose()

def _on_process_closed(self, proc: ipc.job_executor.JobExecutor) -> None:
self._update_job_status_sync(proc)

def _on_process_started(self, proc: ipc.job_executor.JobExecutor) -> None:
self._update_job_status_sync(proc)

def _on_process_job_launched(self, proc: ipc.job_executor.JobExecutor) -> None:
self._update_job_status_sync(proc)

async def _update_worker_status(self):
if self._draining:
update = agent.UpdateWorkerStatus(status=agent.WorkerStatus.WS_FULL)
msg = agent.WorkerMessage(update_worker=update)
await self._queue_msg(msg)
return

current_load = await asyncio.get_event_loop().run_in_executor(
None, self._opts.load_fnc
)

is_full = current_load >= _WorkerEnvOption.getvalue(
self._opts.load_threshold, self._devmode
)
currently_available = not is_full and not self._draining

status = (
agent.WorkerStatus.WS_AVAILABLE
if currently_available
else agent.WorkerStatus.WS_FULL
)

update = agent.UpdateWorkerStatus(load=current_load, status=status)

# only log if status has changed
if self._previous_status != status and not self._draining:
self._previous_status = status
extra = {
"load": current_load,
"threshold": self._opts.load_threshold,
}
if is_full:
logger.info(
"worker is at full capacity, marking as unavailable",
extra=extra,
)
else:
logger.info(
"worker is below capacity, marking as available",
extra=extra,
)

msg = agent.WorkerMessage(update_worker=update)
with contextlib.suppress(utils.aio.ChanClosed):
await self._queue_msg(msg)

def _update_job_status_sync(self, proc: ipc.job_executor.JobExecutor) -> None:
t = self._loop.create_task(self._update_job_status(proc))
self._tasks.add(t)
t.add_done_callback(self._tasks.discard)

async def _update_job_status(self, proc: ipc.job_executor.JobExecutor) -> None:
job_info = proc.running_job
if not job_info:
logger.error("job_info not found for process")
return
status: agent.JobStatus = agent.JobStatus.JS_RUNNING
if proc.run_status == ipc.job_executor.RunStatus.FINISHED_FAILED:
status = agent.JobStatus.JS_FAILED
elif proc.run_status == ipc.job_executor.RunStatus.FINISHED_CLEAN:
status = agent.JobStatus.JS_SUCCESS
elif proc.run_status == ipc.job_executor.RunStatus.STARTING:
status = agent.JobStatus.JS_PENDING

error: str | None = None
if proc.exception:
error = str(proc.exception)
update = agent.UpdateJobStatus(
job_id=job_info.job.id, status=status, error=error
)
msg = agent.WorkerMessage(update_job=update)
await self._queue_msg(msg)

0 comments on commit 3b58ee5

Please sign in to comment.