Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handle replication slot conflicts #1762

Merged
merged 33 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5453e37
Use advisory lock to handle rolling deploys
msfstef Sep 26, 2024
a6dfec4
Configurable replication stream ID
msfstef Sep 26, 2024
7cf3f38
Quote all publication and replication slot names
msfstef Sep 26, 2024
3b0dba8
Implement utility for resetting the publication
msfstef Sep 26, 2024
a7ef1b5
Fix utils quoting test
msfstef Sep 26, 2024
09db776
Use hash of slot name for lock
msfstef Sep 26, 2024
20285c4
Fix integration test
msfstef Sep 26, 2024
51291ae
Add changeset
msfstef Sep 26, 2024
0acac52
Basic replication status work
msfstef Sep 26, 2024
4e34836
Remove connection manager as required opt
msfstef Sep 30, 2024
fbaa5a6
Use an independent connection lock
msfstef Sep 30, 2024
67dc96d
Add backoff retries
msfstef Sep 30, 2024
267d471
Check pool id as well for status
msfstef Sep 30, 2024
a1836a7
Fix integration test and lock restarting
msfstef Sep 30, 2024
b653044
Rename connection lock atom
msfstef Sep 30, 2024
3f47fbd
Fix health check endpoint to return correct messages
msfstef Sep 30, 2024
4e8e3e6
Add basic unit tests for health check endpoint
msfstef Sep 30, 2024
3bba4e2
Add integration test for rolling deploy
msfstef Sep 30, 2024
b203953
Fix weird formatting
msfstef Sep 30, 2024
a7e1897
Return 503 for stopping state
msfstef Sep 30, 2024
017ed32
Transform start link keyword argument
msfstef Sep 30, 2024
f2d6250
Mock lock connection under Postgres
msfstef Sep 30, 2024
934c35d
Add basic lock connection tests
msfstef Sep 30, 2024
4655e00
Add comments explaining seemingly unnecessary pattern matching
msfstef Sep 30, 2024
49d6620
Update changeset
msfstef Sep 30, 2024
3269786
Add basic integration test for health endoint
msfstef Sep 30, 2024
3ac6845
Add health check plug to integration testing for rolling deploys
msfstef Sep 30, 2024
50e95f0
Assert electric process exits
msfstef Oct 1, 2024
685bc4f
Log periodic messages while lock is not acquired
msfstef Oct 1, 2024
f22c669
Add crash recovery integration test
msfstef Oct 1, 2024
c024581
Replace positional matching on a keyword list with Keyword.pop()
alco Oct 1, 2024
b0d928b
Remove a noop "with" from ServiceStatus.check()
alco Oct 1, 2024
dc8c980
Adress PR comments
msfstef Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/poor-candles-fly.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@core/sync-service": patch
---

- Wait for advisory lock on replication slot to enable rolling deploys.
- Configurable replication slot and publication name using `REPLICATION_STREAM_ID` environment variable.
- Add `HealthCheckPlug` API endopint at `v1/health` that returns `waiting`, `starting`,and `active` statuses.
65 changes: 65 additions & 0 deletions integration-tests/tests/crash-recovery.lux
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
[doc Verify handling of an Electric crash recovery]

[include macros.luxinc]

[global pg_container_name=crash-recovery__pg]

###

## Start a new Postgres cluster
[invoke setup_pg "" ""]

## Add some data
[invoke start_psql]
[shell psql]
"""!
CREATE TABLE items (
id UUID PRIMARY KEY,
val TEXT
);
"""
??CREATE TABLE

"""!
INSERT INTO
items (id, val)
SELECT
gen_random_uuid(),
'#' || generate_series || ' test val'
FROM
generate_series(1, 10);
"""
??INSERT 0 10

## Start the sync service.
[invoke setup_electric]

[shell electric]
??[info] Starting replication from postgres

# Initialize a shape and collect the offset
[shell client]
# strip ANSI codes from response for easier matching
!curl -v -X GET http://localhost:3000/v1/shape/items?offset=-1
?electric-shape-id: ([\d-]+)
[local shape_id=$1]
?electric-chunk-last-offset: ([\w\d_]+)
[local last_offset=$1]

## Terminate electric
[shell electric]
!System.halt()
??$PS1

## Start the sync service again.
[invoke setup_electric]
[shell electric]
??[info] Starting replication from postgres

# Client should be able to continue same shape
[shell client]
!curl -v -X GET "http://localhost:3000/v1/shape/items?offset=$last_offset&shape_id=$shape_id"
??HTTP/1.1 200 OK

[cleanup]
[invoke teardown]
4 changes: 2 additions & 2 deletions integration-tests/tests/invalidated-replication-slot.lux
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[my invalidated_slot_error=
"""
[error] GenServer Electric.ConnectionManager terminating
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot"
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot_default"

This slot has been invalidated because it exceeded the maximum reserved size.
"""]
Expand All @@ -34,7 +34,7 @@

## Confirm slot invalidation in Postgres.
[shell pg]
?invalidating slot "electric_slot" because its restart_lsn \d+/\d+ exceeds max_slot_wal_keep_size
?invalidating slot "electric_slot_default" because its restart_lsn [\d\w]+/[\d\w]+ exceeds max_slot_wal_keep_size

