diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index c95b0f6d42..65a93f2011 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -28,29 +28,34 @@ defmodule Electric.ConnectionManager do defmodule State do defstruct [ - # Database connection opts to be passed to Postgrex modules. + # Database connection opts to be passed to Postgrex modules :connection_opts, - # Replication options specific to `Electric.Postgres.ReplicationClient`. + # Replication options specific to `Electric.Postgres.ReplicationClient` :replication_opts, - # Database connection pool options. + # Database connection pool options :pool_opts, - # Options specific to `Electric.Timeline`. + # Options specific to `Electric.Timeline` :timeline_opts, - # PID of the replication client. + # PID of the replication client :replication_client_pid, - # PID of the Postgres connection lock. + # PID of the Postgres connection lock :lock_connection_pid, - # PID of the database connection pool (a `Postgrex` process). + # PID of the database connection pool :pool_pid, # PID of the shape log collector :shape_log_collector_pid, - # Backoff term used for reconnection with exponential back-off. + # Backoff term used for reconnection with exponential back-off :backoff, - # Flag indicating whether the lock on the replication has been acquired. + # Flag indicating whether the lock on the replication has been acquired :pg_lock_acquired, # PostgreSQL server version :pg_version, - :electric_instance_id + # Electric instance ID is used for connection process labeling + :electric_instance_id, + # PostgreSQL system identifier + :pg_system_identifier, + # PostgreSQL timeline ID + :pg_timeline_id ] end @@ -98,8 +103,8 @@ defmodule Electric.ConnectionManager do GenServer.cast(server, :exclusive_connection_lock_acquired) end - def pg_version_looked_up(server, pg_version) do - GenServer.cast(server, {:pg_version_looked_up, pg_version}) + def pg_info_looked_up(server, pg_info) do + GenServer.cast(server, {:pg_info_looked_up, pg_info}) end @impl true @@ -219,7 +224,10 @@ defmodule Electric.ConnectionManager do # Now that we have a ShapeCache process running under Shapes.Supervisor, we can run the # timeline check. - Electric.Timeline.check(get_pg_timeline(pool_pid), state.timeline_opts) + Electric.Timeline.check( + {state.pg_system_identifier, state.pg_timeline_id}, + state.timeline_opts + ) # Everything is ready to start accepting and processing logical messages from Postgres. Electric.Postgres.ReplicationClient.start_streaming(state.replication_client_pid) @@ -306,8 +314,14 @@ defmodule Electric.ConnectionManager do {:noreply, %{state | pg_lock_acquired: true}, {:continue, :start_replication_client}} end - def handle_cast({:pg_version_looked_up, pg_version}, state) do - {:noreply, %{state | pg_version: pg_version}} + def handle_cast({:pg_info_looked_up, {server_version, system_identifier, timeline_id}}, state) do + {:noreply, + %{ + state + | pg_version: server_version, + pg_system_identifier: system_identifier, + pg_timeline_id: timeline_id + }} end defp start_replication_client(electric_instance_id, connection_opts, replication_opts) do @@ -498,21 +512,6 @@ defmodule Electric.ConnectionManager do Keyword.put(connection_opts, :socket_options, tcp_opts) end - defp get_pg_timeline(conn) do - %Postgrex.Result{rows: [[system_identifier, timeline_id]]} = - Postgrex.query!( - conn, - """ - SELECT - (pg_control_system()).system_identifier, - (pg_control_checkpoint()).timeline_id - """, - [] - ) - - {system_identifier, timeline_id} - end - defp lookup_log_collector_pid(shapes_supervisor) do {Electric.Replication.ShapeLogCollector, log_collector_pid, :worker, _modules} = shapes_supervisor diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index 322daa86b3..f2dc0df41f 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -16,7 +16,7 @@ defmodule Electric.Postgres.ReplicationClient do @type step :: :disconnected | :connected - | :query_pg_version + | :query_pg_info | :create_publication | :create_slot | :set_display_setting diff --git a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex index f3a5a3b78b..a3611df296 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex @@ -45,17 +45,29 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do ### - defp pg_version_query(state) do - query = "SELECT current_setting('server_version_num') AS server_version_num" + defp pg_info_query(state) do + query = """ + SELECT + current_setting('server_version_num') server_version_num, + (pg_control_system()).system_identifier, + (pg_control_checkpoint()).timeline_id + """ + {:query, query, state} end - defp pg_version_result([%Postgrex.Result{rows: [[version_str]]}], state) do - Logger.info("Postgres server version reported as #{version_str}") + defp pg_info_result([%Postgrex.Result{} = result], state) do + %{rows: [[version_str, system_identifier, timeline_id]]} = result + + Logger.info( + "Postgres server version = #{version_str}, " <> + "system identifier = #{system_identifier}, " <> + "timeline_id = #{timeline_id}" + ) - Electric.ConnectionManager.pg_version_looked_up( + Electric.ConnectionManager.pg_info_looked_up( state.connection_manager, - String.to_integer(version_str) + {String.to_integer(version_str), system_identifier, timeline_id} ) state @@ -178,20 +190,29 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do # This is how we order the queries to be executed prior to switching into the logical streaming mode. @spec next_step(state) :: step - defp next_step(%{step: :connected}), do: :query_pg_version + defp next_step(%{step: :connected}), + do: :query_pg_info - defp next_step(%{step: :query_pg_version, try_creating_publication?: true}), + defp next_step(%{step: :query_pg_info, try_creating_publication?: true}), do: :create_publication - defp next_step(%{step: :query_pg_version}), do: :create_slot - defp next_step(%{step: :create_publication}), do: :create_slot - defp next_step(%{step: :create_slot}), do: :set_display_setting + defp next_step(%{step: :query_pg_info}), + do: :create_slot + + defp next_step(%{step: :create_publication}), + do: :create_slot + + defp next_step(%{step: :create_slot}), + do: :set_display_setting defp next_step(%{step: :set_display_setting, display_settings: queries}) when queries != [], do: :set_display_setting - defp next_step(%{step: :set_display_setting, start_streaming?: true}), do: :streaming - defp next_step(%{step: :set_display_setting}), do: :ready_to_stream + defp next_step(%{step: :set_display_setting, start_streaming?: true}), + do: :streaming + + defp next_step(%{step: :set_display_setting}), + do: :ready_to_stream ### @@ -200,7 +221,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do # this module. @spec query_for_step(step, state) :: callback_return - defp query_for_step(:query_pg_version, state), do: pg_version_query(state) + defp query_for_step(:query_pg_info, state), do: pg_info_query(state) defp query_for_step(:create_publication, state), do: create_publication_query(state) defp query_for_step(:create_slot, state), do: create_slot_query(state) defp query_for_step(:set_display_setting, state), do: set_display_setting_query(state) @@ -213,8 +234,8 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do # that query's step. This is again done to facilitate grouping functions for the same step. @spec dispatch_query_result(step, query_result, state) :: state | no_return - defp dispatch_query_result(:query_pg_version, result, state), - do: pg_version_result(result, state) + defp dispatch_query_result(:query_pg_info, result, state), + do: pg_info_result(result, state) defp dispatch_query_result(:create_publication, result, state), do: create_publication_result(result, state)