Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync-service): Query Postgres server version at the start of connection setup #1784

Merged
merged 3 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/dry-buses-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Query Postgres server version as early as possible so that it is available throughout the whole connection initialization process.
22 changes: 7 additions & 15 deletions packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ defmodule Electric.ConnectionManager do

@impl true
def handle_call(:get_pg_version, _from, %{pg_version: pg_version} = state) do
# If we haven't queried the PG version by the time it is requested, that's a fatal error.
false = is_nil(pg_version)
{:reply, pg_version, state}
end

Expand Down Expand Up @@ -208,13 +210,11 @@ defmodule Electric.ConnectionManager do
{:ok, pid} ->
Electric.Timeline.check({get_pg_id(pid), get_pg_timeline(pid)}, state.timeline_opts)

pg_version = query_pg_version(pid)

# Now we have everything ready to start accepting and processing logical messages from
# Postgres.
Electric.Postgres.ReplicationClient.start_streaming(state.replication_client_pid)

state = %{state | pool_pid: pid, pg_version: pg_version}
state = %{state | pool_pid: pid}
{:noreply, state}

{:error, reason} ->
Expand Down Expand Up @@ -281,6 +281,10 @@ defmodule Electric.ConnectionManager do
end
end

def handle_cast({:pg_version, pg_version}, state) do
{:noreply, %{state | pg_version: pg_version}}
end

def handle_cast(:lock_connection_acquired, %{pg_lock_acquired: false} = state) do
# As soon as we acquire the connection lock, we try to start the replication connection
# first because it requires additional privileges compared to regular "pooled" connections,
Expand Down Expand Up @@ -470,16 +474,4 @@ defmodule Electric.ConnectionManager do
%Postgrex.Result{rows: [[timeline_id]]} -> timeline_id
end
end

def query_pg_version(conn) do
[[setting]] =
Postgrex.query!(
conn,
"SELECT current_setting('server_version_num')::integer",
[]
)
|> Map.fetch!(:rows)

setting
end
end
62 changes: 42 additions & 20 deletions packages/sync-service/lib/electric/postgres/lock_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ defmodule Electric.Postgres.LockConnection do

defmodule State do
defstruct [
:step,
:connection_manager,
:lock_acquired,
:lock_name,
Expand All @@ -32,24 +33,20 @@ defmodule Electric.Postgres.LockConnection do
def start_link(opts) do
{connection_opts, init_opts} = Keyword.pop(opts, :connection_opts)

case Postgrex.SimpleConnection.start_link(
__MODULE__,
init_opts,
connection_opts ++ [timeout: :infinity, auto_reconnect: false]
) do
{:ok, pid} ->
send(pid, :acquire_lock)
{:ok, pid}

{:error, error} ->
{:error, error}
end
Postgrex.SimpleConnection.start_link(
__MODULE__,
init_opts,
connection_opts ++ [timeout: :infinity, auto_reconnect: false]
)
end

@impl true
def init(opts) do
send(self(), :query_pg_version)

{:ok,
%State{
step: :query_pg_version,
connection_manager: Keyword.fetch!(opts, :connection_manager),
lock_name: Keyword.fetch!(opts, :lock_name),
lock_acquired: false,
Expand All @@ -58,6 +55,10 @@ defmodule Electric.Postgres.LockConnection do
end

@impl true
def handle_info(:query_pg_version, state) do
{:query, pg_version_query(), state}
end

def handle_info(:acquire_lock, state) do
if state.lock_acquired do
notify_lock_acquired(state)
Expand All @@ -73,28 +74,49 @@ defmodule Electric.Postgres.LockConnection do
end

@impl true
def handle_result([_] = _results, state) do
def handle_result(
[%Postgrex.Result{columns: ["server_version_num"], rows: [[version_str]]}],
state
) do
Logger.info("Postgres server version reported as #{version_str}")
notify_pg_version(String.to_integer(version_str), state)
send(self(), :acquire_lock)
{:noreply, %{state | step: :acquire_lock}}
end

def handle_result([%Postgrex.Result{columns: ["pg_advisory_lock"]}], state) do
Logger.info("Lock acquired from postgres with name #{state.lock_name}")
notify_lock_acquired(state)
{:noreply, %{state | lock_acquired: true}}
{:noreply, %{state | lock_acquired: true, step: :ready}}
end

@impl true
def handle_result(%Postgrex.Error{} = error, %State{backoff: {backoff, _}} = state) do
def handle_result(%Postgrex.Error{} = error, %State{step: step, backoff: {backoff, _}} = state) do
error_str =
case step do
:query_pg_version -> "Failed to get Postgres server version"
:acquire_lock -> "Failed to acquire lock #{state.lock_name}"
end

{time, backoff} = :backoff.fail(backoff)
tref = :erlang.start_timer(time, self(), :acquire_lock)
tref = :erlang.start_timer(time, self(), step)

Logger.error(
"Failed to acquire lock #{state.lock_name} with reason #{inspect(error)} - retrying in #{inspect(time)}ms."
)
Logger.error(error_str <> " with reason #{inspect(error)} - retrying in #{inspect(time)}ms.")

{:noreply, %{state | lock_acquired: false, backoff: {backoff, tref}}}
end

defp notify_pg_version(pg_version, %State{connection_manager: connection_manager}) do
GenServer.cast(connection_manager, {:pg_version, pg_version})
end

defp notify_lock_acquired(%State{connection_manager: connection_manager} = _state) do
GenServer.cast(connection_manager, :lock_connection_acquired)
end

defp pg_version_query do
"SELECT current_setting('server_version_num') AS server_version_num"
end

defp lock_query(%State{lock_name: name} = _state) do
"SELECT pg_advisory_lock(hashtext('#{name}'))"
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ defmodule Electric.Postgres.ConfigurationTest do
end

defp list_tables_in_publication(conn, publication) do
pg_version = Electric.ConnectionManager.query_pg_version(conn)
%{rows: [[pg_version]]} =
Postgrex.query!(conn, "SELECT current_setting('server_version_num')::integer", [])

list_tables_in_pub(conn, publication, pg_version)
end

Expand Down
4 changes: 3 additions & 1 deletion packages/sync-service/test/support/db_setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ defmodule Support.DbSetup do
end

def with_pg_version(ctx) do
pg_version = Electric.ConnectionManager.query_pg_version(ctx.db_conn)
%{rows: [[pg_version]]} =
Postgrex.query!(ctx.db_conn, "SELECT current_setting('server_version_num')::integer", [])

{:ok, %{get_pg_version: fn -> pg_version end}}
end

Expand Down
Loading