Skip to content

Commit

Permalink
WIP: NotifyQueue
Browse files Browse the repository at this point in the history
On Willow notifications run in a task. Starting such a task when there
is already a task running causes undefined behaviour. We therefore want
to avoid this situation.

At the same time, we don't want a user to miss an important
notification, so we cannot simply drop a notification if there is
already an active notification.

Enter NotifyQueue. Instead of passing a notification received on the API
to Willow clients directly, we add them in a queue. This queue is a
dict, with key being the MAC address of the Willow client. We cannot use
the WebSocket as key, like we do in ConnMgr, as this changes every time
Willow reconnects. And since we're on 2.4GHz Wi-Fi, reconnections will
occur.

NotifyQueue creates an asyncio task that loops forever, with a 1s delay
between iterations. If a notification is queued for a client, there is
no active notification, and the notification ID (UNIX timestamp in ms)
is not in the future, the notification will be sent, and the client will
be marked having an active notification. When the notification is done,
Willow sends a notify_done to WAS, and WAS cleans up the notification
from the queue.

One problem with this approach is that if Willow is restarted or crashes
during a notification, the notification will never be cleaned up in WAS.

Another problem is that when a long running notification is active,
important notifications like a timer or an alarm will not be sent. A
solution for this could be to have notification priorities:
if a notification is active, and a notification with a higher priority
arrives, we cancel the running notification and start the new one.

One more problem is that we don't have persistent state, so restarting
WAS will result in the queued notifications being lost.

We want this to be reliable, as people might use this for an alarm
clock. Missing an alarm might result in someone getting in trouble at
work. Missing a timer while cooking might result in having to throw away
food.
  • Loading branch information
stintel committed Nov 1, 2023
1 parent 166e3a5 commit 7811b23
Showing 1 changed file with 109 additions and 17 deletions.
126 changes: 109 additions & 17 deletions api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from websockets.exceptions import ConnectionClosed
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Literal, Optional
from typing import List, Literal, Optional

from command_endpoints.ha_rest import HomeAssistantRestEndpoint
from command_endpoints.ha_ws import HomeAssistantWebSocketEndpoint, HomeAssistantWebSocketEndpointNotSupportedException
Expand Down Expand Up @@ -115,6 +115,7 @@ def __init__(self, ua):
self.platform = "unknown"
self.mac_addr = "unknown"
self.ua = ua
self.notification_active = 0

def set_hostname(self, hostname):
self.hostname = hostname
Expand All @@ -125,6 +126,12 @@ def set_platform(self, platform):
def set_mac_addr(self, mac_addr):
self.mac_addr = mac_addr

def is_notification_active(self):
return self.notification_active != 0

def set_notification_active(self, id):
self.notification_active = id


class ConnMgr:
def __init__(self):
Expand Down Expand Up @@ -153,20 +160,28 @@ def get_client_by_hostname(self, hostname):
if v.hostname == hostname:
return k

async def notify(self, data):
try:
msg = NotifyMsg.model_validate_json(json.dumps(data))
msg = msg.model_dump_json(exclude={'hostname'}, exclude_none=True)
log.info(msg)
if 'hostname' in data:
ws = self.get_client_by_hostname(data['hostname'])
await ws.send_text(msg)
else:
await self.broadcast(msg)
return "Success"
except Exception as e:
log.error(f"Failed to send notify command ({e})")
return "Error"
def get_client_by_ws(self, ws):
return self.connected_clients[ws]

def get_mac_by_hostname(self, hostname):
for k, v in self.connected_clients.items():
if v.hostname == hostname:
return self.connected_clients[k]['mac_addr']

def get_ws_by_mac(self, mac):
for k, v in self.connected_clients.items():
# log.debug(f"get_ws_by_mac: {k} {v.mac_addr}")
if v.mac_addr == mac:
return k

log.debug("get_ws_by_mac: returning None")
return None

def is_notification_active(self, ws):
return self.connected_clients[ws].is_notification_active()

def set_notification_active(self, ws, id):
self.connected_clients[ws].set_notification_active(id)

