Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add presence #201

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 52 additions & 15 deletions backend/transcribee_backend/helpers/sync.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
from asyncio import Queue
from typing import Callable
from collections import defaultdict
import logging

from fastapi import WebSocket, WebSocketDisconnect
from fastapi import WebSocket
from sqlmodel import Session, select
from starlette.websockets import WebSocketState
from transcribee_backend.helpers.time import now_tz_aware
Expand All @@ -13,18 +15,16 @@

class DocumentSyncManager:
def __init__(self):
self.handlers: dict[str, set[Callable]] = {}
self.handlers: defaultdict[str, set[Callable]] = defaultdict(set)

async def broadcast(self, channel: str, message: bytes):
async def broadcast(self, channel: str, message: bytes | str):
for handler in self.handlers[channel]:
await handler(channel, message)

def subscribe(self, channel: str, handler: Callable):
self.handlers.setdefault(channel, set())
self.handlers[channel].add(handler)

def unsubscribe(self, channel: str, handler):
self.handlers.setdefault(channel, set())
def unsubscribe(self, channel: str, handler: Callable):
self.handlers[channel].remove(handler)


Expand All @@ -37,44 +37,60 @@ def __init__(self, document: Document, websocket: WebSocket, session: Session):
self._ws = websocket
self._session = session
self._subscribed = set()
self._msg_queue = Queue()
self._msg_queue_sync = Queue()
self._msg_queue_presence = Queue()

def subscribe(self, channel: str):
self._subscribed.add(channel)
sync_manager.subscribe(channel, self.handle_incoming_broadcast)

async def handle_incoming_broadcast(self, channel: str, message: bytes):
async def handle_incoming_broadcast(self, channel: str, message: bytes | str):
if channel in self._subscribed:
await self._msg_queue.put(message)
if isinstance(message, bytes):
await self._msg_queue_sync.put(message)
else:
await self._msg_queue_presence.put(message)

async def listener(self):
while True:
await self.on_message(await self._ws.receive_bytes())
message = await self._ws.receive()
if "text" in message:
await self.on_presence_message(message["text"])
else:
await self.on_sync_message(message["bytes"])

async def broadcast_sender(self):
async def broadcast_sender_sync(self):
statement = select(DocumentUpdate).where(DocumentUpdate.document == self._doc)
for update in self._session.exec(statement):
await self._ws.send_bytes(
bytes([SyncMessageType.CHANGE]) + update.change_bytes
)
await self._ws.send_bytes(bytes([SyncMessageType.CHANGE_BACKLOG_COMPLETE]))
while True:
msg = await self._msg_queue.get()
msg = await self._msg_queue_sync.get()
await self._ws.send_bytes(bytes([SyncMessageType.CHANGE]) + msg)

async def broadcast_sender_presence(self):
while True:
msg = await self._msg_queue_presence.get()
await self._ws.send_text(msg)

async def run(self):
await self._ws.accept()
self.subscribe(str(self._doc.id))
pending = {
asyncio.create_task(self.listener()),
asyncio.create_task(self.broadcast_sender()),
# asyncio.create_task(self.listener_presence()),
asyncio.create_task(self.broadcast_sender_sync()),
asyncio.create_task(self.broadcast_sender_presence()),
}
while pending:
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED
)
for d in done:
if d.exception() and isinstance(d.exception(), WebSocketDisconnect):
if d.exception():# and isinstance(d.exception(), WebSocketDisconnect):
logging.error(f"exception: {d.exception()!r}")
for task in pending:
task.cancel()
pending = set()
Expand All @@ -91,7 +107,7 @@ async def on_broadcast(self, channel: str, message: bytes):
if channel == str(self._doc.id):
await self._ws.send_bytes(message)

async def on_message(self, message: bytes):
async def on_sync_message(self, message: bytes):
update = DocumentUpdate(change_bytes=message, document_id=self._doc.id)
self._session.add(update)

Expand All @@ -100,3 +116,24 @@ async def on_message(self, message: bytes):

self._session.commit()
await sync_manager.broadcast(str(self._doc.id), message)

async def on_presence_message(self, message: str):
await sync_manager.broadcast(str(self._doc.id), message)


# class PresenceManager:
# def __init__(self):
# self.connections: defaultdict[str, set[Callable]] = defaultdict(set)

# async def broadcast(self, channel: str, message: bytes):
# await asyncio.wait(
# websocket.send_bytes(message) for websocket in self.connections[channel]
# )

