diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 2efa915a51..03c7f24ca6 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -19,7 +19,8 @@ if Config.config_env() == :test do port: 54321, username: "postgres", password: "password", - database: "postgres" + database: "postgres", + sslmode: :disable ] else {:ok, database_url_config} = diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index 3f1aa2ef14..d7a8d1253b 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -148,7 +148,11 @@ defmodule Electric.ConnectionManager do end defp start_replication_client(connection_opts, replication_opts) do - case do_start_replication_client(connection_opts, replication_opts) do + # Disable the reconnection logic in Postgex.ReplicationConnection to force it to exit with + # the connection error. + connection_opts = [auto_reconnect: false] ++ connection_opts + + case Electric.Postgres.ReplicationClient.start_link(connection_opts, replication_opts) do {:ok, pid} -> {:ok, pid, connection_opts} @@ -173,38 +177,6 @@ defmodule Electric.ConnectionManager do end end - defp do_start_replication_client(connection_opts, replication_opts) do - # Disable the reconnection logic in Postgex.ReplicationConnection to force it to exit with - # the connection error. - connection_opts = [auto_reconnect: false] ++ connection_opts - - case Electric.Postgres.ReplicationClient.start_link(connection_opts, replication_opts) do - {:ok, pid} -> - {:ok, pid} - - # There is a bug in Postgrex: it returns a tuple `{:stop, , }` from an - # `init()` callback where `gen_statem` expects just `{:stop, }`. This is the origin - # of the `:bad_return_from_init` error that wraps the root-cause error in the following example: - # - # 16:28:07.982 [error] :gen_statem #PID<0.282.0> terminating - # ** (stop) {:bad_return_from_init, {:stop, %Postgrex.Error{message: "ssl not available", postgres: nil, connection_id: nil, query: nil}, %Postgrex.ReplicationConnection{}}} - # (stdlib 6.0) gen_statem.erl:2748: :gen_statem.init_result/8 - # (stdlib 6.0) proc_lib.erl:329: :proc_lib.init_p_do_apply/3 - # Queue: [] - # Postponed: [] - # State: {:undefined, :undefined} - # Callback mode: :state_functions, state_enter: false - # - # You can reproduce the above failure by adding `?sslmode=prefer` or `?sslmode=require` - # to the `DATABASE_URL` configuration. - {:error, {:bad_return_from_init, {:stop, reason, _state}}} -> - {:error, reason} - - other -> - other - end - end - defp start_connection_pool(connection_opts, pool_opts) do # Disable the reconnection logic in DBConnection to force it to exit with the connection # error. @@ -220,8 +192,8 @@ defmodule Electric.ConnectionManager do %Postgrex.Error{message: message} when not is_nil(message) -> message - %Postgrex.Error{postgres: %{message: message, pg_code: code, routine: routine}} -> - message <> " (PG code: #{code}, PG routine: #{routine})" + %Postgrex.Error{postgres: %{message: message} = pg_error} -> + message <> pg_error_extra_info(pg_error) end Logger.warning("Database connection failed: #{message}") @@ -236,6 +208,22 @@ defmodule Electric.ConnectionManager do {:noreply, state} end + defp pg_error_extra_info(pg_error) do + extra_info_items = + [ + {"PG code:", Map.get(pg_error, :pg_code)}, + {"PG routine:", Map.get(pg_error, :routine)} + ] + |> Enum.reject(fn {_, val} -> is_nil(val) end) + |> Enum.map(fn {label, val} -> "#{label} #{val}" end) + + if extra_info_items != [] do + " (" <> Enum.join(extra_info_items, ", ") <> ")" + else + "" + end + end + defp schedule_reconnection(step, %State{backoff: {backoff, _}} = state) do {time, backoff} = :backoff.fail(backoff) tref = :erlang.start_timer(time, self(), step) diff --git a/packages/sync-service/mix.exs b/packages/sync-service/mix.exs index 60e1cb760b..0aab95100e 100644 --- a/packages/sync-service/mix.exs +++ b/packages/sync-service/mix.exs @@ -63,7 +63,7 @@ defmodule Electric.MixProject do {:nimble_options, "~> 1.1"}, {:pg_query_ex, github: "electric-sql/pg_query_ex"}, {:plug, "~> 1.16"}, - {:postgrex, "~> 0.18"}, + {:postgrex, "~> 0.19"}, {:telemetry_metrics_statsd, "~> 0.7"}, {:telemetry_poller, "~> 1.1"}, {:tz, "~> 0.27"} diff --git a/packages/sync-service/mix.lock b/packages/sync-service/mix.lock index 00224356ec..b9eb363d37 100644 --- a/packages/sync-service/mix.lock +++ b/packages/sync-service/mix.lock @@ -18,7 +18,7 @@ "pg_query_ex": {:git, "https://github.com/electric-sql/pg_query_ex.git", "fee6dc748deb80e3fc3c6f77d5fa77faef91336d", []}, "plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"}, "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, - "postgrex": {:hex, :postgrex, "0.18.0", "f34664101eaca11ff24481ed4c378492fed2ff416cd9b06c399e90f321867d7e", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "a042989ba1bc1cca7383ebb9e461398e3f89f868c92ce6671feb7ef132a252d1"}, + "postgrex": {:hex, :postgrex, "0.19.0", "f7d50e50cb42e0a185f5b9a6095125a9ab7e4abccfbe2ab820ab9aa92b71dbab", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "dba2d2a0a8637defbf2307e8629cb2526388ba7348f67d04ec77a5d6a72ecfae"}, "protox": {:hex, :protox, "1.7.3", "dff5488a648850c95cbd1cca5430be7ccedc99e4102aa934dbf60abfa30e64c1", [:mix], [{:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "b936c0654b68b306c4be853db23bb5623e2be89e11238908f2ff6da10fc0275f"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.2", "2caabe9344ec17eafe5403304771c3539f3b6e2f7fb6a6f602558c825d0d0bfb", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9b43db0dc33863930b9ef9d27137e78974756f5f198cae18409970ed6fa5b561"}, diff --git a/packages/sync-service/test/electric/postgres/replication_client_test.exs b/packages/sync-service/test/electric/postgres/replication_client_test.exs index 16ba374deb..1bfeff2d5f 100644 --- a/packages/sync-service/test/electric/postgres/replication_client_test.exs +++ b/packages/sync-service/test/electric/postgres/replication_client_test.exs @@ -392,8 +392,10 @@ defmodule Electric.Postgres.ReplicationClientTest do do: lsn_str |> Lsn.from_string() |> Lsn.to_integer() defp fetch_slot_info(conn) do - {:ok, result} = Postgrex.query(conn, "SELECT * FROM pg_replication_slots", []) - assert %Postgrex.Result{columns: cols, rows: [row], num_rows: 1} = result + %Postgrex.Result{columns: cols, rows: rows} = + Postgrex.query!(conn, "SELECT * FROM pg_replication_slots", []) + + [row] = Enum.filter(rows, fn [slot_name | _] -> slot_name == @slot_name end) Enum.zip(cols, row) |> Map.new() end