## Observe the fatal connection error.
[shell electric]
Expand Down
13 changes: 11 additions & 2 deletions integration-tests/tests/macros.luxinc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
??database system is ready to accept connections
[endmacro]

[macro start_psql]
[shell psql]
!docker exec -u postgres -it $pg_container_name psql
[endmacro]

[macro seed_pg]
[shell psql]
!docker exec -u postgres -it $pg_container_name psql
Expand Down Expand Up @@ -68,10 +73,14 @@
[endmacro]

[macro setup_electric]
[shell electric]
[invoke setup_electric_shell "electric" "3000"]
[endmacro]

[macro setup_electric_shell shell_name port]
[shell $shell_name]
-$fail_pattern

!DATABASE_URL=$database_url ../electric_dev.sh
!DATABASE_URL=$database_url PORT=$port ../electric_dev.sh
[endmacro]

[macro teardown]
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/postgres-disconnection.lux
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

## Observe the connection error.
[shell electric]
??[warning] Database connection in replication mode failed
??[warning] Database connection in lock_connection mode failed
??[warning] Reconnecting in

## Start the Postgres container back up.
Expand Down
62 changes: 62 additions & 0 deletions integration-tests/tests/rolling-deploy.lux
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
[doc Verify handling of an Electric rolling deploy]

[include macros.luxinc]

[global pg_container_name=rolling-deploy__pg]

###

## Start a new Postgres cluster
[invoke setup_pg "" ""]

## Start the first sync service.
[invoke setup_electric_shell "electric_1" "3000"]

[shell electric_1]
??[info] Acquiring lock from postgres with name electric_slot_default
??[info] Lock acquired from postgres with name electric_slot_default
??[info] Starting replication from postgres

# First service should be health and active
[shell orchestator]
!curl -X GET http://localhost:3000/v1/health
??{"status":"active"}

## Start the second sync service.
[invoke setup_electric_shell "electric_2" "3001"]

## Assert that the lock is not acquired and replication does not start
## in the second electric
[shell electric_2]
-Lock acquired from postgres|Starting replication from postgres|$fail_pattern
??[info] Acquiring lock from postgres with name electric_slot_default
[sleep 2]


# Second service should be in waiting state, ready to take over
[shell orchestator]
!curl -X GET http://localhost:3000/v1/health
??{"status":"active"}
!curl -X GET http://localhost:3001/v1/health
??{"status":"waiting"}

## Terminate first electric
[shell electric_1]
!System.halt()

# Confirm Electric process exit.
??$PS1

## Lock should now be acquired and replication starting
[shell electric_2]
-$fail_pattern
??[info] Lock acquired from postgres with name electric_slot_default
??[info] Starting replication from postgres

# Second service is now healthy and active
[shell orchestator]
!curl -X GET http://localhost:3001/v1/health
??{"status":"active"}

[cleanup]
[invoke teardown]
2 changes: 2 additions & 0 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ config :electric,
instance_id: instance_id,
telemetry_statsd_host: statsd_host,
db_pool_size: env!("DB_POOL_SIZE", :integer, 50),
replication_stream_id: env!("REPLICATION_STREAM_ID", :string, "default"),
service_port: env!("PORT", :integer, 3000),
prometheus_port: prometheus_port,
storage: storage,
persistent_kv: persistent_kv
16 changes: 13 additions & 3 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ defmodule Electric.Application do

persistent_kv = apply(kv_module, kv_fun, [kv_params])

publication_name = "electric_publication"
slot_name = "electric_slot"
replication_stream_id = Application.fetch_env!(:electric, :replication_stream_id)
publication_name = "electric_publication_#{replication_stream_id}"
slot_name = "electric_slot_#{replication_stream_id}"

with {:ok, storage_opts} <- storage_module.shared_opts(storage_opts) do
storage = {storage_module, storage_opts}
Expand All @@ -32,6 +33,14 @@ defmodule Electric.Application do
Electric.ConnectionManager.get_pg_version(Electric.ConnectionManager)
end

get_service_status = fn ->
Electric.ServiceStatus.check(
get_connection_status: fn ->
Electric.ConnectionManager.get_status(Electric.ConnectionManager)
end
)
end

prepare_tables_fn =
{Electric.Postgres.Configuration, :configure_tables_for_replication!,
[get_pg_version, publication_name]}
Expand Down Expand Up @@ -103,12 +112,13 @@ defmodule Electric.Application do
storage: storage,
registry: Registry.ShapeChanges,
shape_cache: {Electric.ShapeCache, []},
get_service_status: get_service_status,
inspector: inspector,
long_poll_timeout: 20_000,
max_age: Application.fetch_env!(:electric, :cache_max_age),
stale_age: Application.fetch_env!(:electric, :cache_stale_age),
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)},
port: 3000,
port: Application.fetch_env!(:electric, :service_port),
thousand_island_options: http_listener_options()}
]
|> add_prometheus_router(Application.fetch_env!(:electric, :prometheus_port))
Expand Down
77 changes: 73 additions & 4 deletions packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ defmodule Electric.ConnectionManager do
:shape_cache,
# PID of the replication client.
:replication_client_pid,
# PID of the Postgres connection lock.
:lock_connection_pid,
# PID of the database connection pool (a `Postgrex` process).
:pool_pid,
# Backoff term used for reconnection with exponential back-off.
:backoff,
# Flag indicating whether the lock on the replication has been acquired.
:pg_lock_acquired,
# PostgreSQL server version
:pg_version,
:electric_instance_id
Expand All @@ -57,6 +61,8 @@ defmodule Electric.ConnectionManager do

