From 8e3b21ddcc46cde76a901fc3b42bc250e760a6e6 Mon Sep 17 00:00:00 2001 From: Robin Ole Heinemann Date: Tue, 23 May 2023 23:05:00 +0200 Subject: [PATCH] add presence --- backend/transcribee_backend/helpers/sync.py | 67 ++++++++++++++----- backend/transcribee_backend/models/task.py | 14 +++- .../transcribee_backend/routers/document.py | 2 +- .../src/editor/automerge_websocket_editor.ts | 37 +++++++++- frontend/src/editor/transcription_editor.tsx | 3 +- 5 files changed, 103 insertions(+), 20 deletions(-) diff --git a/backend/transcribee_backend/helpers/sync.py b/backend/transcribee_backend/helpers/sync.py index 01a78e19..a1079fad 100644 --- a/backend/transcribee_backend/helpers/sync.py +++ b/backend/transcribee_backend/helpers/sync.py @@ -1,8 +1,10 @@ import asyncio from asyncio import Queue from typing import Callable +from collections import defaultdict +import logging -from fastapi import WebSocket, WebSocketDisconnect +from fastapi import WebSocket from sqlmodel import Session, select from starlette.websockets import WebSocketState from transcribee_backend.helpers.time import now_tz_aware @@ -13,18 +15,16 @@ class DocumentSyncManager: def __init__(self): - self.handlers: dict[str, set[Callable]] = {} + self.handlers: defaultdict[str, set[Callable]] = defaultdict(set) - async def broadcast(self, channel: str, message: bytes): + async def broadcast(self, channel: str, message: bytes | str): for handler in self.handlers[channel]: await handler(channel, message) def subscribe(self, channel: str, handler: Callable): - self.handlers.setdefault(channel, set()) self.handlers[channel].add(handler) - def unsubscribe(self, channel: str, handler): - self.handlers.setdefault(channel, set()) + def unsubscribe(self, channel: str, handler: Callable): self.handlers[channel].remove(handler) @@ -37,21 +37,29 @@ def __init__(self, document: Document, websocket: WebSocket, session: Session): self._ws = websocket self._session = session self._subscribed = set() - self._msg_queue = Queue() + self._msg_queue_sync = Queue() + self._msg_queue_presence = Queue() def subscribe(self, channel: str): self._subscribed.add(channel) sync_manager.subscribe(channel, self.handle_incoming_broadcast) - async def handle_incoming_broadcast(self, channel: str, message: bytes): + async def handle_incoming_broadcast(self, channel: str, message: bytes | str): if channel in self._subscribed: - await self._msg_queue.put(message) + if isinstance(message, bytes): + await self._msg_queue_sync.put(message) + else: + await self._msg_queue_presence.put(message) async def listener(self): while True: - await self.on_message(await self._ws.receive_bytes()) + message = await self._ws.receive() + if "text" in message: + await self.on_presence_message(message["text"]) + else: + await self.on_sync_message(message["bytes"]) - async def broadcast_sender(self): + async def broadcast_sender_sync(self): statement = select(DocumentUpdate).where(DocumentUpdate.document == self._doc) for update in self._session.exec(statement): await self._ws.send_bytes( @@ -59,22 +67,30 @@ async def broadcast_sender(self): ) await self._ws.send_bytes(bytes([SyncMessageType.CHANGE_BACKLOG_COMPLETE])) while True: - msg = await self._msg_queue.get() + msg = await self._msg_queue_sync.get() await self._ws.send_bytes(bytes([SyncMessageType.CHANGE]) + msg) + async def broadcast_sender_presence(self): + while True: + msg = await self._msg_queue_presence.get() + await self._ws.send_text(msg) + async def run(self): await self._ws.accept() self.subscribe(str(self._doc.id)) pending = { asyncio.create_task(self.listener()), - asyncio.create_task(self.broadcast_sender()), + # asyncio.create_task(self.listener_presence()), + asyncio.create_task(self.broadcast_sender_sync()), + asyncio.create_task(self.broadcast_sender_presence()), } while pending: done, pending = await asyncio.wait( pending, return_when=asyncio.FIRST_COMPLETED ) for d in done: - if d.exception() and isinstance(d.exception(), WebSocketDisconnect): + if d.exception():# and isinstance(d.exception(), WebSocketDisconnect): + logging.error(f"exception: {d.exception()!r}") for task in pending: task.cancel() pending = set() @@ -91,7 +107,7 @@ async def on_broadcast(self, channel: str, message: bytes): if channel == str(self._doc.id): await self._ws.send_bytes(message) - async def on_message(self, message: bytes): + async def on_sync_message(self, message: bytes): update = DocumentUpdate(change_bytes=message, document_id=self._doc.id) self._session.add(update) @@ -100,3 +116,24 @@ async def on_message(self, message: bytes): self._session.commit() await sync_manager.broadcast(str(self._doc.id), message) + + async def on_presence_message(self, message: str): + await sync_manager.broadcast(str(self._doc.id), message) + + +# class PresenceManager: +# def __init__(self): +# self.connections: defaultdict[str, set[Callable]] = defaultdict(set) + +# async def broadcast(self, channel: str, message: bytes): +# await asyncio.wait( +# websocket.send_bytes(message) for websocket in self.connections[channel] +# ) + +# def subscribe(self, channel: str, websocket: WebSocket): +# self.connections[channel].add(websocket) + +# def unsubscribe(self, channel: str, websocket: WebSocket): +# self.connections[channel].remove(websocket) + +# presence_manager = PresenceManager() diff --git a/backend/transcribee_backend/models/task.py b/backend/transcribee_backend/models/task.py index 82c9decd..4d4c263c 100644 --- a/backend/transcribee_backend/models/task.py +++ b/backend/transcribee_backend/models/task.py @@ -5,6 +5,7 @@ from sqlmodel import JSON, Column, Field, ForeignKey, Relationship, SQLModel from sqlmodel.sql.sqltypes import GUID from transcribee_proto.api import TaskType +from transcribee_proto.api import Document as ApiDocument from typing_extensions import Self from .document import Document @@ -91,19 +92,30 @@ class TaskResponse(TaskBase): dependencies: List[uuid.UUID] @classmethod - def from_orm(cls, task: Task) -> Self: + def from_orm(cls, task: Task, update = {}) -> Self: return super().from_orm( task, update={ "dependencies": [x.id for x in task.dependencies], + **update }, ) class AssignedTaskResponse(TaskResponse): assigned_worker: WorkerBase + document: ApiDocument last_keepalive: datetime.datetime + @classmethod + def from_orm(cls, task: Task) -> Self: + return super().from_orm( + task, + update={ + "document": task.document.as_api_document(), + }, + ) + # TODO: Better typing, combine with types from proto class SpeakerIdentificationTask(TaskBase): diff --git a/backend/transcribee_backend/routers/document.py b/backend/transcribee_backend/routers/document.py index 73268c30..a21c7bb5 100644 --- a/backend/transcribee_backend/routers/document.py +++ b/backend/transcribee_backend/routers/document.py @@ -229,7 +229,7 @@ def get_document_tasks( @document_router.websocket("/sync/{document_id}/") -async def websocket_endpoint( +async def sync_websocket_endpoint( websocket: WebSocket, document: Document = Depends(ws_get_document_from_url), session: Session = Depends(get_session), diff --git a/frontend/src/editor/automerge_websocket_editor.ts b/frontend/src/editor/automerge_websocket_editor.ts index fd5ad617..bcb4bdd0 100644 --- a/frontend/src/editor/automerge_websocket_editor.ts +++ b/frontend/src/editor/automerge_websocket_editor.ts @@ -12,6 +12,14 @@ enum MessageSyncType { FullDoc = 3, } +const PRESENCE_INTERVAL = 27 * 1000; +const PRESENCE_TIMEOUT = 60 * 1000; + +interface PresenceMessage { + actorID: string, + selection: BaseSelection +} + export function useAutomergeWebsocketEditor( url: string | URL, { onInitialSyncComplete }: { onInitialSyncComplete: () => void }, @@ -26,19 +34,44 @@ export function useAutomergeWebsocketEditor( const wsRef = useRef(null); useEffect(() => { + const actorID = Automerge.getActorId(editor.doc); + let presenceTimer: number | null = null; + + function sendPresence() { + const message: PresenceMessage = { + actorID, + selection: editor.selection + }; + console.log("sending", message); + ws.send(JSON.stringify(message)); + + presenceTimer && window.clearTimeout(presenceTimer); + presenceTimer = window.setTimeout(sendPresence, PRESENCE_INTERVAL); + } + const ws = new ReconnectingWebSocket(url.toString(), [], { debug }); + ws.binaryType = "arraybuffer"; const start = Date.now(); let initialMessages: Uint8Array[] = []; let initialSync = true; + sendPresence(); + + const onChange = editor.onChange; + editor.onChange = (...args) => { + sendPresence() // TODO(robin): debounce + onChange && onChange(...args); + }; + const onMessage = async (event: MessageEvent) => { - const msg_data = new Uint8Array(await event.data.arrayBuffer()); + console.log(event); + const msg_data = new Uint8Array(await event.data); const msg_type = msg_data[0]; const msg = msg_data.slice(1); if (msg_type === MessageSyncType.Change) { // skip own changes // TODO: filter own changes in backend? - if (Automerge.decodeChange(msg).actor == Automerge.getActorId(editor.doc)) return; + if (Automerge.decodeChange(msg).actor == actorID) return; if (initialSync) { initialMessages.push(msg); diff --git a/frontend/src/editor/transcription_editor.tsx b/frontend/src/editor/transcription_editor.tsx index 3bb36640..0a43d83a 100644 --- a/frontend/src/editor/transcription_editor.tsx +++ b/frontend/src/editor/transcription_editor.tsx @@ -1,10 +1,11 @@ -import { Editor } from 'slate'; +import { Editor, NodeEntry } from 'slate'; import { Slate, Editable, RenderElementProps, RenderLeafProps } from 'slate-react'; import { SpeakerDropdown } from './speaker_dropdown'; import { useEvent } from '../utils/use_event'; import { TextClickEvent } from './types'; import { startTimeToClassName } from './player'; import clsx from 'clsx'; +import { useCallback, useEffect } from 'react'; export function formattedTime(sec: number | undefined): string { if (sec === undefined) {