From 77e20e638a2aef712eed635d501b6d1eb2534469 Mon Sep 17 00:00:00 2001 From: Tim Bradgate Date: Wed, 11 Feb 2026 15:06:48 +0000 Subject: [PATCH 01/14] Add ScriptDraft model and dependencies for collaborative editing Adds the backend foundation for collaborative script editing (Phase 1 Batch 1): - pycrdt dependency for CRDT/Yjs-compatible document sync - ScriptDraft model following CompiledScript file-pointer pattern - draft_script_path setting for draft file storage - ERROR_SCRIPT_DRAFT_ACTIVE constant for 409 conflict responses Co-Authored-By: Claude Opus 4.6 --- server/controllers/api/constants.py | 3 ++ server/digi_server/settings.py | 8 +++++ server/models/script_draft.py | 47 +++++++++++++++++++++++++++++ server/requirements.txt | 3 +- 4 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 server/models/script_draft.py diff --git a/server/controllers/api/constants.py b/server/controllers/api/constants.py index b487758e..97446687 100644 --- a/server/controllers/api/constants.py +++ b/server/controllers/api/constants.py @@ -111,3 +111,6 @@ ERROR_NAME_ALREADY_TAKEN = "Name already taken" ERROR_TAG_NAME_EXISTS = "Tag name already exists (case-insensitive)" +ERROR_SCRIPT_DRAFT_ACTIVE = ( + "409 cannot modify script while collaborative edit in progress" +) diff --git a/server/digi_server/settings.py b/server/digi_server/settings.py index aface3a7..b5a9befa 100644 --- a/server/digi_server/settings.py +++ b/server/digi_server/settings.py @@ -225,6 +225,14 @@ def __init__(self, application: DigiScriptServer, settings_path=None): display_name="Compiled Script Path", help_text="Directory used to store compiled scripts.", ) + self.define( + "draft_script_path", + str, + os.path.join(self._base_path, "draft_scripts"), + False, + display_name="Draft Script Path", + help_text="Directory used to store collaborative editing draft files.", + ) self.define( "mdns_advertising", bool, diff --git a/server/models/script_draft.py b/server/models/script_draft.py new file mode 100644 index 00000000..6a73c99c --- /dev/null +++ b/server/models/script_draft.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import datetime +from functools import partial +from typing import TYPE_CHECKING + +from sqlalchemy import DateTime, ForeignKey, String +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from models.models import db + + +if TYPE_CHECKING: + from models.script import ScriptRevision + from models.user import User + + +class ScriptDraft(db.Model): + """Tracks an active collaborative editing draft for a script revision. + + :param id: Primary key. + :param revision_id: Unique FK to the script revision being edited. + :param data_path: Filesystem path to the serialized Y.Doc (.yjs file). + :param created_at: When the draft was first created. + :param last_modified: When the draft was last checkpointed to disk. + :param last_editor_id: FK to the user who last made an edit. + """ + + __tablename__ = "script_drafts" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + revision_id: Mapped[int] = mapped_column( + ForeignKey("script_revisions.id", ondelete="CASCADE"), unique=True + ) + data_path: Mapped[str | None] = mapped_column(String) + created_at: Mapped[datetime.datetime | None] = mapped_column( + DateTime, default=partial(datetime.datetime.now, tz=datetime.timezone.utc) + ) + last_modified: Mapped[datetime.datetime | None] = mapped_column( + DateTime, + default=partial(datetime.datetime.now, tz=datetime.timezone.utc), + onupdate=partial(datetime.datetime.now, tz=datetime.timezone.utc), + ) + last_editor_id: Mapped[int | None] = mapped_column(ForeignKey("users.id")) + + script_revision: Mapped[ScriptRevision] = relationship(foreign_keys=[revision_id]) + last_editor: Mapped[User] = relationship(foreign_keys=[last_editor_id]) diff --git a/server/requirements.txt b/server/requirements.txt index cb7fec8a..3ba8c350 100644 --- a/server/requirements.txt +++ b/server/requirements.txt @@ -11,4 +11,5 @@ marshmallow<5 pyjwt[crypto]==2.11.0 setuptools==80.10.2 xkcdpass==1.30.0 -zeroconf==0.148.0 \ No newline at end of file +zeroconf==0.148.0 +pycrdt>=0.12.0,<1.0.0 \ No newline at end of file From ce0698833b474d5370f31c76a001368671519477 Mon Sep 17 00:00:00 2001 From: Tim Bradgate Date: Wed, 11 Feb 2026 15:08:53 +0000 Subject: [PATCH 02/14] Add Alembic migration for script_drafts table Chains from fbb1b6bd8707 (CrewAssignment). Creates script_drafts table with unique constraint on revision_id and CASCADE delete from script_revisions. Also fixes FK reference in ScriptDraft model (user table, not users). Co-Authored-By: Claude Opus 4.6 --- .../888565843a87_add_script_draft_table.py | 52 +++++++++++++++++++ server/models/script_draft.py | 2 +- 2 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 server/alembic_config/versions/888565843a87_add_script_draft_table.py diff --git a/server/alembic_config/versions/888565843a87_add_script_draft_table.py b/server/alembic_config/versions/888565843a87_add_script_draft_table.py new file mode 100644 index 00000000..da68c229 --- /dev/null +++ b/server/alembic_config/versions/888565843a87_add_script_draft_table.py @@ -0,0 +1,52 @@ +"""add script draft table + +Revision ID: 888565843a87 +Revises: fbb1b6bd8707 +Create Date: 2026-02-11 15:08:34.017143 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + + +# revision identifiers, used by Alembic. +revision: str = "888565843a87" +down_revision: Union[str, None] = "fbb1b6bd8707" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "script_drafts", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("revision_id", sa.Integer(), nullable=False), + sa.Column("data_path", sa.String(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=True), + sa.Column("last_modified", sa.DateTime(), nullable=True), + sa.Column("last_editor_id", sa.Integer(), nullable=True), + sa.ForeignKeyConstraint( + ["last_editor_id"], + ["user.id"], + name=op.f("fk_script_drafts_last_editor_id_user"), + ), + sa.ForeignKeyConstraint( + ["revision_id"], + ["script_revisions.id"], + name=op.f("fk_script_drafts_revision_id_script_revisions"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_script_drafts")), + sa.UniqueConstraint("revision_id", name=op.f("uq_script_drafts_revision_id")), + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("script_drafts") + # ### end Alembic commands ### diff --git a/server/models/script_draft.py b/server/models/script_draft.py index 6a73c99c..4206eb09 100644 --- a/server/models/script_draft.py +++ b/server/models/script_draft.py @@ -41,7 +41,7 @@ class ScriptDraft(db.Model): default=partial(datetime.datetime.now, tz=datetime.timezone.utc), onupdate=partial(datetime.datetime.now, tz=datetime.timezone.utc), ) - last_editor_id: Mapped[int | None] = mapped_column(ForeignKey("users.id")) + last_editor_id: Mapped[int | None] = mapped_column(ForeignKey("user.id")) script_revision: Mapped[ScriptRevision] = relationship(foreign_keys=[revision_id]) last_editor: Mapped[User] = relationship(foreign_keys=[last_editor_id]) From 67302a76aa6e89b54271b5a8684bfe1da79e5252 Mon Sep 17 00:00:00 2001 From: Tim Bradgate Date: Wed, 11 Feb 2026 15:12:51 +0000 Subject: [PATCH 03/14] Add ScriptLine-to-YDoc conversion and room manager Phase 1 Batch 3 of collaborative editing: - line_to_ydoc.py: Two-phase conversion (main thread DB query + background thread Y.Doc construction) with selectinload for N+1 avoidance - script_room_manager.py: ScriptRoom (Y.Doc + client tracking + save lock) and RoomManager (lazy room creation, periodic checkpointing with atomic writes, idle eviction, stale draft cleanup) Co-Authored-By: Claude Opus 4.6 --- server/utils/script/__init__.py | 0 server/utils/script/line_to_ydoc.py | 182 +++++++++++ server/utils/script_room_manager.py | 471 ++++++++++++++++++++++++++++ 3 files changed, 653 insertions(+) create mode 100644 server/utils/script/__init__.py create mode 100644 server/utils/script/line_to_ydoc.py create mode 100644 server/utils/script_room_manager.py diff --git a/server/utils/script/__init__.py b/server/utils/script/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/server/utils/script/line_to_ydoc.py b/server/utils/script/line_to_ydoc.py new file mode 100644 index 00000000..f9e6fcff --- /dev/null +++ b/server/utils/script/line_to_ydoc.py @@ -0,0 +1,182 @@ +"""Converts ScriptLine models to a pycrdt Y.Doc for collaborative editing. + +Two-phase approach for thread safety: +- Phase A (main thread): DB query + extract to plain Python dicts +- Phase B (background thread): CPU-bound Y.Doc construction from plain data + +SQLAlchemy Sessions must not cross thread boundaries, so Phase A extracts +all necessary data before Phase B runs in an executor. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pycrdt +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from models.script import ScriptLine, ScriptLineRevisionAssociation + + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + +def fetch_script_line_data(session: Session, revision_id: int) -> list[dict]: + """Phase A: Query DB and extract script line data as plain dicts. + + Must run on the main thread where the SQLAlchemy session is valid. + Uses selectinload to eagerly load all associations in a single query + batch, avoiding N+1 queries. + + :param session: Active SQLAlchemy session. + :param revision_id: The script revision to extract. + :returns: List of dicts with line and part data, safe to pass across threads. + """ + assocs = session.scalars( + select(ScriptLineRevisionAssociation) + .where(ScriptLineRevisionAssociation.revision_id == revision_id) + .options( + selectinload(ScriptLineRevisionAssociation.line).selectinload( + ScriptLine.line_parts + ) + ) + ).all() + + return [ + { + "line_id": a.line_id, + "next_line_id": a.next_line_id, + "previous_line_id": a.previous_line_id, + "line": { + "id": a.line.id, + "act_id": a.line.act_id, + "scene_id": a.line.scene_id, + "page": a.line.page, + "line_type": a.line.line_type.value if a.line.line_type else None, + "stage_direction_style_id": a.line.stage_direction_style_id, + "line_parts": sorted( + [ + { + "id": p.id, + "part_index": p.part_index, + "character_id": p.character_id, + "character_group_id": p.character_group_id, + "line_text": p.line_text or "", + } + for p in a.line.line_parts + ], + key=lambda p: p["part_index"] or 0, + ), + }, + } + for a in assocs + ] + + +def build_ydoc(script_data: list[dict], revision_id: int) -> pycrdt.Doc: + """Phase B: Build a Y.Doc from plain script line data. + + CPU-bound — safe to run in a background thread via run_in_executor. + No SQLAlchemy Session or ORM objects are used. + + :param script_data: List of dicts from fetch_script_line_data. + :param revision_id: The revision ID for metadata. + :returns: A pycrdt.Doc representing the full script. + """ + doc = pycrdt.Doc() + + # Initialize top-level shared types + meta = doc.get("meta", type=pycrdt.Map) + pages = doc.get("pages", type=pycrdt.Map) + doc.get("deleted_line_ids", type=pycrdt.Array) + + meta["revision_id"] = revision_id + meta["last_saved_at"] = "" + + # Handle empty script + if not script_data: + return doc + + # Build in-memory index for O(1) linked list traversal + data_by_line_id = {d["line_id"]: d for d in script_data} + + # Find head of linked list (the line with no previous_line_id) + head = None + for d in script_data: + if d["previous_line_id"] is None: + head = d + break + + if head is None: + return doc + + # Walk linked list, grouping lines by page + current = head + current_page = None + current_page_array = None + + while current is not None: + line_data = current["line"] + page = line_data["page"] + + # Create new page array if page changed + page_key = str(page) if page is not None else "0" + if page_key != current_page: + current_page = page_key + current_page_array = pycrdt.Array() + pages[current_page] = current_page_array + + # Create line Y.Map + line_map = pycrdt.Map() + current_page_array.append(line_map) + + line_map["_id"] = str(line_data["id"]) + line_map["act_id"] = ( + line_data["act_id"] if line_data["act_id"] is not None else 0 + ) + line_map["scene_id"] = ( + line_data["scene_id"] if line_data["scene_id"] is not None else 0 + ) + line_map["line_type"] = ( + line_data["line_type"] if line_data["line_type"] is not None else 0 + ) + line_map["stage_direction_style_id"] = ( + line_data["stage_direction_style_id"] + if line_data["stage_direction_style_id"] is not None + else 0 + ) + + # Create line_parts Y.Array + parts_array = pycrdt.Array() + line_map["line_parts"] = parts_array + + for part_data in line_data["line_parts"]: + part_map = pycrdt.Map() + parts_array.append(part_map) + + part_map["_id"] = str(part_data["id"]) + part_map["part_index"] = ( + part_data["part_index"] if part_data["part_index"] is not None else 0 + ) + part_map["character_id"] = ( + part_data["character_id"] + if part_data["character_id"] is not None + else 0 + ) + part_map["character_group_id"] = ( + part_data["character_group_id"] + if part_data["character_group_id"] is not None + else 0 + ) + + # Y.Text for concurrent text editing + text = pycrdt.Text(part_data["line_text"]) + part_map["line_text"] = text + + # Advance to next line in linked list + next_id = current["next_line_id"] + current = data_by_line_id.get(next_id) if next_id is not None else None + + return doc diff --git a/server/utils/script_room_manager.py b/server/utils/script_room_manager.py new file mode 100644 index 00000000..f50524c5 --- /dev/null +++ b/server/utils/script_room_manager.py @@ -0,0 +1,471 @@ +"""Manages collaborative editing rooms for script revisions. + +Each ScriptRoom holds a pycrdt Y.Doc and tracks connected WebSocket clients. +The RoomManager handles room lifecycle: creation, persistence, and eviction. +""" + +from __future__ import annotations + +import asyncio +import base64 +import datetime +import glob +import os +import tempfile +import time +from typing import TYPE_CHECKING + +import pycrdt +from sqlalchemy import select +from tornado.ioloop import IOLoop + +from digi_server.logger import get_logger +from models.script_draft import ScriptDraft +from utils.script.line_to_ydoc import build_ydoc, fetch_script_line_data + + +if TYPE_CHECKING: + from tornado.websocket import WebSocketHandler + + from digi_server.app_server import DigiScriptServer + +# How long a room stays alive with no clients before eviction (seconds) +ROOM_IDLE_TIMEOUT = 300 # 5 minutes + +# How often to checkpoint the Y.Doc to disk (seconds) +CHECKPOINT_INTERVAL = 30 + + +class ScriptRoom: + """A collaborative editing room for a single script revision. + + Holds the in-memory Y.Doc and tracks connected WebSocket clients. + Each client is stored as a tuple of (ws_handler, role) where role + is 'editor' or 'viewer'. + """ + + def __init__(self, revision_id: int, doc: pycrdt.Doc): + self.revision_id = revision_id + self.doc = doc + self.clients: dict[WebSocketHandler, str] = {} # ws -> role + self.save_lock = asyncio.Lock() + self.last_activity = time.monotonic() + self._last_checkpoint = time.monotonic() + self._dirty = False + self._doc_subscription = None + + def start_observing(self): + """Start observing doc changes to track dirty state.""" + self._doc_subscription = self.doc.observe(self._on_doc_update) + + def stop_observing(self): + """Stop observing doc changes.""" + if self._doc_subscription is not None: + del self._doc_subscription + self._doc_subscription = None + + def _on_doc_update(self, event): + """Called when the Y.Doc is modified.""" + self._dirty = True + self.last_activity = time.monotonic() + + def add_client(self, ws: WebSocketHandler, role: str = "editor"): + """Add a WebSocket client to this room. + + :param ws: The WebSocket handler. + :param role: 'editor' or 'viewer'. + """ + self.clients[ws] = role + self.last_activity = time.monotonic() + get_logger().info( + f"Client joined room for revision {self.revision_id} " + f"as {role} ({len(self.clients)} total)" + ) + + def remove_client(self, ws: WebSocketHandler): + """Remove a WebSocket client from this room. + + :param ws: The WebSocket handler to remove. + """ + self.clients.pop(ws, None) + get_logger().info( + f"Client left room for revision {self.revision_id} " + f"({len(self.clients)} remaining)" + ) + + async def broadcast_update( + self, update: bytes, sender: WebSocketHandler | None = None + ): + """Broadcast a Y.Doc update to all clients except the sender. + + :param update: The binary update to broadcast. + :param sender: The client that originated the update (excluded from broadcast). + """ + payload = base64.b64encode(update).decode("ascii") + message = { + "OP": "YJS_UPDATE", + "DATA": { + "payload": payload, + "room_id": f"draft_{self.revision_id}", + }, + } + + for ws in list(self.clients.keys()): + if ws is sender: + continue + try: + await ws.write_message(message) + except Exception: + get_logger().debug( + f"Failed to send update to client in room {self.revision_id}" + ) + + async def broadcast_awareness( + self, data: bytes, sender: WebSocketHandler | None = None + ): + """Broadcast awareness state to all clients except the sender. + + :param data: The binary awareness data to broadcast. + :param sender: The client that originated the update (excluded from broadcast). + """ + payload = base64.b64encode(data).decode("ascii") + message = { + "OP": "YJS_AWARENESS", + "DATA": { + "payload": payload, + "room_id": f"draft_{self.revision_id}", + }, + } + + for ws in list(self.clients.keys()): + if ws is sender: + continue + try: + await ws.write_message(message) + except Exception: + get_logger().debug( + f"Failed to send awareness to client in room {self.revision_id}" + ) + + def apply_update(self, update: bytes): + """Apply a binary update to the Y.Doc. + + :param update: The binary update from a client. + """ + self.doc.apply_update(update) + + def get_sync_state(self) -> bytes: + """Get the full document state for initial sync. + + :returns: The complete Y.Doc state as bytes. + """ + return self.doc.get_update() + + def get_state_vector(self) -> bytes: + """Get the state vector for incremental sync. + + :returns: The state vector as bytes. + """ + return self.doc.get_state() + + def get_update_for(self, state_vector: bytes) -> bytes: + """Get a diff update for a client with the given state vector. + + :param state_vector: The client's state vector. + :returns: The diff update as bytes. + """ + return self.doc.get_update(state_vector) + + @property + def is_empty(self) -> bool: + """Whether the room has no connected clients.""" + return len(self.clients) == 0 + + @property + def needs_checkpoint(self) -> bool: + """Whether the room needs a checkpoint to disk.""" + return self._dirty and ( + time.monotonic() - self._last_checkpoint > CHECKPOINT_INTERVAL + ) + + def mark_checkpointed(self): + """Mark the room as having been checkpointed.""" + self._dirty = False + self._last_checkpoint = time.monotonic() + + +class RoomManager: + """Manages ScriptRoom instances and their lifecycle. + + Handles room creation (lazy), persistence (checkpointing to disk), + and eviction (after idle timeout). + """ + + def __init__(self, application: DigiScriptServer): + self._application = application + self._rooms: dict[int, ScriptRoom] = {} # revision_id -> ScriptRoom + self._eviction_handle = None + + def start(self): + """Start the periodic eviction and checkpoint loop.""" + self._schedule_maintenance() + + def stop(self): + """Stop the maintenance loop and clean up all rooms.""" + if self._eviction_handle is not None: + IOLoop.current().remove_timeout(self._eviction_handle) + self._eviction_handle = None + + for room in self._rooms.values(): + room.stop_observing() + self._rooms.clear() + + def _schedule_maintenance(self): + """Schedule the next maintenance cycle.""" + self._eviction_handle = IOLoop.current().call_later( + CHECKPOINT_INTERVAL, self._run_maintenance + ) + + async def _run_maintenance(self): + """Run periodic maintenance: checkpoint dirty rooms, evict idle ones.""" + try: + evict_ids = [] + + for revision_id, room in list(self._rooms.items()): + # Checkpoint dirty rooms + if room.needs_checkpoint: + await self._checkpoint_room(room) + + # Mark idle empty rooms for eviction + if room.is_empty and ( + time.monotonic() - room.last_activity > ROOM_IDLE_TIMEOUT + ): + evict_ids.append(revision_id) + + # Evict idle rooms + for revision_id in evict_ids: + await self._evict_room(revision_id) + except Exception: + get_logger().exception("Error during room maintenance") + finally: + self._schedule_maintenance() + + async def get_or_create_room(self, revision_id: int) -> ScriptRoom: + """Get an existing room or create a new one for the given revision. + + If a draft file exists on disk, the Y.Doc is loaded from it. + Otherwise, the Y.Doc is built from the ScriptLine models. + + :param revision_id: The script revision ID. + :returns: The ScriptRoom for this revision. + """ + if revision_id in self._rooms: + return self._rooms[revision_id] + + doc = await self._load_or_build_doc(revision_id) + room = ScriptRoom(revision_id, doc) + room.start_observing() + self._rooms[revision_id] = room + + get_logger().info(f"Created room for revision {revision_id}") + return room + + def get_room(self, revision_id: int) -> ScriptRoom | None: + """Get an existing room without creating one. + + :param revision_id: The script revision ID. + :returns: The ScriptRoom if it exists, else None. + """ + return self._rooms.get(revision_id) + + def get_room_for_client(self, ws: WebSocketHandler) -> ScriptRoom | None: + """Find the room that a WebSocket client belongs to. + + :param ws: The WebSocket handler. + :returns: The ScriptRoom if found, else None. + """ + for room in self._rooms.values(): + if ws in room.clients: + return room + return None + + async def _load_or_build_doc(self, revision_id: int) -> pycrdt.Doc: + """Load a Y.Doc from draft file or build from ScriptLine models. + + :param revision_id: The script revision ID. + :returns: A pycrdt.Doc instance. + """ + with self._application.get_db().sessionmaker() as session: + draft: ScriptDraft | None = session.scalar( + select(ScriptDraft).where(ScriptDraft.revision_id == revision_id) + ) + + if draft and draft.data_path and os.path.exists(draft.data_path): + # Load from existing draft file + try: + with open(draft.data_path, "rb") as f: + data = f.read() + doc = pycrdt.Doc() + doc.get("meta", type=pycrdt.Map) + doc.get("pages", type=pycrdt.Map) + doc.get("deleted_line_ids", type=pycrdt.Array) + doc.apply_update(data) + get_logger().info( + f"Loaded draft for revision {revision_id} " + f"from {draft.data_path}" + ) + return doc + except Exception: + get_logger().exception( + f"Failed to load draft file {draft.data_path}, " + f"rebuilding from ScriptLine models" + ) + # Delete stale DB record + session.delete(draft) + session.commit() + elif draft and ( + not draft.data_path or not os.path.exists(draft.data_path or "") + ): + # Draft record exists but file is missing — clean up + get_logger().warning( + f"Draft record for revision {revision_id} has missing file, " + f"removing stale record" + ) + session.delete(draft) + session.commit() + + # Build from ScriptLine models + script_data = fetch_script_line_data(session, revision_id) + + # Phase B: CPU-bound Y.Doc construction in background thread + doc = await IOLoop.current().run_in_executor( + None, build_ydoc, script_data, revision_id + ) + get_logger().info( + f"Built Y.Doc for revision {revision_id} " + f"from {len(script_data)} line associations" + ) + return doc + + async def _checkpoint_room(self, room: ScriptRoom): + """Checkpoint a room's Y.Doc to disk using atomic write. + + Uses the pattern: write to tmp → fsync → rename for crash safety. + + :param room: The ScriptRoom to checkpoint. + """ + draft_path = self._get_draft_path(room.revision_id) + draft_dir = os.path.dirname(draft_path) + + try: + # Get the full doc state + state = room.get_sync_state() + + # Atomic write: tmp → fsync → rename + fd, tmp_path = tempfile.mkstemp(dir=draft_dir, suffix=".yjs.tmp") + try: + os.write(fd, state) + os.fsync(fd) + finally: + os.close(fd) + os.rename(tmp_path, draft_path) + + # Update DB record + with self._application.get_db().sessionmaker() as session: + draft: ScriptDraft | None = session.scalar( + select(ScriptDraft).where( + ScriptDraft.revision_id == room.revision_id + ) + ) + now = datetime.datetime.now(tz=datetime.timezone.utc) + if draft: + draft.data_path = draft_path + draft.last_modified = now + else: + session.add( + ScriptDraft( + revision_id=room.revision_id, + data_path=draft_path, + created_at=now, + last_modified=now, + ) + ) + session.commit() + + room.mark_checkpointed() + get_logger().debug(f"Checkpointed room {room.revision_id} to {draft_path}") + except Exception: + get_logger().exception(f"Failed to checkpoint room {room.revision_id}") + + async def _evict_room(self, revision_id: int): + """Evict an idle room, checkpointing first if dirty. + + :param revision_id: The revision ID of the room to evict. + """ + room = self._rooms.get(revision_id) + if room is None: + return + + # Checkpoint before evicting if there are unsaved changes + if room._dirty: + await self._checkpoint_room(room) + + room.stop_observing() + del self._rooms[revision_id] + get_logger().info(f"Evicted idle room for revision {revision_id}") + + def _get_draft_path(self, revision_id: int) -> str: + """Get the filesystem path for a draft file. + + :param revision_id: The script revision ID. + :returns: The full path to the draft .yjs file. + """ + draft_dir = self._application.digi_settings.settings.get( + "draft_script_path" + ).get_value() + return os.path.join(draft_dir, f"draft_{revision_id}.yjs") + + async def cleanup_stale_drafts(self): + """Clean up stale draft files on startup. + + Removes DB records where the file is missing, and removes files + that have no corresponding DB record. + """ + draft_dir = self._application.digi_settings.settings.get( + "draft_script_path" + ).get_value() + + with self._application.get_db().sessionmaker() as session: + # Remove DB records with missing files + removed_drafts = [] + drafts: list[ScriptDraft] = session.scalars(select(ScriptDraft)).all() + for draft in drafts: + if not draft.data_path or not os.path.exists(draft.data_path): + get_logger().info( + f"Removing draft record for revision {draft.revision_id} " + f"— data file not found at {draft.data_path}" + ) + session.delete(draft) + removed_drafts.append(draft) + if removed_drafts: + session.commit() + get_logger().info(f"Removed {len(removed_drafts)} stale draft records") + + # Remove unreferenced draft files + if os.path.isdir(draft_dir): + for yjs_file in glob.glob(os.path.join(draft_dir, "*.yjs")): + found = any( + d.data_path == yjs_file + for d in drafts + if d not in removed_drafts + ) + if not found: + get_logger().info( + f"Removing unreferenced draft file: {yjs_file}" + ) + try: + os.remove(yjs_file) + except Exception: + get_logger().exception( + f"Failed to remove draft file: {yjs_file}" + ) From 029ab2adde4d1dedc4716381f40a3a9a629462e8 Mon Sep 17 00:00:00 2001 From: Tim Bradgate Date: Wed, 11 Feb 2026 15:15:33 +0000 Subject: [PATCH 04/14] Add WebSocket handlers for Yjs sync and integrate RoomManager Phase 1 Batch 4 of collaborative editing: - WebSocket: JOIN_SCRIPT_ROOM, LEAVE_SCRIPT_ROOM, YJS_SYNC (2-step), YJS_UPDATE, YJS_AWARENESS handlers with base64 binary transport - on_close: Auto-remove client from collaborative editing rooms - App server: Initialize RoomManager on startup, create draft_script_path directory, clean up stale draft records and unreferenced files Co-Authored-By: Claude Opus 4.6 --- server/controllers/ws_controller.py | 130 ++++++++++++++++++++++++++++ server/digi_server/app_server.py | 46 ++++++++++ 2 files changed, 176 insertions(+) diff --git a/server/controllers/ws_controller.py b/server/controllers/ws_controller.py index d202f753..3b7b48cc 100644 --- a/server/controllers/ws_controller.py +++ b/server/controllers/ws_controller.py @@ -1,5 +1,6 @@ from __future__ import annotations +import base64 import datetime import json from typing import TYPE_CHECKING, Any, Awaitable, Dict, Optional, Union @@ -82,6 +83,12 @@ def on_close(self) -> None: if self in self.application.clients: self.application.clients.remove(self) + # Remove from any collaborative editing room + if hasattr(self.application, "room_manager") and self.application.room_manager: + room = self.application.room_manager.get_room_for_client(self) + if room: + room.remove_client(self) + notify_editor_change = False elect_live_leader = False @@ -229,6 +236,17 @@ async def on_message(self, message: Union[str, bytes]): ) return + # Handle collaborative editing operations (Yjs sync protocol) + if ws_op in ( + "JOIN_SCRIPT_ROOM", + "LEAVE_SCRIPT_ROOM", + "YJS_SYNC", + "YJS_UPDATE", + "YJS_AWARENESS", + ): + await self._handle_collab_op(ws_op, message) + return + with self.make_session() as session: entry: Session = session.get(Session, self.__getattribute__("internal_id")) current_show = await self.application.digi_settings.get("current_show") @@ -391,6 +409,118 @@ async def on_message(self, message: Union[str, bytes]): f"WebSocket connection {self.request.remote_ip}" ) + async def _handle_collab_op(self, ws_op: str, message: dict): + """Handle collaborative editing WebSocket operations. + + :param ws_op: The operation code. + :param message: The full parsed message dict. + """ + room_manager = getattr(self.application, "room_manager", None) + if not room_manager: + get_logger().warning("RoomManager not available, ignoring collab OP") + return + + data = message.get("DATA", {}) + + if ws_op == "JOIN_SCRIPT_ROOM": + revision_id = data.get("revision_id") + if not revision_id: + await self.write_message( + {"OP": "COLLAB_ERROR", "DATA": {"error": "revision_id required"}} + ) + return + + role = data.get("role", "editor") + room = await room_manager.get_or_create_room(revision_id) + room.add_client(self, role) + + # Send initial sync: full document state + sync_state = room.get_sync_state() + await self.write_message( + { + "OP": "YJS_SYNC", + "DATA": { + "step": 0, + "payload": base64.b64encode(sync_state).decode("ascii"), + "room_id": f"draft_{revision_id}", + }, + } + ) + + # Notify all clients about the new collaborator + await self.application.ws_send_to_all( + "NOOP", "GET_SCRIPT_CONFIG_STATUS", {} + ) + + elif ws_op == "LEAVE_SCRIPT_ROOM": + room = room_manager.get_room_for_client(self) + if room: + room.remove_client(self) + await self.application.ws_send_to_all( + "NOOP", "GET_SCRIPT_CONFIG_STATUS", {} + ) + + elif ws_op == "YJS_SYNC": + room = room_manager.get_room_for_client(self) + if not room: + return + + payload = data.get("payload", "") + step = data.get("step", 1) + + try: + decoded = base64.b64decode(payload) + except Exception: + get_logger().warning("Invalid base64 in YJS_SYNC message") + return + + if step == 1: + # Client sends its state vector; server responds with diff + diff = room.get_update_for(decoded) + await self.write_message( + { + "OP": "YJS_SYNC", + "DATA": { + "step": 2, + "payload": base64.b64encode(diff).decode("ascii"), + "room_id": f"draft_{room.revision_id}", + }, + } + ) + elif step == 2: + # Client sends its diff; server applies it + room.apply_update(decoded) + await room.broadcast_update(decoded, sender=self) + + elif ws_op == "YJS_UPDATE": + room = room_manager.get_room_for_client(self) + if not room: + return + + payload = data.get("payload", "") + try: + decoded = base64.b64decode(payload) + except Exception: + get_logger().warning("Invalid base64 in YJS_UPDATE message") + return + + room.apply_update(decoded) + await room.broadcast_update(decoded, sender=self) + + elif ws_op == "YJS_AWARENESS": + room = room_manager.get_room_for_client(self) + if not room: + return + + payload = data.get("payload", "") + try: + decoded = base64.b64decode(payload) + except Exception: + get_logger().warning("Invalid base64 in YJS_AWARENESS message") + return + + await room.broadcast_awareness(decoded, sender=self) + def on_pong(self, data: bytes) -> None: self._last_pong = IOLoop.current().time() self.update_session() diff --git a/server/digi_server/app_server.py b/server/digi_server/app_server.py index 8359ac09..d02917c9 100644 --- a/server/digi_server/app_server.py +++ b/server/digi_server/app_server.py @@ -22,6 +22,7 @@ from models import models from models.cue import CueType from models.script import CompiledScript, Script +from models.script_draft import ScriptDraft from models.session import Session, ShowSession from models.settings import SystemSettings from models.show import Show @@ -32,6 +33,7 @@ from utils.exceptions import DatabaseTypeException, DatabaseUpgradeRequired from utils.mdns_service import MDNSAdvertiser from utils.module_discovery import get_resource_path, is_frozen +from utils.script_room_manager import RoomManager from utils.version_checker import VersionChecker from utils.web.jwt_service import JWTService from utils.web.route import Route @@ -60,6 +62,7 @@ def __init__( self._db: DigiSQLAlchemy = models.db self.jwt_service: JWTService = None + self.room_manager: Optional[RoomManager] = None self.mdns_advertiser: Optional[MDNSAdvertiser] = None self.version_checker: Optional[VersionChecker] = None @@ -222,11 +225,54 @@ class AlembicVersion(self._db.Model): f"Failed to remove compiled script file: {ds_file}" ) + # 4.5. Tidy up draft script files (same pattern as compiled scripts) + draft_script_path = self.digi_settings.settings.get( + "draft_script_path" + ).get_value() + os.makedirs(draft_script_path, exist_ok=True) + + removed_drafts = [] + drafts: List[ScriptDraft] = session.scalars(select(ScriptDraft)).all() + for draft in drafts: + if not draft.data_path or not os.path.exists(draft.data_path): + get_logger().info( + f"Removing draft record for revision {draft.revision_id} " + f"as data file not found at {draft.data_path}" + ) + session.delete(draft) + removed_drafts.append(draft) + if removed_drafts: + session.commit() + get_logger().info( + f"Removed {len(removed_drafts)} stale draft records from the database." + ) + + for draft_file in glob.glob(f"{draft_script_path}/*.yjs"): + found = False + for draft in drafts: + if draft in removed_drafts: + continue + if draft.data_path == draft_file: + found = True + break + if not found: + get_logger().info(f"Removing unreferenced draft file: {draft_file}") + try: + os.remove(draft_file) + except Exception: + get_logger().exception( + f"Failed to remove draft file: {draft_file}" + ) + # 5. Clear out all sessions since we are starting the app up get_logger().debug("Emptying out sessions table!") session.execute(delete(Session)) session.commit() + # Initialize the RoomManager for collaborative editing + self.room_manager = RoomManager(self) + self.room_manager.start() + # Get static files path - adjust for PyInstaller if needed if is_frozen(): static_files_path = get_resource_path(os.path.join("static", "assets")) From c211ae708af6c86a1ec8541a607811543b474b6a Mon Sep 17 00:00:00 2001 From: Tim Bradgate Date: Wed, 11 Feb 2026 15:19:51 +0000 Subject: [PATCH 05/14] Add backend tests for collaborative editing (Phase 1) 30 tests covering: - build_ydoc: empty script, single line, linked list traversal, multi-page, unordered input, multiple line parts, null handling - Base64 round-trip: full state and incremental updates - CRDT convergence: concurrent field edits, concurrent text inserts, offline edit and reconnect - ScriptRoom: client add/remove, sync state, apply update, broadcast with sender exclusion, failed write resilience, dirty tracking Co-Authored-By: Claude Opus 4.6 --- server/test/utils/script/__init__.py | 0 server/test/utils/script/test_line_to_ydoc.py | 360 ++++++++++++++++++ server/test/utils/script/test_script_room.py | 189 +++++++++ 3 files changed, 549 insertions(+) create mode 100644 server/test/utils/script/__init__.py create mode 100644 server/test/utils/script/test_line_to_ydoc.py create mode 100644 server/test/utils/script/test_script_room.py diff --git a/server/test/utils/script/__init__.py b/server/test/utils/script/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/server/test/utils/script/test_line_to_ydoc.py b/server/test/utils/script/test_line_to_ydoc.py new file mode 100644 index 00000000..dba0b61c --- /dev/null +++ b/server/test/utils/script/test_line_to_ydoc.py @@ -0,0 +1,360 @@ +"""Tests for ScriptLine → Y.Doc conversion (build_ydoc). + +Tests the Phase B (CPU-bound) conversion that takes plain dict data +and produces a pycrdt Y.Doc. No database needed — uses synthetic data. +""" + +import base64 + +import pycrdt + +from utils.script.line_to_ydoc import build_ydoc + + +def _make_line_data( + line_id, + next_line_id=None, + previous_line_id=None, + page=1, + act_id=1, + scene_id=1, + line_type=1, + stage_direction_style_id=None, + parts=None, +): + """Helper to build a single line data dict matching fetch_script_line_data output.""" + if parts is None: + parts = [ + { + "id": line_id * 10, + "part_index": 0, + "character_id": 1, + "character_group_id": None, + "line_text": f"Line {line_id} text", + } + ] + return { + "line_id": line_id, + "next_line_id": next_line_id, + "previous_line_id": previous_line_id, + "line": { + "id": line_id, + "act_id": act_id, + "scene_id": scene_id, + "page": page, + "line_type": line_type, + "stage_direction_style_id": stage_direction_style_id, + "line_parts": parts, + }, + } + + +class TestBuildYdocEmpty: + def test_empty_script_returns_doc_with_empty_pages(self): + doc = build_ydoc([], revision_id=42) + + meta = doc.get("meta", type=pycrdt.Map) + pages = doc.get("pages", type=pycrdt.Map) + + assert meta["revision_id"] == 42 + assert len(pages) == 0 + + def test_empty_script_has_deleted_line_ids_array(self): + doc = build_ydoc([], revision_id=1) + deleted = doc.get("deleted_line_ids", type=pycrdt.Array) + assert len(deleted) == 0 + + +class TestBuildYdocSingleLine: + def test_single_line_creates_one_page(self): + data = [_make_line_data(line_id=1, page=1)] + doc = build_ydoc(data, revision_id=10) + + pages = doc.get("pages", type=pycrdt.Map) + assert "1" in pages + page_lines = pages["1"] + assert len(page_lines) == 1 + + def test_single_line_has_correct_fields(self): + data = [ + _make_line_data( + line_id=5, + page=2, + act_id=3, + scene_id=4, + line_type=2, + stage_direction_style_id=7, + ) + ] + doc = build_ydoc(data, revision_id=1) + + pages = doc.get("pages", type=pycrdt.Map) + line = pages["2"][0] + + assert line["_id"] == "5" + assert line["act_id"] == 3 + assert line["scene_id"] == 4 + assert line["line_type"] == 2 + assert line["stage_direction_style_id"] == 7 + + +class TestBuildYdocLinkedList: + def test_three_lines_on_same_page_in_order(self): + """Lines are linked: 1 → 2 → 3, all on page 1.""" + data = [ + _make_line_data(line_id=1, next_line_id=2, previous_line_id=None, page=1), + _make_line_data(line_id=2, next_line_id=3, previous_line_id=1, page=1), + _make_line_data(line_id=3, next_line_id=None, previous_line_id=2, page=1), + ] + doc = build_ydoc(data, revision_id=1) + + pages = doc.get("pages", type=pycrdt.Map) + page_lines = pages["1"] + assert len(page_lines) == 3 + assert page_lines[0]["_id"] == "1" + assert page_lines[1]["_id"] == "2" + assert page_lines[2]["_id"] == "3" + + def test_lines_across_two_pages(self): + """Lines span two pages: 1,2 on page 1; 3 on page 2.""" + data = [ + _make_line_data(line_id=1, next_line_id=2, previous_line_id=None, page=1), + _make_line_data(line_id=2, next_line_id=3, previous_line_id=1, page=1), + _make_line_data(line_id=3, next_line_id=None, previous_line_id=2, page=2), + ] + doc = build_ydoc(data, revision_id=1) + + pages = doc.get("pages", type=pycrdt.Map) + assert len(pages["1"]) == 2 + assert len(pages["2"]) == 1 + assert pages["2"][0]["_id"] == "3" + + def test_unordered_input_still_traverses_correctly(self): + """Input order doesn't matter — linked list is traversed by next_line_id.""" + data = [ + _make_line_data(line_id=3, next_line_id=None, previous_line_id=2, page=1), + _make_line_data(line_id=1, next_line_id=2, previous_line_id=None, page=1), + _make_line_data(line_id=2, next_line_id=3, previous_line_id=1, page=1), + ] + doc = build_ydoc(data, revision_id=1) + + pages = doc.get("pages", type=pycrdt.Map) + ids = [pages["1"][i]["_id"] for i in range(3)] + assert ids == ["1", "2", "3"] + + +class TestBuildYdocLineParts: + def test_line_with_multiple_parts(self): + parts = [ + { + "id": 10, + "part_index": 0, + "character_id": 1, + "character_group_id": None, + "line_text": "Hello", + }, + { + "id": 11, + "part_index": 1, + "character_id": 2, + "character_group_id": None, + "line_text": " World", + }, + ] + data = [_make_line_data(line_id=1, page=1, parts=parts)] + doc = build_ydoc(data, revision_id=1) + + pages = doc.get("pages", type=pycrdt.Map) + line_parts = pages["1"][0]["line_parts"] + assert len(line_parts) == 2 + assert str(line_parts[0]["line_text"]) == "Hello" + assert str(line_parts[1]["line_text"]) == " World" + assert line_parts[0]["character_id"] == 1 + assert line_parts[1]["character_id"] == 2 + + def test_line_text_is_ytext_type(self): + data = [_make_line_data(line_id=1, page=1)] + doc = build_ydoc(data, revision_id=1) + + pages = doc.get("pages", type=pycrdt.Map) + line_text = pages["1"][0]["line_parts"][0]["line_text"] + assert isinstance(line_text, pycrdt.Text) + + +class TestBuildYdocNullHandling: + def test_null_page_becomes_page_zero(self): + data = [_make_line_data(line_id=1, page=None)] + doc = build_ydoc(data, revision_id=1) + + pages = doc.get("pages", type=pycrdt.Map) + assert "0" in pages + assert len(pages["0"]) == 1 + + def test_null_act_id_becomes_zero(self): + data = [_make_line_data(line_id=1, act_id=None)] + doc = build_ydoc(data, revision_id=1) + + pages = doc.get("pages", type=pycrdt.Map) + assert pages["1"][0]["act_id"] == 0 + + +class TestBase64RoundTrip: + """Test that Y.Doc updates survive base64 encoding/decoding.""" + + def test_full_state_base64_round_trip(self): + data = [ + _make_line_data(line_id=1, next_line_id=2, previous_line_id=None, page=1), + _make_line_data(line_id=2, next_line_id=None, previous_line_id=1, page=1), + ] + doc = build_ydoc(data, revision_id=99) + + # Encode to base64 (simulating WebSocket transport) + state = doc.get_update() + encoded = base64.b64encode(state).decode("ascii") + + # Decode and apply to fresh doc + decoded = base64.b64decode(encoded) + doc2 = pycrdt.Doc() + doc2.get("meta", type=pycrdt.Map) + doc2.get("pages", type=pycrdt.Map) + doc2.get("deleted_line_ids", type=pycrdt.Array) + doc2.apply_update(decoded) + + pages = doc2.get("pages", type=pycrdt.Map) + assert pages["1"][0]["_id"] == "1" + assert pages["1"][1]["_id"] == "2" + assert doc2.get("meta", type=pycrdt.Map)["revision_id"] == 99 + + def test_incremental_update_base64_round_trip(self): + doc = build_ydoc([], revision_id=1) + meta = doc.get("meta", type=pycrdt.Map) + + # Capture an incremental update + updates = [] + sub = doc.observe(lambda e: updates.append(e.update)) + meta["last_saved_at"] = "2026-01-01T00:00:00Z" + del sub + + assert len(updates) == 1 + encoded = base64.b64encode(updates[0]).decode("ascii") + decoded = base64.b64decode(encoded) + + # Apply to a fresh synced doc + doc2 = pycrdt.Doc() + doc2.get("meta", type=pycrdt.Map) + doc2.apply_update(doc.get_update()) + doc2.apply_update(decoded) + + meta2 = doc2.get("meta", type=pycrdt.Map) + assert meta2["last_saved_at"] == "2026-01-01T00:00:00Z" + + +class TestCrdtConvergence: + """Test that concurrent edits on separate Y.Doc instances converge.""" + + def test_concurrent_text_edits_converge(self): + """Two docs edit different fields on the same line → both changes present.""" + data = [_make_line_data(line_id=1, page=1)] + doc1 = build_ydoc(data, revision_id=1) + + # Sync doc1 → doc2 + doc2 = pycrdt.Doc() + doc2.get("pages", type=pycrdt.Map) + doc2.get("meta", type=pycrdt.Map) + doc2.get("deleted_line_ids", type=pycrdt.Array) + doc2.apply_update(doc1.get_update()) + + # Doc1 changes act_id + doc1.get("pages", type=pycrdt.Map)["1"][0]["act_id"] = 99 + + # Doc2 changes scene_id + doc2.get("pages", type=pycrdt.Map)["1"][0]["scene_id"] = 88 + + # Cross-sync + sv1 = doc1.get_state() + sv2 = doc2.get_state() + doc1.apply_update(doc2.get_update(sv1)) + doc2.apply_update(doc1.get_update(sv2)) + + # Both should have both changes + p1 = doc1.get("pages", type=pycrdt.Map)["1"][0] + p2 = doc2.get("pages", type=pycrdt.Map)["1"][0] + assert p1["act_id"] == 99 + assert p1["scene_id"] == 88 + assert p2["act_id"] == 99 + assert p2["scene_id"] == 88 + + def test_concurrent_text_inserts_converge(self): + """Two docs insert text at the same position → both texts present.""" + data = [ + _make_line_data( + line_id=1, + page=1, + parts=[ + { + "id": 10, + "part_index": 0, + "character_id": 1, + "character_group_id": None, + "line_text": "", + } + ], + ) + ] + doc1 = build_ydoc(data, revision_id=1) + + doc2 = pycrdt.Doc() + doc2.get("pages", type=pycrdt.Map) + doc2.get("meta", type=pycrdt.Map) + doc2.get("deleted_line_ids", type=pycrdt.Array) + doc2.apply_update(doc1.get_update()) + + # Both type at position 0 + text1 = doc1.get("pages", type=pycrdt.Map)["1"][0]["line_parts"][0]["line_text"] + text2 = doc2.get("pages", type=pycrdt.Map)["1"][0]["line_parts"][0]["line_text"] + + text1 += "Hello" + text2 += "World" + + # Cross-sync + sv1 = doc1.get_state() + sv2 = doc2.get_state() + doc1.apply_update(doc2.get_update(sv1)) + doc2.apply_update(doc1.get_update(sv2)) + + # Both should converge to same value (order may vary) + result1 = str( + doc1.get("pages", type=pycrdt.Map)["1"][0]["line_parts"][0]["line_text"] + ) + result2 = str( + doc2.get("pages", type=pycrdt.Map)["1"][0]["line_parts"][0]["line_text"] + ) + assert result1 == result2 + assert "Hello" in result1 + assert "World" in result1 + + def test_offline_edit_and_reconnect(self): + """Doc1 goes offline, both make edits, then sync → convergence.""" + data = [_make_line_data(line_id=1, page=1)] + doc1 = build_ydoc(data, revision_id=1) + + doc2 = pycrdt.Doc() + doc2.get("pages", type=pycrdt.Map) + doc2.get("meta", type=pycrdt.Map) + doc2.get("deleted_line_ids", type=pycrdt.Array) + doc2.apply_update(doc1.get_update()) + + # "Offline" period — both make independent changes + doc1.get("pages", type=pycrdt.Map)["1"][0]["act_id"] = 10 + doc2.get("pages", type=pycrdt.Map)["1"][0]["scene_id"] = 20 + + # "Reconnect" — exchange state + sv1 = doc1.get_state() + sv2 = doc2.get_state() + doc1.apply_update(doc2.get_update(sv1)) + doc2.apply_update(doc1.get_update(sv2)) + + p1 = doc1.get("pages", type=pycrdt.Map)["1"][0] + p2 = doc2.get("pages", type=pycrdt.Map)["1"][0] + assert p1["act_id"] == p2["act_id"] == 10 + assert p1["scene_id"] == p2["scene_id"] == 20 diff --git a/server/test/utils/script/test_script_room.py b/server/test/utils/script/test_script_room.py new file mode 100644 index 00000000..cd77bcbc --- /dev/null +++ b/server/test/utils/script/test_script_room.py @@ -0,0 +1,189 @@ +"""Tests for ScriptRoom class. + +Tests the in-memory room that holds a Y.Doc and tracks connected clients. +Uses mock WebSocket handlers since we don't need a real server. +""" + +from unittest.mock import AsyncMock + +import pycrdt +import pytest + +from utils.script_room_manager import ScriptRoom + + +def _make_mock_ws(): + """Create a mock WebSocket handler.""" + ws = AsyncMock() + ws.write_message = AsyncMock() + return ws + + +def _make_room(revision_id=1): + """Create a ScriptRoom with a simple Y.Doc.""" + doc = pycrdt.Doc() + doc.get("meta", type=pycrdt.Map) + doc.get("pages", type=pycrdt.Map) + doc.get("deleted_line_ids", type=pycrdt.Array) + return ScriptRoom(revision_id, doc) + + +class TestScriptRoomClients: + def test_add_client(self): + room = _make_room() + ws = _make_mock_ws() + + room.add_client(ws, "editor") + + assert ws in room.clients + assert room.clients[ws] == "editor" + assert not room.is_empty + + def test_add_viewer_client(self): + room = _make_room() + ws = _make_mock_ws() + + room.add_client(ws, "viewer") + + assert room.clients[ws] == "viewer" + + def test_remove_client(self): + room = _make_room() + ws = _make_mock_ws() + room.add_client(ws, "editor") + + room.remove_client(ws) + + assert ws not in room.clients + assert room.is_empty + + def test_remove_nonexistent_client_is_noop(self): + room = _make_room() + ws = _make_mock_ws() + + room.remove_client(ws) # Should not raise + + assert room.is_empty + + +class TestScriptRoomSync: + def test_get_sync_state_returns_bytes(self): + room = _make_room() + state = room.get_sync_state() + assert isinstance(state, bytes) + assert len(state) > 0 + + def test_get_state_vector_returns_bytes(self): + room = _make_room() + sv = room.get_state_vector() + assert isinstance(sv, bytes) + + def test_apply_update(self): + room = _make_room() + meta = room.doc.get("meta", type=pycrdt.Map) + meta["revision_id"] = 42 + + # Create a second doc, apply state, modify, get update + doc2 = pycrdt.Doc() + doc2.get("meta", type=pycrdt.Map) + doc2.apply_update(room.get_sync_state()) + doc2.get("meta", type=pycrdt.Map)["revision_id"] = 99 + + # Get the diff and apply + update = doc2.get_update(room.get_state_vector()) + room.apply_update(update) + + assert meta["revision_id"] == 99 + + def test_get_update_for_state_vector(self): + room = _make_room() + room.doc.get("meta", type=pycrdt.Map) + + # Empty state vector should return full state + empty_sv = pycrdt.Doc().get_state() + diff = room.get_update_for(empty_sv) + assert isinstance(diff, bytes) + assert len(diff) > 0 + + +class TestScriptRoomBroadcast: + @pytest.mark.asyncio + async def test_broadcast_update_excludes_sender(self): + room = _make_room() + ws1 = _make_mock_ws() + ws2 = _make_mock_ws() + ws3 = _make_mock_ws() + room.add_client(ws1, "editor") + room.add_client(ws2, "editor") + room.add_client(ws3, "viewer") + + update = b"test_update_data" + await room.broadcast_update(update, sender=ws1) + + ws1.write_message.assert_not_called() + assert ws2.write_message.call_count == 1 + assert ws3.write_message.call_count == 1 + + # Verify message format + sent_msg = ws2.write_message.call_args[0][0] + assert sent_msg["OP"] == "YJS_UPDATE" + assert "payload" in sent_msg["DATA"] + assert "room_id" in sent_msg["DATA"] + + @pytest.mark.asyncio + async def test_broadcast_awareness_excludes_sender(self): + room = _make_room() + ws1 = _make_mock_ws() + ws2 = _make_mock_ws() + room.add_client(ws1, "editor") + room.add_client(ws2, "editor") + + await room.broadcast_awareness(b"awareness_data", sender=ws1) + + ws1.write_message.assert_not_called() + assert ws2.write_message.call_count == 1 + + sent_msg = ws2.write_message.call_args[0][0] + assert sent_msg["OP"] == "YJS_AWARENESS" + + @pytest.mark.asyncio + async def test_broadcast_handles_failed_write(self): + """If write_message fails for one client, others still receive.""" + room = _make_room() + ws1 = _make_mock_ws() + ws2 = _make_mock_ws() + ws3 = _make_mock_ws() + ws2.write_message.side_effect = Exception("Connection closed") + room.add_client(ws1, "editor") + room.add_client(ws2, "editor") + room.add_client(ws3, "editor") + + # Should not raise + await room.broadcast_update(b"data", sender=ws1) + + # ws3 should still receive even though ws2 failed + assert ws3.write_message.call_count == 1 + + +class TestScriptRoomDirtyTracking: + def test_room_starts_clean(self): + room = _make_room() + assert not room._dirty + assert not room.needs_checkpoint + + def test_doc_update_marks_dirty(self): + room = _make_room() + room.start_observing() + + meta = room.doc.get("meta", type=pycrdt.Map) + meta["test"] = "value" + + assert room._dirty + + def test_mark_checkpointed_clears_dirty(self): + room = _make_room() + room._dirty = True + + room.mark_checkpointed() + + assert not room._dirty From 202949b613b9a14ba7812295ae3fe7505287aca1 Mon Sep 17 00:00:00 2001 From: Tim Bradgate Date: Wed, 11 Feb 2026 15:26:16 +0000 Subject: [PATCH 06/14] Add frontend Yjs infrastructure for collaborative editing Phase 2 Steps 2.1-2.4: - yjs and lib0 dependencies - ScriptDocProvider: custom Yjs provider using existing WebSocket connection with base64 binary transport and OP code messages - useYjsBinding: Vue 2.7 reactive bindings for Y.Map, Y.Text, Y.Array - scriptDraft Vuex module: room state, Y.Doc lifecycle, provider management, collaborator tracking - WebSocket message routing: Yjs OPs handled in SOCKET_ONMESSAGE and dispatched to HANDLE_DRAFT_MESSAGE Co-Authored-By: Claude Opus 4.6 --- client/package-lock.json | 52 ++++- client/package.json | 4 +- client/src/main.js | 4 + client/src/store/modules/scriptDraft.js | 207 +++++++++++++++++ client/src/store/modules/websocket.js | 6 + client/src/store/store.js | 2 + client/src/utils/yjs/ScriptDocProvider.js | 267 ++++++++++++++++++++++ client/src/utils/yjs/useYjsBinding.js | 200 ++++++++++++++++ 8 files changed, 740 insertions(+), 2 deletions(-) create mode 100644 client/src/store/modules/scriptDraft.js create mode 100644 client/src/utils/yjs/ScriptDocProvider.js create mode 100644 client/src/utils/yjs/useYjsBinding.js diff --git a/client/package-lock.json b/client/package-lock.json index 9e313305..6c0f6282 100644 --- a/client/package-lock.json +++ b/client/package-lock.json @@ -20,6 +20,7 @@ "dompurify": "3.3.1", "fuse.js": "7.1.0", "jquery": "3.7.1", + "lib0": "^0.2.117", "lodash": "4.17.23", "loglevel": "1.9.2", "marked": "11.2.0", @@ -31,7 +32,8 @@ "vue-toast-notification": "0.6.3", "vuelidate": "0.7.7", "vuex": "3.6.2", - "vuex-persistedstate": "3.2.1" + "vuex-persistedstate": "3.2.1", + "yjs": "^13.6.29" }, "devDependencies": { "@babel/core": "7.29.0", @@ -5245,6 +5247,16 @@ "dev": true, "license": "ISC" }, + "node_modules/isomorphic.js": { + "version": "0.2.5", + "resolved": "https://registry.npmjs.org/isomorphic.js/-/isomorphic.js-0.2.5.tgz", + "integrity": "sha512-PIeMbHqMt4DnUP3MA/Flc0HElYjMXArsw1qwJZcm9sqR8mq3l8NYizFMty0pWwE/tzIGH3EKK5+jes5mAr85yw==", + "license": "MIT", + "funding": { + "type": "GitHub Sponsors ❤", + "url": "https://github.com/sponsors/dmonad" + } + }, "node_modules/jackspeak": { "version": "3.4.3", "resolved": "https://registry.npmjs.org/jackspeak/-/jackspeak-3.4.3.tgz", @@ -5440,6 +5452,27 @@ "node": ">= 0.8.0" } }, + "node_modules/lib0": { + "version": "0.2.117", + "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.117.tgz", + "integrity": "sha512-DeXj9X5xDCjgKLU/7RR+/HQEVzuuEUiwldwOGsHK/sfAfELGWEyTcf0x+uOvCvK3O2zPmZePXWL85vtia6GyZw==", + "license": "MIT", + "dependencies": { + "isomorphic.js": "^0.2.4" + }, + "bin": { + "0ecdsa-generate-keypair": "bin/0ecdsa-generate-keypair.js", + "0gentesthtml": "bin/gentesthtml.js", + "0serve": "bin/0serve.js" + }, + "engines": { + "node": ">=16" + }, + "funding": { + "type": "GitHub Sponsors ❤", + "url": "https://github.com/sponsors/dmonad" + } + }, "node_modules/locate-path": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/locate-path/-/locate-path-6.0.0.tgz", @@ -15037,6 +15070,23 @@ "dev": true, "license": "ISC" }, + "node_modules/yjs": { + "version": "13.6.29", + "resolved": "https://registry.npmjs.org/yjs/-/yjs-13.6.29.tgz", + "integrity": "sha512-kHqDPdltoXH+X4w1lVmMtddE3Oeqq48nM40FD5ojTd8xYhQpzIDcfE2keMSU5bAgRPJBe225WTUdyUgj1DtbiQ==", + "license": "MIT", + "dependencies": { + "lib0": "^0.2.99" + }, + "engines": { + "node": ">=16.0.0", + "npm": ">=8.0.0" + }, + "funding": { + "type": "GitHub Sponsors ❤", + "url": "https://github.com/sponsors/dmonad" + } + }, "node_modules/yocto-queue": { "version": "0.1.0", "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-0.1.0.tgz", diff --git a/client/package.json b/client/package.json index 7557e2e8..131cd133 100644 --- a/client/package.json +++ b/client/package.json @@ -38,6 +38,7 @@ "dompurify": "3.3.1", "fuse.js": "7.1.0", "jquery": "3.7.1", + "lib0": "^0.2.117", "lodash": "4.17.23", "loglevel": "1.9.2", "marked": "11.2.0", @@ -49,7 +50,8 @@ "vue-toast-notification": "0.6.3", "vuelidate": "0.7.7", "vuex": "3.6.2", - "vuex-persistedstate": "3.2.1" + "vuex-persistedstate": "3.2.1", + "yjs": "^13.6.29" }, "devDependencies": { "@babel/core": "7.29.0", diff --git a/client/src/main.js b/client/src/main.js index 9463ed85..286f54da 100644 --- a/client/src/main.js +++ b/client/src/main.js @@ -88,6 +88,10 @@ function initializeWebSocket() { ); } } + // Route collaborative editing messages to the draft provider + if (['YJS_SYNC', 'YJS_UPDATE', 'YJS_AWARENESS', 'COLLAB_ERROR'].includes(msg.OP)) { + this.store.dispatch('HANDLE_DRAFT_MESSAGE', msg); + } return; } } diff --git a/client/src/store/modules/scriptDraft.js b/client/src/store/modules/scriptDraft.js new file mode 100644 index 00000000..f0bd7e6f --- /dev/null +++ b/client/src/store/modules/scriptDraft.js @@ -0,0 +1,207 @@ +/** + * Vuex module for collaborative script editing draft state. + * + * Tracks the connection state to a collaborative editing room, + * the Yjs document and provider instances, and collaborator presence. + */ + +import * as Y from 'yjs'; +import log from 'loglevel'; + +import ScriptDocProvider from '@/utils/yjs/ScriptDocProvider'; + +export default { + state: { + /** @type {number|null} The revision ID of the active room */ + roomId: null, + + /** @type {boolean} Whether we are connected to a collab room */ + isConnected: false, + + /** @type {boolean} Whether the initial sync from the server is complete */ + isSynced: false, + + /** @type {boolean} Whether there are unsaved changes in the draft */ + isDraft: false, + + /** @type {string|null} ISO timestamp of last save */ + lastSavedAt: null, + + /** @type {Array<{userId: number, username: string, role: string}>} */ + collaborators: [], + + /** + * @type {import('yjs').Doc|null} + * The Yjs document instance. Not persisted to localStorage. + */ + ydoc: null, + + /** + * @type {ScriptDocProvider|null} + * The Yjs provider instance. Not persisted to localStorage. + */ + provider: null, + }, + + mutations: { + SET_DRAFT_ROOM(state, { roomId, ydoc, provider }) { + state.roomId = roomId; + state.ydoc = ydoc; + state.provider = provider; + }, + + SET_DRAFT_CONNECTED(state, value) { + state.isConnected = value; + }, + + SET_DRAFT_SYNCED(state, value) { + state.isSynced = value; + }, + + SET_DRAFT_DIRTY(state, value) { + state.isDraft = value; + }, + + SET_DRAFT_LAST_SAVED(state, timestamp) { + state.lastSavedAt = timestamp; + }, + + SET_DRAFT_COLLABORATORS(state, collaborators) { + state.collaborators = collaborators; + }, + + CLEAR_DRAFT_STATE(state) { + state.roomId = null; + state.isConnected = false; + state.isSynced = false; + state.isDraft = false; + state.lastSavedAt = null; + state.collaborators = []; + state.ydoc = null; + state.provider = null; + }, + }, + + actions: { + /** + * Join a collaborative editing room for a script revision. + * Creates a Y.Doc and ScriptDocProvider, connects to the server. + * + * @param {object} context - Vuex action context + * @param {object} params + * @param {number} params.revisionId - Script revision to edit + * @param {string} [params.role='editor'] - 'editor' or 'viewer' + */ + async JOIN_DRAFT_ROOM(context, { revisionId, role = 'editor' }) { + // Leave existing room first + if (context.state.provider) { + await context.dispatch('LEAVE_DRAFT_ROOM'); + } + + const ydoc = new Y.Doc(); + const provider = new ScriptDocProvider(ydoc, revisionId, { role }); + + context.commit('SET_DRAFT_ROOM', { + roomId: revisionId, + ydoc, + provider, + }); + + // Listen for sync completion + const checkSynced = setInterval(() => { + if (provider.synced) { + context.commit('SET_DRAFT_SYNCED', true); + context.commit('SET_DRAFT_CONNECTED', true); + clearInterval(checkSynced); + } + }, 100); + + // Stop checking after 10 seconds (timeout) + setTimeout(() => { + clearInterval(checkSynced); + if (!provider.synced) { + log.error('ScriptDraft: Sync timeout after 10 seconds'); + } + }, 10000); + + provider.connect(); + log.info(`ScriptDraft: Joined room for revision ${revisionId} as ${role}`); + }, + + /** + * Leave the current collaborative editing room. + */ + async LEAVE_DRAFT_ROOM(context) { + const { provider } = context.state; + if (provider) { + provider.destroy(); + } + + context.commit('CLEAR_DRAFT_STATE'); + log.info('ScriptDraft: Left draft room'); + }, + + /** + * Handle an incoming WebSocket message that might be for the draft provider. + * Called from the SOCKET_ONMESSAGE mutation or action. + * + * @param {object} context + * @param {object} message - The WebSocket message + * @returns {boolean} Whether the message was handled + */ + HANDLE_DRAFT_MESSAGE(context, message) { + const { provider } = context.state; + if (!provider) return false; + + const handled = provider.handleMessage(message); + + // Check if sync status changed + if (handled && provider.synced && !context.state.isSynced) { + context.commit('SET_DRAFT_SYNCED', true); + context.commit('SET_DRAFT_CONNECTED', true); + } + + return handled; + }, + }, + + getters: { + /** @returns {boolean} Whether a collaborative editing session is active */ + IS_DRAFT_ACTIVE(state) { + return state.roomId !== null && state.isConnected; + }, + + /** @returns {import('yjs').Doc|null} The Y.Doc instance */ + DRAFT_YDOC(state) { + return state.ydoc; + }, + + /** @returns {import('yjs').Map|null} The Y.Doc pages map */ + DRAFT_PAGES(state) { + if (!state.ydoc) return null; + return state.ydoc.getMap('pages'); + }, + + /** @returns {import('yjs').Map|null} The Y.Doc meta map */ + DRAFT_META(state) { + if (!state.ydoc) return null; + return state.ydoc.getMap('meta'); + }, + + /** @returns {import('yjs').Array|null} The deleted line IDs array */ + DRAFT_DELETED_LINE_IDS(state) { + if (!state.ydoc) return null; + return state.ydoc.getArray('deleted_line_ids'); + }, + + /** @returns {boolean} Whether initial sync is complete */ + IS_DRAFT_SYNCED(state) { + return state.isSynced; + }, + + /** @returns {Array} List of collaborators in the room */ + DRAFT_COLLABORATORS(state) { + return state.collaborators; + }, + }, +}; diff --git a/client/src/store/modules/websocket.js b/client/src/store/modules/websocket.js index 2cb17370..728c807b 100644 --- a/client/src/store/modules/websocket.js +++ b/client/src/store/modules/websocket.js @@ -100,6 +100,12 @@ export default { case 'RELOAD_CLIENT': window.location.reload(); break; + // Collaborative editing Yjs messages — handled by HANDLE_DRAFT_MESSAGE action + case 'YJS_SYNC': + case 'YJS_UPDATE': + case 'YJS_AWARENESS': + case 'COLLAB_ERROR': + break; default: log.error(`Unknown OP received from websocket: ${message.OP}`); } diff --git a/client/src/store/store.js b/client/src/store/store.js index 06c40d90..a9224752 100644 --- a/client/src/store/store.js +++ b/client/src/store/store.js @@ -11,6 +11,7 @@ import system from './modules/system'; import show from './modules/show'; import script from './modules/script'; import scriptConfig from './modules/scriptConfig'; +import scriptDraft from './modules/scriptDraft'; import help from './modules/help'; import stage from './modules/stage'; @@ -203,6 +204,7 @@ export default new Vuex.Store({ stage, script, scriptConfig, + scriptDraft, user, help, }, diff --git a/client/src/utils/yjs/ScriptDocProvider.js b/client/src/utils/yjs/ScriptDocProvider.js new file mode 100644 index 00000000..17988e6a --- /dev/null +++ b/client/src/utils/yjs/ScriptDocProvider.js @@ -0,0 +1,267 @@ +/** + * Custom Yjs provider that uses DigiScript's existing WebSocket connection. + * + * Instead of opening a separate WebSocket (like y-websocket would), + * this provider sends Yjs sync messages via the existing managed + * connection using custom OP codes. + * + * Message flow: + * JOIN_SCRIPT_ROOM → server creates/loads room → YJS_SYNC step 0 (full state) + * YJS_UPDATE ←→ incremental document updates + * YJS_AWARENESS ←→ presence/cursor state + * LEAVE_SCRIPT_ROOM → server removes client from room + */ + +import Vue from 'vue'; +import * as Y from 'yjs'; +import log from 'loglevel'; + +/** + * Encode a Uint8Array to base64 string for JSON transport. + * @param {Uint8Array} uint8Array + * @returns {string} + */ +function encodeBase64(uint8Array) { + let binary = ''; + for (let i = 0; i < uint8Array.length; i++) { + binary += String.fromCharCode(uint8Array[i]); + } + return btoa(binary); +} + +/** + * Decode a base64 string to Uint8Array. + * @param {string} base64 + * @returns {Uint8Array} + */ +function decodeBase64(base64) { + const binary = atob(base64); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < binary.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + return bytes; +} + +export default class ScriptDocProvider { + /** + * @param {Y.Doc} doc - The Yjs document to sync + * @param {number} revisionId - The script revision ID for the room + * @param {object} options + * @param {string} [options.role='editor'] - 'editor' or 'viewer' + */ + constructor(doc, revisionId, options = {}) { + this.doc = doc; + this.revisionId = revisionId; + this.roomId = `draft_${revisionId}`; + this.role = options.role || 'editor'; + + this._connected = false; + this._synced = false; + this._destroyed = false; + this._updateHandler = null; + + // Bind the update handler + this._onDocUpdate = this._onDocUpdate.bind(this); + } + + /** + * Get the WebSocket instance. + * @returns {WebSocket|null} + */ + get _socket() { + return Vue.prototype.$socket || null; + } + + /** + * Connect to the collaborative editing room. + * Sends JOIN_SCRIPT_ROOM and starts listening for updates. + */ + connect() { + if (this._destroyed) return; + + const socket = this._socket; + if (!socket || socket.readyState !== WebSocket.OPEN) { + log.warn('ScriptDocProvider: WebSocket not ready, deferring connect'); + return; + } + + // Join the room + socket.sendObj({ + OP: 'JOIN_SCRIPT_ROOM', + DATA: { + revision_id: this.revisionId, + role: this.role, + }, + }); + + // Listen for local doc changes to broadcast + this.doc.on('update', this._onDocUpdate); + + this._connected = true; + log.info(`ScriptDocProvider: Joining room ${this.roomId} as ${this.role}`); + } + + /** + * Disconnect from the collaborative editing room. + */ + disconnect() { + if (!this._connected) return; + + const socket = this._socket; + if (socket && socket.readyState === WebSocket.OPEN) { + socket.sendObj({ + OP: 'LEAVE_SCRIPT_ROOM', + DATA: { room_id: this.roomId }, + }); + } + + this.doc.off('update', this._onDocUpdate); + this._connected = false; + this._synced = false; + log.info(`ScriptDocProvider: Left room ${this.roomId}`); + } + + /** + * Permanently destroy this provider. Cannot be reconnected after. + */ + destroy() { + this.disconnect(); + this._destroyed = true; + } + + /** + * Handle an incoming WebSocket message from the server. + * Should be called from the Vuex SOCKET_ONMESSAGE handler. + * + * @param {object} message - The parsed WebSocket message + * @returns {boolean} true if this message was handled + */ + handleMessage(message) { + if (!this._connected && message.OP !== 'YJS_SYNC') return false; + + const data = message.DATA || {}; + if (data.room_id && data.room_id !== this.roomId) return false; + + switch (message.OP) { + case 'YJS_SYNC': + return this._handleSync(data); + case 'YJS_UPDATE': + return this._handleUpdate(data); + case 'YJS_AWARENESS': + return this._handleAwareness(data); + default: + return false; + } + } + + /** + * Handle YJS_SYNC messages from the server. + * @param {object} data + * @returns {boolean} + */ + _handleSync(data) { + const payload = data.payload; + if (!payload) return false; + + try { + const decoded = decodeBase64(payload); + + if (data.step === 0) { + // Initial full state from server + Y.applyUpdate(this.doc, decoded, 'server'); + this._synced = true; + log.info(`ScriptDocProvider: Synced with room ${this.roomId}`); + + // Send our state vector so server knows what we have + const stateVector = Y.encodeStateVector(this.doc); + this._sendToServer('YJS_SYNC', { + step: 1, + payload: encodeBase64(stateVector), + room_id: this.roomId, + }); + } else if (data.step === 2) { + // Server's diff response to our state vector + Y.applyUpdate(this.doc, decoded, 'server'); + } + } catch (e) { + log.error('ScriptDocProvider: Failed to handle sync message', e); + } + + return true; + } + + /** + * Handle YJS_UPDATE messages from the server (other clients' changes). + * @param {object} data + * @returns {boolean} + */ + _handleUpdate(data) { + const payload = data.payload; + if (!payload) return false; + + try { + const decoded = decodeBase64(payload); + Y.applyUpdate(this.doc, decoded, 'server'); + } catch (e) { + log.error('ScriptDocProvider: Failed to apply update', e); + } + + return true; + } + + /** + * Handle YJS_AWARENESS messages from the server. + * @param {object} data + * @returns {boolean} + */ + _handleAwareness(data) { + // Awareness handling will be implemented in Phase 3 + return true; + } + + /** + * Called when the local Y.Doc is updated. + * Broadcasts the update to the server for other clients. + * + * @param {Uint8Array} update + * @param {*} origin - 'server' if from remote, otherwise local + */ + _onDocUpdate(update, origin) { + // Don't echo back updates that came from the server + if (origin === 'server') return; + if (!this._connected) return; + + this._sendToServer('YJS_UPDATE', { + payload: encodeBase64(update), + room_id: this.roomId, + }); + } + + /** + * Send a message to the server via the existing WebSocket. + * @param {string} op - The OP code + * @param {object} data - The DATA payload + */ + _sendToServer(op, data) { + const socket = this._socket; + if (!socket || socket.readyState !== WebSocket.OPEN) { + log.warn('ScriptDocProvider: Cannot send, WebSocket not connected'); + return; + } + + socket.sendObj({ OP: op, DATA: data }); + } + + /** @returns {boolean} Whether the provider is connected to a room */ + get connected() { + return this._connected; + } + + /** @returns {boolean} Whether the initial sync is complete */ + get synced() { + return this._synced; + } +} + +export { encodeBase64, decodeBase64 }; diff --git a/client/src/utils/yjs/useYjsBinding.js b/client/src/utils/yjs/useYjsBinding.js new file mode 100644 index 00000000..a7444535 --- /dev/null +++ b/client/src/utils/yjs/useYjsBinding.js @@ -0,0 +1,200 @@ +/** + * Vue 2.7 ↔ Yjs reactive bindings. + * + * These utilities create reactive Vue objects that stay in sync with + * Yjs shared types (Y.Map, Y.Array, Y.Text). Changes from remote + * clients are reflected in Vue reactivity, and local changes update + * the Yjs types. + * + * Pattern: + * Yjs type → observe → Vue.set() on reactive proxy + * User input → update Yjs type → observe fires → other clients see change + */ + +import Vue from 'vue'; + +/** + * Create a reactive object bound to a Y.Map. + * + * Returns a plain reactive object whose properties mirror the Y.Map. + * Remote changes update the reactive object automatically. + * + * @param {import('yjs').Map} ymap - The Y.Map to bind + * @param {string[]} [keys] - Specific keys to observe (default: all) + * @returns {{ data: object, destroy: Function }} + */ +export function useYMap(ymap, keys = null) { + const data = Vue.observable({}); + + // Initialize from current state + if (keys) { + keys.forEach((key) => { + Vue.set(data, key, ymap.get(key)); + }); + } else { + ymap.forEach((value, key) => { + Vue.set(data, key, _unwrapYjsValue(value)); + }); + } + + // Observe Y.Map changes + const observer = (event) => { + event.changes.keys.forEach((change, key) => { + if (keys && !keys.includes(key)) return; + + if (change.action === 'add' || change.action === 'update') { + Vue.set(data, key, _unwrapYjsValue(ymap.get(key))); + } else if (change.action === 'delete') { + Vue.delete(data, key); + } + }); + }; + + ymap.observe(observer); + + return { + data, + /** + * Set a value on the Y.Map (triggers sync to other clients). + * @param {string} key + * @param {*} value + */ + set(key, value) { + ymap.set(key, value); + }, + /** + * Stop observing the Y.Map. Call on component destroy. + */ + destroy() { + ymap.unobserve(observer); + }, + }; +} + +/** + * Create a reactive string bound to a Y.Text. + * + * Returns a reactive object with a `value` property that mirrors the Y.Text. + * Remote changes update the reactive value automatically. + * + * @param {import('yjs').Text} ytext - The Y.Text to bind + * @returns {{ data: { value: string }, set: Function, destroy: Function }} + */ +export function useYText(ytext) { + const data = Vue.observable({ value: ytext.toString() }); + + const observer = () => { + data.value = ytext.toString(); + }; + + ytext.observe(observer); + + return { + data, + /** + * Replace the entire text content. + * @param {string} newValue + */ + set(newValue) { + const doc = ytext.doc; + if (!doc) return; + + doc.transact(() => { + ytext.delete(0, ytext.length); + if (newValue) { + ytext.insert(0, newValue); + } + }); + }, + destroy() { + ytext.unobserve(observer); + }, + }; +} + +/** + * Create a reactive array bound to a Y.Array. + * + * Returns a reactive array that mirrors the Y.Array contents. + * Each element is unwrapped: Y.Map → plain object, Y.Text → string. + * + * @param {import('yjs').Array} yarray - The Y.Array to bind + * @returns {{ data: Array, destroy: Function }} + */ +export function useYArray(yarray) { + const data = Vue.observable([]); + + // Initialize from current state + _syncArrayData(yarray, data); + + const observer = () => { + _syncArrayData(yarray, data); + }; + + yarray.observe(observer); + + return { + data, + destroy() { + yarray.unobserve(observer); + }, + }; +} + +/** + * Sync Y.Array contents to a reactive array. + * @param {import('yjs').Array} yarray + * @param {Array} target + */ +function _syncArrayData(yarray, target) { + // Clear and rebuild — simpler than diffing for array changes + target.splice(0, target.length); + yarray.forEach((item) => { + target.push(_unwrapYjsValue(item)); + }); +} + +/** + * Unwrap a Yjs shared type to a plain JS value. + * Y.Map → plain object, Y.Text → string, Y.Array → array. + * Primitive values pass through unchanged. + * + * @param {*} value + * @returns {*} + */ +function _unwrapYjsValue(value) { + if (value == null) return value; + + // Check for Y.Text (has toString and insert methods) + if ( + typeof value === 'object' && + typeof value.insert === 'function' && + typeof value.toString === 'function' && + value.doc !== undefined + ) { + return value.toString(); + } + + // Check for Y.Map (has entries method and _map property) + if ( + typeof value === 'object' && + typeof value.entries === 'function' && + typeof value.set === 'function' && + value.doc !== undefined + ) { + const obj = {}; + value.forEach((v, k) => { + obj[k] = _unwrapYjsValue(v); + }); + return obj; + } + + // Check for Y.Array (has toArray method) + if (typeof value === 'object' && typeof value.toArray === 'function' && value.doc !== undefined) { + return value.toArray().map(_unwrapYjsValue); + } + + return value; +} + +export { _unwrapYjsValue }; From fa51291f4df6ffa6ea5aa20eaff52b18792cda8f Mon Sep 17 00:00:00 2001 From: Tim Bradgate Date: Wed, 11 Feb 2026 15:48:47 +0000 Subject: [PATCH 07/14] Integrate Y.Doc into ScriptEditor for collaborative editing (Phase 2.5-2.6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a dual-write bridge between Y.Doc and TMP_SCRIPT Vuex state so existing ScriptLineEditor/ScriptLinePart components continue to work unchanged while collaborative sync happens through the Y.Doc. - Create yjsBridge.js with Y.Doc↔TMP_SCRIPT conversion utilities - Join/leave draft room in ScriptEditor lifecycle hooks - Set up observeDeep on Y.Doc pages for remote change propagation - Use 'local-bridge' transaction origin to prevent observer loops - Wire add/delete/update line operations to Y.Doc - Sync from Y.Doc on page navigation Co-Authored-By: Claude Opus 4.6 --- client/src/utils/yjs/yjsBridge.js | 238 ++++++++++++++++++ .../show/config/script/ScriptEditor.vue | 144 +++++++++++ 2 files changed, 382 insertions(+) create mode 100644 client/src/utils/yjs/yjsBridge.js diff --git a/client/src/utils/yjs/yjsBridge.js b/client/src/utils/yjs/yjsBridge.js new file mode 100644 index 00000000..20bc6a8d --- /dev/null +++ b/client/src/utils/yjs/yjsBridge.js @@ -0,0 +1,238 @@ +/** + * Bridge utilities for converting between Y.Doc and TMP_SCRIPT formats. + * + * The Y.Doc uses a slightly different schema than TMP_SCRIPT: + * - `_id` instead of `id` + * - `parts` instead of `line_parts` + * - `0` as sentinel for null on FK fields + * - Y.Text for line_text instead of plain strings + * + * These utilities handle the conversion in both directions. + */ + +import * as Y from 'yjs'; + +/** + * Convert 0 → null for FK fields stored as 0 in the Y.Doc. + * @param {*} val + * @returns {*} + */ +function zeroToNull(val) { + return val === 0 ? null : val; +} + +/** + * Convert null → 0 for FK fields that need a non-null value in the Y.Doc. + * @param {*} val + * @returns {*} + */ +function nullToZero(val) { + return val == null ? 0 : val; +} + +/** + * Convert a Y.Map line from the Y.Doc to a plain object compatible with TMP_SCRIPT. + * + * @param {import('yjs').Map} lineYMap - A Y.Map representing a script line + * @param {number|string} pageNo - The page number for this line + * @returns {object} A plain line object for TMP_SCRIPT + */ +export function ydocLineToPlain(lineYMap, pageNo) { + const lineId = zeroToNull(lineYMap.get('_id')); + const partsArray = lineYMap.get('parts'); + const lineParts = []; + + if (partsArray) { + for (let i = 0; i < partsArray.length; i++) { + const partYMap = partsArray.get(i); + const lineText = partYMap.get('line_text'); + lineParts.push({ + id: zeroToNull(partYMap.get('_id')), + line_id: lineId, + part_index: partYMap.get('part_index'), + character_id: zeroToNull(partYMap.get('character_id')), + character_group_id: zeroToNull(partYMap.get('character_group_id')), + line_text: lineText ? lineText.toString() : '', + }); + } + } + + return { + id: lineId, + act_id: zeroToNull(lineYMap.get('act_id')), + scene_id: zeroToNull(lineYMap.get('scene_id')), + page: parseInt(pageNo, 10), + line_type: lineYMap.get('line_type'), + line_parts: lineParts, + stage_direction_style_id: zeroToNull(lineYMap.get('stage_direction_style_id')), + }; +} + +/** + * Convert all lines on a Y.Doc page to an array of plain objects for TMP_SCRIPT. + * + * @param {import('yjs').Doc} ydoc - The Y.Doc instance + * @param {number|string} pageNo - The page number to read + * @returns {Array} Array of plain line objects, or empty array if page doesn't exist + */ +export function syncPageFromYDoc(ydoc, pageNo) { + const pages = ydoc.getMap('pages'); + const pageKey = pageNo.toString(); + const pageArray = pages.get(pageKey); + if (!pageArray) return []; + + const lines = []; + for (let i = 0; i < pageArray.length; i++) { + lines.push(ydocLineToPlain(pageArray.get(i), pageNo)); + } + return lines; +} + +/** + * Update a single existing line in the Y.Doc from a TMP_SCRIPT line object. + * Uses 'local-bridge' as the transaction origin so observers can + * distinguish local UI changes from remote changes. + * + * @param {import('yjs').Doc} ydoc - The Y.Doc instance + * @param {number|string} pageNo - The page number + * @param {number} lineIndex - Index of the line within the page array + * @param {object} lineObj - The TMP_SCRIPT line object + */ +export function updateYDocLine(ydoc, pageNo, lineIndex, lineObj) { + const pages = ydoc.getMap('pages'); + const pageKey = pageNo.toString(); + const pageArray = pages.get(pageKey); + if (!pageArray || lineIndex >= pageArray.length) return; + + const lineMap = pageArray.get(lineIndex); + + ydoc.transact(() => { + lineMap.set('act_id', nullToZero(lineObj.act_id)); + lineMap.set('scene_id', nullToZero(lineObj.scene_id)); + lineMap.set('line_type', lineObj.line_type); + lineMap.set('stage_direction_style_id', nullToZero(lineObj.stage_direction_style_id)); + + const partsArray = lineMap.get('parts'); + if (!partsArray) return; + + // Update existing parts + const minLen = Math.min(partsArray.length, lineObj.line_parts.length); + for (let i = 0; i < minLen; i++) { + const part = lineObj.line_parts[i]; + const partMap = partsArray.get(i); + + partMap.set('character_id', nullToZero(part.character_id)); + partMap.set('character_group_id', nullToZero(part.character_group_id)); + partMap.set('part_index', part.part_index ?? i); + + const ytext = partMap.get('line_text'); + if (ytext) { + const currentText = ytext.toString(); + if (currentText !== (part.line_text || '')) { + ytext.delete(0, ytext.length); + if (part.line_text) { + ytext.insert(0, part.line_text); + } + } + } + } + + // Add new parts that don't exist in Y.Doc yet + for (let i = partsArray.length; i < lineObj.line_parts.length; i++) { + const part = lineObj.line_parts[i]; + const newPartMap = new Y.Map(); + partsArray.push([newPartMap]); + + newPartMap.set('_id', 0); + newPartMap.set('character_id', nullToZero(part.character_id)); + newPartMap.set('character_group_id', nullToZero(part.character_group_id)); + newPartMap.set('part_index', part.part_index ?? i); + + const ytext = new Y.Text(); + newPartMap.set('line_text', ytext); + if (part.line_text) { + ytext.insert(0, part.line_text); + } + } + + // Remove extra parts from Y.Doc (if user deleted parts) + while (partsArray.length > lineObj.line_parts.length) { + partsArray.delete(partsArray.length - 1, 1); + } + }, 'local-bridge'); +} + +/** + * Add a new line to a page in the Y.Doc. + * Creates the necessary Y.Map, Y.Array, and Y.Text structures. + * + * @param {import('yjs').Doc} ydoc - The Y.Doc instance + * @param {number|string} pageNo - The page number + * @param {object} lineObj - The TMP_SCRIPT line object to add + * @param {number} [insertAt] - Index to insert at. If omitted, appends to end. + */ +export function addYDocLine(ydoc, pageNo, lineObj, insertAt) { + const pages = ydoc.getMap('pages'); + const pageKey = pageNo.toString(); + let pageArray = pages.get(pageKey); + + ydoc.transact(() => { + // Create page array if it doesn't exist + if (!pageArray) { + pageArray = new Y.Array(); + pages.set(pageKey, pageArray); + } + + const lineMap = new Y.Map(); + if (insertAt !== undefined && insertAt < pageArray.length) { + pageArray.insert(insertAt, [lineMap]); + } else { + pageArray.push([lineMap]); + } + + lineMap.set('_id', nullToZero(lineObj.id)); + lineMap.set('act_id', nullToZero(lineObj.act_id)); + lineMap.set('scene_id', nullToZero(lineObj.scene_id)); + lineMap.set('line_type', lineObj.line_type); + lineMap.set('stage_direction_style_id', nullToZero(lineObj.stage_direction_style_id)); + + const partsArray = new Y.Array(); + lineMap.set('parts', partsArray); + + if (lineObj.line_parts) { + lineObj.line_parts.forEach((part, i) => { + const partMap = new Y.Map(); + partsArray.push([partMap]); + + partMap.set('_id', nullToZero(part.id)); + partMap.set('character_id', nullToZero(part.character_id)); + partMap.set('character_group_id', nullToZero(part.character_group_id)); + partMap.set('part_index', part.part_index ?? i); + + const ytext = new Y.Text(); + partMap.set('line_text', ytext); + if (part.line_text) { + ytext.insert(0, part.line_text); + } + }); + } + }, 'local-bridge'); +} + +/** + * Delete a line from a page in the Y.Doc. + * + * @param {import('yjs').Doc} ydoc - The Y.Doc instance + * @param {number|string} pageNo - The page number + * @param {number} lineIndex - Index of the line to delete + */ +export function deleteYDocLine(ydoc, pageNo, lineIndex) { + const pages = ydoc.getMap('pages'); + const pageKey = pageNo.toString(); + const pageArray = pages.get(pageKey); + if (!pageArray || lineIndex >= pageArray.length) return; + + ydoc.transact(() => { + pageArray.delete(lineIndex, 1); + }, 'local-bridge'); +} diff --git a/client/src/vue_components/show/config/script/ScriptEditor.vue b/client/src/vue_components/show/config/script/ScriptEditor.vue index 9eb7386f..4fafe863 100644 --- a/client/src/vue_components/show/config/script/ScriptEditor.vue +++ b/client/src/vue_components/show/config/script/ScriptEditor.vue @@ -206,6 +206,12 @@ import ScriptLineViewer from '@/vue_components/show/config/script/ScriptLineView import { makeURL, randInt } from '@/js/utils'; import { notNull, notNullAndGreaterThanZero } from '@/js/customValidators'; import { LINE_TYPES } from '@/constants/lineTypes'; +import { + syncPageFromYDoc, + updateYDocLine, + addYDocLine, + deleteYDocLine, +} from '@/utils/yjs/yjsBridge'; export default { name: 'ScriptConfig', @@ -238,6 +244,10 @@ export default { autoSaveInterval: null, isAutoSaving: false, navbarHeight: 0, + /** @type {Function|null} Deep observer cleanup for Y.Doc pages */ + ydocObserverCleanup: null, + /** @type {boolean} Guard flag to prevent observer → TMP_SCRIPT → Y.Doc loops */ + syncingFromYDoc: false, }; }, validations: { @@ -311,6 +321,10 @@ export default { 'STAGE_DIRECTION_STYLE_OVERRIDES', 'USER_SETTINGS', 'IS_SCRIPT_EDITOR', + 'CURRENT_REVISION', + 'IS_DRAFT_ACTIVE', + 'IS_DRAFT_SYNCED', + 'DRAFT_YDOC', ]), }, watch: { @@ -323,6 +337,11 @@ export default { CURRENT_EDITOR() { this.setupAutoSave(); }, + IS_DRAFT_SYNCED(synced) { + if (synced) { + this.setupYDocBridge(); + } + }, }, async beforeMount() { await Promise.all([ @@ -356,6 +375,14 @@ export default { this.currentEditPage = parseInt(storedPage, 10); } await this.goToPageInner(this.currentEditPage); + + // Join collaborative editing room if a revision is active + if (this.CURRENT_REVISION) { + this.JOIN_DRAFT_ROOM({ + revisionId: this.CURRENT_REVISION, + role: this.IS_SCRIPT_EDITOR ? 'editor' : 'viewer', + }); + } }, mounted() { this.loaded = true; @@ -369,6 +396,8 @@ export default { if (this.autoSaveInterval != null) { clearInterval(this.autoSaveInterval); } + this.teardownYDocBridge(); + this.LEAVE_DRAFT_ROOM(); }, methods: { async getMaxScriptPage() { @@ -459,6 +488,7 @@ export default { this.TMP_SCRIPT[this.currentEditPageKey][lineIndex].act_id = prevLine.act_id; this.TMP_SCRIPT[this.currentEditPageKey][lineIndex].scene_id = prevLine.scene_id; } + this.addLineToYDoc(this.currentEditPage, this.TMP_SCRIPT[this.currentEditPageKey][lineIndex]); }, async addStageDirection() { const stageDirectionObject = JSON.parse(JSON.stringify(this.blankLineObj)); @@ -474,6 +504,7 @@ export default { this.TMP_SCRIPT[this.currentEditPageKey][lineIndex].act_id = prevLine.act_id; this.TMP_SCRIPT[this.currentEditPageKey][lineIndex].scene_id = prevLine.scene_id; } + this.addLineToYDoc(this.currentEditPage, this.TMP_SCRIPT[this.currentEditPageKey][lineIndex]); }, async addCueLine() { const cueLineObject = JSON.parse(JSON.stringify(this.blankLineObj)); @@ -490,6 +521,7 @@ export default { this.TMP_SCRIPT[this.currentEditPageKey][lineIndex].act_id = prevLine.act_id; this.TMP_SCRIPT[this.currentEditPageKey][lineIndex].scene_id = prevLine.scene_id; } + this.addLineToYDoc(this.currentEditPage, this.TMP_SCRIPT[this.currentEditPageKey][lineIndex]); }, async addSpacing() { const spacingObject = JSON.parse(JSON.stringify(this.blankLineObj)); @@ -506,6 +538,7 @@ export default { this.TMP_SCRIPT[this.currentEditPageKey][lineIndex].act_id = prevLine.act_id; this.TMP_SCRIPT[this.currentEditPageKey][lineIndex].scene_id = prevLine.scene_id; } + this.addLineToYDoc(this.currentEditPage, this.TMP_SCRIPT[this.currentEditPageKey][lineIndex]); }, async getPreviousLineForIndex(lineIndex) { // Search backwards from lineIndex - 1 on the current page, skipping deleted lines @@ -590,6 +623,10 @@ export default { lineIndex: index, lineObj: line, }); + // Propagate to Y.Doc for collaborative sync + if (this.IS_DRAFT_ACTIVE && this.DRAFT_YDOC && !this.syncingFromYDoc) { + updateYDocLine(this.DRAFT_YDOC, this.currentEditPage, index, line); + } }, beginEditingLine(pageIndex, lineIndex) { const index = this.editPages.indexOf(`page_${pageIndex}_line_${lineIndex}`); @@ -615,6 +652,7 @@ export default { pageNo: pageIndex, lineIndex, }); + this.deleteLineFromYDoc(pageIndex, lineIndex); this.doneEditingLine(pageIndex, lineIndex); this.editPages.forEach(function updateEditPage(editPage, index) { @@ -695,6 +733,11 @@ export default { this.TMP_SCRIPT[this.currentEditPageKey][newLineIndex].act_id = prevLine.act_id; this.TMP_SCRIPT[this.currentEditPageKey][newLineIndex].scene_id = prevLine.scene_id; } + this.addLineToYDoc( + this.currentEditPage, + this.TMP_SCRIPT[this.currentEditPageKey][newLineIndex], + newLineIndex + ); }, async insertDialogueAt(pageIndex, lineIndex) { await this.insertLineAt(pageIndex, lineIndex, LINE_TYPES.DIALOGUE); @@ -799,6 +842,11 @@ export default { this.ADD_BLANK_PAGE(this.currentEditPage); } await this.LOAD_SCRIPT_PAGE(parseInt(pageNo, 10) + 1); + + // If Y.Doc is synced, overlay collaborative data onto loaded pages + if (this.IS_DRAFT_SYNCED && this.DRAFT_YDOC) { + this.syncCurrentPageFromYDoc(); + } }, setupAutoSave() { const autoSaveInterval = Math.max( @@ -911,6 +959,100 @@ export default { } this.isAutoSaving = false; }, + /** + * Set up the Y.Doc ↔ TMP_SCRIPT bridge after initial sync completes. + * Installs a deep observer on the Y.Doc pages map that updates + * TMP_SCRIPT when remote changes arrive. + */ + setupYDocBridge() { + const ydoc = this.DRAFT_YDOC; + if (!ydoc) return; + + const pages = ydoc.getMap('pages'); + + // Sync the current page from Y.Doc → TMP_SCRIPT on initial connect + this.syncCurrentPageFromYDoc(); + + // Observe deep changes on the pages map + const observer = (events, transaction) => { + // Skip changes we made ourselves (from lineChange → updateYDocLine) + if (transaction.origin === 'local-bridge') return; + + // Determine which pages were affected + const affectedPages = new Set(); + events.forEach((event) => { + // Walk up to find the page key + const path = event.path; + if (path.length >= 1) { + affectedPages.add(path[0].toString()); + } else { + // Top-level pages map changed — sync all loaded pages + Object.keys(this.TMP_SCRIPT).forEach((p) => affectedPages.add(p)); + } + }); + + // Sync affected pages that are currently loaded + this.syncingFromYDoc = true; + affectedPages.forEach((pageKey) => { + if (Object.keys(this.TMP_SCRIPT).includes(pageKey)) { + const lines = syncPageFromYDoc(ydoc, pageKey); + this.$store.commit('ADD_PAGE', { pageNo: pageKey, pageContents: lines }); + } + }); + this.syncingFromYDoc = false; + }; + + pages.observeDeep(observer); + this.ydocObserverCleanup = () => pages.unobserveDeep(observer); + + log.info('ScriptEditor: Y.Doc bridge established'); + }, + /** + * Remove the Y.Doc observer. + */ + teardownYDocBridge() { + if (this.ydocObserverCleanup) { + this.ydocObserverCleanup(); + this.ydocObserverCleanup = null; + } + }, + /** + * Sync all currently loaded TMP_SCRIPT pages from Y.Doc data. + */ + syncCurrentPageFromYDoc() { + const ydoc = this.DRAFT_YDOC; + if (!ydoc) return; + + this.syncingFromYDoc = true; + Object.keys(this.TMP_SCRIPT).forEach((pageKey) => { + const lines = syncPageFromYDoc(ydoc, pageKey); + if (lines.length > 0) { + this.$store.commit('ADD_PAGE', { pageNo: pageKey, pageContents: lines }); + } + }); + this.syncingFromYDoc = false; + }, + /** + * Propagate a new line addition to the Y.Doc. + * @param {number} pageNo + * @param {object} lineObj + * @param {number} [insertAt] - Index to insert at + */ + addLineToYDoc(pageNo, lineObj, insertAt) { + if (this.IS_DRAFT_ACTIVE && this.DRAFT_YDOC) { + addYDocLine(this.DRAFT_YDOC, pageNo, lineObj, insertAt); + } + }, + /** + * Propagate a line deletion to the Y.Doc. + * @param {number} pageNo + * @param {number} lineIndex + */ + deleteLineFromYDoc(pageNo, lineIndex) { + if (this.IS_DRAFT_ACTIVE && this.DRAFT_YDOC) { + deleteYDocLine(this.DRAFT_YDOC, pageNo, lineIndex); + } + }, calculateNavbarHeight() { const navbar = document.querySelector('.navbar'); if (navbar) { @@ -947,6 +1089,8 @@ export default { 'GET_STAGE_DIRECTION_STYLE_OVERRIDES', 'GET_CUE_COLOUR_OVERRIDES', 'GET_USER_SETTINGS', + 'JOIN_DRAFT_ROOM', + 'LEAVE_DRAFT_ROOM', ]), }, }; From 56ff9bbbf8ecdefd091596d2f5f4f7e85c0dae46 Mon Sep 17 00:00:00 2001 From: Tim Bradgate Date: Wed, 11 Feb 2026 16:23:14 +0000 Subject: [PATCH 08/14] Fix CURRENT_REVISION null when navigating directly to Script page ScriptEditor.vue assumed CURRENT_REVISION was already populated in Vuex, but it's only loaded by the ScriptRevisions component on a different route. Adding GET_SCRIPT_REVISIONS to ScriptEditor's beforeMount ensures the revision ID is available for joining the collaborative editing room. Co-Authored-By: Claude Opus 4.6 --- client/src/vue_components/show/config/script/ScriptEditor.vue | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/src/vue_components/show/config/script/ScriptEditor.vue b/client/src/vue_components/show/config/script/ScriptEditor.vue index 4fafe863..373e49a6 100644 --- a/client/src/vue_components/show/config/script/ScriptEditor.vue +++ b/client/src/vue_components/show/config/script/ScriptEditor.vue @@ -356,6 +356,7 @@ export default { } return Promise.resolve(); }), + this.GET_SCRIPT_REVISIONS(), this.GET_SCRIPT_CONFIG_STATUS(), this.GET_ACT_LIST(), this.GET_SCENE_LIST(), @@ -1089,6 +1090,7 @@ export default { 'GET_STAGE_DIRECTION_STYLE_OVERRIDES', 'GET_CUE_COLOUR_OVERRIDES', 'GET_USER_SETTINGS', + 'GET_SCRIPT_REVISIONS', 'JOIN_DRAFT_ROOM', 'LEAVE_DRAFT_ROOM', ]), From 35d20e7211d9ea6f4581f2fb0ee2d73ceca0abb0 Mon Sep 17 00:00:00 2001 From: Tim Bradgate Date: Wed, 11 Feb 2026 17:00:56 +0000 Subject: [PATCH 09/14] Add direct Y.Doc binding to ScriptLinePart (Rework R1/R4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove updateYDocLine bridge function — components now write directly to Y.Map/Y.Text. ScriptLinePart gains yPartMap prop with: - @input handler for keystroke-level Y.Text sync - Y.Map writes for character/group dropdown changes - Y.Text and Y.Map observers for remote change handling - Lifecycle setup/teardown for observers Export nullToZero/zeroToNull from yjsBridge for component use. Co-Authored-By: Claude Opus 4.6 --- client/src/utils/yjs/yjsBridge.js | 91 +++---------------- .../show/config/script/ScriptEditor.vue | 12 +-- .../show/config/script/ScriptLinePart.vue | 87 ++++++++++++++++++ 3 files changed, 100 insertions(+), 90 deletions(-) diff --git a/client/src/utils/yjs/yjsBridge.js b/client/src/utils/yjs/yjsBridge.js index 20bc6a8d..626ee67c 100644 --- a/client/src/utils/yjs/yjsBridge.js +++ b/client/src/utils/yjs/yjsBridge.js @@ -1,13 +1,18 @@ /** - * Bridge utilities for converting between Y.Doc and TMP_SCRIPT formats. + * Bridge utilities for Y.Doc ↔ TMP_SCRIPT format conversion. * - * The Y.Doc uses a slightly different schema than TMP_SCRIPT: + * Y.Doc is the source of truth during collaborative editing. TMP_SCRIPT is a + * read-only view cache populated one-way from Y.Doc via observers. + * Components write directly to Y.Map/Y.Text; this module provides: + * - Y.Doc → plain object conversion (for the TMP_SCRIPT view cache) + * - Structural helpers (add/delete lines in Y.Doc) + * - Sentinel conversion (nullToZero / zeroToNull) + * + * Schema differences between Y.Doc and TMP_SCRIPT: * - `_id` instead of `id` * - `parts` instead of `line_parts` * - `0` as sentinel for null on FK fields * - Y.Text for line_text instead of plain strings - * - * These utilities handle the conversion in both directions. */ import * as Y from 'yjs'; @@ -17,7 +22,7 @@ import * as Y from 'yjs'; * @param {*} val * @returns {*} */ -function zeroToNull(val) { +export function zeroToNull(val) { return val === 0 ? null : val; } @@ -26,7 +31,7 @@ function zeroToNull(val) { * @param {*} val * @returns {*} */ -function nullToZero(val) { +export function nullToZero(val) { return val == null ? 0 : val; } @@ -88,80 +93,6 @@ export function syncPageFromYDoc(ydoc, pageNo) { return lines; } -/** - * Update a single existing line in the Y.Doc from a TMP_SCRIPT line object. - * Uses 'local-bridge' as the transaction origin so observers can - * distinguish local UI changes from remote changes. - * - * @param {import('yjs').Doc} ydoc - The Y.Doc instance - * @param {number|string} pageNo - The page number - * @param {number} lineIndex - Index of the line within the page array - * @param {object} lineObj - The TMP_SCRIPT line object - */ -export function updateYDocLine(ydoc, pageNo, lineIndex, lineObj) { - const pages = ydoc.getMap('pages'); - const pageKey = pageNo.toString(); - const pageArray = pages.get(pageKey); - if (!pageArray || lineIndex >= pageArray.length) return; - - const lineMap = pageArray.get(lineIndex); - - ydoc.transact(() => { - lineMap.set('act_id', nullToZero(lineObj.act_id)); - lineMap.set('scene_id', nullToZero(lineObj.scene_id)); - lineMap.set('line_type', lineObj.line_type); - lineMap.set('stage_direction_style_id', nullToZero(lineObj.stage_direction_style_id)); - - const partsArray = lineMap.get('parts'); - if (!partsArray) return; - - // Update existing parts - const minLen = Math.min(partsArray.length, lineObj.line_parts.length); - for (let i = 0; i < minLen; i++) { - const part = lineObj.line_parts[i]; - const partMap = partsArray.get(i); - - partMap.set('character_id', nullToZero(part.character_id)); - partMap.set('character_group_id', nullToZero(part.character_group_id)); - partMap.set('part_index', part.part_index ?? i); - - const ytext = partMap.get('line_text'); - if (ytext) { - const currentText = ytext.toString(); - if (currentText !== (part.line_text || '')) { - ytext.delete(0, ytext.length); - if (part.line_text) { - ytext.insert(0, part.line_text); - } - } - } - } - - // Add new parts that don't exist in Y.Doc yet - for (let i = partsArray.length; i < lineObj.line_parts.length; i++) { - const part = lineObj.line_parts[i]; - const newPartMap = new Y.Map(); - partsArray.push([newPartMap]); - - newPartMap.set('_id', 0); - newPartMap.set('character_id', nullToZero(part.character_id)); - newPartMap.set('character_group_id', nullToZero(part.character_group_id)); - newPartMap.set('part_index', part.part_index ?? i); - - const ytext = new Y.Text(); - newPartMap.set('line_text', ytext); - if (part.line_text) { - ytext.insert(0, part.line_text); - } - } - - // Remove extra parts from Y.Doc (if user deleted parts) - while (partsArray.length > lineObj.line_parts.length) { - partsArray.delete(partsArray.length - 1, 1); - } - }, 'local-bridge'); -} - /** * Add a new line to a page in the Y.Doc. * Creates the necessary Y.Map, Y.Array, and Y.Text structures. diff --git a/client/src/vue_components/show/config/script/ScriptEditor.vue b/client/src/vue_components/show/config/script/ScriptEditor.vue index 373e49a6..ccb03bd2 100644 --- a/client/src/vue_components/show/config/script/ScriptEditor.vue +++ b/client/src/vue_components/show/config/script/ScriptEditor.vue @@ -206,12 +206,7 @@ import ScriptLineViewer from '@/vue_components/show/config/script/ScriptLineView import { makeURL, randInt } from '@/js/utils'; import { notNull, notNullAndGreaterThanZero } from '@/js/customValidators'; import { LINE_TYPES } from '@/constants/lineTypes'; -import { - syncPageFromYDoc, - updateYDocLine, - addYDocLine, - deleteYDocLine, -} from '@/utils/yjs/yjsBridge'; +import { syncPageFromYDoc, addYDocLine, deleteYDocLine } from '@/utils/yjs/yjsBridge'; export default { name: 'ScriptConfig', @@ -624,10 +619,7 @@ export default { lineIndex: index, lineObj: line, }); - // Propagate to Y.Doc for collaborative sync - if (this.IS_DRAFT_ACTIVE && this.DRAFT_YDOC && !this.syncingFromYDoc) { - updateYDocLine(this.DRAFT_YDOC, this.currentEditPage, index, line); - } + // Note: Y.Doc sync now handled by direct component bindings (R3 will rework this) }, beginEditingLine(pageIndex, lineIndex) { const index = this.editPages.indexOf(`page_${pageIndex}_line_${lineIndex}`); diff --git a/client/src/vue_components/show/config/script/ScriptLinePart.vue b/client/src/vue_components/show/config/script/ScriptLinePart.vue index 10bc9c36..cdcb3110 100644 --- a/client/src/vue_components/show/config/script/ScriptLinePart.vue +++ b/client/src/vue_components/show/config/script/ScriptLinePart.vue @@ -42,6 +42,7 @@ ref="partInput" v-model="$v.state.line_text.$model" :state="validateState('line_text')" + @input="onTextInput" @change="stateChange" @keydown.enter.native="handleEnterPress" /> @@ -62,6 +63,7 @@ + + diff --git a/client/src/vue_components/show/config/script/ScriptEditor.vue b/client/src/vue_components/show/config/script/ScriptEditor.vue index e67ae1e4..22f1722f 100644 --- a/client/src/vue_components/show/config/script/ScriptEditor.vue +++ b/client/src/vue_components/show/config/script/ScriptEditor.vue @@ -53,6 +53,14 @@ + + + + + Act Scene @@ -99,6 +107,7 @@ :line-part-cuts="linePartCuts" :stage-direction-styles="STAGE_DIRECTION_STYLES" :stage-direction-style-overrides="STAGE_DIRECTION_STYLE_OVERRIDES" + :editing-users="editingUsersForLine(index)" @editLine="beginEditingLine(currentEditPage, index)" @cutLinePart="cutLinePart" @insertDialogue="insertDialogueAt(currentEditPage, index)" @@ -204,6 +213,7 @@ import { sample } from 'lodash'; import ScriptLineEditor from '@/vue_components/show/config/script/ScriptLineEditor.vue'; import ScriptLineViewer from '@/vue_components/show/config/script/ScriptLineViewer.vue'; +import CollaboratorPanel from '@/vue_components/show/config/script/CollaboratorPanel.vue'; import { makeURL, randInt } from '@/js/utils'; import { notNull, notNullAndGreaterThanZero } from '@/js/customValidators'; import { LINE_TYPES } from '@/constants/lineTypes'; @@ -211,7 +221,7 @@ import { syncPageFromYDoc, addYDocLine, deleteYDocLine } from '@/utils/yjs/yjsBr export default { name: 'ScriptConfig', - components: { ScriptLineViewer, ScriptLineEditor }, + components: { ScriptLineViewer, ScriptLineEditor, CollaboratorPanel }, data() { return { currentEditPage: 1, @@ -321,6 +331,7 @@ export default { 'DRAFT_YDOC', 'DRAFT_COLLABORATORS', 'DRAFT_LINE_EDITORS', + 'DRAFT_AWARENESS_STATES', ]), }, watch: { diff --git a/client/src/vue_components/show/config/script/ScriptLineViewer.vue b/client/src/vue_components/show/config/script/ScriptLineViewer.vue index 1b3bfd25..c9936934 100644 --- a/client/src/vue_components/show/config/script/ScriptLineViewer.vue +++ b/client/src/vue_components/show/config/script/ScriptLineViewer.vue @@ -3,7 +3,9 @@ :class="{ 'stage-direction': line.line_type === LINE_TYPES.STAGE_DIRECTION, 'heading-padding': line.line_type === LINE_TYPES.DIALOGUE && needsHeadingsAll, + 'editing-indicator': editingUsers.length > 0, }" + :style="editingBorderStyle" >

@@ -119,6 +121,9 @@ + + {{ editingUsers.map((u) => u.username).join(', ') }} + [], + }, }, data() { return { @@ -222,6 +231,26 @@ export default { }; }, computed: { + editingBorderStyle() { + if (this.editingUsers.length === 0) return {}; + const COLLAB_COLORS = [ + '#e74c3c', + '#3498db', + '#2ecc71', + '#f39c12', + '#9b59b6', + '#1abc9c', + '#e67e22', + '#e91e63', + ]; + const color = COLLAB_COLORS[this.editingUsers[0].userId % COLLAB_COLORS.length]; + return { borderLeft: `3px solid ${color}`, paddingLeft: '5px' }; + }, + editingTooltip() { + if (this.editingUsers.length === 0) return ''; + const names = this.editingUsers.map((u) => u.username).join(', '); + return `${names} ${this.editingUsers.length === 1 ? 'is' : 'are'} editing this line`; + }, needsHeadings() { let { previousLine } = this; let previousLineIndex = this.lineIndex - 1; @@ -345,4 +374,16 @@ export default { .cut-line-part { text-decoration: line-through; } +.editing-indicator { + transition: border-left 0.2s ease; +} +.editing-badge { + display: block; + font-size: 0.7rem; + opacity: 0.8; + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; + cursor: default; +} From 5ef047e46eea6cee7292bdef6af23b0416f51e14 Mon Sep 17 00:00:00 2001 From: Tim Bradgate Date: Thu, 12 Feb 2026 23:22:14 +0000 Subject: [PATCH 14/14] Fix infinite render loop when editing line after deletion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move Y.Doc and ScriptDocProvider out of Vuex reactive state into module-level variables. Vue 2 deeply observes all state objects, which caused it to track Y.Doc internal properties as reactive dependencies — triggering infinite re-renders after Yjs transactions. Also adds loop guards to nextActs/nextScenes linked-list traversals and updates _broadcastAwareness to use the new DRAFT_PROVIDER getter. Co-Authored-By: Claude Opus 4.6 --- client/src/store/modules/scriptDraft.js | 92 ++++++++++--------- .../show/config/script/ScriptEditor.vue | 17 ++-- .../show/config/script/ScriptLineEditor.vue | 6 ++ 3 files changed, 66 insertions(+), 49 deletions(-) diff --git a/client/src/store/modules/scriptDraft.js b/client/src/store/modules/scriptDraft.js index 58400641..8fab5a86 100644 --- a/client/src/store/modules/scriptDraft.js +++ b/client/src/store/modules/scriptDraft.js @@ -3,6 +3,13 @@ * * Tracks the connection state to a collaborative editing room, * the Yjs document and provider instances, and collaborator presence. + * + * IMPORTANT: The Y.Doc and ScriptDocProvider instances are stored outside + * of Vuex reactive state (as module-level variables). Vue 2's reactivity + * system deeply observes all objects in state, adding getters/setters to + * every property. For complex library objects like Y.Doc, this causes Vue + * to track internal Yjs properties as reactive dependencies — leading to + * infinite render loops when Y.Doc internals change during transactions. */ import Vue from 'vue'; @@ -11,6 +18,18 @@ import log from 'loglevel'; import ScriptDocProvider from '@/utils/yjs/ScriptDocProvider'; +/** + * Non-reactive storage for Y.Doc and provider instances. + * These must NOT be stored in Vuex state because Vue 2 would make them + * deeply reactive, breaking Yjs internal state management. + * + * @type {import('yjs').Doc|null} + */ +let _ydoc = null; + +/** @type {ScriptDocProvider|null} */ +let _provider = null; + export default { state: { /** @type {number|null} The revision ID of the active room */ @@ -33,25 +52,11 @@ export default { /** @type {Object} */ awarenessStates: {}, - - /** - * @type {import('yjs').Doc|null} - * The Yjs document instance. Not persisted to localStorage. - */ - ydoc: null, - - /** - * @type {ScriptDocProvider|null} - * The Yjs provider instance. Not persisted to localStorage. - */ - provider: null, }, mutations: { - SET_DRAFT_ROOM(state, { roomId, ydoc, provider }) { + SET_DRAFT_ROOM(state, { roomId }) { state.roomId = roomId; - state.ydoc = ydoc; - state.provider = provider; }, SET_DRAFT_CONNECTED(state, value) { @@ -90,8 +95,8 @@ export default { state.lastSavedAt = null; state.collaborators = []; state.awarenessStates = {}; - state.ydoc = null; - state.provider = null; + _ydoc = null; + _provider = null; }, }, @@ -107,18 +112,18 @@ export default { */ async JOIN_DRAFT_ROOM(context, { revisionId, role = 'editor' }) { // Leave existing room first - if (context.state.provider) { + if (_provider) { await context.dispatch('LEAVE_DRAFT_ROOM'); } const ydoc = new Y.Doc(); const provider = new ScriptDocProvider(ydoc, revisionId, { role }); - context.commit('SET_DRAFT_ROOM', { - roomId: revisionId, - ydoc, - provider, - }); + // Store instances outside reactive state + _ydoc = ydoc; + _provider = provider; + + context.commit('SET_DRAFT_ROOM', { roomId: revisionId }); // Listen for sync completion const checkSynced = setInterval(() => { @@ -145,9 +150,8 @@ export default { * Leave the current collaborative editing room. */ async LEAVE_DRAFT_ROOM(context) { - const { provider } = context.state; - if (provider) { - provider.destroy(); + if (_provider) { + _provider.destroy(); } context.commit('CLEAR_DRAFT_STATE'); @@ -163,13 +167,12 @@ export default { * @returns {boolean} Whether the message was handled */ HANDLE_DRAFT_MESSAGE(context, message) { - const { provider } = context.state; - if (!provider) return false; + if (!_provider) return false; - const handled = provider.handleMessage(message); + const handled = _provider.handleMessage(message); // Check if sync status changed - if (handled && provider.synced && !context.state.isSynced) { + if (handled && _provider.synced && !context.state.isSynced) { context.commit('SET_DRAFT_SYNCED', true); context.commit('SET_DRAFT_CONNECTED', true); } @@ -203,27 +206,32 @@ export default { return state.roomId !== null && state.isConnected; }, - /** @returns {import('yjs').Doc|null} The Y.Doc instance */ - DRAFT_YDOC(state) { - return state.ydoc; + /** @returns {import('yjs').Doc|null} The Y.Doc instance (non-reactive) */ + DRAFT_YDOC() { + return _ydoc; + }, + + /** @returns {ScriptDocProvider|null} The provider instance (non-reactive) */ + DRAFT_PROVIDER() { + return _provider; }, /** @returns {import('yjs').Map|null} The Y.Doc pages map */ - DRAFT_PAGES(state) { - if (!state.ydoc) return null; - return state.ydoc.getMap('pages'); + DRAFT_PAGES() { + if (!_ydoc) return null; + return _ydoc.getMap('pages'); }, /** @returns {import('yjs').Map|null} The Y.Doc meta map */ - DRAFT_META(state) { - if (!state.ydoc) return null; - return state.ydoc.getMap('meta'); + DRAFT_META() { + if (!_ydoc) return null; + return _ydoc.getMap('meta'); }, /** @returns {import('yjs').Array|null} The deleted line IDs array */ - DRAFT_DELETED_LINE_IDS(state) { - if (!state.ydoc) return null; - return state.ydoc.getArray('deleted_line_ids'); + DRAFT_DELETED_LINE_IDS() { + if (!_ydoc) return null; + return _ydoc.getArray('deleted_line_ids'); }, /** @returns {boolean} Whether initial sync is complete */ diff --git a/client/src/vue_components/show/config/script/ScriptEditor.vue b/client/src/vue_components/show/config/script/ScriptEditor.vue index 22f1722f..61a77b1d 100644 --- a/client/src/vue_components/show/config/script/ScriptEditor.vue +++ b/client/src/vue_components/show/config/script/ScriptEditor.vue @@ -330,6 +330,7 @@ export default { 'IS_DRAFT_SYNCED', 'DRAFT_YDOC', 'DRAFT_COLLABORATORS', + 'DRAFT_PROVIDER', 'DRAFT_LINE_EDITORS', 'DRAFT_AWARENESS_STATES', ]), @@ -350,7 +351,7 @@ export default { } }, }, - async beforeMount() { + async mounted() { await Promise.all([ this.GET_CURRENT_USER() .then(() => this.GET_USER_SETTINGS()) @@ -391,10 +392,10 @@ export default { role: this.IS_SCRIPT_EDITOR ? 'editor' : 'viewer', }); } - }, - mounted() { + + // All data loaded — now safe to render this.loaded = true; - this.calculateNavbarHeight(); + this.$nextTick(() => this.calculateNavbarHeight()); }, created() { window.addEventListener('resize', this.calculateNavbarHeight); @@ -1022,7 +1023,9 @@ export default { if (!this.IS_DRAFT_ACTIVE || !this.DRAFT_YDOC) return null; const pages = this.DRAFT_YDOC.getMap('pages'); const pageArray = pages.get(this.currentEditPageKey); - if (!pageArray || index >= pageArray.length) return null; + if (!pageArray || index >= pageArray.length) { + return null; + } return pageArray.get(index); }, /** @@ -1031,9 +1034,9 @@ export default { * @param {number|null} lineIndex - The line index, or null if no line is expanded */ _broadcastAwareness(page, lineIndex) { - if (!this.$store.state.scriptDraft.provider) return; + if (!this.DRAFT_PROVIDER) return; const user = this.CURRENT_USER; - this.$store.state.scriptDraft.provider.setLocalAwareness({ + this.DRAFT_PROVIDER.setLocalAwareness({ userId: user ? user.id : null, username: user ? user.username : 'Unknown', page, diff --git a/client/src/vue_components/show/config/script/ScriptLineEditor.vue b/client/src/vue_components/show/config/script/ScriptLineEditor.vue index 2fb3e2f0..a67d3a51 100644 --- a/client/src/vue_components/show/config/script/ScriptLineEditor.vue +++ b/client/src/vue_components/show/config/script/ScriptLineEditor.vue @@ -242,9 +242,12 @@ export default { } const validActs = []; let nextAct = startAct; + let loopCount = 0; // Find all valid acts, if there is no next line then this is all acts after the start act. // If there is a next line, this is all acts up to and including the act of the next line while (nextAct != null) { + loopCount++; + if (loopCount > this.acts.length) break; validActs.push(JSON.parse(JSON.stringify(nextAct))); if (this.nextLine != null && this.nextLine.act_id === nextAct.id) { break; @@ -272,10 +275,13 @@ export default { } const validScenes = []; let nextScene = startScene; + let loopCount = 0; // Find all valid scenes, if there is no next line then this is all scenes after the start // scene. If there is a next line, this is all scenes up to and including the scene of the // next line while (nextScene != null) { + loopCount++; + if (loopCount > scenes.length) break; validScenes.push(JSON.parse(JSON.stringify(nextScene))); if (this.nextLine != null && this.nextLine.scene_id === nextScene.id) { break;