Skip to content

Commit

Permalink
Cherry picked broadcast impl from y-websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
balegas committed Aug 14, 2024
1 parent bcc141c commit 6747ef2
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 9 deletions.
1 change: 1 addition & 0 deletions examples/yjs-provider/.eslintrc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module.exports = {
caughtErrorsIgnorePattern: `^_`,
},
],
"@next/next/no-img-element": "off"
},
ignorePatterns: [
`**/node_modules/**`,
Expand Down
27 changes: 20 additions & 7 deletions examples/yjs-provider/app/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as Y from "yjs"
import { yCollab, yUndoManagerKeymap } from "y-codemirror.next"
import { ElectricProvider } from "./y-electric"
import { IndexeddbPersistence } from "y-indexeddb"
import { BroadcastProvider } from "./y-broadcast"
import * as awarenessProtocol from "y-protocols/awareness"

import { EditorState, EditorView, basicSetup } from "@codemirror/basic-setup"
Expand Down Expand Up @@ -42,13 +43,21 @@ const theme = EditorView.theme(
{ dark: true }
)
const ydoc = new Y.Doc()
let network: ElectricProvider | null = null

if (typeof window !== `undefined`) {
const awareness = new awarenessProtocol.Awareness(ydoc)
const opts = {
connect: true,
awareness,
persistence: new IndexeddbPersistence(room, ydoc),
}
network = new ElectricProvider(`http://localhost:3000/`, room, ydoc, opts)

const opts = {
connect: true,
awareness: new awarenessProtocol.Awareness(ydoc),
persistence: new IndexeddbPersistence(room, ydoc),
new BroadcastProvider(room, ydoc, {
awareness,
})
}
const network = new ElectricProvider(`http://localhost:3000/`, room, ydoc, opts)

export default function Home() {
const editor = useRef(null)
Expand All @@ -66,9 +75,13 @@ export default function Home() {
}

useEffect(() => {
if (typeof window === `undefined`) {
return
}

const ytext = ydoc.getText(room)

network.awareness.setLocalStateField(`user`, {
network?.awareness.setLocalStateField(`user`, {
name: userColor.color,
color: userColor.color,
colorLight: userColor.light,
Expand All @@ -81,7 +94,7 @@ export default function Home() {
basicSetup,
javascript(),
EditorView.lineWrapping,
yCollab(ytext, network.awareness),
yCollab(ytext, network?.awareness),
theme,
],
})
Expand Down
264 changes: 264 additions & 0 deletions examples/yjs-provider/app/y-broadcast.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
/**
* Extracted this from y-websocket
*/

/* eslint-env browser */

import * as bc from "lib0/broadcastchannel"
import * as encoding from "lib0/encoding"
import * as decoding from "lib0/decoding"
import * as syncProtocol from "y-protocols/sync"
import * as awarenessProtocol from "y-protocols/awareness"
import { Observable } from "lib0/observable"
export const messageSync = 0
export const messageQueryAwareness = 3
export const messageAwareness = 1

const messageHandlers = []

messageHandlers[messageSync] = (
encoder,
decoder,
provider,
emitSynced,
_messageType
) => {
encoding.writeVarUint(encoder, messageSync)
const syncMessageType = syncProtocol.readSyncMessage(
decoder,
encoder,
provider.doc,
provider
)
if (
emitSynced &&
syncMessageType === syncProtocol.messageYjsSyncStep2 &&
!provider.synced
) {
provider.synced = true
}
}

messageHandlers[messageQueryAwareness] = (
encoder,
_decoder,
provider,
_emitSynced,
_messageType
) => {
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
provider.awareness,
Array.from(provider.awareness.getStates().keys())
)
)
}

messageHandlers[messageAwareness] = (
_encoder,
decoder,
provider,
_emitSynced,
_messageType
) => {
awarenessProtocol.applyAwarenessUpdate(
provider.awareness,
decoding.readVarUint8Array(decoder),
provider
)
}

/**
* @param {BroadcastProvider} provider
* @param {Uint8Array} buf
* @param {boolean} emitSynced
* @return {encoding.Encoder}
*/
const readMessage = (provider, buf, emitSynced) => {
const decoder = decoding.createDecoder(buf)
const encoder = encoding.createEncoder()
const messageType = decoding.readVarUint(decoder)
const messageHandler = provider.messageHandlers[messageType]
if (/** @type {any} */ (messageHandler)) {
messageHandler(encoder, decoder, provider, emitSynced, messageType)
} else {
console.error(`Unable to compute message`)
}
return encoder
}

/**
* @param {BroadcastProvider} provider
*/

/**
* @param {BroadcastProvider} provider
* @param {ArrayBuffer} buf
*/
const broadcastMessage = (provider, buf) => {
if (provider.bcconnected) {
bc.publish(provider.bcChannel, buf, provider)
}
}

export class BroadcastProvider extends Observable {
constructor(
roomname,
doc,
{
connect = true,
disableBc = false,
awareness = new awarenessProtocol.Awareness(doc),
} = {}
) {
super()

this.bcChannel = roomname
this.awareness = awareness
this.roomname = roomname
this.doc = doc
this.bcconnected = false
this.disableBc = disableBc
this.messageHandlers = messageHandlers.slice()

this._synced = false
this.wsLastMessageReceived = 0

this._bcSubscriber = (data, origin) => {
if (origin !== this) {
const encoder = readMessage(this, new Uint8Array(data), false)
if (encoding.length(encoder) > 1) {
bc.publish(this.bcChannel, encoding.toUint8Array(encoder), this)
}
}
}

this._updateHandler = (update, origin) => {
if (origin !== this) {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeUpdate(encoder, update)
broadcastMessage(this, encoding.toUint8Array(encoder))
}
}
this.doc.on(`update`, this._updateHandler)

this._awarenessUpdateHandler = ({ added, updated, removed }, _origin) => {
const changedClients = added.concat(updated).concat(removed)
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients)
)
broadcastMessage(this, encoding.toUint8Array(encoder))
}
this._exitHandler = () => {
awarenessProtocol.removeAwarenessStates(
this.awareness,
[doc.clientID],
`app closed`
)
}

awareness.on(`update`, this._awarenessUpdateHandler)

if (connect) {
this.connect()
}
}

/**
* @type {boolean}
*/
get synced() {
return this._synced
}

set synced(state) {
if (this._synced !== state) {
this._synced = state
this.emit(`synced`, [state])
this.emit(`sync`, [state])
}
}

destroy() {
this.disconnect()
this.awareness.off(`update`, this._awarenessUpdateHandler)
this.doc.off(`update`, this._updateHandler)
super.destroy()
}

connectBc() {
if (this.disableBc) {
return
}
if (!this.bcconnected) {
bc.subscribe(this.bcChannel, this._bcSubscriber)
this.bcconnected = true
}
// send sync step1 to bc
// write sync step 1
const encoderSync = encoding.createEncoder()
encoding.writeVarUint(encoderSync, messageSync)
syncProtocol.writeSyncStep1(encoderSync, this.doc)
bc.publish(this.bcChannel, encoding.toUint8Array(encoderSync), this)
// broadcast local state
const encoderState = encoding.createEncoder()
encoding.writeVarUint(encoderState, messageSync)
syncProtocol.writeSyncStep2(encoderState, this.doc)
bc.publish(this.bcChannel, encoding.toUint8Array(encoderState), this)
// write queryAwareness
const encoderAwarenessQuery = encoding.createEncoder()
encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness)
bc.publish(
this.bcChannel,
encoding.toUint8Array(encoderAwarenessQuery),
this
)
// broadcast local awareness state
const encoderAwarenessState = encoding.createEncoder()
encoding.writeVarUint(encoderAwarenessState, messageAwareness)
encoding.writeVarUint8Array(
encoderAwarenessState,
awarenessProtocol.encodeAwarenessUpdate(this.awareness, [
this.doc.clientID,
])
)
bc.publish(
this.bcChannel,
encoding.toUint8Array(encoderAwarenessState),
this
)
}

disconnectBc() {
// broadcast message with local awareness state set to null (indicating disconnect)
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
this.awareness,
[this.doc.clientID],
new Map()
)
)
broadcastMessage(this, encoding.toUint8Array(encoder))
if (this.bcconnected) {
bc.unsubscribe(this.bcChannel, this._bcSubscriber)
this.bcconnected = false
}
}

disconnect() {
this.disconnectBc()
}

connect() {
this.connectBc()
}
}
8 changes: 6 additions & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6747ef2

Please sign in to comment.