Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ts-client): Start stream after subscribe #2188

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mean-humans-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": patch
---

Start streaming only after at least one subscriber is present.
8 changes: 6 additions & 2 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
]
>()

#started = false
#lastOffset: Offset
#liveCacheBuster: string // Seconds since our Electric Epoch 😎
#lastSyncedAt?: number // unix time
Expand Down Expand Up @@ -290,8 +291,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#fetchClient = createFetchWithResponseHeadersCheck(
createFetchWithChunkBuffer(fetchWithBackoffClient)
)

this.#start()
}

get shapeHandle() {
Expand All @@ -311,6 +310,9 @@ export class ShapeStream<T extends Row<unknown> = Row>
}

async #start() {
if (this.#started) throw new Error(`Cannot start stream twice`)
this.#started = true

try {
while (
(!this.options.signal?.aborted && !this.#isUpToDate) ||
Expand Down Expand Up @@ -462,6 +464,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
}

// Restart
this.#started = false
this.#start()
}
return
Expand All @@ -481,6 +484,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
const subscriptionId = Math.random()

this.#subscribers.set(subscriptionId, [callback, onError])
if (!this.#started) this.#start()

return () => {
this.#subscribers.delete(subscriptionId)
Expand Down
39 changes: 28 additions & 11 deletions packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ describe(`Shape`, () => {
signal: aborter.signal,
})

await sleep(100) // give some time for the initial fetch to complete
// give some time for the initial fetch to complete
await waitForFetch(shapeStream)
expect(shapeStream.isConnected()).true

const shape = new Shape(shapeStream)
Expand Down Expand Up @@ -328,6 +329,8 @@ describe(`Shape`, () => {
},
})

const unsubscribe = shapeStream.subscribe(() => unsubscribe())

await sleep(100) // give some time for the initial fetch to complete
expect(shapeStream.isConnected()).true

Expand All @@ -347,7 +350,7 @@ describe(`Shape`, () => {
issuesTableUrl,
}) => {
const mockErrorHandler = vi.fn()
new ShapeStream({
const shapeStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: {
table: issuesTableUrl,
Expand All @@ -360,7 +363,7 @@ describe(`Shape`, () => {
onError: mockErrorHandler,
})

await sleep(10) // give some time for the initial fetch to complete
await waitForFetch(shapeStream)
expect(mockErrorHandler.mock.calls.length).toBe(1)
expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError)
})
Expand All @@ -383,7 +386,7 @@ describe(`Shape`, () => {
}
})

new ShapeStream({
const shapeStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: {
table: issuesTableUrl,
Expand All @@ -402,7 +405,7 @@ describe(`Shape`, () => {
onError: mockErrorHandler,
})

await sleep(50) // give some time for the fetches to complete
await waitForFetch(shapeStream)
expect(mockErrorHandler.mock.calls.length).toBe(1)
expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError)
})
Expand All @@ -425,7 +428,7 @@ describe(`Shape`, () => {
}
})

new ShapeStream({
const shapeStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: {
table: issuesTableUrl,
Expand All @@ -446,7 +449,7 @@ describe(`Shape`, () => {
onError: mockErrorHandler,
})

await sleep(50) // give some time for the fetches to complete
await waitForFetch(shapeStream)
expect(mockErrorHandler.mock.calls.length).toBe(1)
expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError)
})
Expand Down Expand Up @@ -489,7 +492,7 @@ describe(`Shape`, () => {
onError: mockErrorHandler,
})

await sleep(50) // give some time for the first fetch to complete
await waitForFetch(shapeStream)
expect(mockErrorHandler.mock.calls.length).toBe(1)
expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError)
expect(shapeStream.isConnected()).toBe(false)
Expand Down Expand Up @@ -521,7 +524,9 @@ describe(`Shape`, () => {
},
})

await sleep(10) // give some time for the initial fetch to complete
const unsub = shapeStream.subscribe(() => unsub())
await sleep(10) // give sometime for fetch to fail

expect(shapeStream.isConnected()).false

const expectedErrorMessage = new MissingHeadersError(url, [
Expand Down Expand Up @@ -549,6 +554,7 @@ describe(`Shape`, () => {
},
})

const unsubLive = shapeStreamLive.subscribe(() => unsubLive())
await sleep(10) // give some time for the initial fetch to complete
expect(shapeStreamLive.isConnected()).false

Expand All @@ -573,7 +579,8 @@ describe(`Shape`, () => {
subscribe: false,
})

await sleep(100) // give some time for the fetch to complete
await waitForFetch(shapeStream)
await sleep(50)

// We should no longer be connected because
// the initial fetch finished and we've not subscribed to changes
Expand All @@ -594,7 +601,7 @@ describe(`Shape`, () => {

expect(shapeStream.isLoading()).true

await sleep(200) // give some time for the initial fetch to complete
await waitForFetch(shapeStream)

expect(shapeStream.isLoading()).false
})
Expand Down Expand Up @@ -665,3 +672,13 @@ describe(`Shape`, () => {
}
})
})

function waitForFetch(stream: ShapeStream): Promise<void> {
let unsub = () => {}
return new Promise<void>((resolve) => {
unsub = stream.subscribe(
() => resolve(),
() => resolve()
)
}).finally(() => unsub())
}
44 changes: 42 additions & 2 deletions packages/typescript-client/test/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe(`ShapeStream`, () => {
}

const aborter = new AbortController()
new ShapeStream({
const stream = new ShapeStream({
url: shapeUrl,
params: {
table: `foo`,
Expand All @@ -35,6 +35,7 @@ describe(`ShapeStream`, () => {
'X-Custom-Header': `my-value`,
},
})
const unsub = stream.subscribe(() => unsub())

await new Promise((resolve) =>
eventTarget.addEventListener(`fetch`, resolve, { once: true })
Expand All @@ -60,7 +61,7 @@ describe(`ShapeStream`, () => {
}

const aborter = new AbortController()
new ShapeStream({
const stream = new ShapeStream({
url: shapeUrl,
params: {
table: `foo`,
Expand All @@ -72,6 +73,8 @@ describe(`ShapeStream`, () => {
fetchClient: fetchWrapper,
})

const unsub = stream.subscribe(() => unsub())

await new Promise((resolve) =>
eventTarget.addEventListener(`fetch`, resolve, { once: true })
)
Expand All @@ -80,4 +83,41 @@ describe(`ShapeStream`, () => {
`columns=id&handle=potato&offset=-1&table=foo&where=a%3D1`
)
})

it(`should start requesting only after first subscription`, async () => {
const eventTarget = new EventTarget()
const fetchWrapper = (): Promise<Response> => {
eventTarget.dispatchEvent(new Event(`fetch`))
return Promise.resolve(Response.error())
}

const aborter = new AbortController()
const stream = new ShapeStream({
url: shapeUrl,
params: {
table: `foo`,
where: `a=1`,
columns: [`id`],
},
handle: `potato`,
signal: aborter.signal,
fetchClient: fetchWrapper,
})

// should not fire any fetch requests
await new Promise<void>((resolve, reject) => {
eventTarget.addEventListener(`fetch`, reject, { once: true })
setTimeout(() => resolve(), 100)
})

// should fire fetch immediately after subbing
const startedStreaming = new Promise<void>((resolve, reject) => {
eventTarget.addEventListener(`fetch`, () => resolve(), {
once: true,
})
setTimeout(() => reject(`timed out`), 100)
})
const unsub = stream.subscribe(() => unsub())
await startedStreaming
})
})
Loading