Skip to content

Commit

Permalink
Added connection pool
Browse files Browse the repository at this point in the history
More cleanup
  • Loading branch information
balegas committed Aug 15, 2024
1 parent 6747ef2 commit 6164404
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 186 deletions.
20 changes: 12 additions & 8 deletions examples/yjs-provider/app/api/awareness/route.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import { db } from "../../db"
import { pool } from "../../db"
import { NextResponse } from "next/server"

export async function POST(request: Request) {
const body = await request.json()
const db = await pool.connect()
try {
const body = await request.json()

await db.query(
`INSERT INTO ydoc_awareness (client, room, op)
await db.query(
`INSERT INTO ydoc_awareness (client, room, op)
VALUES ($1, $2, $3)
ON CONFLICT (client, room)
DO UPDATE SET op = $3`,
[body.client, body.room, body.op]
)

return NextResponse.json({})
[body.client, body.room, body.op]
)
return NextResponse.json({})
} finally {
db.release()
}
}
58 changes: 0 additions & 58 deletions examples/yjs-provider/app/api/compaction/route.ts

This file was deleted.

82 changes: 46 additions & 36 deletions examples/yjs-provider/app/api/operation/route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { db } from "../../db"
import { pool } from "../../db"
import { NextResponse } from "next/server"

// TODO: still loading yjs twice
Expand All @@ -9,57 +9,57 @@ import * as encoding from "lib0/encoding"
import * as decoding from "lib0/decoding"

import { toBase64, fromBase64 } from "lib0/buffer"
import { PoolClient } from "pg"

const maxRowCount = 50

export async function POST(request: Request) {
const body = await request.json()
const db = await pool.connect()

const errorResponse = validateRequest(body)
if (errorResponse) {
return errorResponse
}
try {
const body = await request.json()

await db.query(
`INSERT INTO ydoc_operations (room, op)
VALUES ($1, $2)`,
[body.room, body.op]
)
await maybeCompact(body.room)
const errorResponse = validateRequest(body)
if (errorResponse) {
return errorResponse
}

return NextResponse.json({})
}

function validateRequest({ room, op }: { room: string; op: string }) {
if (!room) {
return NextResponse.json({ error: `'room' is required` }, { status: 400 })
}
await db.query(`BEGIN`)
await db.query(
`INSERT INTO ydoc_operations (room, op)
VALUES ($1, $2)`,
[body.room, body.op]
)
await maybeCompact(db, body.room)
await db.query(`COMMIT`)

if (!op) {
return NextResponse.json({ error: `'op' is required` }, { status: 400 })
return NextResponse.json({})
} catch (e) {
await db.query(`ROLLBACK`)
throw e
} finally {
db.release()
}
}

// naive implementation of compaction
async function maybeCompact(room: string) {
const ydoc = new Y.Doc()

const res0 = await db.query(
`SELECT COUNT(*) as count FROM ydoc_operations`,
[]
)
if (res0.rows[0].count < maxRowCount) {
return
}

console.log(`compaction`)
const res1 = await db.query(
async function maybeCompact(db: PoolClient, room: string) {
const res = await db.query(
`SELECT id, op FROM ydoc_operations
WHERE room = $1
ORDER BY id DESC`,
[room]
)
res1.rows.map(({ op }) => {

if (res.rows.length < maxRowCount) {
return
}

console.log(`compaction`)

const ydoc = new Y.Doc()

res.rows.map(({ op }) => {
const buf = fromBase64(op)
const decoder = decoding.createDecoder(buf)
syncProtocol.readSyncMessage(
Expand All @@ -77,7 +77,17 @@ async function maybeCompact(room: string) {
await db.query(`TRUNCATE ydoc_operations`)
await db.query(
`INSERT INTO ydoc_operations (room, op)
VALUES ($1, $2)`,
VALUES ($1, $2)`,
[room, encoded]
)
}

function validateRequest({ room, op }: { room: string; op: string }) {
if (!room) {
return NextResponse.json({ error: `'room' is required` }, { status: 400 })
}

if (!op) {
return NextResponse.json({ error: `'op' is required` }, { status: 400 })
}
}
11 changes: 5 additions & 6 deletions examples/yjs-provider/app/db.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import pgPkg from "pg"
const { Client } = pgPkg
import { Pool } from "pg"

const db = new Client({
console.log("init pool")
const pool = new Pool({
host: `localhost`,
port: 54321,
password: `password`,
user: `postgres`,
database: `electric`,
max: 1,
})

db.connect()

export { db }
export { pool }
67 changes: 14 additions & 53 deletions examples/yjs-provider/app/y-broadcast.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
* Extracted this from y-websocket
*/

/* eslint-env browser */

import * as bc from "lib0/broadcastchannel"
import * as encoding from "lib0/encoding"
import * as decoding from "lib0/decoding"
import * as syncProtocol from "y-protocols/sync"
import * as awarenessProtocol from "y-protocols/awareness"
import { Observable } from "lib0/observable"
import * as env from "lib0/environment"

export const messageSync = 0
export const messageQueryAwareness = 3
export const messageAwareness = 1
Expand All @@ -19,32 +19,21 @@ const messageHandlers = []
messageHandlers[messageSync] = (
encoder,
decoder,
provider,
emitSynced,
_messageType
provider
) => {
encoding.writeVarUint(encoder, messageSync)
const syncMessageType = syncProtocol.readSyncMessage(
syncProtocol.readSyncMessage(
decoder,
encoder,
provider.doc,
provider
)
if (
emitSynced &&
syncMessageType === syncProtocol.messageYjsSyncStep2 &&
!provider.synced
) {
provider.synced = true
}
}

messageHandlers[messageQueryAwareness] = (
encoder,
_decoder,
provider,
_emitSynced,
_messageType
provider
) => {
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(
Expand All @@ -59,9 +48,7 @@ messageHandlers[messageQueryAwareness] = (
messageHandlers[messageAwareness] = (
_encoder,
decoder,
provider,
_emitSynced,
_messageType
provider
) => {
awarenessProtocol.applyAwarenessUpdate(
provider.awareness,
Expand All @@ -70,33 +57,19 @@ messageHandlers[messageAwareness] = (
)
}

/**
* @param {BroadcastProvider} provider
* @param {Uint8Array} buf
* @param {boolean} emitSynced
* @return {encoding.Encoder}
*/
const readMessage = (provider, buf, emitSynced) => {
const readMessage = (provider, buf) => {
const decoder = decoding.createDecoder(buf)
const encoder = encoding.createEncoder()
const messageType = decoding.readVarUint(decoder)
const messageHandler = provider.messageHandlers[messageType]
if (/** @type {any} */ (messageHandler)) {
messageHandler(encoder, decoder, provider, emitSynced, messageType)
messageHandler(encoder, decoder, provider, false, messageType)
} else {
console.error(`Unable to compute message`)
}
return encoder
}

/**
* @param {BroadcastProvider} provider
*/

/**
* @param {BroadcastProvider} provider
* @param {ArrayBuffer} buf
*/
const broadcastMessage = (provider, buf) => {
if (provider.bcconnected) {
bc.publish(provider.bcChannel, buf, provider)
Expand All @@ -123,9 +96,6 @@ export class BroadcastProvider extends Observable {
this.disableBc = disableBc
this.messageHandlers = messageHandlers.slice()

this._synced = false
this.wsLastMessageReceived = 0

this._bcSubscriber = (data, origin) => {
if (origin !== this) {
const encoder = readMessage(this, new Uint8Array(data), false)
Expand Down Expand Up @@ -162,6 +132,9 @@ export class BroadcastProvider extends Observable {
`app closed`
)
}
if (env.isNode && typeof process !== `undefined`) {
process.on(`exit`, this._exitHandler)
}

awareness.on(`update`, this._awarenessUpdateHandler)

Expand All @@ -170,23 +143,11 @@ export class BroadcastProvider extends Observable {
}
}

/**
* @type {boolean}
*/
get synced() {
return this._synced
}

set synced(state) {
if (this._synced !== state) {
this._synced = state
this.emit(`synced`, [state])
this.emit(`sync`, [state])
}
}

destroy() {
this.disconnect()
if (env.isNode && typeof process !== `undefined`) {
process.off(`exit`, this._exitHandler)
}
this.awareness.off(`update`, this._awarenessUpdateHandler)
this.doc.off(`update`, this._updateHandler)
super.destroy()
Expand Down
Loading

0 comments on commit 6164404

Please sign in to comment.