From 5f6d20236fc62371cc96e610221582474c9ffa66 Mon Sep 17 00:00:00 2001 From: Stefanos Mousafeiris Date: Tue, 1 Oct 2024 18:52:35 +0300 Subject: [PATCH] fix: Handle replication slot conflicts (#1762) Addresses https://github.com/electric-sql/electric/issues/1749 - Makes publication and slot names configurable via a `REPLICATION_STREAM_ID` env variable, which can ultimately be used for multiple electric deploys - Quotes all publication and slot names to address potential issues with configurable names (alternative is to force downcase them when initialised to avoid nasty case-sensitive bugs) - Waits for a message from `Electric.LockConnection` that the lock is acquired before initialising `ConnectionManager` with the replication stream and shapes. - If more than one Electric tries to connect to the same replication slot (with the same `REPLICATION_STREAM_ID`), it will make a blocking query to acquire the lock that will resolve once the previous Electric using that slot releases it - this addresses rolling deploys, and ensures resources are initialised only once the previous Electric has released them - Could potentially switch to `pg_try_advisory_lock` that is not a blocking query but immediately returns whether the lock could be acquired and implement retries with backoff, but since using `pg_advisory_lock` simplifies the implementation I decided to start with that and see what people think. Things that I still need to address: - Currently the publication gets altered when a shape is created (adds a table and potentially a row filter) but no cleanup occurs - so the publication can potentially grow to include everything between restarts and deploys even if it is not being used. - The way I want to address this is to change the `Electric.Postgres.Configuration` to alter the publication based on _all_ active shapes rather than based on each individual one, in that case every call will update the publication as necessary and resuming/cleaning can be a matter of calling this every time a shape is deleted and once upon starting (with recovered shapes or no shapes). Can be a separate PR. - Created https://github.com/electric-sql/electric/issues/1774 to address this separately --------- Co-authored-by: Oleksii Sholik --- .changeset/poor-candles-fly.md | 7 ++ integration-tests/tests/crash-recovery.lux | 65 +++++++++++ .../tests/invalidated-replication-slot.lux | 4 +- integration-tests/tests/macros.luxinc | 13 ++- .../tests/postgres-disconnection.lux | 2 +- integration-tests/tests/rolling-deploy.lux | 62 ++++++++++ packages/sync-service/config/runtime.exs | 2 + .../sync-service/lib/electric/application.ex | 16 ++- .../lib/electric/connection_manager.ex | 77 ++++++++++++- .../lib/electric/plug/health_check_plug.ex | 38 +++++++ .../sync-service/lib/electric/plug/router.ex | 2 + .../lib/electric/postgres/configuration.ex | 30 ++++- .../lib/electric/postgres/lock_connection.ex | 106 ++++++++++++++++++ .../replication_client/connection_setup.ex | 9 +- .../lib/electric/service_status.ex | 22 ++++ packages/sync-service/lib/electric/utils.ex | 13 +++ .../electric/plug/health_check_plug_test.exs | 76 +++++++++++++ .../test/electric/plug/router_test.exs | 23 ++++ .../postgres/lock_connection_test.exs | 83 ++++++++++++++ .../test/support/component_setup.ex | 1 + 20 files changed, 633 insertions(+), 18 deletions(-) create mode 100644 .changeset/poor-candles-fly.md create mode 100644 integration-tests/tests/crash-recovery.lux create mode 100644 integration-tests/tests/rolling-deploy.lux create mode 100644 packages/sync-service/lib/electric/plug/health_check_plug.ex create mode 100644 packages/sync-service/lib/electric/postgres/lock_connection.ex create mode 100644 packages/sync-service/lib/electric/service_status.ex create mode 100644 packages/sync-service/test/electric/plug/health_check_plug_test.exs create mode 100644 packages/sync-service/test/electric/postgres/lock_connection_test.exs diff --git a/.changeset/poor-candles-fly.md b/.changeset/poor-candles-fly.md new file mode 100644 index 0000000000..3c0c4c6466 --- /dev/null +++ b/.changeset/poor-candles-fly.md @@ -0,0 +1,7 @@ +--- +"@core/sync-service": patch +--- + +- Wait for advisory lock on replication slot to enable rolling deploys. +- Configurable replication slot and publication name using `REPLICATION_STREAM_ID` environment variable. +- Add `HealthCheckPlug` API endopint at `v1/health` that returns `waiting`, `starting`,and `active` statuses. \ No newline at end of file diff --git a/integration-tests/tests/crash-recovery.lux b/integration-tests/tests/crash-recovery.lux new file mode 100644 index 0000000000..36a223fdfc --- /dev/null +++ b/integration-tests/tests/crash-recovery.lux @@ -0,0 +1,65 @@ +[doc Verify handling of an Electric crash recovery] + +[include macros.luxinc] + +[global pg_container_name=crash-recovery__pg] + +### + +## Start a new Postgres cluster +[invoke setup_pg "" ""] + +## Add some data +[invoke start_psql] +[shell psql] + """! + CREATE TABLE items ( + id UUID PRIMARY KEY, + val TEXT + ); + """ + ??CREATE TABLE + + """! + INSERT INTO + items (id, val) + SELECT + gen_random_uuid(), + '#' || generate_series || ' test val' + FROM + generate_series(1, 10); + """ + ??INSERT 0 10 + +## Start the sync service. +[invoke setup_electric] + +[shell electric] + ??[info] Starting replication from postgres + +# Initialize a shape and collect the offset +[shell client] + # strip ANSI codes from response for easier matching + !curl -v -X GET http://localhost:3000/v1/shape/items?offset=-1 + ?electric-shape-id: ([\d-]+) + [local shape_id=$1] + ?electric-chunk-last-offset: ([\w\d_]+) + [local last_offset=$1] + +## Terminate electric +[shell electric] + !System.halt() + ??$PS1 + +## Start the sync service again. +[invoke setup_electric] +[shell electric] + ??[info] Starting replication from postgres + +# Client should be able to continue same shape +[shell client] + !curl -v -X GET "http://localhost:3000/v1/shape/items?offset=$last_offset&shape_id=$shape_id" + ??HTTP/1.1 200 OK + +[cleanup] + [invoke teardown] diff --git a/integration-tests/tests/invalidated-replication-slot.lux b/integration-tests/tests/invalidated-replication-slot.lux index 55d234307e..6091aa4ce4 100644 --- a/integration-tests/tests/invalidated-replication-slot.lux +++ b/integration-tests/tests/invalidated-replication-slot.lux @@ -7,7 +7,7 @@ [my invalidated_slot_error= """ [error] GenServer Electric.ConnectionManager terminating - ** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot" + ** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot_default" This slot has been invalidated because it exceeded the maximum reserved size. """] @@ -34,7 +34,7 @@ ## Confirm slot invalidation in Postgres. [shell pg] - ?invalidating slot "electric_slot" because its restart_lsn \d+/\d+ exceeds max_slot_wal_keep_size + ?invalidating slot "electric_slot_default" because its restart_lsn [\d\w]+/[\d\w]+ exceeds max_slot_wal_keep_size ## Observe the fatal connection error. [shell electric] diff --git a/integration-tests/tests/macros.luxinc b/integration-tests/tests/macros.luxinc index fadb0920e1..0fe6f5649c 100644 --- a/integration-tests/tests/macros.luxinc +++ b/integration-tests/tests/macros.luxinc @@ -41,6 +41,11 @@ ??database system is ready to accept connections [endmacro] +[macro start_psql] + [shell psql] + !docker exec -u postgres -it $pg_container_name psql +[endmacro] + [macro seed_pg] [shell psql] !docker exec -u postgres -it $pg_container_name psql @@ -68,10 +73,14 @@ [endmacro] [macro setup_electric] - [shell electric] + [invoke setup_electric_shell "electric" "3000"] +[endmacro] + +[macro setup_electric_shell shell_name port] + [shell $shell_name] -$fail_pattern - !DATABASE_URL=$database_url ../electric_dev.sh + !DATABASE_URL=$database_url PORT=$port ../electric_dev.sh [endmacro] [macro teardown] diff --git a/integration-tests/tests/postgres-disconnection.lux b/integration-tests/tests/postgres-disconnection.lux index 83a9fc5642..289ba4843b 100644 --- a/integration-tests/tests/postgres-disconnection.lux +++ b/integration-tests/tests/postgres-disconnection.lux @@ -22,7 +22,7 @@ ## Observe the connection error. [shell electric] - ??[warning] Database connection in replication mode failed + ??[warning] Database connection in lock_connection mode failed ??[warning] Reconnecting in ## Start the Postgres container back up. diff --git a/integration-tests/tests/rolling-deploy.lux b/integration-tests/tests/rolling-deploy.lux new file mode 100644 index 0000000000..f2d85e0b28 --- /dev/null +++ b/integration-tests/tests/rolling-deploy.lux @@ -0,0 +1,62 @@ +[doc Verify handling of an Electric rolling deploy] + +[include macros.luxinc] + +[global pg_container_name=rolling-deploy__pg] + +### + +## Start a new Postgres cluster +[invoke setup_pg "" ""] + +## Start the first sync service. +[invoke setup_electric_shell "electric_1" "3000"] + +[shell electric_1] + ??[info] Acquiring lock from postgres with name electric_slot_default + ??[info] Lock acquired from postgres with name electric_slot_default + ??[info] Starting replication from postgres + +# First service should be health and active +[shell orchestator] + !curl -X GET http://localhost:3000/v1/health + ??{"status":"active"} + +## Start the second sync service. +[invoke setup_electric_shell "electric_2" "3001"] + +## Assert that the lock is not acquired and replication does not start +## in the second electric +[shell electric_2] + -Lock acquired from postgres|Starting replication from postgres|$fail_pattern + ??[info] Acquiring lock from postgres with name electric_slot_default + [sleep 2] + + +# Second service should be in waiting state, ready to take over +[shell orchestator] + !curl -X GET http://localhost:3000/v1/health + ??{"status":"active"} + !curl -X GET http://localhost:3001/v1/health + ??{"status":"waiting"} + +## Terminate first electric +[shell electric_1] + !System.halt() + + # Confirm Electric process exit. + ??$PS1 + +## Lock should now be acquired and replication starting +[shell electric_2] + -$fail_pattern + ??[info] Lock acquired from postgres with name electric_slot_default + ??[info] Starting replication from postgres + +# Second service is now healthy and active +[shell orchestator] + !curl -X GET http://localhost:3001/v1/health + ??{"status":"active"} + +[cleanup] + [invoke teardown] diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 8301728dbf..1de129981d 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -161,6 +161,8 @@ config :electric, instance_id: instance_id, telemetry_statsd_host: statsd_host, db_pool_size: env!("DB_POOL_SIZE", :integer, 50), + replication_stream_id: env!("REPLICATION_STREAM_ID", :string, "default"), + service_port: env!("PORT", :integer, 3000), prometheus_port: prometheus_port, storage: storage, persistent_kv: persistent_kv diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index f21c7e1f4e..ee3d03958a 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -22,8 +22,9 @@ defmodule Electric.Application do persistent_kv = apply(kv_module, kv_fun, [kv_params]) - publication_name = "electric_publication" - slot_name = "electric_slot" + replication_stream_id = Application.fetch_env!(:electric, :replication_stream_id) + publication_name = "electric_publication_#{replication_stream_id}" + slot_name = "electric_slot_#{replication_stream_id}" with {:ok, storage_opts} <- storage_module.shared_opts(storage_opts) do storage = {storage_module, storage_opts} @@ -32,6 +33,14 @@ defmodule Electric.Application do Electric.ConnectionManager.get_pg_version(Electric.ConnectionManager) end + get_service_status = fn -> + Electric.ServiceStatus.check( + get_connection_status: fn -> + Electric.ConnectionManager.get_status(Electric.ConnectionManager) + end + ) + end + prepare_tables_fn = {Electric.Postgres.Configuration, :configure_tables_for_replication!, [get_pg_version, publication_name]} @@ -103,12 +112,13 @@ defmodule Electric.Application do storage: storage, registry: Registry.ShapeChanges, shape_cache: {Electric.ShapeCache, []}, + get_service_status: get_service_status, inspector: inspector, long_poll_timeout: 20_000, max_age: Application.fetch_env!(:electric, :cache_max_age), stale_age: Application.fetch_env!(:electric, :cache_stale_age), allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)}, - port: 3000, + port: Application.fetch_env!(:electric, :service_port), thousand_island_options: http_listener_options()} ] |> add_prometheus_router(Application.fetch_env!(:electric, :prometheus_port)) diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index ba5f3ad861..c0e625224f 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -43,10 +43,14 @@ defmodule Electric.ConnectionManager do :shape_cache, # PID of the replication client. :replication_client_pid, + # PID of the Postgres connection lock. + :lock_connection_pid, # PID of the database connection pool (a `Postgrex` process). :pool_pid, # Backoff term used for reconnection with exponential back-off. :backoff, + # Flag indicating whether the lock on the replication has been acquired. + :pg_lock_acquired, # PostgreSQL server version :pg_version, :electric_instance_id @@ -57,6 +61,8 @@ defmodule Electric.ConnectionManager do require Logger + @type status :: :waiting | :starting | :active + @type option :: {:connection_opts, Keyword.t()} | {:replication_opts, Keyword.t()} @@ -69,6 +75,8 @@ defmodule Electric.ConnectionManager do @name __MODULE__ + @lock_status_logging_interval 10_000 + @doc """ Returns the version of the PostgreSQL server. """ @@ -77,6 +85,14 @@ defmodule Electric.ConnectionManager do GenServer.call(server, :get_pg_version) end + @doc """ + Returns the status of the connection manager. + """ + @spec get_status(GenServer.server()) :: status() + def get_status(server) do + GenServer.call(server, :get_status) + end + @spec start_link(options) :: GenServer.on_start() def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: @name) @@ -112,14 +128,15 @@ defmodule Electric.ConnectionManager do timeline_opts: timeline_opts, log_collector: Keyword.fetch!(opts, :log_collector), shape_cache: Keyword.fetch!(opts, :shape_cache), + pg_lock_acquired: false, backoff: {:backoff.init(1000, 10_000), nil}, electric_instance_id: Keyword.fetch!(opts, :electric_instance_id) } - # We try to start the replication connection first because it requires additional - # priveleges compared to regular "pooled" connections, so failure to open a replication - # connection should be reported ASAP. - {:ok, state, {:continue, :start_replication_client}} + # Try to acquire the connection lock on the replication slot + # before starting shape and replication processes, to ensure + # a single active sync service is connected to Postgres per slot. + {:ok, state, {:continue, :start_lock_connection}} end @impl true @@ -127,6 +144,38 @@ defmodule Electric.ConnectionManager do {:reply, pg_version, state} end + def handle_call(:get_status, _from, %{pg_lock_acquired: pg_lock_acquired} = state) do + status = + cond do + not pg_lock_acquired -> + :waiting + + is_nil(state.replication_client_pid) || is_nil(state.pool_pid) || + not Process.alive?(state.pool_pid) -> + :starting + + true -> + :active + end + + {:reply, status, state} + end + + def handle_continue(:start_lock_connection, state) do + case Electric.Postgres.LockConnection.start_link( + connection_opts: state.connection_opts, + connection_manager: self(), + lock_name: Keyword.fetch!(state.replication_opts, :slot_name) + ) do + {:ok, lock_connection_pid} -> + Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval) + {:noreply, %{state | lock_connection_pid: lock_connection_pid}} + + {:error, reason} -> + handle_connection_error(reason, state, "lock_connection") + end + end + @impl true def handle_continue(:start_replication_client, state) do case start_replication_client(state) do @@ -188,6 +237,7 @@ defmodule Electric.ConnectionManager do tag = cond do + pid == state.lock_connection_pid -> :lock_connection pid == state.replication_client_pid -> :replication_connection pid == state.pool_pid -> :database_pool end @@ -203,6 +253,17 @@ defmodule Electric.ConnectionManager do {:noreply, %{state | replication_client_pid: nil}} end + # Periodically log the status of the lock connection until it is acquired for + # easier debugging and diagnostics. + def handle_info(:log_lock_connection_status, state) do + if not state.pg_lock_acquired do + Logger.warning(fn -> "Waiting for postgres lock to be acquired..." end) + Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval) + end + + {:noreply, state} + end + @impl true def handle_cast({:connection_opts, pid, connection_opts}, state) do Process.monitor(pid) @@ -220,6 +281,13 @@ defmodule Electric.ConnectionManager do end end + def handle_cast(:lock_connection_acquired, %{pg_lock_acquired: false} = state) do + # As soon as we acquire the connection lock, we try to start the replication connection + # first because it requires additional privileges compared to regular "pooled" connections, + # so failure to open a replication connection should be reported ASAP. + {:noreply, %{state | pg_lock_acquired: true}, {:continue, :start_replication_client}} + end + defp start_replication_client(state) do Electric.Shapes.Supervisor.start_link( electric_instance_id: state.electric_instance_id, @@ -273,6 +341,7 @@ defmodule Electric.ConnectionManager do step = cond do + is_nil(state.lock_connection_pid) -> :start_lock_connection is_nil(state.replication_client_pid) -> :start_replication_client is_nil(state.pool_pid) -> :start_connection_pool end diff --git a/packages/sync-service/lib/electric/plug/health_check_plug.ex b/packages/sync-service/lib/electric/plug/health_check_plug.ex new file mode 100644 index 0000000000..0834b69693 --- /dev/null +++ b/packages/sync-service/lib/electric/plug/health_check_plug.ex @@ -0,0 +1,38 @@ +defmodule Electric.Plug.HealthCheckPlug do + alias Plug.Conn + require Logger + use Plug.Builder + + plug :check_service_status + plug :put_relevant_headers + plug :send_response + + # Match service status to a status code and status message, + # keeping the message name decoupled from the internal representation + # of the status to ensure the API is stable + defp check_service_status(conn, _) do + get_service_status = Access.fetch!(conn.assigns.config, :get_service_status) + + {status_code, status_text} = + case get_service_status.() do + :waiting -> {200, "waiting"} + :starting -> {200, "starting"} + :active -> {200, "active"} + :stopping -> {503, "stopping"} + end + + conn |> assign(:status_text, status_text) |> assign(:status_code, status_code) + end + + defp put_relevant_headers(conn, _), + do: + conn + |> put_resp_header("content-type", "application/json") + |> put_resp_header("cache-control", "no-cache, no-store, must-revalidate") + + defp send_response( + %Conn{assigns: %{status_text: status_text, status_code: status_code}} = conn, + _ + ), + do: send_resp(conn, status_code, Jason.encode!(%{status: status_text})) +end diff --git a/packages/sync-service/lib/electric/plug/router.ex b/packages/sync-service/lib/electric/plug/router.ex index 3c53159301..49ea2b10d2 100644 --- a/packages/sync-service/lib/electric/plug/router.ex +++ b/packages/sync-service/lib/electric/plug/router.ex @@ -18,6 +18,8 @@ defmodule Electric.Plug.Router do delete "/v1/shape/:root_table", to: Electric.Plug.DeleteShapePlug match "/v1/shape/:root_table", via: :options, to: Electric.Plug.OptionsShapePlug + get "/v1/health", to: Electric.Plug.HealthCheckPlug + match _ do send_resp(conn, 404, "Not found") end diff --git a/packages/sync-service/lib/electric/postgres/configuration.ex b/packages/sync-service/lib/electric/postgres/configuration.ex index be506c0a8c..56ce721f38 100644 --- a/packages/sync-service/lib/electric/postgres/configuration.ex +++ b/packages/sync-service/lib/electric/postgres/configuration.ex @@ -44,13 +44,15 @@ defmodule Electric.Postgres.Configuration do Postgrex.transaction(pool, fn conn -> set_replica_identity!(conn, relations) - for {relation, _} <- relations, table = Utils.relation_to_sql(relation) do + for {relation, _} <- relations, + table = Utils.relation_to_sql(relation), + publication = Utils.quote_name(publication_name) do Postgrex.query!(conn, "SAVEPOINT before_publication", []) # PG 14 and below do not support filters on tables of publications case Postgrex.query( conn, - "ALTER PUBLICATION #{publication_name} ADD TABLE #{table}", + "ALTER PUBLICATION #{publication} ADD TABLE #{table}", [] ) do {:ok, _} -> @@ -121,6 +123,28 @@ defmodule Electric.Postgres.Configuration do |> Map.new() end + @doc """ + Drops all tables from the given publication. + """ + @spec drop_all_publication_tables(Postgrex.conn(), String.t()) :: Postgrex.Result.t() + def drop_all_publication_tables(conn, publication_name) do + Postgrex.query!( + conn, + " + DO $$ + DECLARE + r RECORD; + BEGIN + FOR r IN (SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = '#{publication_name}') + LOOP + EXECUTE 'ALTER PUBLICATION #{Utils.quote_name(publication_name)} DROP TABLE ' || r.schemaname || '.' || r.tablename || ';'; + END LOOP; + END $$; + ", + [] + ) + end + # Joins the existing filter for the table with the where clause for the table. # If one of them is `nil` (i.e. no filter) then the resulting filter is `nil`. @spec extend_where_clause(maybe_filter(), filter()) :: filter() @@ -139,7 +163,7 @@ defmodule Electric.Postgres.Configuration do # Makes an SQL query that alters the given publication whith the given tables and filters. @spec make_alter_publication_query(String.t(), filters()) :: String.t() defp make_alter_publication_query(publication_name, filters) do - base_sql = "ALTER PUBLICATION #{publication_name} SET TABLE " + base_sql = "ALTER PUBLICATION #{Utils.quote_name(publication_name)} SET TABLE " tables = filters diff --git a/packages/sync-service/lib/electric/postgres/lock_connection.ex b/packages/sync-service/lib/electric/postgres/lock_connection.ex new file mode 100644 index 0000000000..70831eb142 --- /dev/null +++ b/packages/sync-service/lib/electric/postgres/lock_connection.ex @@ -0,0 +1,106 @@ +defmodule Electric.Postgres.LockConnection do + @moduledoc """ + A Postgres connection that ensures an advisory lock is held for its entire duration, + useful for ensuring only a single sync service instance can be using a single + replication slot at any given time. + + The connection attempts to grab the lock and waits on it until it acquires it. + When it does, it fires off a :lock_connection_acquired message to the specified + `Electric.ConnectionManager` such that the required setup can acquired now that + the service is sure to be the only one operating on this replication stream. + """ + require Logger + @behaviour Postgrex.SimpleConnection + + @type option :: + {:connection_opts, Keyword.t()} + | {:connection_manager, GenServer.server()} + | {:lock_name, String.t()} + + @type options :: [option] + + defmodule State do + defstruct [ + :connection_manager, + :lock_acquired, + :lock_name, + :backoff + ] + end + + @spec start_link(options()) :: {:ok, pid()} | {:error, Postgrex.Error.t() | term()} + def start_link(opts) do + {connection_opts, init_opts} = Keyword.pop(opts, :connection_opts) + + case Postgrex.SimpleConnection.start_link( + __MODULE__, + init_opts, + connection_opts ++ [timeout: :infinity, auto_reconnect: false] + ) do + {:ok, pid} -> + send(pid, :acquire_lock) + {:ok, pid} + + {:error, error} -> + {:error, error} + end + end + + @impl true + def init(opts) do + {:ok, + %State{ + connection_manager: Keyword.fetch!(opts, :connection_manager), + lock_name: Keyword.fetch!(opts, :lock_name), + lock_acquired: false, + backoff: {:backoff.init(1000, 10_000), nil} + }} + end + + @impl true + def handle_info(:acquire_lock, state) do + if state.lock_acquired do + notify_lock_acquired(state) + {:noreply, state} + else + Logger.info("Acquiring lock from postgres with name #{state.lock_name}") + {:query, lock_query(state), state} + end + end + + def handle_info({:timeout, tref, msg}, %{backoff: {backoff, tref}} = state) do + handle_info(msg, %{state | backoff: {backoff, nil}}) + end + + @impl true + def handle_result([_] = _results, state) do + Logger.info("Lock acquired from postgres with name #{state.lock_name}") + notify_lock_acquired(state) + {:noreply, %{state | lock_acquired: true}} + end + + @impl true + def handle_result(%Postgrex.Error{} = error, %State{backoff: {backoff, _}} = state) do + {time, backoff} = :backoff.fail(backoff) + tref = :erlang.start_timer(time, self(), :acquire_lock) + + Logger.error( + "Failed to acquire lock #{state.lock_name} with reason #{inspect(error)} - retrying in #{inspect(time)}ms." + ) + + {:noreply, %{state | lock_acquired: false, backoff: {backoff, tref}}} + end + + defp notify_lock_acquired(%State{connection_manager: connection_manager} = _state) do + GenServer.cast(connection_manager, :lock_connection_acquired) + end + + defp lock_query(%State{lock_name: name} = _state) do + "SELECT pg_advisory_lock(hashtext('#{name}'))" + end + + @impl true + def notify(_channel, _payload, _state) do + :ok + end +end diff --git a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex index 3dfdcbc041..0423ce7b22 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex @@ -7,6 +7,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do connection into the logical streaming mode. This helps keep the main `ReplicationClient` module focused on the handling of logical messages. """ + alias Electric.Utils require Logger @@ -46,7 +47,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do defp create_publication_query(state) do # We're creating an "empty" publication because first snapshot creation should add the table - query = "CREATE PUBLICATION #{state.publication_name}" + query = "CREATE PUBLICATION #{Utils.quote_name(state.publication_name)}" {:query, query, state} end @@ -72,7 +73,9 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do ### defp create_slot_query(state) do - query = "CREATE_REPLICATION_SLOT #{state.slot_name} LOGICAL pgoutput NOEXPORT_SNAPSHOT" + query = + "CREATE_REPLICATION_SLOT #{Utils.quote_name(state.slot_name)} LOGICAL pgoutput NOEXPORT_SNAPSHOT" + {:query, query, state} end @@ -144,7 +147,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do # will be executed after this. defp start_replication_slot_query(state) do query = - "START_REPLICATION SLOT #{state.slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{state.publication_name}')" + "START_REPLICATION SLOT #{Utils.quote_name(state.slot_name)} LOGICAL 0/0 (proto_version '1', publication_names '#{state.publication_name}')" Logger.info("Starting replication from postgres") diff --git a/packages/sync-service/lib/electric/service_status.ex b/packages/sync-service/lib/electric/service_status.ex new file mode 100644 index 0000000000..8c141555c1 --- /dev/null +++ b/packages/sync-service/lib/electric/service_status.ex @@ -0,0 +1,22 @@ +defmodule Electric.ServiceStatus do + @type status() :: :waiting | :starting | :active | :stopping + + @type option :: + {:get_connection_status, (-> Electric.ConnectionManager.status())} + + @type options :: [option] + + @spec check(options()) :: status() + def check(opts) do + get_connection_status_fun = Keyword.fetch!(opts, :get_connection_status) + + # Match the connection status ot a service status - currently + # they are one and the same but keeping this decoupled for future + # additions to conditions that determine service status + case get_connection_status_fun.() do + :waiting -> :waiting + :starting -> :starting + :active -> :active + end + end +end diff --git a/packages/sync-service/lib/electric/utils.ex b/packages/sync-service/lib/electric/utils.ex index 7f3a5c02bb..8b7092bed5 100644 --- a/packages/sync-service/lib/electric/utils.ex +++ b/packages/sync-service/lib/electric/utils.ex @@ -199,6 +199,19 @@ defmodule Electric.Utils do def escape_quotes(text), do: :binary.replace(text, ~S|"|, ~S|""|, [:global]) + @doc """ + Quote a string for use in SQL queries. + + ## Examples + iex> quote_name("foo") + ~S|"foo"| + + iex> quote_name(~S|fo"o|) + ~S|"fo""o"| + """ + @spec quote_name(String.t()) :: String.t() + def quote_name(str), do: ~s|"#{escape_quotes(str)}"| + @doc """ Parses quoted names. diff --git a/packages/sync-service/test/electric/plug/health_check_plug_test.exs b/packages/sync-service/test/electric/plug/health_check_plug_test.exs new file mode 100644 index 0000000000..d30c257c87 --- /dev/null +++ b/packages/sync-service/test/electric/plug/health_check_plug_test.exs @@ -0,0 +1,76 @@ +defmodule Electric.Plug.HealthCheckPlugTest do + use ExUnit.Case, async: true + import Plug.Conn + alias Plug.Conn + + alias Electric.Plug.HealthCheckPlug + + @moduletag :capture_log + + @registry Registry.HealthCheckPlugTest + + setup do + start_link_supervised!({Registry, keys: :duplicate, name: @registry}) + :ok + end + + def conn(%{connection_status: connection_status} = _config) do + # Pass mock dependencies to the plug + config = %{ + get_service_status: fn -> connection_status end + } + + Plug.Test.conn("GET", "/") + |> assign(:config, config) + end + + describe "HealthCheckPlug" do + test "has appropriate content and cache headers" do + conn = + conn(%{connection_status: :waiting}) + |> HealthCheckPlug.call([]) + + assert Conn.get_resp_header(conn, "content-type") == ["application/json"] + + assert Conn.get_resp_header(conn, "cache-control") == [ + "no-cache, no-store, must-revalidate" + ] + end + + test "returns 200 when in waiting mode" do + conn = + conn(%{connection_status: :waiting}) + |> HealthCheckPlug.call([]) + + assert conn.status == 200 + assert Jason.decode!(conn.resp_body) == %{"status" => "waiting"} + end + + test "returns 200 when in starting mode" do + conn = + conn(%{connection_status: :starting}) + |> HealthCheckPlug.call([]) + + assert conn.status == 200 + assert Jason.decode!(conn.resp_body) == %{"status" => "starting"} + end + + test "returns 200 when in active mode" do + conn = + conn(%{connection_status: :active}) + |> HealthCheckPlug.call([]) + + assert conn.status == 200 + assert Jason.decode!(conn.resp_body) == %{"status" => "active"} + end + + test "returns 503 when stopping" do + conn = + conn(%{connection_status: :stopping}) + |> HealthCheckPlug.call([]) + + assert conn.status == 503 + assert Jason.decode!(conn.resp_body) == %{"status" => "stopping"} + end + end +end diff --git a/packages/sync-service/test/electric/plug/router_test.exs b/packages/sync-service/test/electric/plug/router_test.exs index 8c9348b63c..41049ae63c 100644 --- a/packages/sync-service/test/electric/plug/router_test.exs +++ b/packages/sync-service/test/electric/plug/router_test.exs @@ -33,6 +33,29 @@ defmodule Electric.Plug.RouterTest do end end + describe "/v1/health" do + setup [:with_unique_db] + + setup do + %{publication_name: "electric_test_publication", slot_name: "electric_test_slot"} + end + + setup :with_complete_stack + + setup(ctx, + do: %{opts: Router.init(build_router_opts(ctx, get_service_status: fn -> :active end))} + ) + + test "GET returns health status of service", %{opts: opts} do + conn = + conn("GET", "/v1/health") + |> Router.call(opts) + + assert %{status: 200} = conn + assert Jason.decode!(conn.resp_body) == %{"status" => "active"} + end + end + describe "/v1/shapes" do setup [:with_unique_db, :with_basic_tables, :with_sql_execute] diff --git a/packages/sync-service/test/electric/postgres/lock_connection_test.exs b/packages/sync-service/test/electric/postgres/lock_connection_test.exs new file mode 100644 index 0000000000..81677bb14a --- /dev/null +++ b/packages/sync-service/test/electric/postgres/lock_connection_test.exs @@ -0,0 +1,83 @@ +defmodule Electric.Postgres.LockConnectionTest do + use ExUnit.Case, async: true + import ExUnit.CaptureLog + import Support.DbSetup, except: [with_publication: 1] + + alias Electric.Postgres.LockConnection + + @moduletag :capture_log + @lock_name "test_electric_slot" + + describe "LockConnection init" do + setup [:with_unique_db] + + test "should acquire an advisory lock on startup", %{db_config: config, db_conn: conn} do + log = + capture_log(fn -> + assert {:ok, _pid} = + LockConnection.start_link( + connection_opts: config, + connection_manager: self(), + lock_name: @lock_name + ) + + assert_lock_acquired() + end) + + # should have logged lock acquisition process + assert log =~ "Acquiring lock from postgres with name #{@lock_name}" + assert log =~ "Lock acquired from postgres with name #{@lock_name}" + + # should have acquired an advisory lock on PG + assert %Postgrex.Result{rows: [[false]]} = + Postgrex.query!( + conn, + "SELECT pg_try_advisory_lock(hashtext('#{@lock_name}'))", + [] + ) + end + + test "should wait if lock is already acquired", %{db_config: config} do + # grab lock with one connection + {pid1, _} = + with_log(fn -> + assert {:ok, pid} = + LockConnection.start_link( + connection_opts: config, + connection_manager: self(), + lock_name: @lock_name + ) + + assert_lock_acquired() + pid + end) + + # try to grab the same with another + _ = + capture_log(fn -> + assert {:ok, pid} = + LockConnection.start_link( + connection_opts: config, + connection_manager: self(), + lock_name: @lock_name + ) + + # should fail to grab it + refute_lock_acquired() + + # should immediately grab it once previous lock is released + GenServer.stop(pid1) + assert_lock_acquired() + pid + end) + end + end + + defp assert_lock_acquired do + assert_receive {_, :lock_connection_acquired} + end + + defp refute_lock_acquired do + refute_receive {_, :lock_connection_acquired}, 1000 + end +end diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 3f482d14f3..e1f9b6cb62 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -162,6 +162,7 @@ defmodule Support.ComponentSetup do long_poll_timeout: Access.get(overrides, :long_poll_timeout, 5_000), max_age: Access.get(overrides, :max_age, 60), stale_age: Access.get(overrides, :stale_age, 300), + get_service_status: Access.get(overrides, :get_service_status, fn -> :active end), chunk_bytes_threshold: Access.get(overrides, :chunk_bytes_threshold, ctx.chunk_bytes_threshold), allow_shape_deletion: Access.get(overrides, :allow_shape_deletion, true)