Skip to content

Commit

Permalink
feat! (ts client): remove databaseId option from ShapeStream (#2053)
Browse files Browse the repository at this point in the history
This PR fixes #2052.
It removes the `databaseId` option from `ShapeStream` in favor of the
more generic `params` option.
  • Loading branch information
kevin-dp authored Nov 27, 2024
1 parent e815b91 commit 9c50e8f
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 326 deletions.
5 changes: 5 additions & 0 deletions .changeset/old-ligers-run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": minor
---

[BREAKING]: Remove databaseId option from ShapeStream in favor of params option.
3 changes: 1 addition & 2 deletions packages/react-hooks/test/support/global-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { makePgClient } from './test-helpers'

const url = process.env.ELECTRIC_URL ?? `http://localhost:3000`
const proxyUrl = process.env.ELECTRIC_PROXY_CACHE_URL ?? `http://localhost:3002`
const databaseId = process.env.ELECTRIC_DATABASE_ID ?? `test_tenant`

// name of proxy cache container to execute commands against,
// see docker-compose.yml that spins it up for details
Expand All @@ -30,7 +29,7 @@ function waitForElectric(url: string): Promise<void> {
)

const tryHealth = async () =>
fetch(`${url}/v1/health?database_id=${databaseId}`)
fetch(`${url}/v1/health`)
.then(async (res): Promise<void> => {
if (!res.ok) return tryHealth()
const { status } = (await res.json()) as { status: string }
Expand Down
17 changes: 1 addition & 16 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ import {
SHAPE_HANDLE_QUERY_PARAM,
SHAPE_SCHEMA_HEADER,
WHERE_QUERY_PARAM,
DATABASE_ID_QUERY_PARAM,
TABLE_QUERY_PARAM,
REPLICA_PARAM,
} from './constants'

const RESERVED_PARAMS = new Set([
DATABASE_ID_QUERY_PARAM,
COLUMNS_QUERY_PARAM,
LIVE_CACHE_BUSTER_QUERY_PARAM,
SHAPE_HANDLE_QUERY_PARAM,
Expand All @@ -54,7 +52,6 @@ const RESERVED_PARAMS = new Set([
type Replica = `full` | `default`

type ReservedParamKeys =
| typeof DATABASE_ID_QUERY_PARAM
| typeof COLUMNS_QUERY_PARAM
| typeof LIVE_CACHE_BUSTER_QUERY_PARAM
| typeof SHAPE_HANDLE_QUERY_PARAM
Expand Down Expand Up @@ -84,12 +81,6 @@ export interface ShapeStreamOptions<T = never> {
*/
url: string

/**
* Which database to use.
* This is optional unless Electric is used with multiple databases.
*/
databaseId?: string

/**
* The root table for the shape. Passed as a query parameter. Not required if you set the table in your proxy.
*/
Expand Down Expand Up @@ -144,7 +135,7 @@ export interface ShapeStreamOptions<T = never> {
* Additional request parameters to attach to the URL.
* These will be merged with Electric's standard parameters.
* Note: You cannot use Electric's reserved parameter names
* (table, where, columns, offset, handle, live, cursor, database_id, replica).
* (table, where, columns, offset, handle, live, cursor, replica).
*/
params?: ParamsRecord

Expand Down Expand Up @@ -246,7 +237,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
#isUpToDate: boolean = false
#connected: boolean = false
#shapeHandle?: string
#databaseId?: string
#schema?: Schema
#onError?: ShapeStreamErrorHandler
#replica?: Replica
Expand All @@ -257,7 +247,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#lastOffset = this.options.offset ?? `-1`
this.#liveCacheBuster = ``
this.#shapeHandle = this.options.handle
this.#databaseId = this.options.databaseId
this.#messageParser = new MessageParser<T>(options.parser)
this.#replica = this.options.replica
this.#onError = this.options.onError
Expand Down Expand Up @@ -347,10 +336,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
)
}

if (this.#databaseId) {
fetchUrl.searchParams.set(DATABASE_ID_QUERY_PARAM, this.#databaseId!)
}

if (
(this.#replica ?? ShapeStream.Replica.DEFAULT) !=
ShapeStream.Replica.DEFAULT
Expand Down
1 change: 0 additions & 1 deletion packages/typescript-client/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ export const SHAPE_HANDLE_HEADER = `electric-handle`
export const CHUNK_LAST_OFFSET_HEADER = `electric-offset`
export const SHAPE_SCHEMA_HEADER = `electric-schema`
export const CHUNK_UP_TO_DATE_HEADER = `electric-up-to-date`
export const DATABASE_ID_QUERY_PARAM = `database_id`
export const COLUMNS_QUERY_PARAM = `columns`
export const LIVE_CACHE_BUSTER_QUERY_PARAM = `cursor`
export const SHAPE_HANDLE_QUERY_PARAM = `handle`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html

exports[`Shape > should throw on a reserved parameter 1`] = `[ReservedParamError: Cannot use reserved Electric parameter names in custom params: database_id]`;
exports[`Shape > should throw on a reserved parameter 1`] = `[ReservedParamError: Cannot use reserved Electric parameter names in custom params: live]`;
2 changes: 1 addition & 1 deletion packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ describe(`Shape`, () => {
url: `${BASE_URL}/v1/shape`,
table: `foo`,
params: {
database_id: `foo`,
live: `false`,
},
})
new Shape(shapeStream)
Expand Down
185 changes: 1 addition & 184 deletions packages/typescript-client/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ import {
IssueRow,
testWithIssuesTable as it,
testWithMultitypeTable as mit,
//testWithMultiTenantIssuesTable as multiTenantIt,
} from './support/test-context'
import * as h from './support/test-helpers'

const BASE_URL = inject(`baseUrl`)
//const OTHER_DATABASE_URL = inject(`otherDatabaseUrl`)
//const databaseId = inject(`databaseId`)
//const otherDatabaseId = inject(`otherDatabaseId`)

it(`sanity check`, async ({ dbClient, issuesTableSql }) => {
const result = await dbClient.query(`SELECT * FROM ${issuesTableSql}`)

Expand Down Expand Up @@ -976,183 +973,3 @@ describe(`HTTP Sync`, () => {
})
})
})

/*
describe.sequential(`Multi tenancy sync`, () => {
it(`should allow new databases to be added`, async () => {
const url = new URL(`${BASE_URL}/v1/admin/database`)
// Add the database
const res = await fetch(url.toString(), {
method: `POST`,
headers: {
Accept: `application/json`,
'Content-Type': `application/json`,
},
body: JSON.stringify({
database_id: otherDatabaseId,
database_url: OTHER_DATABASE_URL,
}),
})
expect(res.status).toBe(200)
const body = await res.json()
expect(body).toBe(otherDatabaseId)
})
it(`should serve original database`, async ({
issuesTableUrl,
aborter,
insertIssues,
}) => {
const id = await insertIssues({ title: `test issue` })
const shapeData = new Map()
const issueStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
databaseId,
subscribe: false,
signal: aborter.signal,
})
await new Promise<void>((resolve, reject) => {
issueStream.subscribe((messages) => {
messages.forEach((message) => {
if (isChangeMessage(message)) {
shapeData.set(message.key, message.value)
}
if (isUpToDateMessage(message)) {
aborter.abort()
return resolve()
}
})
}, reject)
})
const values = [...shapeData.values()]
expect(values).toHaveLength(1)
expect(values[0]).toMatchObject({
id: id[0],
title: `test issue`,
})
})
multiTenantIt(
`should serve new database`,
async ({ issuesTableUrl, aborter, insertIssuesToOtherDb }) => {
const id = await insertIssuesToOtherDb({ title: `test issue in new db` })
const shapeData = new Map()
const issueStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
databaseId: otherDatabaseId,
subscribe: false,
signal: aborter.signal,
})
await new Promise<void>((resolve, reject) => {
issueStream.subscribe((messages) => {
messages.forEach((message) => {
if (isChangeMessage(message)) {
shapeData.set(message.key, message.value)
}
if (isUpToDateMessage(message)) {
aborter.abort()
return resolve()
}
})
}, reject)
})
const values = [...shapeData.values()]
expect(values).toHaveLength(1)
expect(values[0]).toMatchObject({
id: id[0],
title: `test issue in new db`,
})
}
)
multiTenantIt(
`should serve both databases in live mode`,
async ({
issuesTableUrl,
aborter,
otherAborter,
insertIssues,
insertIssuesToOtherDb,
}) => {
// Set up streams for both databases
const defaultStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
databaseId,
subscribe: true,
signal: aborter.signal,
})
const otherStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
databaseId: otherDatabaseId,
subscribe: true,
signal: otherAborter.signal,
})
const defaultData = new Map()
const otherData = new Map()
// Set up subscriptions
defaultStream.subscribe((messages) => {
messages.forEach((message) => {
if (isChangeMessage(message)) {
defaultData.set(message.key, message.value)
}
})
})
otherStream.subscribe((messages) => {
messages.forEach((message) => {
if (isChangeMessage(message)) {
otherData.set(message.key, message.value)
}
})
})
// Insert data into both databases
const defaultId = await insertIssues({ title: `default db issue` })
const otherId = await insertIssuesToOtherDb({ title: `other db issue` })
// Give time for updates to propagate
await sleep(1000)
// Verify data from default database
expect([...defaultData.values()]).toHaveLength(1)
expect([...defaultData.values()][0]).toMatchObject({
id: defaultId[0],
title: `default db issue`,
})
// Verify data from other database
expect([...otherData.values()]).toHaveLength(1)
expect([...otherData.values()][0]).toMatchObject({
id: otherId[0],
title: `other db issue`,
})
}
)
it(`should allow databases to be deleted`, async () => {
const url = new URL(`${BASE_URL}/v1/admin/database/${otherDatabaseId}`)
// Add the database
const res = await fetch(url.toString(), { method: `DELETE` })
expect(res.status).toBe(200)
const body = await res.json()
expect(body).toBe(otherDatabaseId)
})
})
*/
28 changes: 5 additions & 23 deletions packages/typescript-client/test/support/global-setup.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
import type { GlobalSetupContext } from 'vitest/node'
import { makePgClient } from './test-helpers'
import { Client } from 'pg'

const url = process.env.ELECTRIC_URL ?? `http://localhost:3000`
const proxyUrl = process.env.ELECTRIC_PROXY_CACHE_URL ?? `http://localhost:3002`
const databaseId = process.env.ELECTRIC_DATABASE_ID ?? `test_tenant`
const otherDatabaseId = `other_test_tenant`
const otherDatabaseUrl =
process.env.OTHER_DATABASE_URL ??
`postgresql://postgres:password@localhost:54322/electric?sslmode=disable`

// name of proxy cache container to execute commands against,
// see docker-compose.yml that spins it up for details
Expand All @@ -24,9 +18,6 @@ declare module 'vitest' {
testPgSchema: string
proxyCacheContainerName: string
proxyCachePath: string
databaseId: string
otherDatabaseId: string
otherDatabaseUrl: string
}
}

Expand All @@ -38,7 +29,7 @@ function waitForElectric(url: string): Promise<void> {
)

const tryHealth = async () =>
fetch(`${url}/v1/health?database_id=${databaseId}`)
fetch(`${url}/v1/health`)
.then(async (res): Promise<void> => {
if (!res.ok) return tryHealth()
const { status } = (await res.json()) as { status: string }
Expand All @@ -63,27 +54,18 @@ export default async function ({ provide }: GlobalSetupContext) {
await waitForElectric(url)

const client = makePgClient()
const otherClient = new Client(otherDatabaseUrl)
const clients = [client, otherClient]

for (const c of clients) {
await c.connect()
await c.query(`CREATE SCHEMA IF NOT EXISTS electric_test`)
}
await client.connect()
await client.query(`CREATE SCHEMA IF NOT EXISTS electric_test`)

provide(`baseUrl`, url)
provide(`testPgSchema`, `electric_test`)
provide(`proxyCacheBaseUrl`, proxyUrl)
provide(`proxyCacheContainerName`, proxyCacheContainerName)
provide(`proxyCachePath`, proxyCachePath)
provide(`databaseId`, databaseId)
provide(`otherDatabaseId`, otherDatabaseId)
provide(`otherDatabaseUrl`, otherDatabaseUrl)

return async () => {
for (const c of clients) {
await c.query(`DROP SCHEMA electric_test CASCADE`)
await c.end()
}
await client.query(`DROP SCHEMA electric_test CASCADE`)
await client.end()
}
}
Loading

0 comments on commit 9c50e8f

Please sign in to comment.