diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index e9438ee033..343121e957 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -242,12 +242,16 @@ defmodule Electric.Connection.Manager do @impl true def handle_continue(:start_lock_connection, %State{lock_connection_pid: nil} = state) do - case Electric.Postgres.LockConnection.start_link( - connection_opts: state.connection_opts, - connection_manager: self(), - lock_name: Keyword.fetch!(state.replication_opts, :slot_name) - ) do - {:ok, lock_connection_pid} -> + opts = [ + connection_opts: state.connection_opts, + connection_manager: self(), + lock_name: Keyword.fetch!(state.replication_opts, :slot_name) + ] + + case start_lock_connection(opts) do + {:ok, pid, connection_opts} -> + state = %{state | lock_connection_pid: pid, connection_opts: connection_opts} + Electric.StackSupervisor.dispatch_stack_event( state.stack_events_registry, state.stack_id, @@ -255,7 +259,7 @@ defmodule Electric.Connection.Manager do ) Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval) - {:noreply, %{state | lock_connection_pid: lock_connection_pid}} + {:noreply, state} {:error, reason} -> handle_connection_error(reason, state, "lock_connection") @@ -435,32 +439,27 @@ defmodule Electric.Connection.Manager do }} end - defp start_replication_client(opts) do - case Electric.Postgres.ReplicationClient.start_link(opts) do + defp start_lock_connection(opts) do + case Electric.Postgres.LockConnection.start_link(opts) do {:ok, pid} -> - {:ok, pid, Keyword.fetch!(opts, :connection_opts)} - - {:error, %Postgrex.Error{message: "ssl not available"}} = error -> - sslmode = get_in(opts, [:connection_opts, :sslmode]) + {:ok, pid, opts[:connection_opts]} - if sslmode == :require do - error - else - if not is_nil(sslmode) do - # Only log a warning when there's an explicit sslmode parameter in the database - # config, meaning the user has requested a certain sslmode. - Logger.warning( - "Failed to connect to the database using SSL. Trying again, using an unencrypted connection." - ) - end - - opts - |> Keyword.update!(:connection_opts, &Keyword.put(&1, :ssl, false)) - |> start_replication_client() + error -> + with {:ok, opts} <- maybe_fallback_to_no_ssl(error, opts) do + start_lock_connection(opts) end + end + end + + defp start_replication_client(opts) do + case Electric.Postgres.ReplicationClient.start_link(opts) do + {:ok, pid} -> + {:ok, pid, opts[:connection_opts]} error -> - error + with {:ok, opts} <- maybe_fallback_to_no_ssl(error, opts) do + start_replication_client(opts) + end end end @@ -478,6 +477,30 @@ defmodule Electric.Connection.Manager do ) end + defp maybe_fallback_to_no_ssl( + {:error, %Postgrex.Error{message: "ssl not available"}} = error, + opts + ) do + sslmode = get_in(opts, [:connection_opts, :sslmode]) + + if sslmode == :require do + error + else + if not is_nil(sslmode) do + # Only log a warning when there's an explicit sslmode parameter in the database + # config, meaning the user has requested a certain sslmode. + Logger.warning( + "Failed to connect to the database using SSL. Trying again, using an unencrypted connection." + ) + end + + opts = Keyword.update!(opts, :connection_opts, &Keyword.put(&1, :ssl, false)) + {:ok, opts} + end + end + + defp maybe_fallback_to_no_ssl(error, _opts), do: error + defp handle_connection_error( {:shutdown, {:failed_to_start_child, Electric.Postgres.ReplicationClient, error}}, state,