Skip to content

Commit

Permalink
feat: Handle invalidated replication slot which is an unrecoverable f…
Browse files Browse the repository at this point in the history
…ailure state (#106)

Here we add handling of the fatal error caused by replication slot
invalidation. Since there's no recovery procedure that Electric can do
in this case, it simply halts execution. The user should resolve the
issue in their Postgres database before restarting Electric.

Crucially, the only way to resolve this problem is to delete the
replication slot and reconfigure the database cluster. A legitimate
question arises: will Electric be able to recover the state of the
logical replication stream after it restarts and is forced to create a
new replication slot? This question remains unanswered for now. Later
I'll be sharing a document that outlines the problem in an even broader
context.

The other change here is a basic setup to write and run integration
tests using Lux.
  • Loading branch information
alco authored Aug 6, 2024
1 parent b9d42cd commit 077c609
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 26 deletions.
53 changes: 53 additions & 0 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
name: Integration Tests

on:
push:
branches: ['main']
pull_request:

permissions:
contents: read

jobs:
build:
name: Build and test
runs-on: ubuntu-latest
defaults:
run:
working-directory: integration-tests
steps:
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
with:
version-type: strict
version-file: '.tool-versions'
- name: Restore dependencies cache
uses: actions/cache@v3
with:
path: packages/sync-service/deps
key: ${{ runner.os }}-mix-${{ hashFiles('packages/sync-service/mix.lock') }}
restore-keys: ${{ runner.os }}-mix-
- name: Restore compiled code
uses: actions/cache/restore@v4
with:
path: |
packages/sync-service/_build/*/lib
!packages/sync-service/_build/*/lib/electric
key: ${{ runner.os }}-build-test-${{ hashFiles('packages/sync-service/mix.lock') }}
- name: Install dependencies
run: mix deps.get && mix deps.compile
working-directory: packages/sync-service
- name: Save compiled code
uses: actions/cache/save@v4
with:
path: |
packages/sync-service/_build/*/lib
!packages/sync-service/_build/*/lib/electric
key: ${{ runner.os }}-build-test-${{ hashFiles('packages/sync-service/mix.lock') }}
- name: Compile
run: mix compile --force --all-warnings --warnings-as-errors
working-directory: packages/sync-service
- name: Setup lux
run: make
- name: Run integration tests
run: ./run.sh
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
docs/.vitepress/dist
docs/.vitepress/cache
docs/public/openapi.html
integration-tests/lux/
integration-tests/lux_logs/
json_files
file.jsonl
node_modules
Expand Down
9 changes: 9 additions & 0 deletions integration-tests/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
LUX_BIN="$SCRIPT_DIR/lux/bin/lux"

lux:
git clone https://github.com/electric-sql/lux.git && \
cd lux && \
git checkout otp-27 && \
autoconf && \
./configure && \
make
7 changes: 7 additions & 0 deletions integration-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Integration tests

We're using [lux](https://github.com/electric-sql/lux/tree/otp-27) to run integration tests that require setting up and orchestrating multiple components.

To prepare your dev machine for running these tests, run `make` once.

To execute all tests, run `./run.sh`.
8 changes: 8 additions & 0 deletions integration-tests/electric_dev.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env bash

set -e

SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd)

cd "$SCRIPT_DIR"/../packages/sync-service
iex -S mix
10 changes: 10 additions & 0 deletions integration-tests/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env bash

set -e

SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd)

LUX_BIN="$SCRIPT_DIR/lux/bin/lux"
LUX="$LUX_BIN --multiplier=${TIMEOUT_MULTIPLIER:-1000}"

