Skip to content

Commit

Permalink
Persistence with resume
Browse files Browse the repository at this point in the history
  • Loading branch information
balegas committed Aug 14, 2024
1 parent 33e38a7 commit 10fd08b
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 18 deletions.
71 changes: 71 additions & 0 deletions examples/yjs-provider/app/api/operation/route.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,83 @@
import { db } from "../../db"
import { NextResponse } from "next/server"

// TODO: still loading yjs twice
import * as Y from "yjs"
import * as syncProtocol from "y-protocols/sync"

import * as encoding from "lib0/encoding"
import * as decoding from "lib0/decoding"

import { toBase64, fromBase64 } from "lib0/buffer"

const maxRowCount = 50

export async function POST(request: Request) {
const body = await request.json()

const errorResponse = validateRequest(body)
if (errorResponse) {
return errorResponse
}

await db.query(
`INSERT INTO ydoc_operations (room, op)
VALUES ($1, $2)`,
[body.room, body.op]
)
await maybeCompact(body.room)

return NextResponse.json({})
}

function validateRequest({ room, op }: { room: string; op: string }) {
if (!room) {
return NextResponse.json({ error: `'room' is required` }, { status: 400 })
}

if (!op) {
return NextResponse.json({ error: `'op' is required` }, { status: 400 })
}
}

// naive implementation of compaction
async function maybeCompact(room: string) {
const ydoc = new Y.Doc()

const res0 = await db.query(
`SELECT COUNT(*) as count FROM ydoc_operations`,
[]
)
if (res0.rows[0].count < maxRowCount) {
return
}

console.log(`compaction`)
const res1 = await db.query(
`SELECT id, op FROM ydoc_operations
WHERE room = $1
ORDER BY id DESC`,
[room]
)
res1.rows.map(({ op }) => {
const buf = fromBase64(op)
const decoder = decoding.createDecoder(buf)
syncProtocol.readSyncMessage(
decoder,
encoding.createEncoder(),
ydoc,
`server`
)
})

const encoder = encoding.createEncoder()
syncProtocol.writeUpdate(encoder, Y.encodeStateAsUpdate(ydoc))
const encoded = toBase64(encoding.toUint8Array(encoder))

await db.query(`TRUNCATE ydoc_operations`)
await db.query(
`INSERT INTO ydoc_operations (room, op)
VALUES ($1, $2)`,
[room, encoded]
)
}
14 changes: 5 additions & 9 deletions examples/yjs-provider/app/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,12 @@ const theme = EditorView.theme(
const ydoc = new Y.Doc()
let network: ElectricProvider | null = null

if (typeof window !== `undefined`) {
const opts = {
connect: true,
awareness: new awarenessProtocol.Awareness(ydoc),
// persistence: new IndexeddbPersistence(room, ydoc),
}
network = new ElectricProvider(`http://localhost:3000/`, room, ydoc, opts)
const opts = {
connect: true,
awareness: new awarenessProtocol.Awareness(ydoc),
persistence: new IndexeddbPersistence(room, ydoc),
}
network = new ElectricProvider(`http://localhost:3000/`, room, ydoc, opts)

export default function Home() {
const editor = useRef(null)
Expand All @@ -69,8 +67,6 @@ export default function Home() {
}

useEffect(() => {
if (typeof window === `undefined`) return

const ytext = ydoc.getText(room)

network!.awareness.setLocalStateField(`user`, {
Expand Down
70 changes: 61 additions & 9 deletions examples/yjs-provider/app/y-electric.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,25 @@ const setupShapeStream = (provider) => {
})
}

const handleSyncMessage = (messages) =>
// Should handle multiple clients
const updateShapeState = (name, offset, shapeId) => {
if (provider.persistence === null) {
return
}
provider.persistence.set(name, { offset, shape_id: shapeId })
}

const handleSyncMessage = (messages) => {
if (messages.length < 2) {
return
}
const { offset } = messages[messages.length - 2]
updateShapeState(
`operations_state`,
Number(offset.split(`_`)[0]),
provider.operationsStream.shapeId
)

handleMessages(messages).forEach((decoder) => {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
Expand All @@ -62,15 +80,27 @@ const setupShapeStream = (provider) => {
provider.synced = true
}
})
}

const handleAwarenessMessage = (messages) => {
if (messages.length < 2) {
return
}
const { offset } = messages[messages.length - 2]
updateShapeState(
`awareness_state`,
Number(offset.split(`_`)[0]),
provider.awarenessStream.shapeId
)

const handleAwarenessMessage = (messages) =>
handleMessages(messages).forEach((decoder) => {
awarenessProtocol.applyAwarenessUpdate(
provider.awareness,
decoding.readVarUint8Array(decoder),
provider
)
})
}

// TODO: need to improve error handling
const handleError = (event) => {
Expand All @@ -94,7 +124,7 @@ const setupShapeStream = (provider) => {
provider.connecting = false
if (provider.connected) {
provider.connected = false

provider.synced = false

awarenessProtocol.removeAwarenessStates(
Expand Down Expand Up @@ -196,13 +226,13 @@ export class ElectricProvider extends Observable {
* @param {boolean} [opts.connect]
* @param {awarenessProtocol.Awareness} [opts.awareness]
* @param {IndexeddbPersistence} [opts.persistence]
* @param {Object<string,string>} [opts.params] specify url parameters
* @param {Object<string,string>} [opts.resume]
*/
constructor(
serverUrl,
roomname,
doc,
{ connect = false, awareness = null, persistence = null } = {}
{ connect = false, awareness = null, persistence = null, resume = {} } = {}
) {
super()

Expand All @@ -221,13 +251,27 @@ export class ElectricProvider extends Observable {
this.awarenessStream = null

this.pending = []
this.resume = resume ?? {}

this.closeHandler = null

this.persistence = persistence
this.loaded = persistence === null

persistence?.on(`synced`, () => {
this.loaded = true
this.connect()
persistence
.get(`operations_state`)
.then((opsState) => {
this.resume.operations = opsState
return persistence.get(`awareness_state`)
})
.then((awarenessState) => {
this.resume.awareness = awarenessState
})
.then(() => {
this.loaded = true
this.connect()
})
})

/**
Expand Down Expand Up @@ -271,14 +315,22 @@ export class ElectricProvider extends Observable {
}

get operationsUrl() {
const params = { where: `room = '${this.roomname}'` }
const params = {
where: `room = '${this.roomname}'`,
...this.resume.operations,
}
const encodedParams = url.encodeQueryParams(params)
console.log(params)
return this.serverUrl + `/v1/shape/ydoc_operations?` + encodedParams
}

get awarenessUrl() {
const params = { where: `room = '${this.roomname}'` }
const params = {
where: `room = '${this.roomname}'`,
...this.resume.awareness,
}
const encodedParams = url.encodeQueryParams(params)
console.log(params)
return this.serverUrl + `/v1/shape/ydoc_awareness?` + encodedParams
}

Expand Down

0 comments on commit 10fd08b

Please sign in to comment.