Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pg-sync #435

Merged
merged 16 commits into from
Nov 26, 2024
Merged
9 changes: 3 additions & 6 deletions docs/docs/sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ You can then use the `syncShapeToTable` method to sync a table from Electric:

```ts
const shape = await pg.electric.syncShapeToTable({
shape: { url: 'http://localhost:3000/v1/shape/todo' },
shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' },
table: 'todo',
primaryKey: ['id'],
})
Expand Down Expand Up @@ -97,12 +97,9 @@ The returned `shape` object from the `syncShapeToTable` call has the following m
- `shapeId: string`<br>
The server side `shapeId`

- `subscribeOnceToUpToDate(cb: () => void, error: (err: FetchError | Error) => void)`<br>
- `subscribe(cb: () => void, error: (err: FetchError | Error) => void)`<br>
A callback to indicate that the shape caught up to the main Postgres.

- `unsubscribeAllUpToDateSubscribers()`<br>
Unsubscribe all `subscribeOnceToUpToDate` listeners.

- `subscribeMustRefresh(cb: () => void)`<br>
A callback that is called when the stream emits a `must-refresh` message.

Expand All @@ -115,7 +112,7 @@ The returned `shape` object from the `syncShapeToTable` call has the following m
### `ShapeStreamOptions`

- `url: string`<br>
The full URL to where the Shape is hosted. This can either be the Electric server directly, or a proxy. E.g. for a local Electric instance, you might set `http://localhost:3000/v1/shape/foo`
The full URL to where the Shape is hosted. This can either be the Electric server directly, or a proxy. E.g. for a local Electric instance, you might set `http://localhost:3000/v1/shape?table=table_name`
yacineb marked this conversation as resolved.
Show resolved Hide resolved

yacineb marked this conversation as resolved.
Show resolved Hide resolved
- `where?: string`<br>
Where clauses for the shape.
Expand Down
6 changes: 4 additions & 2 deletions packages/pglite-react/src/hooks.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { useEffect, useState, useRef } from 'react'
import type { LiveQuery, LiveQueryResults } from '@electric-sql/pglite/live'
import { usePGlite } from './provider'
import { query as buildQuery } from '@electric-sql/pglite/template'
import { useEffect, useRef, useState } from 'react'
import { usePGlite } from './provider'

