Skip to content

Commit

Permalink
Move the querying for PG timeline to ReplicationClient's connection s…
Browse files Browse the repository at this point in the history
…etup phase

We already have a state machine defined for the replication connection
and there's no need to check out the whole connection from the DB pool
(twice) just to look up some static PG info.
  • Loading branch information
alco committed Oct 16, 2024
1 parent 00d9134 commit c14c6fe
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 47 deletions.
59 changes: 29 additions & 30 deletions packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,34 @@ defmodule Electric.ConnectionManager do

defmodule State do
defstruct [
# Database connection opts to be passed to Postgrex modules.
# Database connection opts to be passed to Postgrex modules
:connection_opts,
# Replication options specific to `Electric.Postgres.ReplicationClient`.
# Replication options specific to `Electric.Postgres.ReplicationClient`
:replication_opts,
# Database connection pool options.
# Database connection pool options
:pool_opts,
# Options specific to `Electric.Timeline`.
# Options specific to `Electric.Timeline`
:timeline_opts,
# PID of the replication client.
# PID of the replication client
:replication_client_pid,
# PID of the Postgres connection lock.
# PID of the Postgres connection lock
:lock_connection_pid,
# PID of the database connection pool (a `Postgrex` process).
# PID of the database connection pool
:pool_pid,
# PID of the shape log collector
:shape_log_collector_pid,
# Backoff term used for reconnection with exponential back-off.
# Backoff term used for reconnection with exponential back-off
:backoff,
# Flag indicating whether the lock on the replication has been acquired.
# Flag indicating whether the lock on the replication has been acquired
:pg_lock_acquired,
# PostgreSQL server version
:pg_version,
:electric_instance_id
# Electric instance ID is used for connection process labeling
:electric_instance_id,
# PostgreSQL system identifier
:pg_system_identifier,
# PostgreSQL timeline ID
:pg_timeline_id
]
end

Expand Down Expand Up @@ -98,8 +103,8 @@ defmodule Electric.ConnectionManager do
GenServer.cast(server, :exclusive_connection_lock_acquired)
end

def pg_version_looked_up(server, pg_version) do
GenServer.cast(server, {:pg_version_looked_up, pg_version})
def pg_info_looked_up(server, pg_info) do
GenServer.cast(server, {:pg_info_looked_up, pg_info})
end

@impl true
Expand Down Expand Up @@ -219,7 +224,10 @@ defmodule Electric.ConnectionManager do

# Now that we have a ShapeCache process running under Shapes.Supervisor, we can run the
# timeline check.
Electric.Timeline.check(get_pg_timeline(pool_pid), state.timeline_opts)
Electric.Timeline.check(
{state.pg_system_identifier, state.pg_timeline_id},
state.timeline_opts
)

# Everything is ready to start accepting and processing logical messages from Postgres.
Electric.Postgres.ReplicationClient.start_streaming(state.replication_client_pid)
Expand Down Expand Up @@ -306,8 +314,14 @@ defmodule Electric.ConnectionManager do
{:noreply, %{state | pg_lock_acquired: true}, {:continue, :start_replication_client}}
end

def handle_cast({:pg_version_looked_up, pg_version}, state) do
{:noreply, %{state | pg_version: pg_version}}
def handle_cast({:pg_info_looked_up, {server_version, system_identifier, timeline_id}}, state) do
{:noreply,
%{
state
| pg_version: server_version,
pg_system_identifier: system_identifier,
pg_timeline_id: timeline_id
}}
end

defp start_replication_client(electric_instance_id, connection_opts, replication_opts) do
Expand Down Expand Up @@ -498,21 +512,6 @@ defmodule Electric.ConnectionManager do
Keyword.put(connection_opts, :socket_options, tcp_opts)
end

defp get_pg_timeline(conn) do
%Postgrex.Result{rows: [[system_identifier, timeline_id]]} =
Postgrex.query!(
conn,
"""
SELECT
(pg_control_system()).system_identifier,
(pg_control_checkpoint()).timeline_id
""",
[]
)

