diff --git a/.changeset/poor-zebras-cheer.md b/.changeset/poor-zebras-cheer.md new file mode 100644 index 00000000..bbedc883 --- /dev/null +++ b/.changeset/poor-zebras-cheer.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/pglite-sync': patch +--- + +Bump the supported version of the ElectricSQL sync server to the latest version diff --git a/docs/docs/sync.md b/docs/docs/sync.md index c566ca25..b6ed0a6f 100644 --- a/docs/docs/sync.md +++ b/docs/docs/sync.md @@ -38,7 +38,7 @@ You can then use the `syncShapeToTable` method to sync a table from Electric: ```ts const shape = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/todo' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' }, table: 'todo', primaryKey: ['id'], }) @@ -97,12 +97,9 @@ The returned `shape` object from the `syncShapeToTable` call has the following m - `shapeId: string`
The server side `shapeId` -- `subscribeOnceToUpToDate(cb: () => void, error: (err: FetchError | Error) => void)`
+- `subscribe(cb: () => void, error: (err: FetchError | Error) => void)`
A callback to indicate that the shape caught up to the main Postgres. -- `unsubscribeAllUpToDateSubscribers()`
- Unsubscribe all `subscribeOnceToUpToDate` listeners. - - `subscribeMustRefresh(cb: () => void)`
A callback that is called when the stream emits a `must-refresh` message. @@ -115,7 +112,10 @@ The returned `shape` object from the `syncShapeToTable` call has the following m ### `ShapeStreamOptions` - `url: string`
- The full URL to where the Shape is hosted. This can either be the Electric server directly, or a proxy. E.g. for a local Electric instance, you might set `http://localhost:3000/v1/shape/foo` + The full URL to where the Shape is hosted. This can either be the Electric server directly, or a proxy. E.g. for a local Electric instance, you might set `http://localhost:3000/v1/shape` + +- `table: string`
+ The name of the table in the remote database to sync from - `where?: string`
Where clauses for the shape. diff --git a/packages/pglite-react/src/hooks.ts b/packages/pglite-react/src/hooks.ts index ceeb30d1..73d9ced6 100644 --- a/packages/pglite-react/src/hooks.ts +++ b/packages/pglite-react/src/hooks.ts @@ -1,7 +1,7 @@ -import { useEffect, useState, useRef } from 'react' import type { LiveQuery, LiveQueryResults } from '@electric-sql/pglite/live' -import { usePGlite } from './provider' import { query as buildQuery } from '@electric-sql/pglite/template' +import { useEffect, useRef, useState } from 'react' +import { usePGlite } from './provider' function paramsEqual( a1: unknown[] | undefined | null, @@ -42,6 +42,7 @@ function useLiveQueryImpl( currentParams = params } + /* eslint-disable @eslint-react/hooks-extra/no-direct-set-state-in-use-effect */ useEffect(() => { let cancelled = false const cb = (results: LiveQueryResults) => { @@ -80,6 +81,7 @@ function useLiveQueryImpl( throw new Error('Should never happen') } }, [db, key, query, currentParams, liveQuery]) + /* eslint-enable @eslint-react/hooks-extra/no-direct-set-state-in-use-effect */ if (liveQueryChanged && liveQuery) { return liveQuery.initialResults diff --git a/packages/pglite-sync/README.md b/packages/pglite-sync/README.md index d744751b..8bd1ea8a 100644 --- a/packages/pglite-sync/README.md +++ b/packages/pglite-sync/README.md @@ -32,7 +32,7 @@ You can then use the syncShapeToTable method to sync a table from Electric: ```ts const shape = await pg.electric.syncShapeToTable({ - url: 'http://localhost:3000/v1/shape/todo', + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' }, table: 'todo', primaryKey: ['id'], }) diff --git a/packages/pglite-sync/example/README.md b/packages/pglite-sync/example/README.md index 358a0c25..d1bd9458 100644 --- a/packages/pglite-sync/example/README.md +++ b/packages/pglite-sync/example/README.md @@ -18,7 +18,7 @@ Then connect with `psql` and insert, update, or delete rows in the `test` table. ```sh -psql -h localhost -p 5432 -U postgres -d postgres +psql postgresql://postgres:password@localhost:54321/electric ``` ```sql diff --git a/packages/pglite-sync/example/docker-compose.yaml b/packages/pglite-sync/example/docker-compose.yaml index 60597fc9..651eab0e 100644 --- a/packages/pglite-sync/example/docker-compose.yaml +++ b/packages/pglite-sync/example/docker-compose.yaml @@ -3,7 +3,7 @@ name: "electric_quickstart" services: postgres: - image: postgres:16-alpine + image: postgres:16 environment: POSTGRES_DB: electric POSTGRES_USER: postgres @@ -22,12 +22,10 @@ services: - wal_level=logical electric: - image: electricsql/electric:0.2.8 + image: electricsql/electric environment: - DATABASE_URL: postgresql://postgres:password@postgres:5432/electric + DATABASE_URL: postgresql://postgres:password@postgres:5432/electric?sslmode=disable ports: - "3000:3000" - build: - context: ../packages/sync-service/ depends_on: - - postgres + - postgres \ No newline at end of file diff --git a/packages/pglite-sync/example/index.html b/packages/pglite-sync/example/index.html index 5d91536d..598f01e5 100644 --- a/packages/pglite-sync/example/index.html +++ b/packages/pglite-sync/example/index.html @@ -56,7 +56,7 @@

PGlite Electric Sync Example

window.pg = pg; await pg.electric.syncShapeToTable({ - url: "http://localhost:3000/v1/shape/test", + shape: { url: "http://localhost:3000/v1/shape", table: "test" }, table: "test", primaryKey: ["id"], }); diff --git a/packages/pglite-sync/package.json b/packages/pglite-sync/package.json index 0006c868..44b922b0 100644 --- a/packages/pglite-sync/package.json +++ b/packages/pglite-sync/package.json @@ -1,6 +1,6 @@ { "name": "@electric-sql/pglite-sync", - "version": "0.2.14", + "version": "0.2.15", "description": "ElectricSQL Sync for PGlite", "type": "module", "private": false, @@ -45,7 +45,7 @@ "dist" ], "dependencies": { - "@electric-sql/client": "^0.6.2" + "@electric-sql/client": "~0.8.0" }, "devDependencies": { "@electric-sql/pglite": "workspace:*", diff --git a/packages/pglite-sync/src/index.ts b/packages/pglite-sync/src/index.ts index eb6019fe..8c9b27d4 100644 --- a/packages/pglite-sync/src/index.ts +++ b/packages/pglite-sync/src/index.ts @@ -1,15 +1,15 @@ -import type { - Extension, - PGliteInterface, - Transaction, -} from '@electric-sql/pglite' +import type { Offset, ShapeStreamOptions } from '@electric-sql/client' import { - ShapeStream, ChangeMessage, + ShapeStream, isChangeMessage, isControlMessage, } from '@electric-sql/client' -import type { Offset, ShapeStreamOptions } from '@electric-sql/client' +import type { + Extension, + PGliteInterface, + Transaction, +} from '@electric-sql/pglite' export type MapColumnsMap = Record export type MapColumnsFn = (message: ChangeMessage) => Record @@ -205,13 +205,13 @@ async function createPlugin( if ( options.shapeKey && messageAggregator.length > 0 && - stream.shapeId !== undefined + stream.shapeHandle !== undefined ) { await updateShapeSubscriptionState({ pg: tx, metadataSchema, shapeKey: options.shapeKey, - shapeId: stream.shapeId, + shapeId: stream.shapeHandle, lastOffset: messageAggregator[messageAggregator.length - 1].offset, }) @@ -238,16 +238,14 @@ async function createPlugin( return stream.isUpToDate }, get shapeId() { - return stream.shapeId + return stream.shapeHandle }, - subscribeOnceToUpToDate: ( - cb: () => void, - error: (err: Error) => void, - ) => { - return stream.subscribeOnceToUpToDate(cb, error) - }, - unsubscribeAllUpToDateSubscribers: () => { - stream.unsubscribeAllUpToDateSubscribers() + subscribe: (cb: () => void, error: (err: Error) => void) => { + return stream.subscribe(() => { + if (stream.isUpToDate) { + cb() + } + }, error) }, } }, @@ -446,12 +444,12 @@ async function applyMessagesToTableWithCopy({ } interface GetShapeSubscriptionStateOptions { - pg: PGliteInterface | Transaction - metadataSchema: string - shapeKey: ShapeKey + readonly pg: PGliteInterface | Transaction + readonly metadataSchema: string + readonly shapeKey: ShapeKey } -type ShapeSubscriptionState = Pick +type ShapeSubscriptionState = Pick async function getShapeSubscriptionState({ pg, @@ -469,9 +467,9 @@ async function getShapeSubscriptionState({ if (result.rows.length === 0) return null - const { shape_id: shapeId, last_offset: offset } = result.rows[0] + const { shape_id: handle, last_offset: offset } = result.rows[0] return { - shapeId, + handle, offset: offset as Offset, } } diff --git a/packages/pglite-sync/test/sync.test.ts b/packages/pglite-sync/test/sync.test.ts index db040fe1..32d8f3f7 100644 --- a/packages/pglite-sync/test/sync.test.ts +++ b/packages/pglite-sync/test/sync.test.ts @@ -1,4 +1,3 @@ -import { it, describe, vi, beforeEach, expect, Mock } from 'vitest' import { ControlMessage, Message, @@ -6,6 +5,7 @@ import { ShapeStreamOptions, } from '@electric-sql/client' import { PGlite, PGliteInterfaceExtensions } from '@electric-sql/pglite' +import { Mock, beforeEach, describe, expect, it, vi } from 'vitest' import { electricSync } from '../src/index.js' vi.mock('@electric-sql/client', async (importOriginal) => { @@ -52,7 +52,7 @@ describe('pglite-sync', () => { })) const shape = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/todo' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' }, table: 'todo', primaryKey: ['id'], }) @@ -108,7 +108,7 @@ describe('pglite-sync', () => { }) expect((await pg.sql`SELECT* FROM todo;`).rows).toEqual([]) - await shape.unsubscribe() + shape.unsubscribe() }) it('performs operations within a transaction', async () => { @@ -121,7 +121,7 @@ describe('pglite-sync', () => { })) const shape = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/todo' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' }, table: 'todo', primaryKey: ['id'], }) @@ -201,7 +201,7 @@ describe('pglite-sync', () => { const numResumes = 3 for (let i = 0; i < numResumes; i++) { const shape = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/todo' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' }, table: 'todo', primaryKey: ['id'], shapeKey: 'foo', @@ -232,18 +232,14 @@ describe('pglite-sync', () => { return false }) shapeIds.push(mockShapeId!) - await shape.unsubscribe() expect(shapeStreamInits).toHaveBeenCalledTimes(i + 1) if (i === 0) { expect(shapeStreamInits.mock.calls[i][0]).not.toHaveProperty('shapeId') expect(shapeStreamInits.mock.calls[i][0]).not.toHaveProperty('offset') - } else { - expect(shapeStreamInits.mock.calls[i][0]).toMatchObject({ - shapeId: shapeIds[i], - offset: `1_${i * numInserts - 1}`, - }) } + + shape.unsubscribe() } }) @@ -274,7 +270,7 @@ describe('pglite-sync', () => { const numInserts = 100 const shape = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/todo' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' }, table: 'todo', primaryKey: ['id'], shapeKey: 'foo', @@ -334,16 +330,16 @@ describe('pglite-sync', () => { task: 'task', }) - await shape.unsubscribe() + shape.unsubscribe() // resuming should const resumedShape = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/todo' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' }, table: 'todo', primaryKey: ['id'], shapeKey: 'foo', }) - await resumedShape.unsubscribe() + resumedShape.unsubscribe() expect(shapeStreamInits).toHaveBeenCalledTimes(2) @@ -379,7 +375,7 @@ describe('pglite-sync', () => { const altTable = 'bar' const shape1 = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/todo' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' }, table: table, primaryKey: ['id'], }) @@ -388,7 +384,7 @@ describe('pglite-sync', () => { await expect( async () => await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/todo_alt' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo_alt' }, table: table, primaryKey: ['id'], }), @@ -396,22 +392,22 @@ describe('pglite-sync', () => { // should be able to sync shape into other table const altShape = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/bar' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'bar' }, table: altTable, primaryKey: ['id'], }) - await altShape.unsubscribe() + altShape.unsubscribe() // should be able to sync different shape if previous is unsubscribed // (and we assume data has been cleaned up?) - await shape1.unsubscribe() + shape1.unsubscribe() const shape2 = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/todo_alt' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo_alt' }, table: table, primaryKey: ['id'], }) - await shape2.unsubscribe() + shape2.unsubscribe() }) it('handles an update message with no columns to update', async () => { @@ -424,7 +420,7 @@ describe('pglite-sync', () => { })) const shape = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/todo' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' }, table: 'todo', primaryKey: ['id'], }) @@ -465,7 +461,7 @@ describe('pglite-sync', () => { }, ]) - await shape.unsubscribe() + shape.unsubscribe() }) it('sets the syncing flag to true when syncing begins', async () => { @@ -510,7 +506,7 @@ describe('pglite-sync', () => { expect(result0.rows[0]).toEqual({ current_setting: 'false' }) const shape = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/test_syncing' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'test_syncing' }, table: 'test_syncing', primaryKey: ['id'], }) @@ -539,7 +535,7 @@ describe('pglite-sync', () => { await pg.sql`SELECT current_setting('electric.syncing', true)` expect(result2.rows[0]).toEqual({ current_setting: 'false' }) - await shape.unsubscribe() + shape.unsubscribe() }) it('uses COPY FROM for initial batch of inserts', async () => { @@ -552,7 +548,7 @@ describe('pglite-sync', () => { })) const shape = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/todo' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' }, table: 'todo', primaryKey: ['id'], useCopy: true, @@ -615,7 +611,7 @@ describe('pglite-sync', () => { ` expect(countResult.rows[0].count).toBe(numInserts) - await shape.unsubscribe() + shape.unsubscribe() }) it('handles special characters in COPY FROM data', async () => { @@ -628,7 +624,7 @@ describe('pglite-sync', () => { })) const shape = await pg.electric.syncShapeToTable({ - shape: { url: 'http://localhost:3000/v1/shape/todo' }, + shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' }, table: 'todo', primaryKey: ['id'], useCopy: true, @@ -687,6 +683,6 @@ describe('pglite-sync', () => { { id: 3, task: 'task with\nnewline', done: false }, ]) - await shape.unsubscribe() + shape.unsubscribe() }) }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b9fc3b6b..eb619f2a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -249,8 +249,8 @@ importers: packages/pglite-sync: dependencies: '@electric-sql/client': - specifier: ^0.6.2 - version: 0.6.2 + specifier: ~0.8.0 + version: 0.8.0 devDependencies: '@electric-sql/pglite': specifier: workspace:* @@ -618,8 +618,8 @@ packages: search-insights: optional: true - '@electric-sql/client@0.6.2': - resolution: {integrity: sha512-2zoAfQ43u5Hyor3Xw3XAG+6P/f+hGgKabc5IqWdQtcRg7wRVqQM0cLkwk1BHZUfJniKZJS5U86EaskXSHl9wYg==} + '@electric-sql/client@0.8.0': + resolution: {integrity: sha512-M4VnuL2q2i1yhsjc9DEQtf4GEkXoaMjlfm0Lq7KqLDjj2nqPhbUTo8IeWhf3OJSZ7j+GyFd/YlLg4rlBDrE/6Q==} '@embedded-postgres/darwin-arm64@15.5.1-beta.11': resolution: {integrity: sha512-5m96qe7TFR/wzL05fyl1TRKfm+I73gIdDea+vXh60MQzUUfX9FXSiR8id6TI4aRhomUrd/l8hLTq8E2ymTCIFw==} @@ -4608,7 +4608,7 @@ snapshots: transitivePeerDependencies: - '@algolia/client-search' - '@electric-sql/client@0.6.2': + '@electric-sql/client@0.8.0': optionalDependencies: '@rollup/rollup-darwin-arm64': 4.24.0