$LUX ${@:-tests/*.lux}
56 changes: 56 additions & 0 deletions integration-tests/tests/invalidated-replication-slot.lux
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[doc Verify handling of invalidated replication slot while Electric is running]

[include replication-slot-invalidation.luxinc]

[global pg_container_name=replication-slot-invalidated-while-running__pg]

[my invalidated_slot_error=
"""
[error] GenServer Electric.ConnectionManager terminating
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot"

This slot has been invalidated because it exceeded the maximum reserved size.
"""]

###

## Start a new Postgres cluster configured for easy replication slot invalidation.
[invoke setup_pg \
"--wal-segsize=1" \
"-c max_slot_wal_keep_size=1MB -c max_wal_size=2MB"]

## Start the sync service.
[invoke setup_electric]

[shell electric]
??[info] Starting replication from postgres

# Reset the failure pattern because we'll be matching on an error.
-

## Seed the database with enough data to exceed max_wal_size and force a checkpoint that
## will invalidate the replication slot.
[invoke seed_pg]

## Confirm slot invalidation in Postgres.
[shell pg]
?invalidating slot "electric_slot" because its restart_lsn \d+/\d+ exceeds max_slot_wal_keep_size

## Observe the fatal connection error.
[shell electric]
??$invalidated_slot_error

# Confirm Electric process exit.
??$PS1

## Start the sync service once again to verify that it crashes due to the invalidated slot error.
[invoke setup_electric]

[shell electric]
??[info] Starting replication from postgres
-
??$invalidated_slot_error
??$PS1

[cleanup]
[invoke teardown]
71 changes: 71 additions & 0 deletions integration-tests/tests/replication-slot-invalidation.luxinc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
[global PS1=SH-PROMPT:]
[global fail_pattern=(?i)error|fatal|no such]

[global pg_container_name=]
[global database_url=]

[macro setup_pg initdb_args config_opts]
[shell pg]
-$fail_pattern

!docker run --rm \
--name $pg_container_name \
-e POSTGRES_DB=electric \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_INITDB_ARGS=${initdb_args} \
-p 5432 \
postgres:14-alpine \
-c wal_level=logical ${config_opts}

??database system is ready to accept connections

# Reset the failure pattern to avoid false failures when Electric tries to create an already
# existing publication or replication slot.
-

[shell get_container_port]
!docker inspect $pg_container_name --format '{{json .NetworkSettings.Ports}}'
?"HostIp":"0.0.0.0","HostPort":"(\d+)"
[local port=$1]
[global database_url=postgresql://postgres:password@localhost:$port/postgres?sslmode=disable]
[endmacro]

[macro seed_pg]
[shell psql]
!docker exec -u postgres -it $pg_container_name psql

"""!
CREATE TABLE items2 (
id UUID PRIMARY KEY,
val1 TEXT,
val2 TEXT
);
"""
??CREATE TABLE

"""!
INSERT INTO
items2 (id, val1, val2)
SELECT
gen_random_uuid(),
'#' || generate_series || ' test val1 ' || repeat('012345679abcdef', 4096),
'#' || generate_series || ' test val2 ' || repeat('012345679abcdef', 4096)
FROM
generate_series(1, 2048);
"""
??INSERT 0 2048
[endmacro]

[macro setup_electric]
[shell electric]
-$fail_pattern

!DATABASE_URL=$database_url ../electric_dev.sh
[endmacro]

[macro teardown]
-$fail_pattern

!docker rm -f -v $pg_container_name
[endmacro]
55 changes: 30 additions & 25 deletions packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,24 +127,18 @@ defmodule Electric.ConnectionManager do

# When either the replication client or the connection pool shuts down, let the OTP
# supervisor restart the connection manager to initiate a new connection procedure from a clean
# slate.
def handle_info({:EXIT, pid, _reason} = message, state) do
if known_pid?(pid, state) do
reason =
if pid == state.replication_client_pid do
:replication_connection_closed
else
:database_connection_closed
end
# slate. That is, unless the error that caused the shutdown is unrecoverable and requires
# manual resolution in Postgres. In that case, we crash the whole server.
def handle_info({:EXIT, pid, reason}, state) do
halt_if_fatal_error!(reason)

{:stop, reason, state}
else
Logger.warning(
"#{inspect(__MODULE__)} process received #{inspect(message)} for an unknown PID."
)
tag =
cond do
pid == state.replication_client_pid -> :replication_connection
pid == state.pool_pid -> :database_pool
end

{:noreply, state}
end
{:stop, {tag, reason}, state}
end

defp start_replication_client(connection_opts, replication_opts) do
Expand Down Expand Up @@ -184,6 +178,8 @@ defmodule Electric.ConnectionManager do
end

defp handle_connection_error(error, state) do
halt_if_fatal_error!(error)

message =
case error do
%DBConnection.ConnectionError{message: message} ->
Expand Down Expand Up @@ -224,22 +220,31 @@ defmodule Electric.ConnectionManager do
end
end

@invalid_slot_detail "This slot has been invalidated because it exceeded the maximum reserved size."

defp halt_if_fatal_error!(
%Postgrex.Error{
postgres: %{
code: :object_not_in_prerequisite_state,
detail: @invalid_slot_detail,
pg_code: "55000",
routine: "StartLogicalReplication"
}
} = error
) do
System.stop(1)
exit(error)
end

defp halt_if_fatal_error!(_), do: nil

defp schedule_reconnection(step, %State{backoff: {backoff, _}} = state) do
{time, backoff} = :backoff.fail(backoff)
tref = :erlang.start_timer(time, self(), step)
Logger.warning("Reconnecting in #{inspect(time)}ms")
%State{state | backoff: {backoff, tref}}
end

defp known_pid?(pid, %{replication_client_pid: pid}), do: true
defp known_pid?(pid, %{pool_pid: pid}), do: true

# This is an edge case that's possible when we've starting the reconnection procedure already
# but still receive an EXIT message from the remaining one of the two processes.
defp known_pid?(_, %{replication_client_pid: nil, pool_pid: nil}), do: true

defp known_pid?(_, _), do: false

defp update_ssl_opts(connection_opts) do
ssl_opts =
case connection_opts[:sslmode] do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
query =
"START_REPLICATION SLOT #{state.slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{state.publication_name}')"

Logger.info("Started replication from postgres")
Logger.info("Starting replication from postgres")

{:stream, query, [], state}
end
Expand Down

0 comments on commit 077c609

Please sign in to comment.