From 9f0aed972b47fd1dbf23de13530bf71ef71248ca Mon Sep 17 00:00:00 2001 From: cacosandon Date: Fri, 29 Mar 2024 19:05:16 -0300 Subject: [PATCH 01/14] refactor(django_channels): move to its own folder to not be confused with other files --- .../{ => django_channels}/django_channels_consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename pycrdt_websocket/{ => django_channels}/django_channels_consumer.py (98%) diff --git a/pycrdt_websocket/django_channels_consumer.py b/pycrdt_websocket/django_channels/django_channels_consumer.py similarity index 98% rename from pycrdt_websocket/django_channels_consumer.py rename to pycrdt_websocket/django_channels/django_channels_consumer.py index 9f917b4..a0e6efd 100644 --- a/pycrdt_websocket/django_channels_consumer.py +++ b/pycrdt_websocket/django_channels/django_channels_consumer.py @@ -6,8 +6,8 @@ from channels.generic.websocket import AsyncWebsocketConsumer # type: ignore[import-not-found] from pycrdt import Doc -from .websocket import Websocket -from .yutils import YMessageType, process_sync_message, sync +from ..websocket import Websocket +from ..yutils import YMessageType, process_sync_message, sync logger = getLogger(__name__) From 61a80eee5d450c820e70ddb0ca48dc09231646f5 Mon Sep 17 00:00:00 2001 From: cacosandon Date: Fri, 29 Mar 2024 19:05:20 -0300 Subject: [PATCH 02/14] refactor(django_channels): rename consumer file to yjs_consumer --- .../{django_channels_consumer.py => yjs_consumer.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename pycrdt_websocket/django_channels/{django_channels_consumer.py => yjs_consumer.py} (100%) diff --git a/pycrdt_websocket/django_channels/django_channels_consumer.py b/pycrdt_websocket/django_channels/yjs_consumer.py similarity index 100% rename from pycrdt_websocket/django_channels/django_channels_consumer.py rename to pycrdt_websocket/django_channels/yjs_consumer.py From 2aafc497fbbead0b3139dae0cb103b3b6e6b70ee Mon Sep 17 00:00:00 2001 From: cacosandon Date: Fri, 29 Mar 2024 19:05:22 -0300 Subject: [PATCH 03/14] feat(django_channels): add base YRoomStorage and optionally add it to YjsConsumer --- .../django_channels/yjs_consumer.py | 50 ++++++-- .../django_channels/yroom_storage.py | 116 ++++++++++++++++++ 2 files changed, 156 insertions(+), 10 deletions(-) create mode 100644 pycrdt_websocket/django_channels/yroom_storage.py diff --git a/pycrdt_websocket/django_channels/yjs_consumer.py b/pycrdt_websocket/django_channels/yjs_consumer.py index a0e6efd..01f3ee5 100644 --- a/pycrdt_websocket/django_channels/yjs_consumer.py +++ b/pycrdt_websocket/django_channels/yjs_consumer.py @@ -6,8 +6,16 @@ from channels.generic.websocket import AsyncWebsocketConsumer # type: ignore[import-not-found] from pycrdt import Doc +from pycrdt_websocket.django_channels.yroom_storage import BaseYRoomStorage + from ..websocket import Websocket -from ..yutils import YMessageType, process_sync_message, sync +from ..yutils import ( + YMessageType, + YSyncMessageType, + process_sync_message, + read_message, + sync, +) logger = getLogger(__name__) @@ -126,6 +134,13 @@ def __init__(self): self.room_name = None self.ydoc = None self._websocket_shim = None + self.room_storage: BaseYRoomStorage | None = None + + def make_room_storage(self) -> BaseYRoomStorage | None: + """Make the room storage for a new channel to persist the YDoc permanently. + Defaults to not using any (just broadcast updates between consumers). + """ + return None def make_room_name(self) -> str: """Make the room name for a new channel. @@ -137,15 +152,10 @@ def make_room_name(self) -> str: """ return self.scope["url_route"]["kwargs"]["room"] - async def make_ydoc(self) -> Doc: - """Make the YDoc for a new channel. + async def _make_ydoc(self) -> Doc: + if self.room_storage: + return await self.room_storage.get_document() - Override to customize the YDoc when a channel is created - (useful to initialize it with data from your database, or to add observers to it). - - Returns: - The YDoc for a new channel. Defaults to a new empty YDoc. - """ return Doc() def _make_websocket_shim(self, path: str) -> _WebsocketShim: @@ -153,7 +163,9 @@ def _make_websocket_shim(self, path: str) -> _WebsocketShim: async def connect(self) -> None: self.room_name = self.make_room_name() - self.ydoc = await self.make_ydoc() + self.room_storage = self.make_room_storage() + + self.ydoc = await self._make_ydoc() self._websocket_shim = self._make_websocket_shim(self.scope["path"]) await self.channel_layer.group_add(self.room_name, self.channel_name) @@ -162,14 +174,32 @@ async def connect(self) -> None: await sync(self.ydoc, self._websocket_shim, logger) async def disconnect(self, code) -> None: + if self.room_storage: + await self.room_storage.close() + + if not self.room_name: + return + await self.channel_layer.group_discard(self.room_name, self.channel_name) async def receive(self, text_data=None, bytes_data=None): if bytes_data is None: return + await self.group_send_message(bytes_data) + if bytes_data[0] != YMessageType.SYNC: return + + # If it's an update message, apply it to the storage document + if self.room_storage and bytes_data[1] == YSyncMessageType.SYNC_UPDATE: + update = read_message(bytes_data[2:]) + + if update != b"\x00\x00": + await self.room_storage.update_document(update) + + return + await process_sync_message(bytes_data[1:], self.ydoc, self._websocket_shim, logger) class WrappedMessage(TypedDict): diff --git a/pycrdt_websocket/django_channels/yroom_storage.py b/pycrdt_websocket/django_channels/yroom_storage.py new file mode 100644 index 0000000..19f8de6 --- /dev/null +++ b/pycrdt_websocket/django_channels/yroom_storage.py @@ -0,0 +1,116 @@ +import time +from typing import Optional + +from pycrdt import Doc + + +class BaseYRoomStorage: + """Base class for YRoom storage. + This class is responsible for storing, retrieving, updating and persisting the Ypy document. + Each Django Channels Consumer should have its own YRoomStorage instance, although all consumers + and rooms with the same room name will be connected to the same document in the end. + Updates to the document should be sent to the shared storage, instead of each + consumer having its own version of the YDoc. + + A full example of a Redis as temporary storage and Postgres as persistent storage is: + ```py + from typing import Optional + from django.db import models + from ypy_websocket.django_channels.yroom_storage import RedisYRoomStorage + + class YDocSnapshotManager(models.Manager): + async def aget_snapshot(self, name) -> Optional[bytes]: + try: + instance: YDocSnapshot = await self.aget(name=name) + result = instance.data + if not isinstance(result, bytes): + # Postgres on psycopg2 returns memoryview + return bytes(result) + except YDocSnapshot.DoesNotExist: + return None + else: + return result + + async def asave_snapshot(self, name, data): + return await self.aupdate_or_create(name=name, defaults={"data": data}) + + class YDocSnapshot(models.Model): + name = models.CharField(max_length=255, primary_key=True) + data = models.BinaryField() + objects = YDocSnapshotManager() + + class CustomRoomStorage(RedisYRoomStorage): + async def load_snapshot(self) -> Optional[bytes]: + return await YDocSnapshot.objects.aget_snapshot(self.room_name) + + async def save_snapshot(self): + current_snapshot = await self.redis.get(self.redis_key) + if not current_snapshot: + return + await YDocSnapshot.objects.asave_snapshot( + self.room_name, + current_snapshot, + ) + ``` + """ + + def __init__(self, room_name: str) -> None: + self.room_name = room_name + + self.last_saved_at = time.time() + self.save_throttle_interval = 5 + + async def get_document(self) -> Doc: + """Gets the document from the storage. + Ideally it should be retrieved first from temporary storage (e.g. Redis) and then from + persistent storage (e.g. a database). + Returns: + The document with the latest changes. + """ + + raise NotImplementedError + + async def update_document(self, update: bytes): + """Updates the document in the storage. + Updates could be received by Yjs client (e.g. from a WebSocket) or from the server + (e.g. from a Django Celery job). + Args: + update: The update to apply to the document. + """ + + raise NotImplementedError + + async def load_snapshot(self) -> Optional[bytes]: + """Gets the document from the database. Override this method to + implement a persistent storage. + Defaults to None. + Returns: + The latest document snapshot. + """ + return None + + async def save_snapshot(self) -> None: + """Saves a snapshot of the document to the storage. + If you need to persist the document to a database, you should do it here. + Default implementation does nothing. + """ + + pass + + async def throttled_save_snapshot(self) -> None: + """Saves a snapshot of the document to the storage, debouncing the calls.""" + + if time.time() - self.last_saved_at <= self.save_throttle_interval: + return + + await self.save_snapshot() + + self.last_saved_at = time.time() + + async def close(self): + """Closes the storage. + Default implementation does nothing. + """ + + pass + From 9d3b3f6cda8365603991b6a990ef997e834b8225 Mon Sep 17 00:00:00 2001 From: cacosandon Date: Fri, 29 Mar 2024 19:05:24 -0300 Subject: [PATCH 04/14] feat(yroom_storage): add Redis storage as an example --- .../django_channels/yjs_consumer.py | 3 + .../django_channels/yroom_storage.py | 71 +++++++++++++++++++ pyproject.toml | 3 + 3 files changed, 77 insertions(+) diff --git a/pycrdt_websocket/django_channels/yjs_consumer.py b/pycrdt_websocket/django_channels/yjs_consumer.py index 01f3ee5..c6d2b2f 100644 --- a/pycrdt_websocket/django_channels/yjs_consumer.py +++ b/pycrdt_websocket/django_channels/yjs_consumer.py @@ -139,6 +139,9 @@ def __init__(self): def make_room_storage(self) -> BaseYRoomStorage | None: """Make the room storage for a new channel to persist the YDoc permanently. Defaults to not using any (just broadcast updates between consumers). + + Example: + self.room_storage = RedisYRoomStorage(self.room_name) """ return None diff --git a/pycrdt_websocket/django_channels/yroom_storage.py b/pycrdt_websocket/django_channels/yroom_storage.py index 19f8de6..fe05f29 100644 --- a/pycrdt_websocket/django_channels/yroom_storage.py +++ b/pycrdt_websocket/django_channels/yroom_storage.py @@ -1,6 +1,7 @@ import time from typing import Optional +import redis.asyncio as redis from pycrdt import Doc @@ -114,3 +115,73 @@ async def close(self): pass + +class RedisYRoomStorage(BaseYRoomStorage): + """A YRoom storage that uses Redis as main storage, without + persistent storage. + Args: + room_name: The name of the room. + """ + + def __init__(self, room_name: str) -> None: + super().__init__(room_name) + + self.redis_key = f"document:{self.room_name}" + self.redis = self.make_redis() + + def make_redis(self): + """Makes a Redis client. + Defaults to a local client""" + + return redis.Redis(host="localhost", port=6379, db=0) + + async def get_document(self) -> Doc: + snapshot = await self.redis.get(self.redis_key) + + if not snapshot: + snapshot = await self.load_snapshot() + + document = Doc() + + if snapshot: + document.apply_update(snapshot) + + return document + + async def update_document(self, update: bytes): + await self.redis.watch(self.redis_key) + + try: + current_document = await self.get_document() + updated_snapshot = self._apply_update_to_snapshot(current_document, update) + + async with self.redis.pipeline() as pipe: + while True: + try: + pipe.multi() + pipe.set(self.redis_key, updated_snapshot) + + await pipe.execute() + + break + except redis.WatchError: + current_snapshot = await self.get_document() + updated_snapshot = self._apply_update_to_snapshot( + current_snapshot, + update, + ) + + continue + finally: + await self.redis.unwatch() + + await self.throttled_save_snapshot() + + async def close(self): + await self.save_snapshot() + await self.redis.close() + + def _apply_update_to_snapshot(self, document: Doc, update: bytes) -> bytes: + document.apply_update(update) + + return document.get_update() diff --git a/pyproject.toml b/pyproject.toml index ebc5bad..e85dc60 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,9 @@ docs = [ django = [ "channels", ] +redis = [ + "redis", +] [project.urls] Homepage = "https://github.com/jupyter-server/pycrdt-websocket" From 341d3017b3ea447e90f83f3d1d60fa88541370b2 Mon Sep 17 00:00:00 2001 From: cacosandon Date: Fri, 29 Mar 2024 19:05:26 -0300 Subject: [PATCH 05/14] refactor: create EMPTY_UPDATE constant --- pycrdt_websocket/django_channels/yjs_consumer.py | 3 ++- pycrdt_websocket/yutils.py | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pycrdt_websocket/django_channels/yjs_consumer.py b/pycrdt_websocket/django_channels/yjs_consumer.py index c6d2b2f..59274fb 100644 --- a/pycrdt_websocket/django_channels/yjs_consumer.py +++ b/pycrdt_websocket/django_channels/yjs_consumer.py @@ -10,6 +10,7 @@ from ..websocket import Websocket from ..yutils import ( + EMPTY_UPDATE, YMessageType, YSyncMessageType, process_sync_message, @@ -198,7 +199,7 @@ async def receive(self, text_data=None, bytes_data=None): if self.room_storage and bytes_data[1] == YSyncMessageType.SYNC_UPDATE: update = read_message(bytes_data[2:]) - if update != b"\x00\x00": + if update != EMPTY_UPDATE: await self.room_storage.update_document(update) return diff --git a/pycrdt_websocket/yutils.py b/pycrdt_websocket/yutils.py index 2d363b4..9142707 100644 --- a/pycrdt_websocket/yutils.py +++ b/pycrdt_websocket/yutils.py @@ -18,6 +18,8 @@ class YSyncMessageType(IntEnum): SYNC_STEP2 = 1 SYNC_UPDATE = 2 +# Empty updates (see https://github.com/y-crdt/ypy/issues/98) +EMPTY_UPDATE = b"\x00\x00" def write_var_uint(num: int) -> bytes: res = [] @@ -128,7 +130,7 @@ async def process_sync_message(message: bytes, ydoc: Doc, websocket, log) -> Non YSyncMessageType.SYNC_UPDATE, ): update = read_message(msg) - if update != b"\x00\x00": + if update != EMPTY_UPDATE: ydoc.apply_update(update) From 31224125ba6fc3e2155467cf8522136e3d093daa Mon Sep 17 00:00:00 2001 From: cacosandon Date: Fri, 29 Mar 2024 19:05:37 -0300 Subject: [PATCH 06/14] docs(yjs_consumer): add typing and docs for YjsConsumer --- .../django_channels/yjs_consumer.py | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/pycrdt_websocket/django_channels/yjs_consumer.py b/pycrdt_websocket/django_channels/yjs_consumer.py index 59274fb..1fb6cda 100644 --- a/pycrdt_websocket/django_channels/yjs_consumer.py +++ b/pycrdt_websocket/django_channels/yjs_consumer.py @@ -79,14 +79,17 @@ class YjsConsumer(AsyncWebsocketConsumer): In particular, - Override `make_room_name` to customize the room name. - - Override `make_ydoc` to initialize the YDoc. This is useful to initialize it with data - from your database, or to add observers to it). + - Override `make_room_storage` to initialize the room storage. Create your own storage class + by subclassing `BaseYRoomStorage` and implementing the methods. - Override `connect` to do custom validation (like auth) on connect, but be sure to call `await super().connect()` in the end. - Call `group_send_message` to send a message to an entire group/room. - Call `send_message` to send a message to a single client, although this is not recommended. - A full example of a custom consumer showcasing all of these options is: + A full example of a custom consumer showcasing all of these options is below. The example also + includes an example function `propagate_document_update_from_external` that demonstrates how to + send a message to all connected clients from an external source (like a Celery job). + ```py from pycrdt import Doc from asgiref.sync import async_to_sync @@ -96,49 +99,51 @@ class YjsConsumer(AsyncWebsocketConsumer): class DocConsumer(YjsConsumer): + def make_room_storage(self) -> BaseYRoomStorage: + # Modify the room storage here + + return RedisYRoomStorage(self.room_name) + def make_room_name(self) -> str: - # modify the room name here - return self.scope["url_route"]["kwargs"]["room"] + # Modify the room name here - async def make_ydoc(self) -> Doc: - doc = Doc() - # fill doc with data from DB here - doc.observe(self.on_update_event) - return doc + return self.scope["url_route"]["kwargs"]["room"] async def connect(self): user = self.scope["user"] + if user is None or user.is_anonymous: await self.close() return - await super().connect() - def on_update_event(self, event): - # process event here - ... + await super().connect() - async def doc_update(self, update_wrapper): + async def propagate_document_update(self, update_wrapper): update = update_wrapper["update"] - self.ydoc.apply_update(update) - await self.group_send_message(create_update_message(update)) + await self.send(create_update_message(update)) - def send_doc_update(room_name, update): - layer = get_channel_layer() - async_to_sync(layer.group_send)(room_name, {"type": "doc_update", "update": update}) - ``` + async def propagate_document_update_from_external(room_name, update): + channel_layer = get_channel_layer() + + await channel_layer.group_send( + room_name, + {"type": "propagate_document_update", "update": update}, + ) + ``` """ def __init__(self): super().__init__() - self.room_name = None - self.ydoc = None - self._websocket_shim = None + self.room_name: str | None = None + self.ydoc: Doc | None = None self.room_storage: BaseYRoomStorage | None = None + self._websocket_shim: _WebsocketShim | None = None def make_room_storage(self) -> BaseYRoomStorage | None: """Make the room storage for a new channel to persist the YDoc permanently. + Defaults to not using any (just broadcast updates between consumers). Example: From 446e9a2e7de1d13086946727d229cb65ce72c29b Mon Sep 17 00:00:00 2001 From: cacosandon Date: Fri, 29 Mar 2024 19:06:07 -0300 Subject: [PATCH 07/14] docs: create section for Django Channels --- docs/reference/Django_Channels.md | 11 +++++++++++ docs/reference/Django_Channels_consumer.md | 1 - mkdocs.yml | 2 +- pycrdt_websocket/django_channels/__init__.py | 2 ++ 4 files changed, 14 insertions(+), 2 deletions(-) create mode 100644 docs/reference/Django_Channels.md delete mode 100644 docs/reference/Django_Channels_consumer.md create mode 100644 pycrdt_websocket/django_channels/__init__.py diff --git a/docs/reference/Django_Channels.md b/docs/reference/Django_Channels.md new file mode 100644 index 0000000..979d1ca --- /dev/null +++ b/docs/reference/Django_Channels.md @@ -0,0 +1,11 @@ +## Consumer + +::: pycrdt_websocket.django_channels.yjs_consumer.YjsConsumer + +## Storage + +### BaseYRoomStorage +::: pycrdt_websocket.django_channels.yroom_storage.BaseYRoomStorage + +### RedisYRoomStorage +::: pycrdt_websocket.django_channels.yroom_storage.RedisYRoomStorage \ No newline at end of file diff --git a/docs/reference/Django_Channels_consumer.md b/docs/reference/Django_Channels_consumer.md deleted file mode 100644 index 8548b4e..0000000 --- a/docs/reference/Django_Channels_consumer.md +++ /dev/null @@ -1 +0,0 @@ -::: pycrdt_websocket.django_channels_consumer.YjsConsumer diff --git a/mkdocs.yml b/mkdocs.yml index efa55a9..872d860 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -39,7 +39,7 @@ nav: - reference/WebSocket_provider.md - reference/WebSocket_server.md - reference/ASGI_server.md - - reference/Django_Channels_consumer.md + - reference/Django_Channels.md - reference/WebSocket.md - reference/Room.md - reference/Store.md diff --git a/pycrdt_websocket/django_channels/__init__.py b/pycrdt_websocket/django_channels/__init__.py new file mode 100644 index 0000000..f4f5b05 --- /dev/null +++ b/pycrdt_websocket/django_channels/__init__.py @@ -0,0 +1,2 @@ +from .yjs_consumer import YjsConsumer +from .yroom_storage import BaseYRoomStorage, RedisYRoomStorage \ No newline at end of file From 37a50a16521c316ea24018df0af6c9e95507a64a Mon Sep 17 00:00:00 2001 From: cacosandon Date: Fri, 29 Mar 2024 19:09:42 -0300 Subject: [PATCH 08/14] lint: fix ruff issues by adding same import format as main __init__ --- pycrdt_websocket/django_channels/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pycrdt_websocket/django_channels/__init__.py b/pycrdt_websocket/django_channels/__init__.py index f4f5b05..04b431a 100644 --- a/pycrdt_websocket/django_channels/__init__.py +++ b/pycrdt_websocket/django_channels/__init__.py @@ -1,2 +1,3 @@ -from .yjs_consumer import YjsConsumer -from .yroom_storage import BaseYRoomStorage, RedisYRoomStorage \ No newline at end of file +from .yjs_consumer import YjsConsumer as YjsConsumer +from .yroom_storage import BaseYRoomStorage as BaseYRoomStorage +from .yroom_storage import RedisYRoomStorage as RedisYRoomStorage From cbd1a916d23844b3dde2d9f0f578b53f9ae71681 Mon Sep 17 00:00:00 2001 From: cacosandon Date: Fri, 29 Mar 2024 19:11:02 -0300 Subject: [PATCH 09/14] lint: run ruff autofix --- docs/reference/Django_Channels.md | 2 +- pycrdt_websocket/yutils.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/reference/Django_Channels.md b/docs/reference/Django_Channels.md index 979d1ca..5aee0ca 100644 --- a/docs/reference/Django_Channels.md +++ b/docs/reference/Django_Channels.md @@ -8,4 +8,4 @@ ::: pycrdt_websocket.django_channels.yroom_storage.BaseYRoomStorage ### RedisYRoomStorage -::: pycrdt_websocket.django_channels.yroom_storage.RedisYRoomStorage \ No newline at end of file +::: pycrdt_websocket.django_channels.yroom_storage.RedisYRoomStorage diff --git a/pycrdt_websocket/yutils.py b/pycrdt_websocket/yutils.py index 9142707..4f609c8 100644 --- a/pycrdt_websocket/yutils.py +++ b/pycrdt_websocket/yutils.py @@ -18,9 +18,11 @@ class YSyncMessageType(IntEnum): SYNC_STEP2 = 1 SYNC_UPDATE = 2 + # Empty updates (see https://github.com/y-crdt/ypy/issues/98) EMPTY_UPDATE = b"\x00\x00" + def write_var_uint(num: int) -> bytes: res = [] while num > 127: From 889db49ff06ea4384152ca4801f8a032cfd35b97 Mon Sep 17 00:00:00 2001 From: cacosandon Date: Sat, 30 Mar 2024 20:45:29 -0300 Subject: [PATCH 10/14] build(pyproject.toml): add types-redis to test dependencies --- pycrdt_websocket/django_channels/yjs_consumer.py | 8 ++++---- pyproject.toml | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pycrdt_websocket/django_channels/yjs_consumer.py b/pycrdt_websocket/django_channels/yjs_consumer.py index 1fb6cda..e4999ea 100644 --- a/pycrdt_websocket/django_channels/yjs_consumer.py +++ b/pycrdt_websocket/django_channels/yjs_consumer.py @@ -136,10 +136,10 @@ async def propagate_document_update_from_external(room_name, update): def __init__(self): super().__init__() - self.room_name: str | None = None - self.ydoc: Doc | None = None - self.room_storage: BaseYRoomStorage | None = None - self._websocket_shim: _WebsocketShim | None = None + self.room_name = None + self.ydoc = None + self.room_storage = None + self._websocket_shim = None def make_room_storage(self) -> BaseYRoomStorage | None: """Make the room storage for a new channel to persist the YDoc permanently. diff --git a/pyproject.toml b/pyproject.toml index e85dc60..33a7133 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ test = [ "hypercorn >=0.16.0", "trio >=0.25.0", "sniffio", + "types-redis", ] docs = [ "mkdocs", From bfd11c489ef794f26faa8504360683784c9b5830 Mon Sep 17 00:00:00 2001 From: cacosandon Date: Sat, 6 Apr 2024 16:01:06 -0300 Subject: [PATCH 11/14] refactor(yroom_storage): make base class abstract and optional throttling save --- pycrdt_websocket/django_channels/__init__.py | 1 - .../django_channels/yroom_storage.py | 80 +++++++++---------- 2 files changed, 40 insertions(+), 41 deletions(-) diff --git a/pycrdt_websocket/django_channels/__init__.py b/pycrdt_websocket/django_channels/__init__.py index 04b431a..0f860bd 100644 --- a/pycrdt_websocket/django_channels/__init__.py +++ b/pycrdt_websocket/django_channels/__init__.py @@ -1,3 +1,2 @@ from .yjs_consumer import YjsConsumer as YjsConsumer from .yroom_storage import BaseYRoomStorage as BaseYRoomStorage -from .yroom_storage import RedisYRoomStorage as RedisYRoomStorage diff --git a/pycrdt_websocket/django_channels/yroom_storage.py b/pycrdt_websocket/django_channels/yroom_storage.py index fe05f29..f41f08c 100644 --- a/pycrdt_websocket/django_channels/yroom_storage.py +++ b/pycrdt_websocket/django_channels/yroom_storage.py @@ -1,11 +1,12 @@ import time +from abc import ABC, abstractmethod from typing import Optional import redis.asyncio as redis from pycrdt import Doc -class BaseYRoomStorage: +class BaseYRoomStorage(ABC): """Base class for YRoom storage. This class is responsible for storing, retrieving, updating and persisting the Ypy document. Each Django Channels Consumer should have its own YRoomStorage instance, although all consumers @@ -55,12 +56,12 @@ async def save_snapshot(self): ``` """ - def __init__(self, room_name: str) -> None: + def __init__(self, room_name: str, save_throttle_interval: int | None) -> None: self.room_name = room_name - + self.save_throttle_interval = save_throttle_interval self.last_saved_at = time.time() - self.save_throttle_interval = 5 + @abstractmethod async def get_document(self) -> Doc: """Gets the document from the storage. Ideally it should be retrieved first from temporary storage (e.g. Redis) and then from @@ -68,53 +69,46 @@ async def get_document(self) -> Doc: Returns: The document with the latest changes. """ + ... - raise NotImplementedError - - async def update_document(self, update: bytes): + @abstractmethod + async def update_document(self, update: bytes) -> None: """Updates the document in the storage. Updates could be received by Yjs client (e.g. from a WebSocket) or from the server (e.g. from a Django Celery job). Args: update: The update to apply to the document. """ + ... - raise NotImplementedError - + @abstractmethod async def load_snapshot(self) -> Optional[bytes]: - """Gets the document from the database. Override this method to + """Gets the document encoded as update from the database. Override this method to implement a persistent storage. Defaults to None. Returns: The latest document snapshot. """ - return None + ... + @abstractmethod async def save_snapshot(self) -> None: - """Saves a snapshot of the document to the storage. - If you need to persist the document to a database, you should do it here. - Default implementation does nothing. - """ - - pass + """Saves the document encoded as update to the database.""" + ... async def throttled_save_snapshot(self) -> None: - """Saves a snapshot of the document to the storage, debouncing the calls.""" + """Saves the document encoded as update to the database, throttled.""" - if time.time() - self.last_saved_at <= self.save_throttle_interval: + if ( + not self.save_throttle_interval + or time.time() - self.last_saved_at <= self.save_throttle_interval + ): return await self.save_snapshot() self.last_saved_at = time.time() - async def close(self): - """Closes the storage. - Default implementation does nothing. - """ - - pass - class RedisYRoomStorage(BaseYRoomStorage): """A YRoom storage that uses Redis as main storage, without @@ -123,17 +117,11 @@ class RedisYRoomStorage(BaseYRoomStorage): room_name: The name of the room. """ - def __init__(self, room_name: str) -> None: - super().__init__(room_name) + def __init__(self, room_name: str, save_throttle_interval: int | None = None) -> None: + super().__init__(room_name, save_throttle_interval) self.redis_key = f"document:{self.room_name}" - self.redis = self.make_redis() - - def make_redis(self): - """Makes a Redis client. - Defaults to a local client""" - - return redis.Redis(host="localhost", port=6379, db=0) + self.redis = self._make_redis() async def get_document(self) -> Doc: snapshot = await self.redis.get(self.redis_key) @@ -153,7 +141,7 @@ async def update_document(self, update: bytes): try: current_document = await self.get_document() - updated_snapshot = self._apply_update_to_snapshot(current_document, update) + updated_snapshot = self._apply_update_to_document(current_document, update) async with self.redis.pipeline() as pipe: while True: @@ -165,9 +153,9 @@ async def update_document(self, update: bytes): break except redis.WatchError: - current_snapshot = await self.get_document() - updated_snapshot = self._apply_update_to_snapshot( - current_snapshot, + current_document = await self.get_document() + updated_snapshot = self._apply_update_to_document( + current_document, update, ) @@ -177,11 +165,23 @@ async def update_document(self, update: bytes): await self.throttled_save_snapshot() + async def load_snapshot(self) -> Optional[bytes]: + return None + + async def save_snapshot(self) -> Optional[bytes]: + return None + async def close(self): await self.save_snapshot() await self.redis.close() - def _apply_update_to_snapshot(self, document: Doc, update: bytes) -> bytes: + def _apply_update_to_document(self, document: Doc, update: bytes) -> bytes: document.apply_update(update) return document.get_update() + + def _make_redis(self): + """Makes a Redis client. + Defaults to a local client""" + + return redis.Redis(host="localhost", port=6379, db=0) From 6c03d23056847ab2d465652b49a2559033dcdae1 Mon Sep 17 00:00:00 2001 From: cacosandon Date: Sat, 6 Apr 2024 16:18:40 -0300 Subject: [PATCH 12/14] refactor(django_channels): move redis storage to its own file so redis its not required and update imports --- pycrdt_websocket/django_channels/__init__.py | 2 +- .../base_yroom_storage.py} | 78 ----------------- .../storage/redis_yroom_storage.py | 83 +++++++++++++++++++ .../django_channels/yjs_consumer.py | 10 ++- 4 files changed, 91 insertions(+), 82 deletions(-) rename pycrdt_websocket/django_channels/{yroom_storage.py => storage/base_yroom_storage.py} (62%) create mode 100644 pycrdt_websocket/django_channels/storage/redis_yroom_storage.py diff --git a/pycrdt_websocket/django_channels/__init__.py b/pycrdt_websocket/django_channels/__init__.py index 0f860bd..7be8da3 100644 --- a/pycrdt_websocket/django_channels/__init__.py +++ b/pycrdt_websocket/django_channels/__init__.py @@ -1,2 +1,2 @@ +from .storage.base_yroom_storage import BaseYRoomStorage as BaseYRoomStorage from .yjs_consumer import YjsConsumer as YjsConsumer -from .yroom_storage import BaseYRoomStorage as BaseYRoomStorage diff --git a/pycrdt_websocket/django_channels/yroom_storage.py b/pycrdt_websocket/django_channels/storage/base_yroom_storage.py similarity index 62% rename from pycrdt_websocket/django_channels/yroom_storage.py rename to pycrdt_websocket/django_channels/storage/base_yroom_storage.py index f41f08c..427b5af 100644 --- a/pycrdt_websocket/django_channels/yroom_storage.py +++ b/pycrdt_websocket/django_channels/storage/base_yroom_storage.py @@ -2,7 +2,6 @@ from abc import ABC, abstractmethod from typing import Optional -import redis.asyncio as redis from pycrdt import Doc @@ -108,80 +107,3 @@ async def throttled_save_snapshot(self) -> None: await self.save_snapshot() self.last_saved_at = time.time() - - -class RedisYRoomStorage(BaseYRoomStorage): - """A YRoom storage that uses Redis as main storage, without - persistent storage. - Args: - room_name: The name of the room. - """ - - def __init__(self, room_name: str, save_throttle_interval: int | None = None) -> None: - super().__init__(room_name, save_throttle_interval) - - self.redis_key = f"document:{self.room_name}" - self.redis = self._make_redis() - - async def get_document(self) -> Doc: - snapshot = await self.redis.get(self.redis_key) - - if not snapshot: - snapshot = await self.load_snapshot() - - document = Doc() - - if snapshot: - document.apply_update(snapshot) - - return document - - async def update_document(self, update: bytes): - await self.redis.watch(self.redis_key) - - try: - current_document = await self.get_document() - updated_snapshot = self._apply_update_to_document(current_document, update) - - async with self.redis.pipeline() as pipe: - while True: - try: - pipe.multi() - pipe.set(self.redis_key, updated_snapshot) - - await pipe.execute() - - break - except redis.WatchError: - current_document = await self.get_document() - updated_snapshot = self._apply_update_to_document( - current_document, - update, - ) - - continue - finally: - await self.redis.unwatch() - - await self.throttled_save_snapshot() - - async def load_snapshot(self) -> Optional[bytes]: - return None - - async def save_snapshot(self) -> Optional[bytes]: - return None - - async def close(self): - await self.save_snapshot() - await self.redis.close() - - def _apply_update_to_document(self, document: Doc, update: bytes) -> bytes: - document.apply_update(update) - - return document.get_update() - - def _make_redis(self): - """Makes a Redis client. - Defaults to a local client""" - - return redis.Redis(host="localhost", port=6379, db=0) diff --git a/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py b/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py new file mode 100644 index 0000000..51d1fb3 --- /dev/null +++ b/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py @@ -0,0 +1,83 @@ +from typing import Optional + +import redis.asyncio as redis +from pycrdt import Doc + +from .base_yroom_storage import BaseYRoomStorage + + +class RedisYRoomStorage(BaseYRoomStorage): + """A YRoom storage that uses Redis as main storage, without + persistent storage. + Args: + room_name: The name of the room. + """ + + def __init__(self, room_name: str, save_throttle_interval: int | None = None) -> None: + super().__init__(room_name, save_throttle_interval) + + self.redis_key = f"document:{self.room_name}" + self.redis = self._make_redis() + + async def get_document(self) -> Doc: + snapshot = await self.redis.get(self.redis_key) + + if not snapshot: + snapshot = await self.load_snapshot() + + document = Doc() + + if snapshot: + document.apply_update(snapshot) + + return document + + async def update_document(self, update: bytes): + await self.redis.watch(self.redis_key) + + try: + current_document = await self.get_document() + updated_snapshot = self._apply_update_to_document(current_document, update) + + async with self.redis.pipeline() as pipe: + while True: + try: + pipe.multi() + pipe.set(self.redis_key, updated_snapshot) + + await pipe.execute() + + break + except redis.WatchError: + current_document = await self.get_document() + updated_snapshot = self._apply_update_to_document( + current_document, + update, + ) + + continue + finally: + await self.redis.unwatch() + + await self.throttled_save_snapshot() + + async def load_snapshot(self) -> Optional[bytes]: + return None + + async def save_snapshot(self) -> Optional[bytes]: + return None + + async def close(self): + await self.save_snapshot() + await self.redis.close() + + def _apply_update_to_document(self, document: Doc, update: bytes) -> bytes: + document.apply_update(update) + + return document.get_update() + + def _make_redis(self): + """Makes a Redis client. + Defaults to a local client""" + + return redis.Redis(host="localhost", port=6379, db=0) diff --git a/pycrdt_websocket/django_channels/yjs_consumer.py b/pycrdt_websocket/django_channels/yjs_consumer.py index e4999ea..ff750df 100644 --- a/pycrdt_websocket/django_channels/yjs_consumer.py +++ b/pycrdt_websocket/django_channels/yjs_consumer.py @@ -6,7 +6,7 @@ from channels.generic.websocket import AsyncWebsocketConsumer # type: ignore[import-not-found] from pycrdt import Doc -from pycrdt_websocket.django_channels.yroom_storage import BaseYRoomStorage +from pycrdt_websocket.django_channels.storage.base_yroom_storage import BaseYRoomStorage from ..websocket import Websocket from ..yutils import ( @@ -96,13 +96,14 @@ class YjsConsumer(AsyncWebsocketConsumer): from channels.layers import get_channel_layer from pycrdt_websocket.django_channels_consumer import YjsConsumer from pycrdt_websocket.yutils import create_update_message + from pycrdt_websocket.django_channels.storage.redis_yroom_storage import RedisYRoomStorage class DocConsumer(YjsConsumer): def make_room_storage(self) -> BaseYRoomStorage: # Modify the room storage here - return RedisYRoomStorage(self.room_name) + return RedisYRoomStorage(room_name=self.room_name) def make_room_name(self) -> str: # Modify the room name here @@ -147,7 +148,10 @@ def make_room_storage(self) -> BaseYRoomStorage | None: Defaults to not using any (just broadcast updates between consumers). Example: - self.room_storage = RedisYRoomStorage(self.room_name) + self.room_storage = YourCustomRedisYRoomStorage( + room_name=self.room_name, + save_throttle_interval=5 + ) """ return None From f7c6f268a40a203daa44af981ff4f896e5dab702 Mon Sep 17 00:00:00 2001 From: cacosandon Date: Sat, 6 Apr 2024 16:27:50 -0300 Subject: [PATCH 13/14] refactor(django-channels@storage): move throttling methods to redis storage and close method to base style: fix linter wrong return type of save_snapshot --- .../storage/base_yroom_storage.py | 22 ++++++------------- .../storage/redis_yroom_storage.py | 21 ++++++++++++++++-- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/pycrdt_websocket/django_channels/storage/base_yroom_storage.py b/pycrdt_websocket/django_channels/storage/base_yroom_storage.py index 427b5af..7a4c272 100644 --- a/pycrdt_websocket/django_channels/storage/base_yroom_storage.py +++ b/pycrdt_websocket/django_channels/storage/base_yroom_storage.py @@ -1,4 +1,3 @@ -import time from abc import ABC, abstractmethod from typing import Optional @@ -55,10 +54,8 @@ async def save_snapshot(self): ``` """ - def __init__(self, room_name: str, save_throttle_interval: int | None) -> None: + def __init__(self, room_name: str) -> None: self.room_name = room_name - self.save_throttle_interval = save_throttle_interval - self.last_saved_at = time.time() @abstractmethod async def get_document(self) -> Doc: @@ -95,15 +92,10 @@ async def save_snapshot(self) -> None: """Saves the document encoded as update to the database.""" ... - async def throttled_save_snapshot(self) -> None: - """Saves the document encoded as update to the database, throttled.""" + async def close(self) -> None: + """Closes the storage connection. - if ( - not self.save_throttle_interval - or time.time() - self.last_saved_at <= self.save_throttle_interval - ): - return - - await self.save_snapshot() - - self.last_saved_at = time.time() + Useful for cleaning up resources like closing a database + connection or saving the document before exiting. + """ + pass diff --git a/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py b/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py index 51d1fb3..c0bd024 100644 --- a/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py +++ b/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py @@ -1,3 +1,4 @@ +import time from typing import Optional import redis.asyncio as redis @@ -14,7 +15,10 @@ class RedisYRoomStorage(BaseYRoomStorage): """ def __init__(self, room_name: str, save_throttle_interval: int | None = None) -> None: - super().__init__(room_name, save_throttle_interval) + super().__init__(room_name) + + self.save_throttle_interval = save_throttle_interval + self.last_saved_at = time.time() self.redis_key = f"document:{self.room_name}" self.redis = self._make_redis() @@ -64,9 +68,22 @@ async def update_document(self, update: bytes): async def load_snapshot(self) -> Optional[bytes]: return None - async def save_snapshot(self) -> Optional[bytes]: + async def save_snapshot(self) -> None: return None + async def throttled_save_snapshot(self) -> None: + """Saves the document encoded as update to the database, throttled.""" + + if ( + not self.save_throttle_interval + or time.time() - self.last_saved_at <= self.save_throttle_interval + ): + return + + await self.save_snapshot() + + self.last_saved_at = time.time() + async def close(self): await self.save_snapshot() await self.redis.close() From 36da1db88c50f55da2613d3cabfe9feb42ca79de Mon Sep 17 00:00:00 2001 From: cacosandon Date: Sun, 7 Apr 2024 17:56:17 -0400 Subject: [PATCH 14/14] feat(django-channels@redis-storage): add expiration time to values in Redis and make make_redis public --- .../storage/redis_yroom_storage.py | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py b/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py index c0bd024..3b02919 100644 --- a/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py +++ b/pycrdt_websocket/django_channels/storage/redis_yroom_storage.py @@ -14,14 +14,20 @@ class RedisYRoomStorage(BaseYRoomStorage): room_name: The name of the room. """ - def __init__(self, room_name: str, save_throttle_interval: int | None = None) -> None: + def __init__( + self, + room_name: str, + save_throttle_interval: int | None = None, + redis_expiration_seconds: int | None = 60 * 10, # 10 minutes, + ): super().__init__(room_name) self.save_throttle_interval = save_throttle_interval self.last_saved_at = time.time() self.redis_key = f"document:{self.room_name}" - self.redis = self._make_redis() + self.redis = self.make_redis() + self.redis_expiration_seconds = redis_expiration_seconds async def get_document(self) -> Doc: snapshot = await self.redis.get(self.redis_key) @@ -47,7 +53,11 @@ async def update_document(self, update: bytes): while True: try: pipe.multi() - pipe.set(self.redis_key, updated_snapshot) + pipe.set( + name=self.redis_key, + value=updated_snapshot, + ex=self.redis_expiration_seconds, + ) await pipe.execute() @@ -84,6 +94,12 @@ async def throttled_save_snapshot(self) -> None: self.last_saved_at = time.time() + def make_redis(self): + """Makes a Redis client. + Defaults to a local client""" + + return redis.Redis(host="localhost", port=6379, db=0) + async def close(self): await self.save_snapshot() await self.redis.close() @@ -92,9 +108,3 @@ def _apply_update_to_document(self, document: Doc, update: bytes) -> bytes: document.apply_update(update) return document.get_update() - - def _make_redis(self): - """Makes a Redis client. - Defaults to a local client""" - - return redis.Redis(host="localhost", port=6379, db=0)