Skip to content

Commit

Permalink
fix: Handle replication slot conflicts (#1762)
Browse files Browse the repository at this point in the history
Addresses #1749

- Makes publication and slot names configurable via a
`REPLICATION_STREAM_ID` env variable, which can ultimately be used for
multiple electric deploys
- Quotes all publication and slot names to address potential issues with
configurable names (alternative is to force downcase them when
initialised to avoid nasty case-sensitive bugs)
- Waits for a message from `Electric.LockConnection` that the lock is
acquired before initialising `ConnectionManager` with the replication
stream and shapes.
- If more than one Electric tries to connect to the same replication
slot (with the same `REPLICATION_STREAM_ID`), it will make a blocking
query to acquire the lock that will resolve once the previous Electric
using that slot releases it - this addresses rolling deploys, and
ensures resources are initialised only once the previous Electric has
released them
- Could potentially switch to `pg_try_advisory_lock` that is not a
blocking query but immediately returns whether the lock could be
acquired and implement retries with backoff, but since using
`pg_advisory_lock` simplifies the implementation I decided to start with
that and see what people think.
 

Things that I still need to address:
- Currently the publication gets altered when a shape is created (adds a
table and potentially a row filter) but no cleanup occurs - so the
publication can potentially grow to include everything between restarts
and deploys even if it is not being used.
- The way I want to address this is to change the
`Electric.Postgres.Configuration` to alter the publication based on
_all_ active shapes rather than based on each individual one, in that
case every call will update the publication as necessary and
resuming/cleaning can be a matter of calling this every time a shape is
deleted and once upon starting (with recovered shapes or no shapes). Can
be a separate PR.
- Created #1774 to
address this separately

---------

Co-authored-by: Oleksii Sholik <[email protected]>
  • Loading branch information
msfstef and alco authored Oct 1, 2024
1 parent 7f44565 commit 5f6d202
Show file tree
Hide file tree
Showing 20 changed files with 633 additions and 18 deletions.
7 changes: 7 additions & 0 deletions .changeset/poor-candles-fly.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@core/sync-service": patch
---

- Wait for advisory lock on replication slot to enable rolling deploys.
- Configurable replication slot and publication name using `REPLICATION_STREAM_ID` environment variable.
- Add `HealthCheckPlug` API endopint at `v1/health` that returns `waiting`, `starting`,and `active` statuses.
65 changes: 65 additions & 0 deletions integration-tests/tests/crash-recovery.lux
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
[doc Verify handling of an Electric crash recovery]

[include macros.luxinc]

[global pg_container_name=crash-recovery__pg]

###

## Start a new Postgres cluster
[invoke setup_pg "" ""]

## Add some data
[invoke start_psql]
[shell psql]
"""!
CREATE TABLE items (
id UUID PRIMARY KEY,
val TEXT
);
"""
??CREATE TABLE

"""!
INSERT INTO
items (id, val)
SELECT
gen_random_uuid(),
'#' || generate_series || ' test val'
FROM
generate_series(1, 10);
"""
??INSERT 0 10

## Start the sync service.
[invoke setup_electric]

[shell electric]
??[info] Starting replication from postgres

# Initialize a shape and collect the offset
[shell client]
# strip ANSI codes from response for easier matching
!curl -v -X GET http://localhost:3000/v1/shape/items?offset=-1
?electric-shape-id: ([\d-]+)
[local shape_id=$1]
?electric-chunk-last-offset: ([\w\d_]+)
[local last_offset=$1]

## Terminate electric
[shell electric]
!System.halt()
??$PS1

## Start the sync service again.
[invoke setup_electric]
[shell electric]
??[info] Starting replication from postgres

# Client should be able to continue same shape
[shell client]
!curl -v -X GET "http://localhost:3000/v1/shape/items?offset=$last_offset&shape_id=$shape_id"
??HTTP/1.1 200 OK

[cleanup]
[invoke teardown]
4 changes: 2 additions & 2 deletions integration-tests/tests/invalidated-replication-slot.lux
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[my invalidated_slot_error=
"""
[error] GenServer Electric.ConnectionManager terminating
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot"
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot_default"

This slot has been invalidated because it exceeded the maximum reserved size.
"""]
Expand All @@ -34,7 +34,7 @@

## Confirm slot invalidation in Postgres.
[shell pg]
?invalidating slot "electric_slot" because its restart_lsn \d+/\d+ exceeds max_slot_wal_keep_size
?invalidating slot "electric_slot_default" because its restart_lsn [\d\w]+/[\d\w]+ exceeds max_slot_wal_keep_size

## Observe the fatal connection error.
[shell electric]
Expand Down
13 changes: 11 additions & 2 deletions integration-tests/tests/macros.luxinc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
??database system is ready to accept connections
[endmacro]

[macro start_psql]
[shell psql]
!docker exec -u postgres -it $pg_container_name psql
[endmacro]

[macro seed_pg]
[shell psql]
!docker exec -u postgres -it $pg_container_name psql
Expand Down Expand Up @@ -68,10 +73,14 @@
[endmacro]

[macro setup_electric]
[shell electric]
[invoke setup_electric_shell "electric" "3000"]
[endmacro]

[macro setup_electric_shell shell_name port]
[shell $shell_name]
-$fail_pattern

!DATABASE_URL=$database_url ../electric_dev.sh
!DATABASE_URL=$database_url PORT=$port ../electric_dev.sh
[endmacro]

[macro teardown]
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/postgres-disconnection.lux
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

## Observe the connection error.
[shell electric]
??[warning] Database connection in replication mode failed
??[warning] Database connection in lock_connection mode failed
??[warning] Reconnecting in

## Start the Postgres container back up.
Expand Down
62 changes: 62 additions & 0 deletions integration-tests/tests/rolling-deploy.lux
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
[doc Verify handling of an Electric rolling deploy]

[include macros.luxinc]

[global pg_container_name=rolling-deploy__pg]

###

## Start a new Postgres cluster
[invoke setup_pg "" ""]

## Start the first sync service.
[invoke setup_electric_shell "electric_1" "3000"]

[shell electric_1]
??[info] Acquiring lock from postgres with name electric_slot_default
??[info] Lock acquired from postgres with name electric_slot_default
??[info] Starting replication from postgres

# First service should be health and active
[shell orchestator]
!curl -X GET http://localhost:3000/v1/health
??{"status":"active"}

## Start the second sync service.
[invoke setup_electric_shell "electric_2" "3001"]

## Assert that the lock is not acquired and replication does not start
## in the second electric
[shell electric_2]
-Lock acquired from postgres|Starting replication from postgres|$fail_pattern
??[info] Acquiring lock from postgres with name electric_slot_default
[sleep 2]


# Second service should be in waiting state, ready to take over
[shell orchestator]
!curl -X GET http://localhost:3000/v1/health
??{"status":"active"}
!curl -X GET http://localhost:3001/v1/health
??{"status":"waiting"}

## Terminate first electric
[shell electric_1]
!System.halt()

# Confirm Electric process exit.
??$PS1

## Lock should now be acquired and replication starting
[shell electric_2]
-$fail_pattern
??[info] Lock acquired from postgres with name electric_slot_default
??[info] Starting replication from postgres

# Second service is now healthy and active
[shell orchestator]
!curl -X GET http://localhost:3001/v1/health
??{"status":"active"}

[cleanup]
[invoke teardown]
2 changes: 2 additions & 0 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ config :electric,
instance_id: instance_id,
telemetry_statsd_host: statsd_host,
db_pool_size: env!("DB_POOL_SIZE", :integer, 50),
replication_stream_id: env!("REPLICATION_STREAM_ID", :string, "default"),
service_port: env!("PORT", :integer, 3000),
prometheus_port: prometheus_port,
storage: storage,
persistent_kv: persistent_kv
16 changes: 13 additions & 3 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ defmodule Electric.Application do

persistent_kv = apply(kv_module, kv_fun, [kv_params])

publication_name = "electric_publication"
slot_name = "electric_slot"
replication_stream_id = Application.fetch_env!(:electric, :replication_stream_id)
publication_name = "electric_publication_#{replication_stream_id}"
slot_name = "electric_slot_#{replication_stream_id}"

with {:ok, storage_opts} <- storage_module.shared_opts(storage_opts) do
storage = {storage_module, storage_opts}
Expand All @@ -32,6 +33,14 @@ defmodule Electric.Application do
Electric.ConnectionManager.get_pg_version(Electric.ConnectionManager)
end

get_service_status = fn ->
Electric.ServiceStatus.check(
get_connection_status: fn ->
Electric.ConnectionManager.get_status(Electric.ConnectionManager)
end
)
end

prepare_tables_fn =
{Electric.Postgres.Configuration, :configure_tables_for_replication!,
[get_pg_version, publication_name]}
Expand Down Expand Up @@ -103,12 +112,13 @@ defmodule Electric.Application do
storage: storage,
registry: Registry.ShapeChanges,
shape_cache: {Electric.ShapeCache, []},
get_service_status: get_service_status,
inspector: inspector,
long_poll_timeout: 20_000,
max_age: Application.fetch_env!(:electric, :cache_max_age),
stale_age: Application.fetch_env!(:electric, :cache_stale_age),
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)},
port: 3000,
port: Application.fetch_env!(:electric, :service_port),
thousand_island_options: http_listener_options()}
]
|> add_prometheus_router(Application.fetch_env!(:electric, :prometheus_port))
Expand Down
77 changes: 73 additions & 4 deletions packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ defmodule Electric.ConnectionManager do
:shape_cache,
# PID of the replication client.
:replication_client_pid,
# PID of the Postgres connection lock.
:lock_connection_pid,
# PID of the database connection pool (a `Postgrex` process).
:pool_pid,
# Backoff term used for reconnection with exponential back-off.
:backoff,
# Flag indicating whether the lock on the replication has been acquired.
:pg_lock_acquired,
# PostgreSQL server version
:pg_version,
:electric_instance_id
Expand All @@ -57,6 +61,8 @@ defmodule Electric.ConnectionManager do

