From 12fd091c4fde2972be0d691953dc53586bfc6d5a Mon Sep 17 00:00:00 2001 From: Kevin Date: Thu, 21 Nov 2024 11:36:25 +0100 Subject: [PATCH] feat! (ts client): Remove subscribeOnceToUpToDate from shape stream (#2017) Fixes https://github.com/electric-sql/electric/issues/1996. We remove `subscribeOnceToUpToDate` in favor of subscribing to the stream and "waiting" for the first `up-to-date` control message. **This is a breaking change.** --- .changeset/grumpy-cougars-cheat.md | 5 +++ packages/typescript-client/src/client.ts | 43 ------------------- packages/typescript-client/src/shape.ts | 9 ---- .../test/integration.test.ts | 4 -- 4 files changed, 5 insertions(+), 56 deletions(-) create mode 100644 .changeset/grumpy-cougars-cheat.md diff --git a/.changeset/grumpy-cougars-cheat.md b/.changeset/grumpy-cougars-cheat.md new file mode 100644 index 0000000000..0b6d2ed978 --- /dev/null +++ b/.changeset/grumpy-cougars-cheat.md @@ -0,0 +1,5 @@ +--- +"@electric-sql/client": minor +--- + +[BREAKING] Remove subscribeOnceToUpToDate method from ShapeStream. Instead, you should subscribe to the stream and check for the up-to-date control message. diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index d8c0228c09..81c31ec3db 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -154,12 +154,7 @@ export interface ShapeStreamInterface = Row> { callback: (messages: Message[]) => MaybePromise, onError?: (error: FetchError | Error) => void ): void - unsubscribeAllUpToDateSubscribers(): void unsubscribeAll(): void - subscribeOnceToUpToDate( - callback: () => MaybePromise, - error: (err: FetchError | Error) => void - ): () => void isLoading(): boolean lastSyncedAt(): number | undefined @@ -222,10 +217,6 @@ export class ShapeStream = Row> ((error: Error) => void) | undefined, ] >() - readonly #upToDateSubscribers = new Map< - number, - [() => void, (error: FetchError | Error) => void] - >() #lastOffset: Offset #liveCacheBuster: string // Seconds since our Electric Epoch 😎 @@ -366,7 +357,6 @@ export class ShapeStream = Row> continue } else if (e.status >= 400 && e.status < 500) { // Notify subscribers - this.#sendErrorToUpToDateSubscribers(e) this.#sendErrorToSubscribers(e) // 400 errors are not actionable without additional user input, @@ -408,7 +398,6 @@ export class ShapeStream = Row> // Update isUpToDate if (batch.length > 0) { - const prevUpToDate = this.#isUpToDate const lastMessage = batch[batch.length - 1] if (isUpToDateMessage(lastMessage)) { this.#lastSyncedAt = Date.now() @@ -416,9 +405,6 @@ export class ShapeStream = Row> } await this.#publish(batch) - if (!prevUpToDate && this.#isUpToDate) { - this.#notifyUpToDateSubscribers() - } } } } catch (err) { @@ -445,23 +431,6 @@ export class ShapeStream = Row> this.#subscribers.clear() } - subscribeOnceToUpToDate( - callback: () => MaybePromise, - error: (err: FetchError | Error) => void - ) { - const subscriptionId = Math.random() - - this.#upToDateSubscribers.set(subscriptionId, [callback, error]) - - return () => { - this.#upToDateSubscribers.delete(subscriptionId) - } - } - - unsubscribeAllUpToDateSubscribers(): void { - this.#upToDateSubscribers.clear() - } - /** Unix time at which we last synced. Undefined when `isLoading` is true. */ lastSyncedAt(): number | undefined { return this.#lastSyncedAt @@ -503,18 +472,6 @@ export class ShapeStream = Row> }) } - #notifyUpToDateSubscribers() { - this.#upToDateSubscribers.forEach(([callback]) => { - callback() - }) - } - - #sendErrorToUpToDateSubscribers(error: FetchError | Error) { - this.#upToDateSubscribers.forEach(([_, errorCallback]) => - errorCallback(error) - ) - } - /** * Resets the state of the stream, optionally with a provided * shape handle diff --git a/packages/typescript-client/src/shape.ts b/packages/typescript-client/src/shape.ts index 225837dd74..43fc480791 100644 --- a/packages/typescript-client/src/shape.ts +++ b/packages/typescript-client/src/shape.ts @@ -55,15 +55,6 @@ export class Shape = Row> { this.#process.bind(this), this.#handleError.bind(this) ) - const unsubscribe = this.#stream.subscribeOnceToUpToDate( - () => { - unsubscribe() - }, - (e) => { - this.#handleError(e) - throw e - } - ) } get isUpToDate(): boolean { diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index af2e79bb7e..b77ab3bd61 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -848,12 +848,8 @@ describe(`HTTP Sync`, () => { const errorSubscriberPromise = new Promise((_, reject) => invalidIssueStream.subscribe(() => {}, reject) ) - const errorUpToDateSubscriberPromise = new Promise((_, reject) => - invalidIssueStream.subscribeOnceToUpToDate(() => {}, reject) - ) await expect(errorSubscriberPromise).rejects.toThrow(FetchError) - await expect(errorUpToDateSubscriberPromise).rejects.toThrow(FetchError) expect(invalidIssueStream.error).instanceOf(FetchError) expect((invalidIssueStream.error! as FetchError).status).toBe(400) expect(invalidIssueStream.isConnected()).false