Skip to content

Commit

Permalink
chore (ts-client): validate required Electric headers on response (#1957
Browse files Browse the repository at this point in the history
)

Fixes #1950

Questions:
- Are the headers only required on 2XX responses or also on other
responses?
   They are currently only being checked on 2XX responses.
- Are we checking *all* required headers?
Currently we check `electric-offset`, `electric-handle`, and
`electric-schema`
  • Loading branch information
kevin-dp authored Nov 19, 2024
1 parent 5063314 commit 71d61b5
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 5 deletions.
5 changes: 5 additions & 0 deletions .changeset/shiny-rules-drive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": patch
---

Verify that fetch response contains required Electric headers.
12 changes: 10 additions & 2 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ import {
} from './types'
import { MessageParser, Parser } from './parser'
import { isUpToDateMessage } from './helpers'
import { FetchError, FetchBackoffAbortError } from './error'
import {
FetchError,
FetchBackoffAbortError,
MissingHeadersError,
} from './error'
import {
BackoffDefaults,
BackoffOptions,
createFetchWithBackoff,
createFetchWithChunkBuffer,
createFetchWithResponseHeadersCheck,
} from './fetch'
import {
CHUNK_LAST_OFFSET_HEADER,
Expand Down Expand Up @@ -252,7 +257,9 @@ export class ShapeStream<T extends Row<unknown> = Row>
},
})

this.#fetchClient = createFetchWithChunkBuffer(fetchWithBackoffClient)
this.#fetchClient = createFetchWithResponseHeadersCheck(
createFetchWithChunkBuffer(fetchWithBackoffClient)
)

this.start()
}
Expand Down Expand Up @@ -345,6 +352,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#connected = true
} catch (e) {
if (e instanceof FetchBackoffAbortError) break // interrupted
if (e instanceof MissingHeadersError) throw e
if (!(e instanceof FetchError)) throw e // should never happen
if (e.status == 409) {
// Upon receiving a 409, we should start from scratch
Expand Down
11 changes: 11 additions & 0 deletions packages/typescript-client/src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,14 @@ export class FetchBackoffAbortError extends Error {
super(`Fetch with backoff aborted`)
}
}

export class MissingHeadersError extends Error {
constructor(url: string, missingHeaders: Array<string>) {
let msg = `The response for the shape request to ${url} didn't include the following required headers:\n`
missingHeaders.forEach((h) => {
msg += `- ${h}\n`
})
msg += `\nThis is often due to a proxy not setting CORS correctly so that all Electric headers can be read by the client.`
super(msg)
}
}
53 changes: 52 additions & 1 deletion packages/typescript-client/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import {
SHAPE_HANDLE_HEADER,
SHAPE_HANDLE_QUERY_PARAM,
} from './constants'
import { FetchError, FetchBackoffAbortError } from './error'
import {
FetchError,
FetchBackoffAbortError,
MissingHeadersError,
} from './error'

// Some specific 4xx and 5xx HTTP status codes that we definitely
// want to retry
Expand Down Expand Up @@ -146,6 +150,53 @@ export function createFetchWithChunkBuffer(
return prefetchClient
}

export const requiredElectricResponseHeaders = [
`electric-offset`,
`electric-handle`,
]

export const requiredLiveResponseHeaders = [`electric-cursor`]

export const requiredNonLiveResponseHeaders = [`electric-schema`]

export function createFetchWithResponseHeadersCheck(
fetchClient: typeof fetch
): typeof fetch {
return async (...args: Parameters<typeof fetchClient>) => {
const response = await fetchClient(...args)

if (response.ok) {
// Check that the necessary Electric headers are present on the response
const headers = response.headers
const missingHeaders: Array<string> = []

const addMissingHeaders = (requiredHeaders: Array<string>) =>
requiredHeaders.filter((h) => !headers.has(h))
addMissingHeaders(requiredElectricResponseHeaders)

const input = args[0]
const urlString = input.toString()
const url = new URL(urlString)
if (url.searchParams.has(LIVE_QUERY_PARAM, `true`)) {
addMissingHeaders(requiredLiveResponseHeaders)
}

if (
!url.searchParams.has(LIVE_QUERY_PARAM) ||
url.searchParams.has(LIVE_QUERY_PARAM, `false`)
) {
addMissingHeaders(requiredNonLiveResponseHeaders)
}

if (missingHeaders.length > 0) {
throw new MissingHeadersError(urlString, missingHeaders)
}
}

return response
}
}

class PrefetchQueue {
readonly #fetchClient: typeof fetch
readonly #maxPrefetchedRequests: number
Expand Down
63 changes: 62 additions & 1 deletion packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { setTimeout as sleep } from 'node:timers/promises'
import { testWithIssuesTable as it } from './support/test-context'
import { ShapeStream, Shape, FetchError } from '../src'
import { Message, Row, ChangeMessage } from '../src/types'
import { requiredElectricResponseHeaders } from '../src/fetch'
import { MissingHeadersError } from '../src/error'

const BASE_URL = inject(`baseUrl`)

Expand Down Expand Up @@ -300,7 +302,14 @@ describe(`Shape`, () => {
undefined
)
await sleep(50)
return new Response(undefined, { status: 204 })
return new Response(undefined, {
status: 204,
headers: new Headers({
[`electric-offset`]: `0_0`,
[`electric-handle`]: `foo`,
[`electric-schema`]: ``,
}),
})
},
})

