From 7de9f1d529f622b91b809290ee867cb5dec9badd Mon Sep 17 00:00:00 2001 From: Stefanos Mousafeiris Date: Wed, 23 Oct 2024 13:59:02 +0300 Subject: [PATCH] fix: Handle 400 errors as unrecoverable (#1880) Fixes https://github.com/electric-sql/electric/issues/1876 The issue was that we were sending `must-refetch` messages with some of our 400s, although they were unrecoverable and re-fetching would trigger an infinite loop. I've made the server send 400 errors for the cases where the `shape_id` and `shape_definition` do not match, with an explanatory message, and I've made the client terminate when receiving such an error. As a temporary measure, the error is not thrown into the ether but stored in the `ShapeStream` (accessible via `stream.error`). We have had some discussions with @balegas about making errors more "handleable" by changing our approach to the `ShapeStream` instantiation. I think this warrants opening a new issue and corresponding PR to discuss more thoroughly. --- .changeset/pretty-scissors-search.md | 6 +++ .../lib/electric/plug/serve_shape_plug.ex | 11 ++++-- .../test/electric/plug/router_test.exs | 7 +++- .../electric/plug/serve_shape_plug_test.exs | 7 +++- packages/typescript-client/src/client.ts | 18 +++++---- .../test/integration.test.ts | 38 ++++++++++++++++++- 6 files changed, 72 insertions(+), 15 deletions(-) create mode 100644 .changeset/pretty-scissors-search.md diff --git a/.changeset/pretty-scissors-search.md b/.changeset/pretty-scissors-search.md new file mode 100644 index 0000000000..9ce014047f --- /dev/null +++ b/.changeset/pretty-scissors-search.md @@ -0,0 +1,6 @@ +--- +"@electric-sql/client": patch +"@core/sync-service": patch +--- + +Handle 400 errors as unrecoverable rather than `must-refetch` cases diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index 48fab9bdf6..142368b8db 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -22,6 +22,11 @@ defmodule Electric.Plug.ServeShapePlug do # Control messages @up_to_date [Jason.encode!(%{headers: %{control: "up-to-date"}})] @must_refetch Jason.encode!([%{headers: %{control: "must-refetch"}}]) + @shape_definition_mismatch Jason.encode!(%{ + message: + "The specified shape definition and ID do not match. " <> + "Please ensure the shape definition is correct or omit the shape ID from the request to obtain a new one." + }) defmodule TimeUtils do @oct9th2024 DateTime.from_naive!(~N[2024-10-09 00:00:00], "Etc/UTC") @@ -236,7 +241,7 @@ defmodule Electric.Plug.ServeShapePlug do # thus the shape ID does not match the shape definition # and we return a 400 bad request status code conn - |> send_resp(400, @must_refetch) + |> send_resp(400, @shape_definition_mismatch) |> halt() else # The shape ID does not exist or no longer exists @@ -269,10 +274,8 @@ defmodule Electric.Plug.ServeShapePlug do if Shapes.has_shape?(config, shape_id) do # The shape with the provided ID exists but does not match the shape definition # otherwise we would have found it and it would have matched the previous function clause - IO.puts("400 - SHAPE ID NOT FOUND") - conn - |> send_resp(400, @must_refetch) + |> send_resp(400, @shape_definition_mismatch) |> halt() else # The requested shape_id is not found, returns 409 along with a location redirect for clients to diff --git a/packages/sync-service/test/electric/plug/router_test.exs b/packages/sync-service/test/electric/plug/router_test.exs index 5ad4908b7d..a4e1963ab1 100644 --- a/packages/sync-service/test/electric/plug/router_test.exs +++ b/packages/sync-service/test/electric/plug/router_test.exs @@ -771,7 +771,12 @@ defmodule Electric.Plug.RouterTest do |> Router.call(opts) assert %{status: 400} = conn - assert conn.resp_body == Jason.encode!([%{headers: %{control: "must-refetch"}}]) + + assert conn.resp_body == + Jason.encode!(%{ + message: + "The specified shape definition and ID do not match. Please ensure the shape definition is correct or omit the shape ID from the request to obtain a new one." + }) end test "GET receives 409 to a newly created shape when shape ID is not found and no shape matches the shape definition", diff --git a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs index 0341ba7c26..b42fcbcb36 100644 --- a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs +++ b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs @@ -603,7 +603,12 @@ defmodule Electric.Plug.ServeShapePlugTest do |> ServeShapePlug.call([]) assert conn.status == 400 - assert Jason.decode!(conn.resp_body) == [%{"headers" => %{"control" => "must-refetch"}}] + + assert Jason.decode!(conn.resp_body) == %{ + "message" => + "The specified shape definition and ID do not match." <> + " Please ensure the shape definition is correct or omit the shape ID from the request to obtain a new one." + } end test "sends 400 when omitting primary key columns in selection" do diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 1434d07346..1aa3f3cc13 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -159,6 +159,7 @@ export class ShapeStream = Row> #connected: boolean = false #shapeId?: string #schema?: Schema + #error?: unknown constructor(options: ShapeStreamOptions>) { validateOptions(options) @@ -193,6 +194,10 @@ export class ShapeStream = Row> return this.#isUpToDate } + get error() { + return this.#error + } + async start() { this.#isUpToDate = false @@ -232,13 +237,7 @@ export class ShapeStream = Row> } catch (e) { if (e instanceof FetchBackoffAbortError) break // interrupted if (!(e instanceof FetchError)) throw e // should never happen - if (e.status == 400) { - // The request is invalid, most likely because the shape has been deleted. - // We should start from scratch, this will force the shape to be recreated. - this.#reset() - await this.#publish(e.json as Message[]) - continue - } else if (e.status == 409) { + if (e.status == 409) { // Upon receiving a 409, we should start from scratch // with the newly provided shape ID const newShapeId = e.headers[SHAPE_ID_HEADER] @@ -250,7 +249,8 @@ export class ShapeStream = Row> this.#sendErrorToUpToDateSubscribers(e) this.#sendErrorToSubscribers(e) - // 400 errors are not actionable without additional user input, so we're throwing them. + // 400 errors are not actionable without additional user input, + // so we exit the loop throw e } } @@ -301,6 +301,8 @@ export class ShapeStream = Row> } } } + } catch (err) { + this.#error = err } finally { this.#connected = false } diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index 441097d9b1..2198f5a2be 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -2,7 +2,7 @@ import { parse } from 'cache-control-parser' import { setTimeout as sleep } from 'node:timers/promises' import { v4 as uuidv4 } from 'uuid' import { assert, describe, expect, inject, vi } from 'vitest' -import { Shape, ShapeStream } from '../src' +import { FetchError, Shape, ShapeStream } from '../src' import { Message, Offset } from '../src/types' import { isChangeMessage, isUpToDateMessage } from '../src/helpers' import { @@ -802,6 +802,42 @@ describe(`HTTP Sync`, () => { } }) + it(`should handle invalid requests by terminating stream`, async ({ + expect, + issuesTableUrl, + aborter, + }) => { + const issueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape/${issuesTableUrl}`, + subscribe: true, + signal: aborter.signal, + }) + + await h.forEachMessage(issueStream, aborter, (res, msg) => { + if (isUpToDateMessage(msg)) res() + }) + + const invalidIssueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape/${issuesTableUrl}`, + subscribe: true, + shapeId: issueStream.shapeId, + where: `1=1`, + }) + + const errorSubscriberPromise = new Promise((_, reject) => + invalidIssueStream.subscribe(() => {}, reject) + ) + const errorUpToDateSubscriberPromise = new Promise((_, reject) => + invalidIssueStream.subscribeOnceToUpToDate(() => {}, reject) + ) + + await expect(errorSubscriberPromise).rejects.toThrow(FetchError) + await expect(errorUpToDateSubscriberPromise).rejects.toThrow(FetchError) + expect(invalidIssueStream.error).instanceOf(FetchError) + expect((invalidIssueStream.error! as FetchError).status).toBe(400) + expect(invalidIssueStream.isConnected()).false + }) + it(`should detect shape deprecation and restart syncing`, async ({ expect, insertIssues,