Skip to content

Commit

Permalink
feat: rows and rowsSync API on Shape (#1897)
Browse files Browse the repository at this point in the history
Fixes #1888

I've added both a `rows` and `rowsSync` API to match what we already
have for `value` and `valueSync`.

The react hooks already expose a `data` field that _is_ the rows (it was
doing the conversion there), so I don't think we need to change that
API.

That being said, currently the `subscribe` API also returns the `Map`
form of the data - if we want that to return rows and phase out the
`Map` format entirely, it will be a significant, breaking change.
  • Loading branch information
msfstef authored Oct 29, 2024
1 parent aebda2f commit a503c21
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 91 deletions.
8 changes: 8 additions & 0 deletions .changeset/friendly-toes-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@electric-sql/client": minor
"@electric-sql/react": patch
---

- Implement `rows` and `currentRows` getters on `Shape` interface for easier data access.
- [BREAKING] Rename `valueSync` getter on `Shape` to `currentValue` for clarity and consistency.
- [BREAKING] Change `subscribe` API on `Shape` to accept callbacks with signature `({ rows: T[], value: Map<string, T> }) => void`
4 changes: 2 additions & 2 deletions packages/react-hooks/src/react-hooks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export async function preloadShape<T extends Row<unknown> = Row>(
): Promise<Shape<T>> {
const shapeStream = getShapeStream<T>(options)
const shape = getShape<T>(shapeStream)
await shape.value
await shape.rows
return shape
}

Expand Down Expand Up @@ -96,7 +96,7 @@ function parseShapeData<T extends Row<unknown>>(
shape: Shape<T>
): UseShapeResult<T> {
return {
data: [...shape.valueSync.values()],
data: shape.currentRows,
isLoading: shape.isLoading(),
lastSyncedAt: shape.lastSyncedAt(),
isError: shape.error !== false,
Expand Down
6 changes: 3 additions & 3 deletions packages/typescript-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ const stream = new ShapeStream({
const shape = new Shape(stream)

// Returns promise that resolves with the latest shape data once it's fully loaded
await shape.value
await shape.rows

// passes subscribers shape data when the shape updates
shape.subscribe(shapeData => {
// shapeData is a Map of the latest value of each row in a shape.
shape.subscribe(({ rows }) => {
// rows is an array of the latest value of each row in a shape.
}
```
Expand Down
37 changes: 23 additions & 14 deletions packages/typescript-client/src/shape.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import { FetchError } from './error'
import { ShapeStreamInterface } from './client'

export type ShapeData<T extends Row<unknown> = Row> = Map<string, T>
export type ShapeChangedCallback<T extends Row<unknown> = Row> = (
export type ShapeChangedCallback<T extends Row<unknown> = Row> = (data: {
value: ShapeData<T>
) => void
rows: T[]
}) => void

/**
* A Shape is an object that subscribes to a shape log,
* keeps a materialised shape `.value` in memory and
* keeps a materialised shape `.rows` in memory and
* notifies subscribers when the value has changed.
*
* It can be used without a framework and as a primitive
Expand All @@ -24,19 +25,19 @@ export type ShapeChangedCallback<T extends Row<unknown> = Row> = (
* const shape = new Shape(shapeStream)
* ```
*
* `value` returns a promise that resolves the Shape data once the Shape has been
* `rows` returns a promise that resolves the Shape data once the Shape has been
* fully loaded (and when resuming from being offline):
*
* const value = await shape.value
* const rows = await shape.rows
*
* `valueSync` returns the current data synchronously:
* `currentRows` returns the current data synchronously:
*
* const value = shape.valueSync
* const rows = shape.currentRows
*
* Subscribe to updates. Called whenever the shape updates in Postgres.
*
* shape.subscribe(shapeData => {
* console.log(shapeData)
* shape.subscribe(({ rows }) => {
* console.log(rows)
* })
*/
export class Shape<T extends Row<unknown> = Row> {
Expand Down Expand Up @@ -69,21 +70,29 @@ export class Shape<T extends Row<unknown> = Row> {
return this.#stream.isUpToDate
}

get rows(): Promise<T[]> {
return this.value.then((v) => Array.from(v.values()))
}

get currentRows(): T[] {
return Array.from(this.currentValue.values())
}

get value(): Promise<ShapeData<T>> {
return new Promise((resolve, reject) => {
if (this.#stream.isUpToDate) {
resolve(this.valueSync)
resolve(this.currentValue)
} else {
const unsubscribe = this.subscribe((shapeData) => {
const unsubscribe = this.subscribe(({ value }) => {
unsubscribe()
if (this.#error) reject(this.#error)
resolve(shapeData)
resolve(value)
})
}
})
}

get valueSync() {
get currentValue() {
return this.#data
}

Expand Down Expand Up @@ -192,7 +201,7 @@ export class Shape<T extends Row<unknown> = Row> {

#notify(): void {
this.#subscribers.forEach((callback) => {
callback(this.valueSync)
callback({ value: this.currentValue, rows: this.currentRows })
})
}
}
10 changes: 6 additions & 4 deletions packages/typescript-client/test/client.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ describe(`client`, () => {

expectTypeOf(shape).toEqualTypeOf<Shape<Row>>()

shape.subscribe((data) => {
expectTypeOf(data).toEqualTypeOf<ShapeData<Row>>()
shape.subscribe(({ value, rows }) => {
expectTypeOf(value).toEqualTypeOf<ShapeData<Row>>()
expectTypeOf(rows).toEqualTypeOf<Row[]>()
})

const data = await shape.value
Expand All @@ -76,8 +77,9 @@ describe(`client`, () => {
const shape = new Shape(shapeStream)
expectTypeOf(shape).toEqualTypeOf<Shape<CustomRow>>()

shape.subscribe((data) => {
expectTypeOf(data).toEqualTypeOf<ShapeData<CustomRow>>()
shape.subscribe(({ value, rows }) => {
expectTypeOf(value).toEqualTypeOf<ShapeData<CustomRow>>()
expectTypeOf(rows).toEqualTypeOf<CustomRow[]>()
})

const data = await shape.value
Expand Down
114 changes: 56 additions & 58 deletions packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ describe(`Shape`, () => {
url: `${BASE_URL}/v1/shape/${issuesTableUrl}`,
})
const shape = new Shape(shapeStream)
const map = await shape.value

expect(map).toEqual(new Map())
expect(await shape.value).toEqual(new Map())
expect(await shape.rows).toEqual([])
expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(start)
expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now())
expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start)
})

it(`should notify with the initial value`, async ({
issuesTableUrl,
issuesTableKey,
insertIssues,
aborter,
}) => {
Expand All @@ -36,18 +35,11 @@ describe(`Shape`, () => {
})
const shape = new Shape(shapeStream)

const map = await new Promise((resolve) => {
shape.subscribe(resolve)
})

const expectedValue = new Map()
expectedValue.set(`${issuesTableKey}/"${id}"`, {
id: id,
title: `test title`,
priority: 10,
const rows = await new Promise((resolve) => {
shape.subscribe(({ rows }) => resolve(rows))
})

expect(map).toEqual(expectedValue)
expect(rows).toEqual([{ id: id, title: `test title`, priority: 10 }])
expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(start)
expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now())
expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start)
Expand All @@ -58,26 +50,27 @@ describe(`Shape`, () => {
insertIssues,
deleteIssue,
updateIssue,
issuesTableKey,
aborter,
}) => {
const [id] = await insertIssues({ title: `test title` })

const expectedValue1 = [
{
id: id,
title: `test title`,
priority: 10,
},
]

const start = Date.now()
const shapeStream = new ShapeStream({
url: `${BASE_URL}/v1/shape/${issuesTableUrl}`,
signal: aborter.signal,
})
const shape = new Shape(shapeStream)
const map = await shape.value
const rows = await shape.rows

const expectedValue = new Map()
expectedValue.set(`${issuesTableKey}/"${id}"`, {
id: id,
title: `test title`,
priority: 10,
})
expect(map).toEqual(expectedValue)
expect(rows).toEqual(expectedValue1)
expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(start)
expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now())
expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start)
Expand All @@ -98,12 +91,16 @@ describe(`Shape`, () => {
await sleep(200) // some time for electric to catch up
await hasNotified

expectedValue.set(`${issuesTableKey}/"${id2}"`, {
id: id2,
title: `new title`,
priority: 10,
})
expect(shape.valueSync).toEqual(expectedValue)
const expectedValue2 = [
...expectedValue1,
{
id: id2,
title: `new title`,
priority: 10,
},
]

expect(shape.currentRows).toEqual(expectedValue2)
expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(intermediate)
expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now())
expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - intermediate)
Expand All @@ -113,7 +110,6 @@ describe(`Shape`, () => {

it(`should resync from scratch on a shape rotation`, async ({
issuesTableUrl,
issuesTableKey,
insertIssues,
deleteIssue,
clearIssuesShape,
Expand All @@ -123,19 +119,21 @@ describe(`Shape`, () => {
const id2 = uuidv4()
await insertIssues({ id: id1, title: `foo1` })

const expectedValue1 = new Map()
expectedValue1.set(`${issuesTableKey}/"${id1}"`, {
id: id1,
title: `foo1`,
priority: 10,
})
const expectedValue1 = [
{
id: id1,
title: `foo1`,
priority: 10,
},
]

const expectedValue2 = new Map()
expectedValue2.set(`${issuesTableKey}/"${id2}"`, {
id: id2,
title: `foo2`,
priority: 10,
})
const expectedValue2 = [
{
id: id2,
title: `foo2`,
priority: 10,
},
]

let requestsMade = 0
const start = Date.now()
Expand Down Expand Up @@ -167,14 +165,14 @@ describe(`Shape`, () => {
let dataUpdateCount = 0
await new Promise<void>((resolve, reject) => {
setTimeout(() => reject(`Timed out waiting for data changes`), 1000)
shape.subscribe((shapeData) => {
shape.subscribe(({ rows }) => {
dataUpdateCount++
if (dataUpdateCount === 1) {
expect(shapeData).toEqual(expectedValue1)
expect(rows).toEqual(expectedValue1)
expect(shape.lastSynced()).toBeLessThanOrEqual(Date.now() - start)
return
} else if (dataUpdateCount === 2) {
expect(shapeData).toEqual(expectedValue2)
expect(rows).toEqual(expectedValue2)
expect(shape.lastSynced()).toBeLessThanOrEqual(
Date.now() - rotationTime
)
Expand All @@ -188,7 +186,6 @@ describe(`Shape`, () => {
it(`should notify subscribers when the value changes`, async ({
issuesTableUrl,
insertIssues,
issuesTableKey,
aborter,
}) => {
const [id] = await insertIssues({ title: `test title` })
Expand All @@ -201,23 +198,24 @@ describe(`Shape`, () => {
const shape = new Shape(shapeStream)

const hasNotified = new Promise((resolve) => {
shape.subscribe(resolve)
shape.subscribe(({ rows }) => resolve(rows))
})

const [id2] = await insertIssues({ title: `other title` })

const value = await hasNotified
const expectedValue = new Map()
expectedValue.set(`${issuesTableKey}/"${id}"`, {
id: id,
title: `test title`,
priority: 10,
})
expectedValue.set(`${issuesTableKey}/"${id2}"`, {
id: id2,
title: `other title`,
priority: 10,
})
const expectedValue = [
{
id: id,
title: `test title`,
priority: 10,
},
{
id: id2,
title: `other title`,
priority: 10,
},
]
expect(value).toEqual(expectedValue)
expect(shape.lastSyncedAt()).toBeGreaterThanOrEqual(start)
expect(shape.lastSyncedAt()).toBeLessThanOrEqual(Date.now())
Expand Down Expand Up @@ -252,7 +250,7 @@ describe(`Shape`, () => {
expect(shapeStream.isConnected()).true

const shape = new Shape(shapeStream)
await shape.value
await shape.rows

expect(shapeStream.isConnected()).true

Expand Down
8 changes: 4 additions & 4 deletions packages/typescript-client/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ describe(`HTTP Sync`, () => {
signal: aborter.signal,
})
const client = new Shape(issueStream)
const data = await client.value
const rows = await client.rows

expect([...data.values()]).toMatchObject([
expect(rows).toMatchObject([
{
txt: `test`,
i2: 1,
Expand Down Expand Up @@ -300,9 +300,9 @@ describe(`HTTP Sync`, () => {
const body = (await res.json()) as Message[]
expect(body.length).greaterThan(1)
})
const updatedData = client.valueSync
const updatedData = await client.rows

expect([...updatedData.values()]).toMatchObject([
expect(updatedData).toMatchObject([
{
txt: `changed`,
i2: 1,
Expand Down
Loading

0 comments on commit a503c21

Please sign in to comment.