Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handle replication slot conflicts #1762

Merged
merged 33 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5453e37
Use advisory lock to handle rolling deploys
msfstef Sep 26, 2024
a6dfec4
Configurable replication stream ID
msfstef Sep 26, 2024
7cf3f38
Quote all publication and replication slot names
msfstef Sep 26, 2024
3b0dba8
Implement utility for resetting the publication
msfstef Sep 26, 2024
a7ef1b5
Fix utils quoting test
msfstef Sep 26, 2024
09db776
Use hash of slot name for lock
msfstef Sep 26, 2024
20285c4
Fix integration test
msfstef Sep 26, 2024
51291ae
Add changeset
msfstef Sep 26, 2024
0acac52
Basic replication status work
msfstef Sep 26, 2024
4e34836
Remove connection manager as required opt
msfstef Sep 30, 2024
fbaa5a6
Use an independent connection lock
msfstef Sep 30, 2024
67dc96d
Add backoff retries
msfstef Sep 30, 2024
267d471
Check pool id as well for status
msfstef Sep 30, 2024
a1836a7
Fix integration test and lock restarting
msfstef Sep 30, 2024
b653044
Rename connection lock atom
msfstef Sep 30, 2024
3f47fbd
Fix health check endpoint to return correct messages
msfstef Sep 30, 2024
4e8e3e6
Add basic unit tests for health check endpoint
msfstef Sep 30, 2024
3bba4e2
Add integration test for rolling deploy
msfstef Sep 30, 2024
b203953
Fix weird formatting
msfstef Sep 30, 2024
a7e1897
Return 503 for stopping state
msfstef Sep 30, 2024
017ed32
Transform start link keyword argument
msfstef Sep 30, 2024
f2d6250
Mock lock connection under Postgres
msfstef Sep 30, 2024
934c35d
Add basic lock connection tests
msfstef Sep 30, 2024
4655e00
Add comments explaining seemingly unnecessary pattern matching
msfstef Sep 30, 2024
49d6620
Update changeset
msfstef Sep 30, 2024
3269786
Add basic integration test for health endoint
msfstef Sep 30, 2024
3ac6845
Add health check plug to integration testing for rolling deploys
msfstef Sep 30, 2024
50e95f0
Assert electric process exits
msfstef Oct 1, 2024
685bc4f
Log periodic messages while lock is not acquired
msfstef Oct 1, 2024
f22c669
Add crash recovery integration test
msfstef Oct 1, 2024
c024581
Replace positional matching on a keyword list with Keyword.pop()
alco Oct 1, 2024
b0d928b
Remove a noop "with" from ServiceStatus.check()
alco Oct 1, 2024
dc8c980
Adress PR comments
msfstef Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/poor-candles-fly.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@core/sync-service": patch
---

- Wait for advisory lock on replication slot to enable rolling deploys.
- Configurable replication slot and publication name using `REPLICATION_STREAM_ID` environment variable.
4 changes: 2 additions & 2 deletions integration-tests/tests/invalidated-replication-slot.lux
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[my invalidated_slot_error=
"""
[error] GenServer Electric.ConnectionManager terminating
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot"
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot_default"

This slot has been invalidated because it exceeded the maximum reserved size.
"""]
Expand All @@ -34,7 +34,7 @@

## Confirm slot invalidation in Postgres.
[shell pg]
?invalidating slot "electric_slot" because its restart_lsn \d+/\d+ exceeds max_slot_wal_keep_size
?invalidating slot "electric_slot_default" because its restart_lsn [\d\w]+/[\d\w]+ exceeds max_slot_wal_keep_size

## Observe the fatal connection error.
[shell electric]
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/postgres-disconnection.lux
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

## Observe the connection error.
[shell electric]
??[warning] Database connection in replication mode failed
??[warning] Database connection in lock_connection mode failed
??[warning] Reconnecting in

## Start the Postgres container back up.
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ config :electric,
instance_id: instance_id,
telemetry_statsd_host: statsd_host,
db_pool_size: env!("DB_POOL_SIZE", :integer, 50),
replication_stream_id: env!("REPLICATION_STREAM_ID", :string, "default"),
prometheus_port: prometheus_port,
storage: storage,
persistent_kv: persistent_kv
14 changes: 12 additions & 2 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ defmodule Electric.Application do

persistent_kv = apply(kv_module, kv_fun, [kv_params])

publication_name = "electric_publication"
slot_name = "electric_slot"
replication_stream_id = Application.fetch_env!(:electric, :replication_stream_id)
publication_name = "electric_publication_#{replication_stream_id}"
slot_name = "electric_slot_#{replication_stream_id}"

with {:ok, storage_opts} <- storage_module.shared_opts(storage_opts) do
storage = {storage_module, storage_opts}
Expand All @@ -32,6 +33,14 @@ defmodule Electric.Application do
Electric.ConnectionManager.get_pg_version(Electric.ConnectionManager)
end

get_service_status = fn ->
Electric.ServiceStatus.check(
get_connection_status: fn ->
Electric.ConnectionManager.get_status(Electric.ConnectionManager)
end
)
end

prepare_tables_fn =
{Electric.Postgres.Configuration, :configure_tables_for_replication!,
[get_pg_version, publication_name]}
Expand Down Expand Up @@ -103,6 +112,7 @@ defmodule Electric.Application do
storage: storage,
registry: Registry.ShapeChanges,
shape_cache: {Electric.ShapeCache, []},
get_service_status: get_service_status,
inspector: inspector,
long_poll_timeout: 20_000,
max_age: Application.fetch_env!(:electric, :cache_max_age),
Expand Down
63 changes: 59 additions & 4 deletions packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ defmodule Electric.ConnectionManager do
:shape_cache,
# PID of the replication client.
:replication_client_pid,
# PID of the Postgres connection lock.
:lock_connection_pid,
# PID of the database connection pool (a `Postgrex` process).
:pool_pid,
# Backoff term used for reconnection with exponential back-off.
:backoff,
# Flag indicating whether the lock on the replication has been acquired.
:pg_lock_acquired,
# PostgreSQL server version
:pg_version,
:electric_instance_id
Expand All @@ -57,6 +61,8 @@ defmodule Electric.ConnectionManager do

require Logger

@type status :: :waiting | :starting | :active

@type option ::
{:connection_opts, Keyword.t()}
| {:replication_opts, Keyword.t()}
Expand All @@ -77,6 +83,14 @@ defmodule Electric.ConnectionManager do
GenServer.call(server, :get_pg_version)
end

@doc """
Returns the status of the connection manager.
"""
@spec get_status(GenServer.server()) :: status()
def get_status(server) do
GenServer.call(server, :get_status)
end

@spec start_link(options) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: @name)
Expand Down Expand Up @@ -112,21 +126,53 @@ defmodule Electric.ConnectionManager do
timeline_opts: timeline_opts,
log_collector: Keyword.fetch!(opts, :log_collector),
shape_cache: Keyword.fetch!(opts, :shape_cache),
pg_lock_acquired: false,
backoff: {:backoff.init(1000, 10_000), nil},
electric_instance_id: Keyword.fetch!(opts, :electric_instance_id)
}

# We try to start the replication connection first because it requires additional
# priveleges compared to regular "pooled" connections, so failure to open a replication
# connection should be reported ASAP.
{:ok, state, {:continue, :start_replication_client}}
# Try to acquire the connection lock on the replication slot
# before starting shape and replication processes, to ensure
# a single active sync service is connected to Postgres per slot.
{:ok, state, {:continue, :start_lock_connection}}
end

@impl true
def handle_call(:get_pg_version, _from, %{pg_version: pg_version} = state) do
{:reply, pg_version, state}
end

def handle_call(:get_status, _from, %{pg_lock_acquired: pg_lock_acquired} = state) do
status =
cond do
not pg_lock_acquired ->
:waiting

is_nil(state.replication_client_pid) || is_nil(state.pool_id) ||
not Process.alive?(state.pool_id) ->
:starting

true ->
:active
end

{:reply, status, state}
end

def handle_continue(:start_lock_connection, state) do
case Electric.LockConnection.start_link(
state.connection_opts,
self(),
Keyword.fetch!(state.replication_opts, :slot_name)
) do
{:ok, lock_connection_pid} ->
{:noreply, %{state | lock_connection_pid: lock_connection_pid}}

{:error, reason} ->
handle_connection_error(reason, state, "lock_connection")
end
end

@impl true
def handle_continue(:start_replication_client, state) do
case start_replication_client(state) do
Expand Down Expand Up @@ -188,6 +234,7 @@ defmodule Electric.ConnectionManager do

tag =
cond do
pid == state.lock_connection_pid -> :lock_connection
pid == state.replication_client_pid -> :replication_connection
pid == state.pool_pid -> :database_pool
end
Expand Down Expand Up @@ -220,6 +267,13 @@ defmodule Electric.ConnectionManager do
end
end

def handle_cast(:lock_connection_acquired, state) do
# As soon as we acquire the connection lock, we try to start the replication connection
# first because it requires additional privileges compared to regular "pooled" connections,
# so failure to open a replication connection should be reported ASAP.
{:noreply, %{state | pg_lock_acquired: true}, {:continue, :start_replication_client}}
end

defp start_replication_client(state) do
Electric.Shapes.Supervisor.start_link(
electric_instance_id: state.electric_instance_id,
Expand Down Expand Up @@ -273,6 +327,7 @@ defmodule Electric.ConnectionManager do

step =
cond do
is_nil(state.lock_connection_pid) -> :start_lock_connection
is_nil(state.replication_client_pid) -> :start_replication_client
is_nil(state.pool_pid) -> :start_connection_pool
end
Expand Down
96 changes: 96 additions & 0 deletions packages/sync-service/lib/electric/lock_connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
defmodule Electric.LockConnection do
@moduledoc """
A Postgres connection that ensures an advisory lock is held for its entire duration,
useful for ensuring only a single sync service instance can be using a single
replication slot at any given time.

The connection attempts to grab the lock and waits on it until it acquires it.
When it does, it fires off a :lock_connection_acquired message to the specified
`Electric.ConnectionManager` such that the required setup can acquired now that
the service is sure to be the only one operating on this replication stream.
"""
require Logger
@behaviour Postgrex.SimpleConnection

defmodule State do
defstruct [
:connection_manager,
:lock_acquired,
:lock_name,
:backoff
]
end

@spec start_link(Keyword.t(), GenServer.server(), String.t()) ::
{:ok, pid()} | {:error, Postgrex.Error.t() | term()}
def start_link(connection_opts, connection_manager, lock_name) do
msfstef marked this conversation as resolved.
Show resolved Hide resolved
case Postgrex.SimpleConnection.start_link(
__MODULE__,
[connection_manager: connection_manager, lock_name: lock_name],
connection_opts ++ [timeout: :infinity, auto_reconnect: false]
) do
{:ok, pid} ->
send(pid, :acquire_lock)
{:ok, pid}

{:error, error} ->
{:error, error}
end
end

@impl true
def init(opts) do
{:ok,
%State{
connection_manager: Keyword.fetch!(opts, :connection_manager),
lock_name: Keyword.fetch!(opts, :lock_name),
lock_acquired: false,
backoff: {:backoff.init(1000, 10_000), nil}
}}
end

@impl true
def handle_info(:acquire_lock, state) do
if(state.lock_acquired) do
msfstef marked this conversation as resolved.
Show resolved Hide resolved
notify_lock_acquired(state)
{:noreply, state}
else
{:query, lock_query(state), state}
end
end

def handle_info({:timeout, tref, msg}, %{backoff: {backoff, tref}} = state) do
handle_info(msg, %{state | backoff: {backoff, nil}})
end

@impl true
def handle_result(results, state) when is_list(results) do
msfstef marked this conversation as resolved.
Show resolved Hide resolved
notify_lock_acquired(state)
{:noreply, state}
end

@impl true
def handle_result(%Postgrex.Error{} = error, %State{backoff: {backoff, _}} = state) do
{time, backoff} = :backoff.fail(backoff)
tref = :erlang.start_timer(time, self(), :acquire_lock)

Logger.error(
"Failed to acquire lock #{state.lock_name} with reason #{inspect(error)} - retrying in #{inspect(time)}ms."
)

{:noreply, %{state | lock_acquired: false, backoff: {backoff, tref}}}
end

defp notify_lock_acquired(%State{connection_manager: connection_manager} = _state) do
GenServer.cast(connection_manager, :lock_connection_acquired)
end

defp lock_query(%State{lock_name: name} = _state) do
"SELECT pg_advisory_lock(hashtext('#{name}'))"
end

@impl true
def notify(_channel, _payload, _state) do
:ok
end
end
35 changes: 35 additions & 0 deletions packages/sync-service/lib/electric/plug/health_check_plug.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule Electric.Plug.HealthCheckPlug do
alias Plug.Conn
require Logger
use Plug.Builder

plug :check_service_status
plug :put_relevant_headers
plug :send_response

defp check_service_status(conn, _) do
get_service_status = Access.fetch!(conn.assigns.config, :get_service_status)

{status_code, status_text} =
case get_service_status.() do
:waiting -> {200, "waiting"}
:starting -> {200, "starting"}
:active -> {200, "active"}
:stopping -> {500, "stopping"}
msfstef marked this conversation as resolved.
Show resolved Hide resolved
end

conn |> assign(:status_text, status_text) |> assign(:status_code, status_code)
end

defp put_relevant_headers(conn, _),
do:
conn
|> put_resp_header("content-type", "application/json")
|> put_resp_header("cache-control", "no-cache, no-store, must-revalidate")

defp send_response(
%Conn{assigns: %{status_text: status_text, status_code: status_code}} = conn,
_
),
do: send_resp(conn, status_code, Jason.encode!(%{status: status_text}))
end
2 changes: 2 additions & 0 deletions packages/sync-service/lib/electric/plug/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ defmodule Electric.Plug.Router do
delete "/v1/shape/:root_table", to: Electric.Plug.DeleteShapePlug
match "/v1/shape/:root_table", via: :options, to: Electric.Plug.OptionsShapePlug

get "/v1/health", to: Electric.Plug.HealthCheckPlug

match _ do
send_resp(conn, 404, "Not found")
end
Expand Down
Loading
Loading