From 077c6090c923f4734bdfc0b32d4b201b1b768c9e Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 6 Aug 2024 16:35:19 +0300 Subject: [PATCH] feat: Handle invalidated replication slot which is an unrecoverable failure state (#106) Here we add handling of the fatal error caused by replication slot invalidation. Since there's no recovery procedure that Electric can do in this case, it simply halts execution. The user should resolve the issue in their Postgres database before restarting Electric. Crucially, the only way to resolve this problem is to delete the replication slot and reconfigure the database cluster. A legitimate question arises: will Electric be able to recover the state of the logical replication stream after it restarts and is forced to create a new replication slot? This question remains unanswered for now. Later I'll be sharing a document that outlines the problem in an even broader context. The other change here is a basic setup to write and run integration tests using Lux. --- .github/workflows/integration_tests.yml | 53 ++++++++++++++ .gitignore | 2 + integration-tests/Makefile | 9 +++ integration-tests/README.md | 7 ++ integration-tests/electric_dev.sh | 8 +++ integration-tests/run.sh | 10 +++ .../tests/invalidated-replication-slot.lux | 56 +++++++++++++++ .../replication-slot-invalidation.luxinc | 71 +++++++++++++++++++ .../lib/electric/connection_manager.ex | 55 +++++++------- .../replication_client/connection_setup.ex | 2 +- 10 files changed, 247 insertions(+), 26 deletions(-) create mode 100644 .github/workflows/integration_tests.yml create mode 100644 integration-tests/Makefile create mode 100644 integration-tests/README.md create mode 100755 integration-tests/electric_dev.sh create mode 100755 integration-tests/run.sh create mode 100644 integration-tests/tests/invalidated-replication-slot.lux create mode 100644 integration-tests/tests/replication-slot-invalidation.luxinc diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml new file mode 100644 index 0000000000..915a34c571 --- /dev/null +++ b/.github/workflows/integration_tests.yml @@ -0,0 +1,53 @@ +name: Integration Tests + +on: + push: + branches: ['main'] + pull_request: + +permissions: + contents: read + +jobs: + build: + name: Build and test + runs-on: ubuntu-latest + defaults: + run: + working-directory: integration-tests + steps: + - uses: actions/checkout@v4 + - uses: erlef/setup-beam@v1 + with: + version-type: strict + version-file: '.tool-versions' + - name: Restore dependencies cache + uses: actions/cache@v3 + with: + path: packages/sync-service/deps + key: ${{ runner.os }}-mix-${{ hashFiles('packages/sync-service/mix.lock') }} + restore-keys: ${{ runner.os }}-mix- + - name: Restore compiled code + uses: actions/cache/restore@v4 + with: + path: | + packages/sync-service/_build/*/lib + !packages/sync-service/_build/*/lib/electric + key: ${{ runner.os }}-build-test-${{ hashFiles('packages/sync-service/mix.lock') }} + - name: Install dependencies + run: mix deps.get && mix deps.compile + working-directory: packages/sync-service + - name: Save compiled code + uses: actions/cache/save@v4 + with: + path: | + packages/sync-service/_build/*/lib + !packages/sync-service/_build/*/lib/electric + key: ${{ runner.os }}-build-test-${{ hashFiles('packages/sync-service/mix.lock') }} + - name: Compile + run: mix compile --force --all-warnings --warnings-as-errors + working-directory: packages/sync-service + - name: Setup lux + run: make + - name: Run integration tests + run: ./run.sh diff --git a/.gitignore b/.gitignore index 64b02b09a2..e84dd8e882 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ docs/.vitepress/dist docs/.vitepress/cache docs/public/openapi.html +integration-tests/lux/ +integration-tests/lux_logs/ json_files file.jsonl node_modules diff --git a/integration-tests/Makefile b/integration-tests/Makefile new file mode 100644 index 0000000000..eadf02ed90 --- /dev/null +++ b/integration-tests/Makefile @@ -0,0 +1,9 @@ +LUX_BIN="$SCRIPT_DIR/lux/bin/lux" + +lux: + git clone https://github.com/electric-sql/lux.git && \ + cd lux && \ + git checkout otp-27 && \ + autoconf && \ + ./configure && \ + make diff --git a/integration-tests/README.md b/integration-tests/README.md new file mode 100644 index 0000000000..b2c7c60dfc --- /dev/null +++ b/integration-tests/README.md @@ -0,0 +1,7 @@ +# Integration tests + +We're using [lux](https://github.com/electric-sql/lux/tree/otp-27) to run integration tests that require setting up and orchestrating multiple components. + +To prepare your dev machine for running these tests, run `make` once. + +To execute all tests, run `./run.sh`. diff --git a/integration-tests/electric_dev.sh b/integration-tests/electric_dev.sh new file mode 100755 index 0000000000..a7c90143f8 --- /dev/null +++ b/integration-tests/electric_dev.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +set -e + +SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd) + +cd "$SCRIPT_DIR"/../packages/sync-service +iex -S mix diff --git a/integration-tests/run.sh b/integration-tests/run.sh new file mode 100755 index 0000000000..6cc7f94469 --- /dev/null +++ b/integration-tests/run.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +set -e + +SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd) + +LUX_BIN="$SCRIPT_DIR/lux/bin/lux" +LUX="$LUX_BIN --multiplier=${TIMEOUT_MULTIPLIER:-1000}" + +$LUX ${@:-tests/*.lux} diff --git a/integration-tests/tests/invalidated-replication-slot.lux b/integration-tests/tests/invalidated-replication-slot.lux new file mode 100644 index 0000000000..e4ab4ee933 --- /dev/null +++ b/integration-tests/tests/invalidated-replication-slot.lux @@ -0,0 +1,56 @@ +[doc Verify handling of invalidated replication slot while Electric is running] + +[include replication-slot-invalidation.luxinc] + +[global pg_container_name=replication-slot-invalidated-while-running__pg] + +[my invalidated_slot_error= + """ + [error] GenServer Electric.ConnectionManager terminating + ** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot" + + This slot has been invalidated because it exceeded the maximum reserved size. + """] + +### + +## Start a new Postgres cluster configured for easy replication slot invalidation. +[invoke setup_pg \ + "--wal-segsize=1" \ + "-c max_slot_wal_keep_size=1MB -c max_wal_size=2MB"] + +## Start the sync service. +[invoke setup_electric] + +[shell electric] + ??[info] Starting replication from postgres + + # Reset the failure pattern because we'll be matching on an error. + - + +## Seed the database with enough data to exceed max_wal_size and force a checkpoint that +## will invalidate the replication slot. +[invoke seed_pg] + +## Confirm slot invalidation in Postgres. +[shell pg] + ?invalidating slot "electric_slot" because its restart_lsn \d+/\d+ exceeds max_slot_wal_keep_size + +## Observe the fatal connection error. +[shell electric] + ??$invalidated_slot_error + + # Confirm Electric process exit. + ??$PS1 + +## Start the sync service once again to verify that it crashes due to the invalidated slot error. +[invoke setup_electric] + +[shell electric] + ??[info] Starting replication from postgres + - + ??$invalidated_slot_error + ??$PS1 + +[cleanup] + [invoke teardown] diff --git a/integration-tests/tests/replication-slot-invalidation.luxinc b/integration-tests/tests/replication-slot-invalidation.luxinc new file mode 100644 index 0000000000..2a1a52bcc4 --- /dev/null +++ b/integration-tests/tests/replication-slot-invalidation.luxinc @@ -0,0 +1,71 @@ +[global PS1=SH-PROMPT:] +[global fail_pattern=(?i)error|fatal|no such] + +[global pg_container_name=] +[global database_url=] + +[macro setup_pg initdb_args config_opts] + [shell pg] + -$fail_pattern + + !docker run --rm \ + --name $pg_container_name \ + -e POSTGRES_DB=electric \ + -e POSTGRES_USER=postgres \ + -e POSTGRES_PASSWORD=password \ + -e POSTGRES_INITDB_ARGS=${initdb_args} \ + -p 5432 \ + postgres:14-alpine \ + -c wal_level=logical ${config_opts} + + ??database system is ready to accept connections + + # Reset the failure pattern to avoid false failures when Electric tries to create an already + # existing publication or replication slot. + - + + [shell get_container_port] + !docker inspect $pg_container_name --format '{{json .NetworkSettings.Ports}}' + ?"HostIp":"0.0.0.0","HostPort":"(\d+)" + [local port=$1] + [global database_url=postgresql://postgres:password@localhost:$port/postgres?sslmode=disable] +[endmacro] + +[macro seed_pg] + [shell psql] + !docker exec -u postgres -it $pg_container_name psql + + """! + CREATE TABLE items2 ( + id UUID PRIMARY KEY, + val1 TEXT, + val2 TEXT + ); + """ + ??CREATE TABLE + + """! + INSERT INTO + items2 (id, val1, val2) + SELECT + gen_random_uuid(), + '#' || generate_series || ' test val1 ' || repeat('012345679abcdef', 4096), + '#' || generate_series || ' test val2 ' || repeat('012345679abcdef', 4096) + FROM + generate_series(1, 2048); + """ + ??INSERT 0 2048 +[endmacro] + +[macro setup_electric] + [shell electric] + -$fail_pattern + + !DATABASE_URL=$database_url ../electric_dev.sh +[endmacro] + +[macro teardown] + -$fail_pattern + + !docker rm -f -v $pg_container_name +[endmacro] diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index d7a8d1253b..3e5374a47c 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -127,24 +127,18 @@ defmodule Electric.ConnectionManager do # When either the replication client or the connection pool shuts down, let the OTP # supervisor restart the connection manager to initiate a new connection procedure from a clean - # slate. - def handle_info({:EXIT, pid, _reason} = message, state) do - if known_pid?(pid, state) do - reason = - if pid == state.replication_client_pid do - :replication_connection_closed - else - :database_connection_closed - end + # slate. That is, unless the error that caused the shutdown is unrecoverable and requires + # manual resolution in Postgres. In that case, we crash the whole server. + def handle_info({:EXIT, pid, reason}, state) do + halt_if_fatal_error!(reason) - {:stop, reason, state} - else - Logger.warning( - "#{inspect(__MODULE__)} process received #{inspect(message)} for an unknown PID." - ) + tag = + cond do + pid == state.replication_client_pid -> :replication_connection + pid == state.pool_pid -> :database_pool + end - {:noreply, state} - end + {:stop, {tag, reason}, state} end defp start_replication_client(connection_opts, replication_opts) do @@ -184,6 +178,8 @@ defmodule Electric.ConnectionManager do end defp handle_connection_error(error, state) do + halt_if_fatal_error!(error) + message = case error do %DBConnection.ConnectionError{message: message} -> @@ -224,6 +220,24 @@ defmodule Electric.ConnectionManager do end end + @invalid_slot_detail "This slot has been invalidated because it exceeded the maximum reserved size." + + defp halt_if_fatal_error!( + %Postgrex.Error{ + postgres: %{ + code: :object_not_in_prerequisite_state, + detail: @invalid_slot_detail, + pg_code: "55000", + routine: "StartLogicalReplication" + } + } = error + ) do + System.stop(1) + exit(error) + end + + defp halt_if_fatal_error!(_), do: nil + defp schedule_reconnection(step, %State{backoff: {backoff, _}} = state) do {time, backoff} = :backoff.fail(backoff) tref = :erlang.start_timer(time, self(), step) @@ -231,15 +245,6 @@ defmodule Electric.ConnectionManager do %State{state | backoff: {backoff, tref}} end - defp known_pid?(pid, %{replication_client_pid: pid}), do: true - defp known_pid?(pid, %{pool_pid: pid}), do: true - - # This is an edge case that's possible when we've starting the reconnection procedure already - # but still receive an EXIT message from the remaining one of the two processes. - defp known_pid?(_, %{replication_client_pid: nil, pool_pid: nil}), do: true - - defp known_pid?(_, _), do: false - defp update_ssl_opts(connection_opts) do ssl_opts = case connection_opts[:sslmode] do diff --git a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex index 22a7d302fe..3dfdcbc041 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex @@ -146,7 +146,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do query = "START_REPLICATION SLOT #{state.slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{state.publication_name}')" - Logger.info("Started replication from postgres") + Logger.info("Starting replication from postgres") {:stream, query, [], state} end