require Logger

@type status :: :waiting | :starting | :active

@type option ::
{:connection_opts, Keyword.t()}
| {:replication_opts, Keyword.t()}
Expand All @@ -69,6 +75,8 @@ defmodule Electric.ConnectionManager do

@name __MODULE__

@lock_status_logging_interval 10_000

@doc """
Returns the version of the PostgreSQL server.
"""
Expand All @@ -77,6 +85,14 @@ defmodule Electric.ConnectionManager do
GenServer.call(server, :get_pg_version)
end

@doc """
Returns the status of the connection manager.
"""
@spec get_status(GenServer.server()) :: status()
def get_status(server) do
GenServer.call(server, :get_status)
end

@spec start_link(options) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: @name)
Expand Down Expand Up @@ -112,21 +128,54 @@ defmodule Electric.ConnectionManager do
timeline_opts: timeline_opts,
log_collector: Keyword.fetch!(opts, :log_collector),
shape_cache: Keyword.fetch!(opts, :shape_cache),
pg_lock_acquired: false,
backoff: {:backoff.init(1000, 10_000), nil},
electric_instance_id: Keyword.fetch!(opts, :electric_instance_id)
}

# We try to start the replication connection first because it requires additional
# priveleges compared to regular "pooled" connections, so failure to open a replication
# connection should be reported ASAP.
{:ok, state, {:continue, :start_replication_client}}
# Try to acquire the connection lock on the replication slot
# before starting shape and replication processes, to ensure
# a single active sync service is connected to Postgres per slot.
{:ok, state, {:continue, :start_lock_connection}}
end

