Skip to content

Commit

Permalink
fix: Upgrade Postgrex (#221)
Browse files Browse the repository at this point in the history
The newer version includes fixed return value from gen_statem's init()
callback. This solves the log noise issue that was occurring whenever a
DB connection failed.
  • Loading branch information
alco authored Aug 5, 2024
1 parent 156bef9 commit 5f3a7e3
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 40 deletions.
3 changes: 2 additions & 1 deletion packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ if Config.config_env() == :test do
port: 54321,
username: "postgres",
password: "password",
database: "postgres"
database: "postgres",
sslmode: :disable
]
else
{:ok, database_url_config} =
Expand Down
58 changes: 23 additions & 35 deletions packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,11 @@ defmodule Electric.ConnectionManager do
end

defp start_replication_client(connection_opts, replication_opts) do
case do_start_replication_client(connection_opts, replication_opts) do
# Disable the reconnection logic in Postgex.ReplicationConnection to force it to exit with
# the connection error.
connection_opts = [auto_reconnect: false] ++ connection_opts

case Electric.Postgres.ReplicationClient.start_link(connection_opts, replication_opts) do
{:ok, pid} ->
{:ok, pid, connection_opts}

Expand All @@ -173,38 +177,6 @@ defmodule Electric.ConnectionManager do
end
end

defp do_start_replication_client(connection_opts, replication_opts) do
# Disable the reconnection logic in Postgex.ReplicationConnection to force it to exit with
# the connection error.
connection_opts = [auto_reconnect: false] ++ connection_opts

case Electric.Postgres.ReplicationClient.start_link(connection_opts, replication_opts) do
{:ok, pid} ->
{:ok, pid}

# There is a bug in Postgrex: it returns a tuple `{:stop, <reason>, <state>}` from an
# `init()` callback where `gen_statem` expects just `{:stop, <reason>}`. This is the origin
# of the `:bad_return_from_init` error that wraps the root-cause error in the following example:
#
# 16:28:07.982 [error] :gen_statem #PID<0.282.0> terminating
# ** (stop) {:bad_return_from_init, {:stop, %Postgrex.Error{message: "ssl not available", postgres: nil, connection_id: nil, query: nil}, %Postgrex.ReplicationConnection{}}}
# (stdlib 6.0) gen_statem.erl:2748: :gen_statem.init_result/8
# (stdlib 6.0) proc_lib.erl:329: :proc_lib.init_p_do_apply/3
# Queue: []
# Postponed: []
# State: {:undefined, :undefined}
# Callback mode: :state_functions, state_enter: false
#
# You can reproduce the above failure by adding `?sslmode=prefer` or `?sslmode=require`
# to the `DATABASE_URL` configuration.
{:error, {:bad_return_from_init, {:stop, reason, _state}}} ->
{:error, reason}

other ->
other
end
end

defp start_connection_pool(connection_opts, pool_opts) do
# Disable the reconnection logic in DBConnection to force it to exit with the connection
# error.
Expand All @@ -220,8 +192,8 @@ defmodule Electric.ConnectionManager do
%Postgrex.Error{message: message} when not is_nil(message) ->
message

%Postgrex.Error{postgres: %{message: message, pg_code: code, routine: routine}} ->
message <> " (PG code: #{code}, PG routine: #{routine})"
%Postgrex.Error{postgres: %{message: message} = pg_error} ->
message <> pg_error_extra_info(pg_error)
end

Logger.warning("Database connection failed: #{message}")
Expand All @@ -236,6 +208,22 @@ defmodule Electric.ConnectionManager do
{:noreply, state}
end

defp pg_error_extra_info(pg_error) do
extra_info_items =
[
{"PG code:", Map.get(pg_error, :pg_code)},
{"PG routine:", Map.get(pg_error, :routine)}
]
|> Enum.reject(fn {_, val} -> is_nil(val) end)
|> Enum.map(fn {label, val} -> "#{label} #{val}" end)

if extra_info_items != [] do
" (" <> Enum.join(extra_info_items, ", ") <> ")"
else
""
end
end

defp schedule_reconnection(step, %State{backoff: {backoff, _}} = state) do
{time, backoff} = :backoff.fail(backoff)
tref = :erlang.start_timer(time, self(), step)
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ defmodule Electric.MixProject do
{:nimble_options, "~> 1.1"},
{:pg_query_ex, github: "electric-sql/pg_query_ex"},
{:plug, "~> 1.16"},
{:postgrex, "~> 0.18"},
{:postgrex, "~> 0.19"},
{:telemetry_metrics_statsd, "~> 0.7"},
{:telemetry_poller, "~> 1.1"},
{:tz, "~> 0.27"}
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"pg_query_ex": {:git, "https://github.com/electric-sql/pg_query_ex.git", "fee6dc748deb80e3fc3c6f77d5fa77faef91336d", []},
"plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"},
"plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"},
"postgrex": {:hex, :postgrex, "0.18.0", "f34664101eaca11ff24481ed4c378492fed2ff416cd9b06c399e90f321867d7e", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "a042989ba1bc1cca7383ebb9e461398e3f89f868c92ce6671feb7ef132a252d1"},
"postgrex": {:hex, :postgrex, "0.19.0", "f7d50e50cb42e0a185f5b9a6095125a9ab7e4abccfbe2ab820ab9aa92b71dbab", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "dba2d2a0a8637defbf2307e8629cb2526388ba7348f67d04ec77a5d6a72ecfae"},
"protox": {:hex, :protox, "1.7.3", "dff5488a648850c95cbd1cca5430be7ccedc99e4102aa934dbf60abfa30e64c1", [:mix], [{:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "b936c0654b68b306c4be853db23bb5623e2be89e11238908f2ff6da10fc0275f"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"telemetry_metrics": {:hex, :telemetry_metrics, "0.6.2", "2caabe9344ec17eafe5403304771c3539f3b6e2f7fb6a6f602558c825d0d0bfb", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9b43db0dc33863930b9ef9d27137e78974756f5f198cae18409970ed6fa5b561"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,10 @@ defmodule Electric.Postgres.ReplicationClientTest do
do: lsn_str |> Lsn.from_string() |> Lsn.to_integer()

defp fetch_slot_info(conn) do
{:ok, result} = Postgrex.query(conn, "SELECT * FROM pg_replication_slots", [])
assert %Postgrex.Result{columns: cols, rows: [row], num_rows: 1} = result
%Postgrex.Result{columns: cols, rows: rows} =
Postgrex.query!(conn, "SELECT * FROM pg_replication_slots", [])

[row] = Enum.filter(rows, fn [slot_name | _] -> slot_name == @slot_name end)

Enum.zip(cols, row) |> Map.new()
end
Expand Down

0 comments on commit 5f3a7e3

Please sign in to comment.