function paramsEqual(
a1: unknown[] | undefined | null,
Expand Down Expand Up @@ -42,6 +42,7 @@ function useLiveQueryImpl<T = { [key: string]: unknown }>(
currentParams = params
}

/* eslint-disable @eslint-react/hooks-extra/no-direct-set-state-in-use-effect */
useEffect(() => {
let cancelled = false
const cb = (results: LiveQueryResults<T>) => {
Expand Down Expand Up @@ -80,6 +81,7 @@ function useLiveQueryImpl<T = { [key: string]: unknown }>(
throw new Error('Should never happen')
}
}, [db, key, query, currentParams, liveQuery])
/* eslint-enable @eslint-react/hooks-extra/no-direct-set-state-in-use-effect */

if (liveQueryChanged && liveQuery) {
return liveQuery.initialResults
Expand Down
2 changes: 1 addition & 1 deletion packages/pglite-sync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ You can then use the syncShapeToTable method to sync a table from Electric:

```ts
const shape = await pg.electric.syncShapeToTable({
url: 'http://localhost:3000/v1/shape/todo',
shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' },
table: 'todo',
primaryKey: ['id'],
})
Expand Down
10 changes: 4 additions & 6 deletions packages/pglite-sync/example/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: "electric_quickstart"

services:
postgres:
image: postgres:16-alpine
image: postgres:16
environment:
POSTGRES_DB: electric
POSTGRES_USER: postgres
Expand All @@ -22,12 +22,10 @@ services:
- wal_level=logical

electric:
image: electricsql/electric:0.2.8
image: electricsql/electric
environment:
DATABASE_URL: postgresql://postgres:password@postgres:5432/electric
DATABASE_URL: postgresql://postgres:password@postgres:5432/electric?sslmode=disable
ports:
- "3000:3000"
build:
context: ../packages/sync-service/
depends_on:
- postgres
- postgres
2 changes: 1 addition & 1 deletion packages/pglite-sync/example/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ <h1>PGlite Electric Sync Example</h1>
window.pg = pg;

await pg.electric.syncShapeToTable({
url: "http://localhost:3000/v1/shape/test",
shape: { url: "http://localhost:3000/v1/shape?table=todo" },
yacineb marked this conversation as resolved.
Show resolved Hide resolved
table: "test",
primaryKey: ["id"],
});
Expand Down
4 changes: 2 additions & 2 deletions packages/pglite-sync/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@electric-sql/pglite-sync",
"version": "0.2.14",
"version": "0.2.15",
"description": "ElectricSQL Sync for PGlite",
"type": "module",
"private": false,
Expand Down Expand Up @@ -45,7 +45,7 @@
"dist"
],
"dependencies": {
"@electric-sql/client": "^0.6.2"
"@electric-sql/client": "~0.8.0"
},
"devDependencies": {
"@electric-sql/pglite": "workspace:*",
Expand Down
46 changes: 22 additions & 24 deletions packages/pglite-sync/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import type {
Extension,
PGliteInterface,
Transaction,
} from '@electric-sql/pglite'
import type { Offset, ShapeStreamOptions } from '@electric-sql/client'
import {
ShapeStream,
ChangeMessage,
ShapeStream,
isChangeMessage,
isControlMessage,
} from '@electric-sql/client'
import type { Offset, ShapeStreamOptions } from '@electric-sql/client'
import type {
Extension,
PGliteInterface,
Transaction,
} from '@electric-sql/pglite'

export type MapColumnsMap = Record<string, string>
export type MapColumnsFn = (message: ChangeMessage<any>) => Record<string, any>
Expand Down Expand Up @@ -205,13 +205,13 @@ async function createPlugin(
if (
options.shapeKey &&
messageAggregator.length > 0 &&
stream.shapeId !== undefined
stream.shapeHandle !== undefined
) {
await updateShapeSubscriptionState({
pg: tx,
metadataSchema,
shapeKey: options.shapeKey,
shapeId: stream.shapeId,
shapeId: stream.shapeHandle,
lastOffset:
messageAggregator[messageAggregator.length - 1].offset,
})
Expand All @@ -238,16 +238,14 @@ async function createPlugin(
return stream.isUpToDate
},
get shapeId() {
return stream.shapeId
return stream.shapeHandle
},
subscribeOnceToUpToDate: (
cb: () => void,
error: (err: Error) => void,
) => {
return stream.subscribeOnceToUpToDate(cb, error)
},
unsubscribeAllUpToDateSubscribers: () => {
stream.unsubscribeAllUpToDateSubscribers()
subscribe: (cb: () => void, error: (err: Error) => void) => {
return stream.subscribe(() => {
if (stream.isUpToDate) {
cb()
}
}, error)
},
}
},
Expand Down Expand Up @@ -446,12 +444,12 @@ async function applyMessagesToTableWithCopy({
}

interface GetShapeSubscriptionStateOptions {
pg: PGliteInterface | Transaction
metadataSchema: string
shapeKey: ShapeKey
readonly pg: PGliteInterface | Transaction
readonly metadataSchema: string
readonly shapeKey: ShapeKey
}

type ShapeSubscriptionState = Pick<ShapeStreamOptions, 'shapeId' | 'offset'>
type ShapeSubscriptionState = Pick<ShapeStreamOptions, 'handle' | 'offset'>

async function getShapeSubscriptionState({
pg,
Expand All @@ -469,9 +467,9 @@ async function getShapeSubscriptionState({

if (result.rows.length === 0) return null

const { shape_id: shapeId, last_offset: offset } = result.rows[0]
const { shape_id: handle, last_offset: offset } = result.rows[0]
return {
shapeId,
handle,
offset: offset as Offset,
}
}
Expand Down
Loading