@impl true
def handle_call(:get_pg_version, _from, %{pg_version: pg_version} = state) do
{:reply, pg_version, state}
end

def handle_call(:get_status, _from, %{pg_lock_acquired: pg_lock_acquired} = state) do
status =
cond do
not pg_lock_acquired ->
:waiting

is_nil(state.replication_client_pid) || is_nil(state.pool_pid) ||
not Process.alive?(state.pool_pid) ->
:starting

true ->
:active
end

{:reply, status, state}
end

def handle_continue(:start_lock_connection, state) do
case Electric.Postgres.LockConnection.start_link(
connection_opts: state.connection_opts,
connection_manager: self(),
lock_name: Keyword.fetch!(state.replication_opts, :slot_name)
) do
{:ok, lock_connection_pid} ->
Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval)
{:noreply, %{state | lock_connection_pid: lock_connection_pid}}

{:error, reason} ->
handle_connection_error(reason, state, "lock_connection")
end
end

@impl true
def handle_continue(:start_replication_client, state) do
case start_replication_client(state) do
Expand Down Expand Up @@ -188,6 +237,7 @@ defmodule Electric.ConnectionManager do

tag =
cond do
pid == state.lock_connection_pid -> :lock_connection
pid == state.replication_client_pid -> :replication_connection
pid == state.pool_pid -> :database_pool
end
Expand All @@ -203,6 +253,17 @@ defmodule Electric.ConnectionManager do
{:noreply, %{state | replication_client_pid: nil}}
end

# Periodically log the status of the lock connection until it is acquired for
# easier debugging and diagnostics.
def handle_info(:log_lock_connection_status, state) do
if not state.pg_lock_acquired do
Logger.warning(fn -> "Waiting for postgres lock to be acquired..." end)
Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval)
end

{:noreply, state}
end

@impl true
def handle_cast({:connection_opts, pid, connection_opts}, state) do
Process.monitor(pid)
Expand All @@ -220,6 +281,13 @@ defmodule Electric.ConnectionManager do
end
end

def handle_cast(:lock_connection_acquired, %{pg_lock_acquired: false} = state) do
# As soon as we acquire the connection lock, we try to start the replication connection
# first because it requires additional privileges compared to regular "pooled" connections,
# so failure to open a replication connection should be reported ASAP.
{:noreply, %{state | pg_lock_acquired: true}, {:continue, :start_replication_client}}
end

defp start_replication_client(state) do
Electric.Shapes.Supervisor.start_link(
electric_instance_id: state.electric_instance_id,
Expand Down Expand Up @@ -273,6 +341,7 @@ defmodule Electric.ConnectionManager do

step =
cond do
is_nil(state.lock_connection_pid) -> :start_lock_connection
is_nil(state.replication_client_pid) -> :start_replication_client
is_nil(state.pool_pid) -> :start_connection_pool
end
Expand Down
Loading

0 comments on commit 5f6d202

Please sign in to comment.