diff --git a/examples/yjs-provider/app/api/operation/route.ts b/examples/yjs-provider/app/api/operation/route.ts index 5a170417fa..03d1072663 100644 --- a/examples/yjs-provider/app/api/operation/route.ts +++ b/examples/yjs-provider/app/api/operation/route.ts @@ -1,12 +1,83 @@ import { db } from "../../db" import { NextResponse } from "next/server" +// TODO: still loading yjs twice +import * as Y from "yjs" +import * as syncProtocol from "y-protocols/sync" + +import * as encoding from "lib0/encoding" +import * as decoding from "lib0/decoding" + +import { toBase64, fromBase64 } from "lib0/buffer" + +const maxRowCount = 50 + export async function POST(request: Request) { const body = await request.json() + + const errorResponse = validateRequest(body) + if (errorResponse) { + return errorResponse + } + await db.query( `INSERT INTO ydoc_operations (room, op) VALUES ($1, $2)`, [body.room, body.op] ) + await maybeCompact(body.room) + return NextResponse.json({}) } + +function validateRequest({ room, op }: { room: string; op: string }) { + if (!room) { + return NextResponse.json({ error: `'room' is required` }, { status: 400 }) + } + + if (!op) { + return NextResponse.json({ error: `'op' is required` }, { status: 400 }) + } +} + +// naive implementation of compaction +async function maybeCompact(room: string) { + const ydoc = new Y.Doc() + + const res0 = await db.query( + `SELECT COUNT(*) as count FROM ydoc_operations`, + [] + ) + if (res0.rows[0].count < maxRowCount) { + return + } + + console.log(`compaction`) + const res1 = await db.query( + `SELECT id, op FROM ydoc_operations + WHERE room = $1 + ORDER BY id DESC`, + [room] + ) + res1.rows.map(({ op }) => { + const buf = fromBase64(op) + const decoder = decoding.createDecoder(buf) + syncProtocol.readSyncMessage( + decoder, + encoding.createEncoder(), + ydoc, + `server` + ) + }) + + const encoder = encoding.createEncoder() + syncProtocol.writeUpdate(encoder, Y.encodeStateAsUpdate(ydoc)) + const encoded = toBase64(encoding.toUint8Array(encoder)) + + await db.query(`TRUNCATE ydoc_operations`) + await db.query( + `INSERT INTO ydoc_operations (room, op) + VALUES ($1, $2)`, + [room, encoded] + ) +} diff --git a/examples/yjs-provider/app/page.tsx b/examples/yjs-provider/app/page.tsx index 09d2bcd360..3db067219c 100644 --- a/examples/yjs-provider/app/page.tsx +++ b/examples/yjs-provider/app/page.tsx @@ -44,14 +44,12 @@ const theme = EditorView.theme( const ydoc = new Y.Doc() let network: ElectricProvider | null = null -if (typeof window !== `undefined`) { - const opts = { - connect: true, - awareness: new awarenessProtocol.Awareness(ydoc), - // persistence: new IndexeddbPersistence(room, ydoc), - } - network = new ElectricProvider(`http://localhost:3000/`, room, ydoc, opts) +const opts = { + connect: true, + awareness: new awarenessProtocol.Awareness(ydoc), + persistence: new IndexeddbPersistence(room, ydoc), } +network = new ElectricProvider(`http://localhost:3000/`, room, ydoc, opts) export default function Home() { const editor = useRef(null) @@ -69,8 +67,6 @@ export default function Home() { } useEffect(() => { - if (typeof window === `undefined`) return - const ytext = ydoc.getText(room) network!.awareness.setLocalStateField(`user`, { diff --git a/examples/yjs-provider/app/y-electric.js b/examples/yjs-provider/app/y-electric.js index 948ab25955..5ae159e5e0 100644 --- a/examples/yjs-provider/app/y-electric.js +++ b/examples/yjs-provider/app/y-electric.js @@ -45,7 +45,25 @@ const setupShapeStream = (provider) => { }) } - const handleSyncMessage = (messages) => + // Should handle multiple clients + const updateShapeState = (name, offset, shapeId) => { + if (provider.persistence === null) { + return + } + provider.persistence.set(name, { offset, shape_id: shapeId }) + } + + const handleSyncMessage = (messages) => { + if (messages.length < 2) { + return + } + const { offset } = messages[messages.length - 2] + updateShapeState( + `operations_state`, + Number(offset.split(`_`)[0]), + provider.operationsStream.shapeId + ) + handleMessages(messages).forEach((decoder) => { const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) @@ -62,8 +80,19 @@ const setupShapeStream = (provider) => { provider.synced = true } }) + } + + const handleAwarenessMessage = (messages) => { + if (messages.length < 2) { + return + } + const { offset } = messages[messages.length - 2] + updateShapeState( + `awareness_state`, + Number(offset.split(`_`)[0]), + provider.awarenessStream.shapeId + ) - const handleAwarenessMessage = (messages) => handleMessages(messages).forEach((decoder) => { awarenessProtocol.applyAwarenessUpdate( provider.awareness, @@ -71,6 +100,7 @@ const setupShapeStream = (provider) => { provider ) }) + } // TODO: need to improve error handling const handleError = (event) => { @@ -94,7 +124,7 @@ const setupShapeStream = (provider) => { provider.connecting = false if (provider.connected) { provider.connected = false - + provider.synced = false awarenessProtocol.removeAwarenessStates( @@ -196,13 +226,13 @@ export class ElectricProvider extends Observable { * @param {boolean} [opts.connect] * @param {awarenessProtocol.Awareness} [opts.awareness] * @param {IndexeddbPersistence} [opts.persistence] - * @param {Object} [opts.params] specify url parameters + * @param {Object} [opts.resume] */ constructor( serverUrl, roomname, doc, - { connect = false, awareness = null, persistence = null } = {} + { connect = false, awareness = null, persistence = null, resume = {} } = {} ) { super() @@ -221,13 +251,27 @@ export class ElectricProvider extends Observable { this.awarenessStream = null this.pending = [] + this.resume = resume ?? {} this.closeHandler = null + this.persistence = persistence this.loaded = persistence === null + persistence?.on(`synced`, () => { - this.loaded = true - this.connect() + persistence + .get(`operations_state`) + .then((opsState) => { + this.resume.operations = opsState + return persistence.get(`awareness_state`) + }) + .then((awarenessState) => { + this.resume.awareness = awarenessState + }) + .then(() => { + this.loaded = true + this.connect() + }) }) /** @@ -271,14 +315,22 @@ export class ElectricProvider extends Observable { } get operationsUrl() { - const params = { where: `room = '${this.roomname}'` } + const params = { + where: `room = '${this.roomname}'`, + ...this.resume.operations, + } const encodedParams = url.encodeQueryParams(params) + console.log(params) return this.serverUrl + `/v1/shape/ydoc_operations?` + encodedParams } get awarenessUrl() { - const params = { where: `room = '${this.roomname}'` } + const params = { + where: `room = '${this.roomname}'`, + ...this.resume.awareness, + } const encodedParams = url.encodeQueryParams(params) + console.log(params) return this.serverUrl + `/v1/shape/ydoc_awareness?` + encodedParams }