Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat! (ts client): remove databaseId option from ShapeStream #2053

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/old-ligers-run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": minor
---

[BREAKING]: Remove databaseId option from ShapeStream in favor of params option.
3 changes: 1 addition & 2 deletions packages/react-hooks/test/support/global-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { makePgClient } from './test-helpers'

const url = process.env.ELECTRIC_URL ?? `http://localhost:3000`
const proxyUrl = process.env.ELECTRIC_PROXY_CACHE_URL ?? `http://localhost:3002`
const databaseId = process.env.ELECTRIC_DATABASE_ID ?? `test_tenant`

// name of proxy cache container to execute commands against,
// see docker-compose.yml that spins it up for details
Expand All @@ -30,7 +29,7 @@ function waitForElectric(url: string): Promise<void> {
)

const tryHealth = async () =>
fetch(`${url}/v1/health?database_id=${databaseId}`)
fetch(`${url}/v1/health`)
.then(async (res): Promise<void> => {
if (!res.ok) return tryHealth()
const { status } = (await res.json()) as { status: string }
Expand Down
17 changes: 1 addition & 16 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ import {
SHAPE_HANDLE_QUERY_PARAM,
SHAPE_SCHEMA_HEADER,
WHERE_QUERY_PARAM,
DATABASE_ID_QUERY_PARAM,
TABLE_QUERY_PARAM,
REPLICA_PARAM,
} from './constants'

const RESERVED_PARAMS = new Set([
DATABASE_ID_QUERY_PARAM,
COLUMNS_QUERY_PARAM,
LIVE_CACHE_BUSTER_QUERY_PARAM,
SHAPE_HANDLE_QUERY_PARAM,
Expand All @@ -54,7 +52,6 @@ const RESERVED_PARAMS = new Set([
type Replica = `full` | `default`

type ReservedParamKeys =
| typeof DATABASE_ID_QUERY_PARAM
| typeof COLUMNS_QUERY_PARAM
| typeof LIVE_CACHE_BUSTER_QUERY_PARAM
| typeof SHAPE_HANDLE_QUERY_PARAM
Expand Down Expand Up @@ -84,12 +81,6 @@ export interface ShapeStreamOptions<T = never> {
*/
url: string

/**
* Which database to use.
* This is optional unless Electric is used with multiple databases.
*/
databaseId?: string

/**
* The root table for the shape. Passed as a query parameter. Not required if you set the table in your proxy.
*/
Expand Down Expand Up @@ -144,7 +135,7 @@ export interface ShapeStreamOptions<T = never> {
* Additional request parameters to attach to the URL.
* These will be merged with Electric's standard parameters.
* Note: You cannot use Electric's reserved parameter names
* (table, where, columns, offset, handle, live, cursor, database_id, replica).
* (table, where, columns, offset, handle, live, cursor, replica).
*/
params?: ParamsRecord

Expand Down Expand Up @@ -246,7 +237,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
#isUpToDate: boolean = false
#connected: boolean = false
#shapeHandle?: string
#databaseId?: string
#schema?: Schema
#onError?: ShapeStreamErrorHandler
#replica?: Replica
Expand All @@ -257,7 +247,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#lastOffset = this.options.offset ?? `-1`
this.#liveCacheBuster = ``
this.#shapeHandle = this.options.handle
this.#databaseId = this.options.databaseId
this.#messageParser = new MessageParser<T>(options.parser)
this.#replica = this.options.replica
this.#onError = this.options.onError
Expand Down Expand Up @@ -347,10 +336,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
)
}

if (this.#databaseId) {
fetchUrl.searchParams.set(DATABASE_ID_QUERY_PARAM, this.#databaseId!)
}

if (
(this.#replica ?? ShapeStream.Replica.DEFAULT) !=
ShapeStream.Replica.DEFAULT
Expand Down
1 change: 0 additions & 1 deletion packages/typescript-client/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ export const SHAPE_HANDLE_HEADER = `electric-handle`
export const CHUNK_LAST_OFFSET_HEADER = `electric-offset`
export const SHAPE_SCHEMA_HEADER = `electric-schema`
export const CHUNK_UP_TO_DATE_HEADER = `electric-up-to-date`
export const DATABASE_ID_QUERY_PARAM = `database_id`
export const COLUMNS_QUERY_PARAM = `columns`
export const LIVE_CACHE_BUSTER_QUERY_PARAM = `cursor`
export const SHAPE_HANDLE_QUERY_PARAM = `handle`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html

exports[`Shape > should throw on a reserved parameter 1`] = `[ReservedParamError: Cannot use reserved Electric parameter names in custom params: database_id]`;
exports[`Shape > should throw on a reserved parameter 1`] = `[ReservedParamError: Cannot use reserved Electric parameter names in custom params: live]`;
2 changes: 1 addition & 1 deletion packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ describe(`Shape`, () => {
url: `${BASE_URL}/v1/shape`,
table: `foo`,
params: {
database_id: `foo`,
live: `false`,
},
})
new Shape(shapeStream)
Expand Down
185 changes: 1 addition & 184 deletions packages/typescript-client/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ import {
IssueRow,
testWithIssuesTable as it,
testWithMultitypeTable as mit,
//testWithMultiTenantIssuesTable as multiTenantIt,
} from './support/test-context'
import * as h from './support/test-helpers'

const BASE_URL = inject(`baseUrl`)
//const OTHER_DATABASE_URL = inject(`otherDatabaseUrl`)
//const databaseId = inject(`databaseId`)
//const otherDatabaseId = inject(`otherDatabaseId`)

it(`sanity check`, async ({ dbClient, issuesTableSql }) => {
const result = await dbClient.query(`SELECT * FROM ${issuesTableSql}`)

Expand Down Expand Up @@ -976,183 +973,3 @@ describe(`HTTP Sync`, () => {
})
})
})

/*
describe.sequential(`Multi tenancy sync`, () => {
it(`should allow new databases to be added`, async () => {
const url = new URL(`${BASE_URL}/v1/admin/database`)

// Add the database
const res = await fetch(url.toString(), {
method: `POST`,
headers: {
Accept: `application/json`,
'Content-Type': `application/json`,
},
body: JSON.stringify({
database_id: otherDatabaseId,
database_url: OTHER_DATABASE_URL,
}),
})

expect(res.status).toBe(200)
const body = await res.json()
expect(body).toBe(otherDatabaseId)
})

it(`should serve original database`, async ({
issuesTableUrl,
aborter,
insertIssues,
}) => {
const id = await insertIssues({ title: `test issue` })

const shapeData = new Map()
const issueStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
databaseId,
subscribe: false,
signal: aborter.signal,
})

await new Promise<void>((resolve, reject) => {
issueStream.subscribe((messages) => {
messages.forEach((message) => {
if (isChangeMessage(message)) {
shapeData.set(message.key, message.value)
}
if (isUpToDateMessage(message)) {
aborter.abort()
return resolve()
}
})
}, reject)
})

const values = [...shapeData.values()]
expect(values).toHaveLength(1)
expect(values[0]).toMatchObject({
id: id[0],
title: `test issue`,
})
})

multiTenantIt(
`should serve new database`,
async ({ issuesTableUrl, aborter, insertIssuesToOtherDb }) => {
const id = await insertIssuesToOtherDb({ title: `test issue in new db` })

const shapeData = new Map()
const issueStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
databaseId: otherDatabaseId,
subscribe: false,
signal: aborter.signal,
})

await new Promise<void>((resolve, reject) => {
issueStream.subscribe((messages) => {
messages.forEach((message) => {
if (isChangeMessage(message)) {
shapeData.set(message.key, message.value)
}
if (isUpToDateMessage(message)) {
aborter.abort()
return resolve()
}
})
}, reject)
})

const values = [...shapeData.values()]
expect(values).toHaveLength(1)
expect(values[0]).toMatchObject({
id: id[0],
title: `test issue in new db`,
})
}
)

multiTenantIt(
`should serve both databases in live mode`,
async ({
issuesTableUrl,
aborter,
otherAborter,
insertIssues,
insertIssuesToOtherDb,
}) => {
// Set up streams for both databases
const defaultStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
databaseId,
subscribe: true,
signal: aborter.signal,
})

const otherStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
databaseId: otherDatabaseId,
subscribe: true,
signal: otherAborter.signal,
})

const defaultData = new Map()
const otherData = new Map()

// Set up subscriptions
defaultStream.subscribe((messages) => {
messages.forEach((message) => {
if (isChangeMessage(message)) {
defaultData.set(message.key, message.value)
}
})
})

otherStream.subscribe((messages) => {
messages.forEach((message) => {
if (isChangeMessage(message)) {
otherData.set(message.key, message.value)
}
})
})

// Insert data into both databases
const defaultId = await insertIssues({ title: `default db issue` })
const otherId = await insertIssuesToOtherDb({ title: `other db issue` })

// Give time for updates to propagate
await sleep(1000)

// Verify data from default database
expect([...defaultData.values()]).toHaveLength(1)
expect([...defaultData.values()][0]).toMatchObject({
id: defaultId[0],
title: `default db issue`,
})

// Verify data from other database
expect([...otherData.values()]).toHaveLength(1)
expect([...otherData.values()][0]).toMatchObject({
id: otherId[0],
title: `other db issue`,
})
}
)

it(`should allow databases to be deleted`, async () => {
const url = new URL(`${BASE_URL}/v1/admin/database/${otherDatabaseId}`)

// Add the database
const res = await fetch(url.toString(), { method: `DELETE` })

expect(res.status).toBe(200)
const body = await res.json()
expect(body).toBe(otherDatabaseId)
})
})
*/
28 changes: 5 additions & 23 deletions packages/typescript-client/test/support/global-setup.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
import type { GlobalSetupContext } from 'vitest/node'
import { makePgClient } from './test-helpers'
import { Client } from 'pg'

const url = process.env.ELECTRIC_URL ?? `http://localhost:3000`
const proxyUrl = process.env.ELECTRIC_PROXY_CACHE_URL ?? `http://localhost:3002`
const databaseId = process.env.ELECTRIC_DATABASE_ID ?? `test_tenant`
const otherDatabaseId = `other_test_tenant`
const otherDatabaseUrl =
process.env.OTHER_DATABASE_URL ??
`postgresql://postgres:password@localhost:54322/electric?sslmode=disable`

// name of proxy cache container to execute commands against,
// see docker-compose.yml that spins it up for details
Expand All @@ -24,9 +18,6 @@ declare module 'vitest' {
testPgSchema: string
proxyCacheContainerName: string
proxyCachePath: string
databaseId: string
otherDatabaseId: string
otherDatabaseUrl: string
}
}

Expand All @@ -38,7 +29,7 @@ function waitForElectric(url: string): Promise<void> {
)

const tryHealth = async () =>
fetch(`${url}/v1/health?database_id=${databaseId}`)
fetch(`${url}/v1/health`)
.then(async (res): Promise<void> => {
if (!res.ok) return tryHealth()
const { status } = (await res.json()) as { status: string }
Expand All @@ -63,27 +54,18 @@ export default async function ({ provide }: GlobalSetupContext) {
await waitForElectric(url)

const client = makePgClient()
const otherClient = new Client(otherDatabaseUrl)
const clients = [client, otherClient]

for (const c of clients) {
await c.connect()
await c.query(`CREATE SCHEMA IF NOT EXISTS electric_test`)
}
await client.connect()
await client.query(`CREATE SCHEMA IF NOT EXISTS electric_test`)

provide(`baseUrl`, url)
provide(`testPgSchema`, `electric_test`)
provide(`proxyCacheBaseUrl`, proxyUrl)
provide(`proxyCacheContainerName`, proxyCacheContainerName)
provide(`proxyCachePath`, proxyCachePath)
provide(`databaseId`, databaseId)
provide(`otherDatabaseId`, otherDatabaseId)
provide(`otherDatabaseUrl`, otherDatabaseUrl)

return async () => {
for (const c of clients) {
await c.query(`DROP SCHEMA electric_test CASCADE`)
await c.end()
}
await client.query(`DROP SCHEMA electric_test CASCADE`)
await client.end()
}
}
Loading
Loading