diff --git a/.changeset/mean-humans-clean.md b/.changeset/mean-humans-clean.md new file mode 100644 index 0000000000..cc0d97d904 --- /dev/null +++ b/.changeset/mean-humans-clean.md @@ -0,0 +1,5 @@ +--- +"@electric-sql/client": patch +--- + +Start streaming only after at least one subscriber is present. diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 1ee875535b..90f8eed36c 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -257,6 +257,7 @@ export class ShapeStream = Row> ] >() + #started = false #lastOffset: Offset #liveCacheBuster: string // Seconds since our Electric Epoch 😎 #lastSyncedAt?: number // unix time @@ -290,8 +291,6 @@ export class ShapeStream = Row> this.#fetchClient = createFetchWithResponseHeadersCheck( createFetchWithChunkBuffer(fetchWithBackoffClient) ) - - this.#start() } get shapeHandle() { @@ -311,6 +310,9 @@ export class ShapeStream = Row> } async #start() { + if (this.#started) throw new Error(`Cannot start stream twice`) + this.#started = true + try { while ( (!this.options.signal?.aborted && !this.#isUpToDate) || @@ -462,6 +464,7 @@ export class ShapeStream = Row> } // Restart + this.#started = false this.#start() } return @@ -481,6 +484,7 @@ export class ShapeStream = Row> const subscriptionId = Math.random() this.#subscribers.set(subscriptionId, [callback, onError]) + if (!this.#started) this.#start() return () => { this.#subscribers.delete(subscriptionId) diff --git a/packages/typescript-client/test/client.test.ts b/packages/typescript-client/test/client.test.ts index d227fb1669..1f85d35027 100644 --- a/packages/typescript-client/test/client.test.ts +++ b/packages/typescript-client/test/client.test.ts @@ -282,7 +282,8 @@ describe(`Shape`, () => { signal: aborter.signal, }) - await sleep(100) // give some time for the initial fetch to complete + // give some time for the initial fetch to complete + await waitForFetch(shapeStream) expect(shapeStream.isConnected()).true const shape = new Shape(shapeStream) @@ -328,6 +329,8 @@ describe(`Shape`, () => { }, }) + const unsubscribe = shapeStream.subscribe(() => unsubscribe()) + await sleep(100) // give some time for the initial fetch to complete expect(shapeStream.isConnected()).true @@ -347,7 +350,7 @@ describe(`Shape`, () => { issuesTableUrl, }) => { const mockErrorHandler = vi.fn() - new ShapeStream({ + const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, params: { table: issuesTableUrl, @@ -360,7 +363,7 @@ describe(`Shape`, () => { onError: mockErrorHandler, }) - await sleep(10) // give some time for the initial fetch to complete + await waitForFetch(shapeStream) expect(mockErrorHandler.mock.calls.length).toBe(1) expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) }) @@ -383,7 +386,7 @@ describe(`Shape`, () => { } }) - new ShapeStream({ + const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, params: { table: issuesTableUrl, @@ -402,7 +405,7 @@ describe(`Shape`, () => { onError: mockErrorHandler, }) - await sleep(50) // give some time for the fetches to complete + await waitForFetch(shapeStream) expect(mockErrorHandler.mock.calls.length).toBe(1) expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) }) @@ -425,7 +428,7 @@ describe(`Shape`, () => { } }) - new ShapeStream({ + const shapeStream = new ShapeStream({ url: `${BASE_URL}/v1/shape`, params: { table: issuesTableUrl, @@ -446,7 +449,7 @@ describe(`Shape`, () => { onError: mockErrorHandler, }) - await sleep(50) // give some time for the fetches to complete + await waitForFetch(shapeStream) expect(mockErrorHandler.mock.calls.length).toBe(1) expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) }) @@ -489,7 +492,7 @@ describe(`Shape`, () => { onError: mockErrorHandler, }) - await sleep(50) // give some time for the first fetch to complete + await waitForFetch(shapeStream) expect(mockErrorHandler.mock.calls.length).toBe(1) expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) expect(shapeStream.isConnected()).toBe(false) @@ -521,7 +524,9 @@ describe(`Shape`, () => { }, }) - await sleep(10) // give some time for the initial fetch to complete + const unsub = shapeStream.subscribe(() => unsub()) + await sleep(10) // give sometime for fetch to fail + expect(shapeStream.isConnected()).false const expectedErrorMessage = new MissingHeadersError(url, [ @@ -549,6 +554,7 @@ describe(`Shape`, () => { }, }) + const unsubLive = shapeStreamLive.subscribe(() => unsubLive()) await sleep(10) // give some time for the initial fetch to complete expect(shapeStreamLive.isConnected()).false @@ -573,7 +579,8 @@ describe(`Shape`, () => { subscribe: false, }) - await sleep(100) // give some time for the fetch to complete + await waitForFetch(shapeStream) + await sleep(50) // We should no longer be connected because // the initial fetch finished and we've not subscribed to changes @@ -594,7 +601,7 @@ describe(`Shape`, () => { expect(shapeStream.isLoading()).true - await sleep(200) // give some time for the initial fetch to complete + await waitForFetch(shapeStream) expect(shapeStream.isLoading()).false }) @@ -665,3 +672,13 @@ describe(`Shape`, () => { } }) }) + +function waitForFetch(stream: ShapeStream): Promise { + let unsub = () => {} + return new Promise((resolve) => { + unsub = stream.subscribe( + () => resolve(), + () => resolve() + ) + }).finally(() => unsub()) +} diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index 07f8d64bcc..16da17bf98 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -23,7 +23,7 @@ describe(`ShapeStream`, () => { } const aborter = new AbortController() - new ShapeStream({ + const stream = new ShapeStream({ url: shapeUrl, params: { table: `foo`, @@ -35,6 +35,7 @@ describe(`ShapeStream`, () => { 'X-Custom-Header': `my-value`, }, }) + const unsub = stream.subscribe(() => unsub()) await new Promise((resolve) => eventTarget.addEventListener(`fetch`, resolve, { once: true }) @@ -60,7 +61,7 @@ describe(`ShapeStream`, () => { } const aborter = new AbortController() - new ShapeStream({ + const stream = new ShapeStream({ url: shapeUrl, params: { table: `foo`, @@ -72,6 +73,8 @@ describe(`ShapeStream`, () => { fetchClient: fetchWrapper, }) + const unsub = stream.subscribe(() => unsub()) + await new Promise((resolve) => eventTarget.addEventListener(`fetch`, resolve, { once: true }) ) @@ -80,4 +83,41 @@ describe(`ShapeStream`, () => { `columns=id&handle=potato&offset=-1&table=foo&where=a%3D1` ) }) + + it(`should start requesting only after first subscription`, async () => { + const eventTarget = new EventTarget() + const fetchWrapper = (): Promise => { + eventTarget.dispatchEvent(new Event(`fetch`)) + return Promise.resolve(Response.error()) + } + + const aborter = new AbortController() + const stream = new ShapeStream({ + url: shapeUrl, + params: { + table: `foo`, + where: `a=1`, + columns: [`id`], + }, + handle: `potato`, + signal: aborter.signal, + fetchClient: fetchWrapper, + }) + + // should not fire any fetch requests + await new Promise((resolve, reject) => { + eventTarget.addEventListener(`fetch`, reject, { once: true }) + setTimeout(() => resolve(), 100) + }) + + // should fire fetch immediately after subbing + const startedStreaming = new Promise((resolve, reject) => { + eventTarget.addEventListener(`fetch`, () => resolve(), { + once: true, + }) + setTimeout(() => reject(`timed out`), 100) + }) + const unsub = stream.subscribe(() => unsub()) + await startedStreaming + }) })