# def subscribe(self, channel: str, websocket: WebSocket):
# self.connections[channel].add(websocket)

# def unsubscribe(self, channel: str, websocket: WebSocket):
# self.connections[channel].remove(websocket)

# presence_manager = PresenceManager()
14 changes: 13 additions & 1 deletion backend/transcribee_backend/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from sqlmodel import JSON, Column, Field, ForeignKey, Relationship, SQLModel
from sqlmodel.sql.sqltypes import GUID
from transcribee_proto.api import TaskType
from transcribee_proto.api import Document as ApiDocument
from typing_extensions import Self

from .document import Document
Expand Down Expand Up @@ -91,19 +92,30 @@ class TaskResponse(TaskBase):
dependencies: List[uuid.UUID]

@classmethod
def from_orm(cls, task: Task) -> Self:
def from_orm(cls, task: Task, update = {}) -> Self:
return super().from_orm(
task,
update={
"dependencies": [x.id for x in task.dependencies],
**update
},
)


class AssignedTaskResponse(TaskResponse):
assigned_worker: WorkerBase
document: ApiDocument
last_keepalive: datetime.datetime

@classmethod
def from_orm(cls, task: Task) -> Self:
return super().from_orm(
task,
update={
"document": task.document.as_api_document(),
},
)


# TODO: Better typing, combine with types from proto
class SpeakerIdentificationTask(TaskBase):
Expand Down
2 changes: 1 addition & 1 deletion backend/transcribee_backend/routers/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def get_document_tasks(


@document_router.websocket("/sync/{document_id}/")
async def websocket_endpoint(
async def sync_websocket_endpoint(
websocket: WebSocket,
document: Document = Depends(ws_get_document_from_url),
session: Session = Depends(get_session),
Expand Down
37 changes: 35 additions & 2 deletions frontend/src/editor/automerge_websocket_editor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ enum MessageSyncType {
FullDoc = 3,
}

const PRESENCE_INTERVAL = 27 * 1000;
const PRESENCE_TIMEOUT = 60 * 1000;

interface PresenceMessage {
actorID: string,
selection: BaseSelection
}

export function useAutomergeWebsocketEditor(
url: string | URL,
{ onInitialSyncComplete }: { onInitialSyncComplete: () => void },
Expand All @@ -26,19 +34,44 @@ export function useAutomergeWebsocketEditor(

const wsRef = useRef<ReconnectingWebSocket | null>(null);
useEffect(() => {
const actorID = Automerge.getActorId(editor.doc);
let presenceTimer: number | null = null;

function sendPresence() {
const message: PresenceMessage = {
actorID,
selection: editor.selection
};
console.log("sending", message);
ws.send(JSON.stringify(message));

presenceTimer && window.clearTimeout(presenceTimer);
presenceTimer = window.setTimeout(sendPresence, PRESENCE_INTERVAL);
}

const ws = new ReconnectingWebSocket(url.toString(), [], { debug });
ws.binaryType = "arraybuffer";
const start = Date.now();
let initialMessages: Uint8Array[] = [];
let initialSync = true;

sendPresence();

const onChange = editor.onChange;
editor.onChange = (...args) => {
sendPresence() // TODO(robin): debounce
onChange && onChange(...args);
};

const onMessage = async (event: MessageEvent) => {
const msg_data = new Uint8Array(await event.data.arrayBuffer());
console.log(event);
const msg_data = new Uint8Array(await event.data);
const msg_type = msg_data[0];
const msg = msg_data.slice(1);
if (msg_type === MessageSyncType.Change) {
// skip own changes
// TODO: filter own changes in backend?
if (Automerge.decodeChange(msg).actor == Automerge.getActorId(editor.doc)) return;
if (Automerge.decodeChange(msg).actor == actorID) return;

if (initialSync) {
initialMessages.push(msg);
Expand Down
3 changes: 2 additions & 1 deletion frontend/src/editor/transcription_editor.tsx
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Editor } from 'slate';
import { Editor, NodeEntry } from 'slate';
import { Slate, Editable, RenderElementProps, RenderLeafProps } from 'slate-react';
import { SpeakerDropdown } from './speaker_dropdown';
import { useEvent } from '../utils/use_event';
import { TextClickEvent } from './types';
import { startTimeToClassName } from './player';
import clsx from 'clsx';
import { useCallback, useEffect } from 'react';

export function formattedTime(sec: number | undefined): string {
if (sec === undefined) {
Expand Down