{system_identifier, timeline_id}
end

defp lookup_log_collector_pid(shapes_supervisor) do
{Electric.Replication.ShapeLogCollector, log_collector_pid, :worker, _modules} =
shapes_supervisor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Electric.Postgres.ReplicationClient do
@type step ::
:disconnected
| :connected
| :query_pg_version
| :query_pg_info
| :create_publication
| :create_slot
| :set_display_setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,29 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do

###

defp pg_version_query(state) do
query = "SELECT current_setting('server_version_num') AS server_version_num"
defp pg_info_query(state) do
query = """
SELECT
current_setting('server_version_num') server_version_num,
(pg_control_system()).system_identifier,
(pg_control_checkpoint()).timeline_id
"""

{:query, query, state}
end

defp pg_version_result([%Postgrex.Result{rows: [[version_str]]}], state) do
Logger.info("Postgres server version reported as #{version_str}")
defp pg_info_result([%Postgrex.Result{} = result], state) do
%{rows: [[version_str, system_identifier, timeline_id]]} = result

Logger.info(
"Postgres server version = #{version_str}, " <>
"system identifier = #{system_identifier}, " <>
"timeline_id = #{timeline_id}"
)

Electric.ConnectionManager.pg_version_looked_up(
Electric.ConnectionManager.pg_info_looked_up(
state.connection_manager,
String.to_integer(version_str)
{String.to_integer(version_str), system_identifier, timeline_id}
)

state
Expand Down Expand Up @@ -178,20 +190,29 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
# This is how we order the queries to be executed prior to switching into the logical streaming mode.
@spec next_step(state) :: step

defp next_step(%{step: :connected}), do: :query_pg_version
defp next_step(%{step: :connected}),
do: :query_pg_info

defp next_step(%{step: :query_pg_version, try_creating_publication?: true}),
defp next_step(%{step: :query_pg_info, try_creating_publication?: true}),
do: :create_publication

defp next_step(%{step: :query_pg_version}), do: :create_slot
defp next_step(%{step: :create_publication}), do: :create_slot
defp next_step(%{step: :create_slot}), do: :set_display_setting
defp next_step(%{step: :query_pg_info}),
do: :create_slot

defp next_step(%{step: :create_publication}),
do: :create_slot

defp next_step(%{step: :create_slot}),
do: :set_display_setting

defp next_step(%{step: :set_display_setting, display_settings: queries}) when queries != [],
do: :set_display_setting

defp next_step(%{step: :set_display_setting, start_streaming?: true}), do: :streaming
defp next_step(%{step: :set_display_setting}), do: :ready_to_stream
defp next_step(%{step: :set_display_setting, start_streaming?: true}),
do: :streaming

defp next_step(%{step: :set_display_setting}),
do: :ready_to_stream

###

Expand All @@ -200,7 +221,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
# this module.
@spec query_for_step(step, state) :: callback_return

defp query_for_step(:query_pg_version, state), do: pg_version_query(state)
defp query_for_step(:query_pg_info, state), do: pg_info_query(state)
defp query_for_step(:create_publication, state), do: create_publication_query(state)
defp query_for_step(:create_slot, state), do: create_slot_query(state)
defp query_for_step(:set_display_setting, state), do: set_display_setting_query(state)
Expand All @@ -213,8 +234,8 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
# that query's step. This is again done to facilitate grouping functions for the same step.
@spec dispatch_query_result(step, query_result, state) :: state | no_return

defp dispatch_query_result(:query_pg_version, result, state),
do: pg_version_result(result, state)
defp dispatch_query_result(:query_pg_info, result, state),
do: pg_info_result(result, state)

defp dispatch_query_result(:create_publication, result, state),
do: create_publication_result(result, state)
Expand Down

0 comments on commit c14c6fe

Please sign in to comment.