Skip to content

Commit

Permalink
fix: Handle 400 errors as unrecoverable (#1880)
Browse files Browse the repository at this point in the history
Fixes #1876

The issue was that we were sending `must-refetch` messages with some of
our 400s, although they were unrecoverable and re-fetching would trigger
an infinite loop.

I've made the server send 400 errors for the cases where the `shape_id`
and `shape_definition` do not match, with an explanatory message, and
I've made the client terminate when receiving such an error.

As a temporary measure, the error is not thrown into the ether but
stored in the `ShapeStream` (accessible via `stream.error`).

We have had some discussions with @balegas about making errors more
"handleable" by changing our approach to the `ShapeStream`
instantiation. I think this warrants opening a new issue and
corresponding PR to discuss more thoroughly.
  • Loading branch information
msfstef authored Oct 23, 2024
1 parent fedf08a commit b3926a4
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 15 deletions.
6 changes: 6 additions & 0 deletions .changeset/pretty-scissors-search.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@electric-sql/client": patch
"@core/sync-service": patch
---

Handle 400 errors as unrecoverable rather than `must-refetch` cases
11 changes: 7 additions & 4 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ defmodule Electric.Plug.ServeShapePlug do
# Control messages
@up_to_date [Jason.encode!(%{headers: %{control: "up-to-date"}})]
@must_refetch Jason.encode!([%{headers: %{control: "must-refetch"}}])
@shape_definition_mismatch Jason.encode!(%{
message:
"The specified shape definition and ID do not match. " <>
"Please ensure the shape definition is correct or omit the shape ID from the request to obtain a new one."
})

defmodule TimeUtils do
@oct9th2024 DateTime.from_naive!(~N[2024-10-09 00:00:00], "Etc/UTC")
Expand Down Expand Up @@ -236,7 +241,7 @@ defmodule Electric.Plug.ServeShapePlug do
# thus the shape ID does not match the shape definition
# and we return a 400 bad request status code
conn
|> send_resp(400, @must_refetch)
|> send_resp(400, @shape_definition_mismatch)
|> halt()
else
# The shape ID does not exist or no longer exists
Expand Down Expand Up @@ -269,10 +274,8 @@ defmodule Electric.Plug.ServeShapePlug do
if Shapes.has_shape?(config, shape_id) do
# The shape with the provided ID exists but does not match the shape definition
# otherwise we would have found it and it would have matched the previous function clause
IO.puts("400 - SHAPE ID NOT FOUND")

conn
|> send_resp(400, @must_refetch)
|> send_resp(400, @shape_definition_mismatch)
|> halt()
else
# The requested shape_id is not found, returns 409 along with a location redirect for clients to
Expand Down
7 changes: 6 additions & 1 deletion packages/sync-service/test/electric/plug/router_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,12 @@ defmodule Electric.Plug.RouterTest do
|> Router.call(opts)

assert %{status: 400} = conn
assert conn.resp_body == Jason.encode!([%{headers: %{control: "must-refetch"}}])

assert conn.resp_body ==
Jason.encode!(%{
message:
"The specified shape definition and ID do not match. Please ensure the shape definition is correct or omit the shape ID from the request to obtain a new one."
})
end

test "GET receives 409 to a newly created shape when shape ID is not found and no shape matches the shape definition",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,12 @@ defmodule Electric.Plug.ServeShapePlugTest do
|> ServeShapePlug.call([])

assert conn.status == 400
assert Jason.decode!(conn.resp_body) == [%{"headers" => %{"control" => "must-refetch"}}]

assert Jason.decode!(conn.resp_body) == %{
"message" =>
"The specified shape definition and ID do not match." <>
" Please ensure the shape definition is correct or omit the shape ID from the request to obtain a new one."
}
end

test "sends 400 when omitting primary key columns in selection" do
Expand Down
18 changes: 10 additions & 8 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
#connected: boolean = false
#shapeId?: string
#schema?: Schema
#error?: unknown

constructor(options: ShapeStreamOptions<GetExtensions<T>>) {
validateOptions(options)
Expand Down Expand Up @@ -193,6 +194,10 @@ export class ShapeStream<T extends Row<unknown> = Row>
return this.#isUpToDate
}

get error() {
return this.#error
}

async start() {
this.#isUpToDate = false

Expand Down Expand Up @@ -232,13 +237,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
} catch (e) {
if (e instanceof FetchBackoffAbortError) break // interrupted
if (!(e instanceof FetchError)) throw e // should never happen
if (e.status == 400) {
// The request is invalid, most likely because the shape has been deleted.
// We should start from scratch, this will force the shape to be recreated.
this.#reset()
await this.#publish(e.json as Message<T>[])
continue
} else if (e.status == 409) {
if (e.status == 409) {
// Upon receiving a 409, we should start from scratch
// with the newly provided shape ID
const newShapeId = e.headers[SHAPE_ID_HEADER]
Expand All @@ -250,7 +249,8 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#sendErrorToUpToDateSubscribers(e)
this.#sendErrorToSubscribers(e)

// 400 errors are not actionable without additional user input, so we're throwing them.
// 400 errors are not actionable without additional user input,
// so we exit the loop
throw e
}
}
Expand Down Expand Up @@ -301,6 +301,8 @@ export class ShapeStream<T extends Row<unknown> = Row>
}
}
}
} catch (err) {
this.#error = err
} finally {
this.#connected = false
}
Expand Down
38 changes: 37 additions & 1 deletion packages/typescript-client/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { parse } from 'cache-control-parser'
import { setTimeout as sleep } from 'node:timers/promises'
import { v4 as uuidv4 } from 'uuid'
import { assert, describe, expect, inject, vi } from 'vitest'
import { Shape, ShapeStream } from '../src'
import { FetchError, Shape, ShapeStream } from '../src'
import { Message, Offset } from '../src/types'
import { isChangeMessage, isUpToDateMessage } from '../src/helpers'
import {
Expand Down Expand Up @@ -802,6 +802,42 @@ describe(`HTTP Sync`, () => {
}
})

it(`should handle invalid requests by terminating stream`, async ({
expect,
issuesTableUrl,
aborter,
}) => {
const issueStream = new ShapeStream<IssueRow>({
url: `${BASE_URL}/v1/shape/${issuesTableUrl}`,
subscribe: true,
signal: aborter.signal,
})

await h.forEachMessage(issueStream, aborter, (res, msg) => {
if (isUpToDateMessage(msg)) res()
})

const invalidIssueStream = new ShapeStream<IssueRow>({
url: `${BASE_URL}/v1/shape/${issuesTableUrl}`,
subscribe: true,
shapeId: issueStream.shapeId,
where: `1=1`,
})

const errorSubscriberPromise = new Promise((_, reject) =>
invalidIssueStream.subscribe(() => {}, reject)
)
const errorUpToDateSubscriberPromise = new Promise((_, reject) =>
invalidIssueStream.subscribeOnceToUpToDate(() => {}, reject)
)

await expect(errorSubscriberPromise).rejects.toThrow(FetchError)
await expect(errorUpToDateSubscriberPromise).rejects.toThrow(FetchError)
expect(invalidIssueStream.error).instanceOf(FetchError)
expect((invalidIssueStream.error! as FetchError).status).toBe(400)
expect(invalidIssueStream.isConnected()).false
})

it(`should detect shape deprecation and restart syncing`, async ({
expect,
insertIssues,
Expand Down

0 comments on commit b3926a4

Please sign in to comment.