From 6747ef28c5844599973037547a45f31ca015b6ed Mon Sep 17 00:00:00 2001 From: Valter Balegas Date: Wed, 14 Aug 2024 18:13:56 +0100 Subject: [PATCH] Cherry picked broadcast impl from y-websocket --- examples/yjs-provider/.eslintrc.cjs | 1 + examples/yjs-provider/app/page.tsx | 27 ++- examples/yjs-provider/app/y-broadcast.js | 264 +++++++++++++++++++++++ pnpm-lock.yaml | 8 +- 4 files changed, 291 insertions(+), 9 deletions(-) create mode 100644 examples/yjs-provider/app/y-broadcast.js diff --git a/examples/yjs-provider/.eslintrc.cjs b/examples/yjs-provider/.eslintrc.cjs index 915fca4da9..1e2f4aa287 100644 --- a/examples/yjs-provider/.eslintrc.cjs +++ b/examples/yjs-provider/.eslintrc.cjs @@ -31,6 +31,7 @@ module.exports = { caughtErrorsIgnorePattern: `^_`, }, ], + "@next/next/no-img-element": "off" }, ignorePatterns: [ `**/node_modules/**`, diff --git a/examples/yjs-provider/app/page.tsx b/examples/yjs-provider/app/page.tsx index 8fe5dbee89..5f2dd25e36 100644 --- a/examples/yjs-provider/app/page.tsx +++ b/examples/yjs-provider/app/page.tsx @@ -7,6 +7,7 @@ import * as Y from "yjs" import { yCollab, yUndoManagerKeymap } from "y-codemirror.next" import { ElectricProvider } from "./y-electric" import { IndexeddbPersistence } from "y-indexeddb" +import { BroadcastProvider } from "./y-broadcast" import * as awarenessProtocol from "y-protocols/awareness" import { EditorState, EditorView, basicSetup } from "@codemirror/basic-setup" @@ -42,13 +43,21 @@ const theme = EditorView.theme( { dark: true } ) const ydoc = new Y.Doc() +let network: ElectricProvider | null = null + +if (typeof window !== `undefined`) { + const awareness = new awarenessProtocol.Awareness(ydoc) + const opts = { + connect: true, + awareness, + persistence: new IndexeddbPersistence(room, ydoc), + } + network = new ElectricProvider(`http://localhost:3000/`, room, ydoc, opts) -const opts = { - connect: true, - awareness: new awarenessProtocol.Awareness(ydoc), - persistence: new IndexeddbPersistence(room, ydoc), + new BroadcastProvider(room, ydoc, { + awareness, + }) } -const network = new ElectricProvider(`http://localhost:3000/`, room, ydoc, opts) export default function Home() { const editor = useRef(null) @@ -66,9 +75,13 @@ export default function Home() { } useEffect(() => { + if (typeof window === `undefined`) { + return + } + const ytext = ydoc.getText(room) - network.awareness.setLocalStateField(`user`, { + network?.awareness.setLocalStateField(`user`, { name: userColor.color, color: userColor.color, colorLight: userColor.light, @@ -81,7 +94,7 @@ export default function Home() { basicSetup, javascript(), EditorView.lineWrapping, - yCollab(ytext, network.awareness), + yCollab(ytext, network?.awareness), theme, ], }) diff --git a/examples/yjs-provider/app/y-broadcast.js b/examples/yjs-provider/app/y-broadcast.js new file mode 100644 index 0000000000..5698ed62b3 --- /dev/null +++ b/examples/yjs-provider/app/y-broadcast.js @@ -0,0 +1,264 @@ +/** + * Extracted this from y-websocket + */ + +/* eslint-env browser */ + +import * as bc from "lib0/broadcastchannel" +import * as encoding from "lib0/encoding" +import * as decoding from "lib0/decoding" +import * as syncProtocol from "y-protocols/sync" +import * as awarenessProtocol from "y-protocols/awareness" +import { Observable } from "lib0/observable" +export const messageSync = 0 +export const messageQueryAwareness = 3 +export const messageAwareness = 1 + +const messageHandlers = [] + +messageHandlers[messageSync] = ( + encoder, + decoder, + provider, + emitSynced, + _messageType +) => { + encoding.writeVarUint(encoder, messageSync) + const syncMessageType = syncProtocol.readSyncMessage( + decoder, + encoder, + provider.doc, + provider + ) + if ( + emitSynced && + syncMessageType === syncProtocol.messageYjsSyncStep2 && + !provider.synced + ) { + provider.synced = true + } +} + +messageHandlers[messageQueryAwareness] = ( + encoder, + _decoder, + provider, + _emitSynced, + _messageType +) => { + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + provider.awareness, + Array.from(provider.awareness.getStates().keys()) + ) + ) +} + +messageHandlers[messageAwareness] = ( + _encoder, + decoder, + provider, + _emitSynced, + _messageType +) => { + awarenessProtocol.applyAwarenessUpdate( + provider.awareness, + decoding.readVarUint8Array(decoder), + provider + ) +} + +/** + * @param {BroadcastProvider} provider + * @param {Uint8Array} buf + * @param {boolean} emitSynced + * @return {encoding.Encoder} + */ +const readMessage = (provider, buf, emitSynced) => { + const decoder = decoding.createDecoder(buf) + const encoder = encoding.createEncoder() + const messageType = decoding.readVarUint(decoder) + const messageHandler = provider.messageHandlers[messageType] + if (/** @type {any} */ (messageHandler)) { + messageHandler(encoder, decoder, provider, emitSynced, messageType) + } else { + console.error(`Unable to compute message`) + } + return encoder +} + +/** + * @param {BroadcastProvider} provider + */ + +/** + * @param {BroadcastProvider} provider + * @param {ArrayBuffer} buf + */ +const broadcastMessage = (provider, buf) => { + if (provider.bcconnected) { + bc.publish(provider.bcChannel, buf, provider) + } +} + +export class BroadcastProvider extends Observable { + constructor( + roomname, + doc, + { + connect = true, + disableBc = false, + awareness = new awarenessProtocol.Awareness(doc), + } = {} + ) { + super() + + this.bcChannel = roomname + this.awareness = awareness + this.roomname = roomname + this.doc = doc + this.bcconnected = false + this.disableBc = disableBc + this.messageHandlers = messageHandlers.slice() + + this._synced = false + this.wsLastMessageReceived = 0 + + this._bcSubscriber = (data, origin) => { + if (origin !== this) { + const encoder = readMessage(this, new Uint8Array(data), false) + if (encoding.length(encoder) > 1) { + bc.publish(this.bcChannel, encoding.toUint8Array(encoder), this) + } + } + } + + this._updateHandler = (update, origin) => { + if (origin !== this) { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeUpdate(encoder, update) + broadcastMessage(this, encoding.toUint8Array(encoder)) + } + } + this.doc.on(`update`, this._updateHandler) + + this._awarenessUpdateHandler = ({ added, updated, removed }, _origin) => { + const changedClients = added.concat(updated).concat(removed) + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients) + ) + broadcastMessage(this, encoding.toUint8Array(encoder)) + } + this._exitHandler = () => { + awarenessProtocol.removeAwarenessStates( + this.awareness, + [doc.clientID], + `app closed` + ) + } + + awareness.on(`update`, this._awarenessUpdateHandler) + + if (connect) { + this.connect() + } + } + + /** + * @type {boolean} + */ + get synced() { + return this._synced + } + + set synced(state) { + if (this._synced !== state) { + this._synced = state + this.emit(`synced`, [state]) + this.emit(`sync`, [state]) + } + } + + destroy() { + this.disconnect() + this.awareness.off(`update`, this._awarenessUpdateHandler) + this.doc.off(`update`, this._updateHandler) + super.destroy() + } + + connectBc() { + if (this.disableBc) { + return + } + if (!this.bcconnected) { + bc.subscribe(this.bcChannel, this._bcSubscriber) + this.bcconnected = true + } + // send sync step1 to bc + // write sync step 1 + const encoderSync = encoding.createEncoder() + encoding.writeVarUint(encoderSync, messageSync) + syncProtocol.writeSyncStep1(encoderSync, this.doc) + bc.publish(this.bcChannel, encoding.toUint8Array(encoderSync), this) + // broadcast local state + const encoderState = encoding.createEncoder() + encoding.writeVarUint(encoderState, messageSync) + syncProtocol.writeSyncStep2(encoderState, this.doc) + bc.publish(this.bcChannel, encoding.toUint8Array(encoderState), this) + // write queryAwareness + const encoderAwarenessQuery = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness) + bc.publish( + this.bcChannel, + encoding.toUint8Array(encoderAwarenessQuery), + this + ) + // broadcast local awareness state + const encoderAwarenessState = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessState, messageAwareness) + encoding.writeVarUint8Array( + encoderAwarenessState, + awarenessProtocol.encodeAwarenessUpdate(this.awareness, [ + this.doc.clientID, + ]) + ) + bc.publish( + this.bcChannel, + encoding.toUint8Array(encoderAwarenessState), + this + ) + } + + disconnectBc() { + // broadcast message with local awareness state set to null (indicating disconnect) + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + this.awareness, + [this.doc.clientID], + new Map() + ) + ) + broadcastMessage(this, encoding.toUint8Array(encoder)) + if (this.bcconnected) { + bc.unsubscribe(this.bcChannel, this._bcSubscriber) + this.bcconnected = false + } + } + + disconnect() { + this.disconnectBc() + } + + connect() { + this.connectBc() + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index be4240cd9f..6e5fcafab5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -8128,7 +8128,7 @@ snapshots: '@babel/traverse': 7.25.3 '@babel/types': 7.25.2 convert-source-map: 2.0.0 - debug: 4.3.6(supports-color@5.5.0) + debug: 4.3.6 gensync: 1.0.0-beta.2 json5: 2.2.3 semver: 6.3.1 @@ -8363,7 +8363,7 @@ snapshots: '@babel/parser': 7.25.3 '@babel/template': 7.25.0 '@babel/types': 7.25.2 - debug: 4.3.6(supports-color@5.5.0) + debug: 4.3.6 globals: 11.12.0 transitivePeerDependencies: - supports-color @@ -11845,6 +11845,10 @@ snapshots: dependencies: ms: 2.1.2 + debug@4.3.6: + dependencies: + ms: 2.1.2 + debug@4.3.6(supports-color@5.5.0): dependencies: ms: 2.1.2