require Logger

@type status :: :waiting | :starting | :active

@type option ::
{:connection_opts, Keyword.t()}
| {:replication_opts, Keyword.t()}
Expand All @@ -69,6 +75,8 @@ defmodule Electric.ConnectionManager do

@name __MODULE__

@lock_status_logging_interval 10_000

@doc """
Returns the version of the PostgreSQL server.
"""
Expand All @@ -77,6 +85,14 @@ defmodule Electric.ConnectionManager do
GenServer.call(server, :get_pg_version)
end

@doc """
Returns the status of the connection manager.
"""
@spec get_status(GenServer.server()) :: status()
def get_status(server) do
GenServer.call(server, :get_status)
end

@spec start_link(options) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: @name)
Expand Down Expand Up @@ -112,21 +128,54 @@ defmodule Electric.ConnectionManager do
timeline_opts: timeline_opts,
log_collector: Keyword.fetch!(opts, :log_collector),
shape_cache: Keyword.fetch!(opts, :shape_cache),
pg_lock_acquired: false,
backoff: {:backoff.init(1000, 10_000), nil},
electric_instance_id: Keyword.fetch!(opts, :electric_instance_id)
}

# We try to start the replication connection first because it requires additional
# priveleges compared to regular "pooled" connections, so failure to open a replication
# connection should be reported ASAP.
{:ok, state, {:continue, :start_replication_client}}
# Try to acquire the connection lock on the replication slot
# before starting shape and replication processes, to ensure
# a single active sync service is connected to Postgres per slot.
{:ok, state, {:continue, :start_lock_connection}}
end

@impl true
def handle_call(:get_pg_version, _from, %{pg_version: pg_version} = state) do
{:reply, pg_version, state}
end

def handle_call(:get_status, _from, %{pg_lock_acquired: pg_lock_acquired} = state) do
status =
cond do
not pg_lock_acquired ->
:waiting

is_nil(state.replication_client_pid) || is_nil(state.pool_pid) ||
not Process.alive?(state.pool_pid) ->
:starting

true ->
:active
end

{:reply, status, state}
end

def handle_continue(:start_lock_connection, state) do
case Electric.Postgres.LockConnection.start_link(
connection_opts: state.connection_opts,
connection_manager: self(),
lock_name: Keyword.fetch!(state.replication_opts, :slot_name)
) do
{:ok, lock_connection_pid} ->
Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval)
{:noreply, %{state | lock_connection_pid: lock_connection_pid}}

{:error, reason} ->
handle_connection_error(reason, state, "lock_connection")
end
end

@impl true
def handle_continue(:start_replication_client, state) do
case start_replication_client(state) do
Expand Down Expand Up @@ -188,6 +237,7 @@ defmodule Electric.ConnectionManager do

tag =
cond do
pid == state.lock_connection_pid -> :lock_connection
pid == state.replication_client_pid -> :replication_connection
pid == state.pool_pid -> :database_pool
end
Expand All @@ -203,6 +253,17 @@ defmodule Electric.ConnectionManager do
{:noreply, %{state | replication_client_pid: nil}}
end

# Periodically log the status of the lock connection until it is acquired for
# easier debugging and diagnostics.
def handle_info(:log_lock_connection_status, state) do
if not state.pg_lock_acquired do
Logger.warning(fn -> "Waiting for postgres lock to be acquired..." end)
Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval)
end

{:noreply, state}
end

@impl true
def handle_cast({:connection_opts, pid, connection_opts}, state) do
Process.monitor(pid)
Expand All @@ -220,6 +281,13 @@ defmodule Electric.ConnectionManager do
end
end

def handle_cast(:lock_connection_acquired, %{pg_lock_acquired: false} = state) do
msfstef marked this conversation as resolved.
Show resolved Hide resolved
# As soon as we acquire the connection lock, we try to start the replication connection
# first because it requires additional privileges compared to regular "pooled" connections,
# so failure to open a replication connection should be reported ASAP.
{:noreply, %{state | pg_lock_acquired: true}, {:continue, :start_replication_client}}
end

defp start_replication_client(state) do
Electric.Shapes.Supervisor.start_link(
electric_instance_id: state.electric_instance_id,
Expand Down Expand Up @@ -273,6 +341,7 @@ defmodule Electric.ConnectionManager do

step =
cond do
is_nil(state.lock_connection_pid) -> :start_lock_connection
is_nil(state.replication_client_pid) -> :start_replication_client
is_nil(state.pool_pid) -> :start_connection_pool
end
Expand Down
Loading
Loading