Skip to content

Commit

Permalink
feat! (ts client): Remove subscribeOnceToUpToDate from shape stream (#…
Browse files Browse the repository at this point in the history
…2017)

Fixes #1996.

We remove `subscribeOnceToUpToDate` in favor of subscribing to the
stream and "waiting" for the first `up-to-date` control message. **This
is a breaking change.**
  • Loading branch information
kevin-dp authored Nov 21, 2024
1 parent 1faa79b commit 12fd091
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 56 deletions.
5 changes: 5 additions & 0 deletions .changeset/grumpy-cougars-cheat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": minor
---

[BREAKING] Remove subscribeOnceToUpToDate method from ShapeStream. Instead, you should subscribe to the stream and check for the up-to-date control message.
43 changes: 0 additions & 43 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,7 @@ export interface ShapeStreamInterface<T extends Row<unknown> = Row> {
callback: (messages: Message<T>[]) => MaybePromise<void>,
onError?: (error: FetchError | Error) => void
): void
unsubscribeAllUpToDateSubscribers(): void
unsubscribeAll(): void
subscribeOnceToUpToDate(
callback: () => MaybePromise<void>,
error: (err: FetchError | Error) => void
): () => void

isLoading(): boolean
lastSyncedAt(): number | undefined
Expand Down Expand Up @@ -222,10 +217,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
((error: Error) => void) | undefined,
]
>()
readonly #upToDateSubscribers = new Map<
number,
[() => void, (error: FetchError | Error) => void]
>()

#lastOffset: Offset
#liveCacheBuster: string // Seconds since our Electric Epoch 😎
Expand Down Expand Up @@ -366,7 +357,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
continue
} else if (e.status >= 400 && e.status < 500) {
// Notify subscribers
this.#sendErrorToUpToDateSubscribers(e)
this.#sendErrorToSubscribers(e)

// 400 errors are not actionable without additional user input,
Expand Down Expand Up @@ -408,17 +398,13 @@ export class ShapeStream<T extends Row<unknown> = Row>

// Update isUpToDate
if (batch.length > 0) {
const prevUpToDate = this.#isUpToDate
const lastMessage = batch[batch.length - 1]
if (isUpToDateMessage(lastMessage)) {
this.#lastSyncedAt = Date.now()
this.#isUpToDate = true
}

await this.#publish(batch)
if (!prevUpToDate && this.#isUpToDate) {
this.#notifyUpToDateSubscribers()
}
}
}
} catch (err) {
Expand All @@ -445,23 +431,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#subscribers.clear()
}

subscribeOnceToUpToDate(
callback: () => MaybePromise<void>,
error: (err: FetchError | Error) => void
) {
const subscriptionId = Math.random()

this.#upToDateSubscribers.set(subscriptionId, [callback, error])

return () => {
this.#upToDateSubscribers.delete(subscriptionId)
}
}

unsubscribeAllUpToDateSubscribers(): void {
this.#upToDateSubscribers.clear()
}

/** Unix time at which we last synced. Undefined when `isLoading` is true. */
lastSyncedAt(): number | undefined {
return this.#lastSyncedAt
Expand Down Expand Up @@ -503,18 +472,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
})
}

#notifyUpToDateSubscribers() {
this.#upToDateSubscribers.forEach(([callback]) => {
callback()
})
}

#sendErrorToUpToDateSubscribers(error: FetchError | Error) {
this.#upToDateSubscribers.forEach(([_, errorCallback]) =>
errorCallback(error)
)
}

/**
* Resets the state of the stream, optionally with a provided
* shape handle
Expand Down
9 changes: 0 additions & 9 deletions packages/typescript-client/src/shape.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,6 @@ export class Shape<T extends Row<unknown> = Row> {
this.#process.bind(this),
this.#handleError.bind(this)
)
const unsubscribe = this.#stream.subscribeOnceToUpToDate(
() => {
unsubscribe()
},
(e) => {
this.#handleError(e)
throw e
}
)
}

get isUpToDate(): boolean {
Expand Down
4 changes: 0 additions & 4 deletions packages/typescript-client/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -848,12 +848,8 @@ describe(`HTTP Sync`, () => {
const errorSubscriberPromise = new Promise((_, reject) =>
invalidIssueStream.subscribe(() => {}, reject)
)
const errorUpToDateSubscriberPromise = new Promise((_, reject) =>
invalidIssueStream.subscribeOnceToUpToDate(() => {}, reject)
)

await expect(errorSubscriberPromise).rejects.toThrow(FetchError)
await expect(errorUpToDateSubscriberPromise).rejects.toThrow(FetchError)
expect(invalidIssueStream.error).instanceOf(FetchError)
expect((invalidIssueStream.error! as FetchError).status).toBe(400)
expect(invalidIssueStream.isConnected()).false
Expand Down

0 comments on commit 12fd091

Please sign in to comment.