def update_client(self, ws, key, value):
if key == "hostname":
Expand Down Expand Up @@ -194,6 +209,73 @@ class NotifyMsg(BaseModel):
hostname: Optional[str] = None


class NotifyQueue():
notifications: Dict[str, List[NotifyData]]

def __init__(self):
self.notifications = {}

loop = asyncio.get_event_loop()
self.task = loop.create_task(self.dequeue())


def add(self, msg):
msg = NotifyMsg.model_validate_json(json.dumps(msg))

log.debug(msg)

if msg.hostname is not None:
mac_addr = connmgr.get_mac_by_hostname(msg.hostname)
if mac_addr in self.notifications:
self.notifications[mac_addr].append(msg.data)
else:
self.notifications.update({mac_addr: [msg.data]})

else:
for _, client in connmgr.connected_clients.items():
mac_addr = client.mac_addr
if mac_addr in self.notifications:
self.notifications[mac_addr].append(msg.data)
else:
self.notifications.update({mac_addr: [msg.data]})

def done(self, ws, id):
client = connmgr.get_client_by_ws(ws)
for i, notification in enumerate(self.notifications[client.mac_addr]):
if notification.id == id:
connmgr.set_notification_active(ws, 0)
self.notifications[client.mac_addr].pop(i)
break

async def dequeue(self):
while True:
try:
for mac_addr, notifications in self.notifications.items():
# log.debug(f"dequeueing notifications for {mac_addr}: {notifications} (len={len(notifications)})")
if len(notifications) > 0:
ws = connmgr.get_ws_by_mac(mac_addr)
if ws is None:
continue
if connmgr.is_notification_active(ws):
log.debug(f"{mac_addr} has active notification")
continue

for notification in notifications:
if notification.id > int(time.time() * 1000):
continue

connmgr.set_notification_active(ws, notification.id)
log.debug(f"dequeueing notification for {mac_addr}: {notification}")
msg = NotifyMsg(data=notification)
asyncio.ensure_future(ws.send_text(msg.model_dump_json(exclude={'hostname'}, exclude_none=True)))
# don't send more than one notification at once
break
except Exception as e:
log.debug(f"exception during dequeue: {e}")

await asyncio.sleep(1)


class WakeEvent:
def __init__(self, client, volume):
self.client = client
Expand Down Expand Up @@ -519,6 +601,8 @@ async def startup_event():
app.command_endpoint = None
log.error(f"failed to initialize command endpoint ({e})")

app.notify_queue = NotifyQueue()


@app.get("/", response_class=RedirectResponse)
def api_redirect_admin():
Expand Down Expand Up @@ -697,7 +781,7 @@ async def api_get_release(release: GetRelease = Depends()):
return JSONResponse(content=releases)

class GetStatus(BaseModel):
type: Literal['asyncio_tasks'] = Field (Query(..., description='Status type'))
type: Literal['asyncio_tasks', 'notify_queue'] = Field (Query(..., description='Status type'))


@app.get("/api/status")
Expand All @@ -710,6 +794,11 @@ async def api_get_status(status: GetStatus = Depends()):
for task in tasks:
res.append(f"{task.get_name()}: {task.get_coro()}")

elif status.type == "notify_queue":
for mac, notifications in app.notify_queue.notifications.items():
log.debug(f"{mac}: {notifications}")
res.append(f"{mac}: {notifications}")

return JSONResponse(res)


Expand Down Expand Up @@ -764,7 +853,7 @@ async def api_post_client(request: Request, device: PostClient = Depends()):
json.dump(devices, devices_file)
devices_file.close()
elif device.action == 'notify':
return await connmgr.notify(data)
app.notify_queue.add(data)
else:
# Catch all assuming anything else is a device command
return await device_command(data, device.action)
Expand Down Expand Up @@ -833,6 +922,9 @@ async def websocket_endpoint(
elif "wake_end" in msg:
pass

elif "notify_done" in msg:
app.notify_queue.done(websocket, msg["notify_done"])

elif "cmd" in msg:
if msg["cmd"] == "endpoint":
if app.command_endpoint is not None:
Expand Down

0 comments on commit 7811b23

Please sign in to comment.