From 7811b2362a4bd54b014ce286796a8989568a37a0 Mon Sep 17 00:00:00 2001 From: Stijn Tintel Date: Wed, 1 Nov 2023 20:14:39 +0200 Subject: [PATCH] WIP: NotifyQueue On Willow notifications run in a task. Starting such a task when there is already a task running causes undefined behaviour. We therefore want to avoid this situation. At the same time, we don't want a user to miss an important notification, so we cannot simply drop a notification if there is already an active notification. Enter NotifyQueue. Instead of passing a notification received on the API to Willow clients directly, we add them in a queue. This queue is a dict, with key being the MAC address of the Willow client. We cannot use the WebSocket as key, like we do in ConnMgr, as this changes every time Willow reconnects. And since we're on 2.4GHz Wi-Fi, reconnections will occur. NotifyQueue creates an asyncio task that loops forever, with a 1s delay between iterations. If a notification is queued for a client, there is no active notification, and the notification ID (UNIX timestamp in ms) is not in the future, the notification will be sent, and the client will be marked having an active notification. When the notification is done, Willow sends a notify_done to WAS, and WAS cleans up the notification from the queue. One problem with this approach is that if Willow is restarted or crashes during a notification, the notification will never be cleaned up in WAS. Another problem is that when a long running notification is active, important notifications like a timer or an alarm will not be sent. A solution for this could be to have notification priorities: if a notification is active, and a notification with a higher priority arrives, we cancel the running notification and start the new one. One more problem is that we don't have persistent state, so restarting WAS will result in the queued notifications being lost. We want this to be reliable, as people might use this for an alarm clock. Missing an alarm might result in someone getting in trouble at work. Missing a timer while cooking might result in having to throw away food. --- api.py | 126 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 109 insertions(+), 17 deletions(-) diff --git a/api.py b/api.py index 7615298..196624f 100644 --- a/api.py +++ b/api.py @@ -28,7 +28,7 @@ from websockets.exceptions import ConnectionClosed from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field -from typing import Literal, Optional +from typing import List, Literal, Optional from command_endpoints.ha_rest import HomeAssistantRestEndpoint from command_endpoints.ha_ws import HomeAssistantWebSocketEndpoint, HomeAssistantWebSocketEndpointNotSupportedException @@ -115,6 +115,7 @@ def __init__(self, ua): self.platform = "unknown" self.mac_addr = "unknown" self.ua = ua + self.notification_active = 0 def set_hostname(self, hostname): self.hostname = hostname @@ -125,6 +126,12 @@ def set_platform(self, platform): def set_mac_addr(self, mac_addr): self.mac_addr = mac_addr + def is_notification_active(self): + return self.notification_active != 0 + + def set_notification_active(self, id): + self.notification_active = id + class ConnMgr: def __init__(self): @@ -153,20 +160,28 @@ def get_client_by_hostname(self, hostname): if v.hostname == hostname: return k - async def notify(self, data): - try: - msg = NotifyMsg.model_validate_json(json.dumps(data)) - msg = msg.model_dump_json(exclude={'hostname'}, exclude_none=True) - log.info(msg) - if 'hostname' in data: - ws = self.get_client_by_hostname(data['hostname']) - await ws.send_text(msg) - else: - await self.broadcast(msg) - return "Success" - except Exception as e: - log.error(f"Failed to send notify command ({e})") - return "Error" + def get_client_by_ws(self, ws): + return self.connected_clients[ws] + + def get_mac_by_hostname(self, hostname): + for k, v in self.connected_clients.items(): + if v.hostname == hostname: + return self.connected_clients[k]['mac_addr'] + + def get_ws_by_mac(self, mac): + for k, v in self.connected_clients.items(): + # log.debug(f"get_ws_by_mac: {k} {v.mac_addr}") + if v.mac_addr == mac: + return k + + log.debug("get_ws_by_mac: returning None") + return None + + def is_notification_active(self, ws): + return self.connected_clients[ws].is_notification_active() + + def set_notification_active(self, ws, id): + self.connected_clients[ws].set_notification_active(id) def update_client(self, ws, key, value): if key == "hostname": @@ -194,6 +209,73 @@ class NotifyMsg(BaseModel): hostname: Optional[str] = None +class NotifyQueue(): + notifications: Dict[str, List[NotifyData]] + + def __init__(self): + self.notifications = {} + + loop = asyncio.get_event_loop() + self.task = loop.create_task(self.dequeue()) + + + def add(self, msg): + msg = NotifyMsg.model_validate_json(json.dumps(msg)) + + log.debug(msg) + + if msg.hostname is not None: + mac_addr = connmgr.get_mac_by_hostname(msg.hostname) + if mac_addr in self.notifications: + self.notifications[mac_addr].append(msg.data) + else: + self.notifications.update({mac_addr: [msg.data]}) + + else: + for _, client in connmgr.connected_clients.items(): + mac_addr = client.mac_addr + if mac_addr in self.notifications: + self.notifications[mac_addr].append(msg.data) + else: + self.notifications.update({mac_addr: [msg.data]}) + + def done(self, ws, id): + client = connmgr.get_client_by_ws(ws) + for i, notification in enumerate(self.notifications[client.mac_addr]): + if notification.id == id: + connmgr.set_notification_active(ws, 0) + self.notifications[client.mac_addr].pop(i) + break + + async def dequeue(self): + while True: + try: + for mac_addr, notifications in self.notifications.items(): + # log.debug(f"dequeueing notifications for {mac_addr}: {notifications} (len={len(notifications)})") + if len(notifications) > 0: + ws = connmgr.get_ws_by_mac(mac_addr) + if ws is None: + continue + if connmgr.is_notification_active(ws): + log.debug(f"{mac_addr} has active notification") + continue + + for notification in notifications: + if notification.id > int(time.time() * 1000): + continue + + connmgr.set_notification_active(ws, notification.id) + log.debug(f"dequeueing notification for {mac_addr}: {notification}") + msg = NotifyMsg(data=notification) + asyncio.ensure_future(ws.send_text(msg.model_dump_json(exclude={'hostname'}, exclude_none=True))) + # don't send more than one notification at once + break + except Exception as e: + log.debug(f"exception during dequeue: {e}") + + await asyncio.sleep(1) + + class WakeEvent: def __init__(self, client, volume): self.client = client @@ -519,6 +601,8 @@ async def startup_event(): app.command_endpoint = None log.error(f"failed to initialize command endpoint ({e})") + app.notify_queue = NotifyQueue() + @app.get("/", response_class=RedirectResponse) def api_redirect_admin(): @@ -697,7 +781,7 @@ async def api_get_release(release: GetRelease = Depends()): return JSONResponse(content=releases) class GetStatus(BaseModel): - type: Literal['asyncio_tasks'] = Field (Query(..., description='Status type')) + type: Literal['asyncio_tasks', 'notify_queue'] = Field (Query(..., description='Status type')) @app.get("/api/status") @@ -710,6 +794,11 @@ async def api_get_status(status: GetStatus = Depends()): for task in tasks: res.append(f"{task.get_name()}: {task.get_coro()}") + elif status.type == "notify_queue": + for mac, notifications in app.notify_queue.notifications.items(): + log.debug(f"{mac}: {notifications}") + res.append(f"{mac}: {notifications}") + return JSONResponse(res) @@ -764,7 +853,7 @@ async def api_post_client(request: Request, device: PostClient = Depends()): json.dump(devices, devices_file) devices_file.close() elif device.action == 'notify': - return await connmgr.notify(data) + app.notify_queue.add(data) else: # Catch all assuming anything else is a device command return await device_command(data, device.action) @@ -833,6 +922,9 @@ async def websocket_endpoint( elif "wake_end" in msg: pass + elif "notify_done" in msg: + app.notify_queue.done(websocket, msg["notify_done"]) + elif "cmd" in msg: if msg["cmd"] == "endpoint": if app.command_endpoint is not None: