Skip to content

Commit

Permalink
fix(sync-service): Query Postgres server version at the start of conn…
Browse files Browse the repository at this point in the history
…ection setup (#1784)

We were querying it after starting the replication client before, which
was too late, because by that time the shape recovery process might have
already reached
`Electric.Postgres.Configuration.configure_tables_for_replication!()`
where the version is used to branch the control flow. This was causing
flake in integration tests due to the version not always being there and
the unsupported query being issue against Postgres version 14.

In short, this PR resolves the flakiness in integration tests, they
should be consistently green from now on.

The choice of augmenting the `LockConnection` module with version
querying and a crude state machine might seem dubious but the end result
is simpler this way, compared to adding version querying to the
replication client, for example.
  • Loading branch information
alco authored Oct 3, 2024
1 parent d23a79f commit 5acf022
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .changeset/dry-buses-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Query Postgres server version as early as possible so that it is available throughout the whole connection initialization process.
22 changes: 7 additions & 15 deletions packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ defmodule Electric.ConnectionManager do

@impl true
def handle_call(:get_pg_version, _from, %{pg_version: pg_version} = state) do
# If we haven't queried the PG version by the time it is requested, that's a fatal error.
false = is_nil(pg_version)
{:reply, pg_version, state}
end

Expand Down Expand Up @@ -208,13 +210,11 @@ defmodule Electric.ConnectionManager do
{:ok, pid} ->
Electric.Timeline.check({get_pg_id(pid), get_pg_timeline(pid)}, state.timeline_opts)

pg_version = query_pg_version(pid)

# Now we have everything ready to start accepting and processing logical messages from
# Postgres.
Electric.Postgres.ReplicationClient.start_streaming(state.replication_client_pid)

state = %{state | pool_pid: pid, pg_version: pg_version}
state = %{state | pool_pid: pid}
{:noreply, state}

{:error, reason} ->
Expand Down Expand Up @@ -281,6 +281,10 @@ defmodule Electric.ConnectionManager do
end
end

def handle_cast({:pg_version, pg_version}, state) do
{:noreply, %{state | pg_version: pg_version}}
end

def handle_cast(:lock_connection_acquired, %{pg_lock_acquired: false} = state) do
# As soon as we acquire the connection lock, we try to start the replication connection
# first because it requires additional privileges compared to regular "pooled" connections,
Expand Down Expand Up @@ -470,16 +474,4 @@ defmodule Electric.ConnectionManager do
%Postgrex.Result{rows: [[timeline_id]]} -> timeline_id
end
end

def query_pg_version(conn) do
[[setting]] =
Postgrex.query!(
conn,
"SELECT current_setting('server_version_num')::integer",
[]
)
|> Map.fetch!(:rows)

setting
end
end
62 changes: 42 additions & 20 deletions packages/sync-service/lib/electric/postgres/lock_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ defmodule Electric.Postgres.LockConnection do

defmodule State do
defstruct [
:step,
:connection_manager,
:lock_acquired,
:lock_name,
Expand All @@ -32,24 +33,20 @@ defmodule Electric.Postgres.LockConnection do
def start_link(opts) do
{connection_opts, init_opts} = Keyword.pop(opts, :connection_opts)

case Postgrex.SimpleConnection.start_link(
__MODULE__,
init_opts,
connection_opts ++ [timeout: :infinity, auto_reconnect: false]
) do
{:ok, pid} ->
send(pid, :acquire_lock)
{:ok, pid}

{:error, error} ->
{:error, error}
end
Postgrex.SimpleConnection.start_link(
__MODULE__,
init_opts,
connection_opts ++ [timeout: :infinity, auto_reconnect: false]
)
end

@impl true
def init(opts) do
send(self(), :query_pg_version)

{:ok,
%State{
step: :query_pg_version,
connection_manager: Keyword.fetch!(opts, :connection_manager),
lock_name: Keyword.fetch!(opts, :lock_name),
lock_acquired: false,
Expand All @@ -58,6 +55,10 @@ defmodule Electric.Postgres.LockConnection do
end

@impl true
def handle_info(:query_pg_version, state) do
{:query, pg_version_query(), state}
end

def handle_info(:acquire_lock, state) do
if state.lock_acquired do
notify_lock_acquired(state)
Expand All @@ -73,28 +74,49 @@ defmodule Electric.Postgres.LockConnection do
end

@impl true
def handle_result([_] = _results, state) do
def handle_result(
[%Postgrex.Result{columns: ["server_version_num"], rows: [[version_str]]}],
state
) do
Logger.info("Postgres server version reported as #{version_str}")
notify_pg_version(String.to_integer(version_str), state)
send(self(), :acquire_lock)
{:noreply, %{state | step: :acquire_lock}}
end

def handle_result([%Postgrex.Result{columns: ["pg_advisory_lock"]}], state) do
Logger.info("Lock acquired from postgres with name #{state.lock_name}")
notify_lock_acquired(state)
{:noreply, %{state | lock_acquired: true}}
{:noreply, %{state | lock_acquired: true, step: :ready}}
end

@impl true
def handle_result(%Postgrex.Error{} = error, %State{backoff: {backoff, _}} = state) do
def handle_result(%Postgrex.Error{} = error, %State{step: step, backoff: {backoff, _}} = state) do
error_str =
case step do
:query_pg_version -> "Failed to get Postgres server version"
:acquire_lock -> "Failed to acquire lock #{state.lock_name}"
end

{time, backoff} = :backoff.fail(backoff)
tref = :erlang.start_timer(time, self(), :acquire_lock)
tref = :erlang.start_timer(time, self(), step)

Logger.error(
"Failed to acquire lock #{state.lock_name} with reason #{inspect(error)} - retrying in #{inspect(time)}ms."
)
Logger.error(error_str <> " with reason #{inspect(error)} - retrying in #{inspect(time)}ms.")

{:noreply, %{state | lock_acquired: false, backoff: {backoff, tref}}}
end

defp notify_pg_version(pg_version, %State{connection_manager: connection_manager}) do
GenServer.cast(connection_manager, {:pg_version, pg_version})
end

defp notify_lock_acquired(%State{connection_manager: connection_manager} = _state) do
GenServer.cast(connection_manager, :lock_connection_acquired)
end

defp pg_version_query do
"SELECT current_setting('server_version_num') AS server_version_num"
end

defp lock_query(%State{lock_name: name} = _state) do
"SELECT pg_advisory_lock(hashtext('#{name}'))"
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ defmodule Electric.Postgres.ConfigurationTest do
end

defp list_tables_in_publication(conn, publication) do
pg_version = Electric.ConnectionManager.query_pg_version(conn)
%{rows: [[pg_version]]} =
Postgrex.query!(conn, "SELECT current_setting('server_version_num')::integer", [])

list_tables_in_pub(conn, publication, pg_version)
end

Expand Down
4 changes: 3 additions & 1 deletion packages/sync-service/test/support/db_setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ defmodule Support.DbSetup do
end

def with_pg_version(ctx) do
pg_version = Electric.ConnectionManager.query_pg_version(ctx.db_conn)
%{rows: [[pg_version]]} =
Postgrex.query!(ctx.db_conn, "SELECT current_setting('server_version_num')::integer", [])

{:ok, %{get_pg_version: fn -> pg_version end}}
end

Expand Down

0 comments on commit 5acf022

Please sign in to comment.