Expand All @@ -319,6 +328,58 @@ describe(`Shape`, () => {
expect(shapeStream.isConnected()).true
})

it(`should stop fetching and report an error if response is missing required headers`, async ({
issuesTableUrl,
}) => {
let url: string = ``
const shapeStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
table: issuesTableUrl,
fetchClient: async (input, _init) => {
url = input.toString()
const headers = new Headers()
headers.set(`electric-offset`, `0_0`)
return new Response(undefined, { status: 200, headers })
},
})

await sleep(10) // give some time for the initial fetch to complete
expect(shapeStream.isConnected()).false

const missingHeaders = requiredElectricResponseHeaders.filter(
(h) => h !== `electric-offset`
)
const expectedErrorMessage = new MissingHeadersError(url, missingHeaders)
.message
expect((shapeStream.error as Error).message === expectedErrorMessage)

// Also check that electric-cursor is a required header for responses to live queries
const shapeStreamLive = new ShapeStream({
url: `${BASE_URL}/v1/shape?live=true`,
table: issuesTableUrl,
fetchClient: async (input, _init) => {
url = input.toString()
const headers = new Headers()
headers.set(`electric-offset`, `0_0`)
return new Response(undefined, { status: 200, headers })
},
})

await sleep(10) // give some time for the initial fetch to complete
expect(shapeStreamLive.isConnected()).false

const missingHeadersLive = requiredElectricResponseHeaders
.concat([`electric-cursor`])
.filter((h) => h !== `electric-offset`)
const expectedErrorMessageLive = new MissingHeadersError(
url,
missingHeadersLive
).message
expect(
(shapeStreamLive.error as Error).message === expectedErrorMessageLive
)
})

it(`should set isConnected to false after fetch if not subscribed`, async ({
issuesTableUrl,
}) => {
Expand Down
3 changes: 2 additions & 1 deletion website/electric-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ paths:
electric-schema:
schema:
type: string
example: '":0},"status":{"type":"text","dimensions":0,"max_length":8}}'
example: '{"id":{"type":"int4","dimensions":0},"title":{"type":"text","dimensions":0},"status":{"type":"text","dimensions":0,"max_length":8}}'
description: |-
Only present on responses to non-live requests.
A JSON string of an object that maps column names to the corresponding schema object.
The schema object contains the type of the column, the number of dimensions, and possibly additional properties.
Non-array types have a dimension of `0`, while array types have a dimension of 1 or more.
Expand Down

0 comments on commit 71d61b5

Please sign in to comment.