diff --git a/.changeset/shiny-rules-drive.md b/.changeset/shiny-rules-drive.md new file mode 100644 index 0000000000..12f689f882 --- /dev/null +++ b/.changeset/shiny-rules-drive.md @@ -0,0 +1,5 @@ +--- +"@electric-sql/client": patch +--- + +Verify that fetch response contains required Electric headers. diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index d08d42f7bf..fb90ae75cd 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -8,12 +8,17 @@ import { } from './types' import { MessageParser, Parser } from './parser' import { isUpToDateMessage } from './helpers' -import { FetchError, FetchBackoffAbortError } from './error' +import { + FetchError, + FetchBackoffAbortError, + MissingHeadersError, +} from './error' import { BackoffDefaults, BackoffOptions, createFetchWithBackoff, createFetchWithChunkBuffer, + createFetchWithResponseHeadersCheck, } from './fetch' import { CHUNK_LAST_OFFSET_HEADER, @@ -252,7 +257,9 @@ export class ShapeStream = Row> }, }) - this.#fetchClient = createFetchWithChunkBuffer(fetchWithBackoffClient) + this.#fetchClient = createFetchWithResponseHeadersCheck( + createFetchWithChunkBuffer(fetchWithBackoffClient) + ) this.start() } @@ -345,6 +352,7 @@ export class ShapeStream = Row> this.#connected = true } catch (e) { if (e instanceof FetchBackoffAbortError) break // interrupted + if (e instanceof MissingHeadersError) throw e if (!(e instanceof FetchError)) throw e // should never happen if (e.status == 409) { // Upon receiving a 409, we should start from scratch diff --git a/packages/typescript-client/src/error.ts b/packages/typescript-client/src/error.ts index b1cf1d5230..1fdc244027 100644 --- a/packages/typescript-client/src/error.ts +++ b/packages/typescript-client/src/error.ts @@ -48,3 +48,14 @@ export class FetchBackoffAbortError extends Error { super(`Fetch with backoff aborted`) } } + +export class MissingHeadersError extends Error { + constructor(url: string, missingHeaders: Array) { + let msg = `The response for the shape request to ${url} didn't include the following required headers:\n` + missingHeaders.forEach((h) => { + msg += `- ${h}\n` + }) + msg += `\nThis is often due to a proxy not setting CORS correctly so that all Electric headers can be read by the client.` + super(msg) + } +} diff --git a/packages/typescript-client/src/fetch.ts b/packages/typescript-client/src/fetch.ts index bc06752a64..6349d5b7f5 100644 --- a/packages/typescript-client/src/fetch.ts +++ b/packages/typescript-client/src/fetch.ts @@ -6,7 +6,11 @@ import { SHAPE_HANDLE_HEADER, SHAPE_HANDLE_QUERY_PARAM, } from './constants' -import { FetchError, FetchBackoffAbortError } from './error' +import { + FetchError, + FetchBackoffAbortError, + MissingHeadersError, +} from './error' // Some specific 4xx and 5xx HTTP status codes that we definitely // want to retry @@ -146,6 +150,53 @@ export function createFetchWithChunkBuffer( return prefetchClient } +export const requiredElectricResponseHeaders = [ + `electric-offset`, + `electric-handle`, +] + +export const requiredLiveResponseHeaders = [`electric-cursor`] + +export const requiredNonLiveResponseHeaders = [`electric-schema`] + +export function createFetchWithResponseHeadersCheck( + fetchClient: typeof fetch +): typeof fetch { + return async (...args: Parameters) => { + const response = await fetchClient(...args) + + if (response.ok) { + // Check that the necessary Electric headers are present on the response + const headers = response.headers + const missingHeaders: Array = [] + + const addMissingHeaders = (requiredHeaders: Array) => + requiredHeaders.filter((h) => !headers.has(h)) + addMissingHeaders(requiredElectricResponseHeaders) + + const input = args[0] + const urlString = input.toString() + const url = new URL(urlString) + if (url.searchParams.has(LIVE_QUERY_PARAM, `true`)) { + addMissingHeaders(requiredLiveResponseHeaders) + } + + if ( + !url.searchParams.has(LIVE_QUERY_PARAM) || + url.searchParams.has(LIVE_QUERY_PARAM, `false`) + ) { + addMissingHeaders(requiredNonLiveResponseHeaders) + } + + if (missingHeaders.length > 0) { + throw new MissingHeadersError(urlString, missingHeaders) + } + } + + return response + } +} + class PrefetchQueue { readonly #fetchClient: typeof fetch readonly #maxPrefetchedRequests: number diff --git a/packages/typescript-client/test/client.test.ts b/packages/typescript-client/test/client.test.ts index decbcf82b9..f55ae051b9 100644 --- a/packages/typescript-client/test/client.test.ts +++ b/packages/typescript-client/test/client.test.ts @@ -4,6 +4,8 @@ import { setTimeout as sleep } from 'node:timers/promises' import { testWithIssuesTable as it } from './support/test-context' import { ShapeStream, Shape, FetchError } from '../src' import { Message, Row, ChangeMessage } from '../src/types' +import { requiredElectricResponseHeaders } from '../src/fetch' +import { MissingHeadersError } from '../src/error' const BASE_URL = inject(`baseUrl`) @@ -300,7 +302,14 @@ describe(`Shape`, () => { undefined ) await sleep(50) - return new Response(undefined, { status: 204 }) + return new Response(undefined, { + status: 204, + headers: new Headers({ + [`electric-offset`]: `0_0`, + [`electric-handle`]: `foo`, + [`electric-schema`]: ``, + }), + }) }, }) @@ -319,6 +328,58 @@ describe(`Shape`, () => { expect(shapeStream.isConnected()).true }) + it(`should stop fetching and report an error if response is missing required headers`, async ({ + issuesTableUrl, + }) => { + let url: string = `` + const shapeStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape`, + table: issuesTableUrl, + fetchClient: async (input, _init) => { + url = input.toString() + const headers = new Headers() + headers.set(`electric-offset`, `0_0`) + return new Response(undefined, { status: 200, headers }) + }, + }) + + await sleep(10) // give some time for the initial fetch to complete + expect(shapeStream.isConnected()).false + + const missingHeaders = requiredElectricResponseHeaders.filter( + (h) => h !== `electric-offset` + ) + const expectedErrorMessage = new MissingHeadersError(url, missingHeaders) + .message + expect((shapeStream.error as Error).message === expectedErrorMessage) + + // Also check that electric-cursor is a required header for responses to live queries + const shapeStreamLive = new ShapeStream({ + url: `${BASE_URL}/v1/shape?live=true`, + table: issuesTableUrl, + fetchClient: async (input, _init) => { + url = input.toString() + const headers = new Headers() + headers.set(`electric-offset`, `0_0`) + return new Response(undefined, { status: 200, headers }) + }, + }) + + await sleep(10) // give some time for the initial fetch to complete + expect(shapeStreamLive.isConnected()).false + + const missingHeadersLive = requiredElectricResponseHeaders + .concat([`electric-cursor`]) + .filter((h) => h !== `electric-offset`) + const expectedErrorMessageLive = new MissingHeadersError( + url, + missingHeadersLive + ).message + expect( + (shapeStreamLive.error as Error).message === expectedErrorMessageLive + ) + }) + it(`should set isConnected to false after fetch if not subscribed`, async ({ issuesTableUrl, }) => { diff --git a/website/electric-api.yaml b/website/electric-api.yaml index 4eadd274ca..eae90bf18a 100644 --- a/website/electric-api.yaml +++ b/website/electric-api.yaml @@ -229,8 +229,9 @@ paths: electric-schema: schema: type: string - example: '":0},"status":{"type":"text","dimensions":0,"max_length":8}}' + example: '{"id":{"type":"int4","dimensions":0},"title":{"type":"text","dimensions":0},"status":{"type":"text","dimensions":0,"max_length":8}}' description: |- + Only present on responses to non-live requests. A JSON string of an object that maps column names to the corresponding schema object. The schema object contains the type of the column, the number of dimensions, and possibly additional properties. Non-array types have a dimension of `0`, while array types have a dimension of 1 or more.