From 5453e3735ae8e25d78316fdbb3848974fcee5d0c Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 26 Sep 2024 15:26:13 +0300 Subject: [PATCH 01/33] Use advisory lock to handle rolling deploys --- .../electric/postgres/replication_client.ex | 1 + .../replication_client/connection_setup.ex | 29 ++++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index 2ad8ca6c1e..b869b9a443 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -18,6 +18,7 @@ defmodule Electric.Postgres.ReplicationClient do | :connected | :create_publication | :create_slot + | :waiting_for_lock | :set_display_setting | :ready_to_stream | :streaming diff --git a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex index 3dfdcbc041..7160285a56 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex @@ -114,6 +114,28 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do ### + # Start a long query that will block until the lock becomes available. + # NOTE: alternatively use pg_try_advisory_lock and retry with exponential backoff + defp waiting_for_lock_query(state) do + query = + "SELECT pg_advisory_lock(datoid::bigint) FROM pg_replication_slots WHERE slot_name = '#{state.slot_name}'" + + {:query, query, state} + end + + # Sucessfully acquired the lock for the replication slot. + defp waiting_for_lock_result([%Postgrex.Result{} = _result], state) do + Logger.debug("Acquired advisory lock on replication slot") + state + end + + defp waiting_for_lock_result(%Postgrex.Error{} = error, _state) do + # Unexpected error, fail loudly. + raise error + end + + ### + defp set_display_setting_query(%{display_settings: [query | rest]} = state) do {:query, query, %{state | display_settings: rest}} end @@ -162,7 +184,8 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do defp next_step(%{step: :connected, try_creating_publication?: true}), do: :create_publication defp next_step(%{step: :connected}), do: :create_slot defp next_step(%{step: :create_publication}), do: :create_slot - defp next_step(%{step: :create_slot}), do: :set_display_setting + defp next_step(%{step: :create_slot}), do: :waiting_for_lock + defp next_step(%{step: :waiting_for_lock}), do: :set_display_setting defp next_step(%{step: :set_display_setting, display_settings: queries}) when queries != [], do: :set_display_setting @@ -179,6 +202,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do defp query_for_step(:create_publication, state), do: create_publication_query(state) defp query_for_step(:create_slot, state), do: create_slot_query(state) + defp query_for_step(:waiting_for_lock, state), do: waiting_for_lock_query(state) defp query_for_step(:set_display_setting, state), do: set_display_setting_query(state) defp query_for_step(:ready_to_stream, state), do: ready_to_stream(state) defp query_for_step(:streaming, state), do: start_replication_slot_query(state) @@ -195,6 +219,9 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do defp dispatch_query_result(:create_slot, result, state), do: create_slot_result(result, state) + defp dispatch_query_result(:waiting_for_lock, result, state), + do: waiting_for_lock_result(result, state) + defp dispatch_query_result(:set_display_setting, result, state), do: set_display_setting_result(result, state) end From a6dfec447047edff85406b1305958e91e0c71d3b Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 26 Sep 2024 15:29:07 +0300 Subject: [PATCH 02/33] Configurable replication stream ID --- packages/sync-service/config/runtime.exs | 1 + packages/sync-service/lib/electric/application.ex | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 8301728dbf..771ab6d1f8 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -161,6 +161,7 @@ config :electric, instance_id: instance_id, telemetry_statsd_host: statsd_host, db_pool_size: env!("DB_POOL_SIZE", :integer, 50), + replication_stream_id: env!("REPLICATION_STREAM_ID", :string, "default"), prometheus_port: prometheus_port, storage: storage, persistent_kv: persistent_kv diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index f21c7e1f4e..711e3a9817 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -22,8 +22,9 @@ defmodule Electric.Application do persistent_kv = apply(kv_module, kv_fun, [kv_params]) - publication_name = "electric_publication" - slot_name = "electric_slot" + replication_stream_id = Application.fetch_env!(:electric, :replication_stream_id) + publication_name = "electric_publication_#{replication_stream_id}" + slot_name = "electric_slot_#{replication_stream_id}" with {:ok, storage_opts} <- storage_module.shared_opts(storage_opts) do storage = {storage_module, storage_opts} From 7cf3f38deefa914db134f89c7f6b768a3076aa46 Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 26 Sep 2024 16:15:29 +0300 Subject: [PATCH 03/33] Quote all publication and replication slot names --- .../lib/electric/postgres/configuration.ex | 8 +++++--- .../replication_client/connection_setup.ex | 15 ++++++++++----- packages/sync-service/lib/electric/utils.ex | 16 ++++++++++++++++ 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/packages/sync-service/lib/electric/postgres/configuration.ex b/packages/sync-service/lib/electric/postgres/configuration.ex index be506c0a8c..970106882f 100644 --- a/packages/sync-service/lib/electric/postgres/configuration.ex +++ b/packages/sync-service/lib/electric/postgres/configuration.ex @@ -44,13 +44,15 @@ defmodule Electric.Postgres.Configuration do Postgrex.transaction(pool, fn conn -> set_replica_identity!(conn, relations) - for {relation, _} <- relations, table = Utils.relation_to_sql(relation) do + for {relation, _} <- relations, + table = Utils.relation_to_sql(relation), + publication = Utils.quote_name(publication_name) do Postgrex.query!(conn, "SAVEPOINT before_publication", []) # PG 14 and below do not support filters on tables of publications case Postgrex.query( conn, - "ALTER PUBLICATION #{publication_name} ADD TABLE #{table}", + "ALTER PUBLICATION #{publication} ADD TABLE #{table}", [] ) do {:ok, _} -> @@ -139,7 +141,7 @@ defmodule Electric.Postgres.Configuration do # Makes an SQL query that alters the given publication whith the given tables and filters. @spec make_alter_publication_query(String.t(), filters()) :: String.t() defp make_alter_publication_query(publication_name, filters) do - base_sql = "ALTER PUBLICATION #{publication_name} SET TABLE " + base_sql = "ALTER PUBLICATION #{Utils.quote_name(publication_name)} SET TABLE " tables = filters diff --git a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex index 7160285a56..264cbfc89c 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex @@ -7,6 +7,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do connection into the logical streaming mode. This helps keep the main `ReplicationClient` module focused on the handling of logical messages. """ + alias Electric.Utils require Logger @@ -46,7 +47,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do defp create_publication_query(state) do # We're creating an "empty" publication because first snapshot creation should add the table - query = "CREATE PUBLICATION #{state.publication_name}" + query = "CREATE PUBLICATION #{Utils.quote_name(state.publication_name)}" {:query, query, state} end @@ -72,7 +73,9 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do ### defp create_slot_query(state) do - query = "CREATE_REPLICATION_SLOT #{state.slot_name} LOGICAL pgoutput NOEXPORT_SNAPSHOT" + query = + "CREATE_REPLICATION_SLOT #{Utils.quote_name(state.slot_name)} LOGICAL pgoutput NOEXPORT_SNAPSHOT" + {:query, query, state} end @@ -114,11 +117,13 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do ### - # Start a long query that will block until the lock becomes available. + # Start a long query that will block until the lock becomes available, based + # on the OID of the replication slot - such that if the slot is dropped and recreated + # this lock will no longer sto # NOTE: alternatively use pg_try_advisory_lock and retry with exponential backoff defp waiting_for_lock_query(state) do query = - "SELECT pg_advisory_lock(datoid::bigint) FROM pg_replication_slots WHERE slot_name = '#{state.slot_name}'" + "SELECT pg_advisory_lock(datoid::bigint) FROM pg_replication_slots WHERE slot_name = '#{Utils.quote_name(state.slot_name)}'" {:query, query, state} end @@ -166,7 +171,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do # will be executed after this. defp start_replication_slot_query(state) do query = - "START_REPLICATION SLOT #{state.slot_name} LOGICAL 0/0 (proto_version '1', publication_names '#{state.publication_name}')" + "START_REPLICATION SLOT #{Utils.quote_name(state.slot_name)} LOGICAL 0/0 (proto_version '1', publication_names '#{state.publication_name}')" Logger.info("Starting replication from postgres") diff --git a/packages/sync-service/lib/electric/utils.ex b/packages/sync-service/lib/electric/utils.ex index 7f3a5c02bb..fc64ed8e0a 100644 --- a/packages/sync-service/lib/electric/utils.ex +++ b/packages/sync-service/lib/electric/utils.ex @@ -199,6 +199,22 @@ defmodule Electric.Utils do def escape_quotes(text), do: :binary.replace(text, ~S|"|, ~S|""|, [:global]) + @doc """ + Quote a string for use in SQL queries. + + ## Examples + iex> quote_name("foo") + ~S|"foo"| + + iex> quote_name(~S|"foo"|) + ~S|"foo"| + + iex> quote_name(~S|"fo""o"|) + ~S|"fo""o"| + """ + @spec quote_name(String.t()) :: String.t() + def quote_name(str), do: ~s|"#{escape_quotes(str)}"| + @doc """ Parses quoted names. From 3b0dba85f53dd170de02e254bd4a5c112e1e0188 Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 26 Sep 2024 16:15:44 +0300 Subject: [PATCH 04/33] Implement utility for resetting the publication --- .../lib/electric/postgres/configuration.ex | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/packages/sync-service/lib/electric/postgres/configuration.ex b/packages/sync-service/lib/electric/postgres/configuration.ex index 970106882f..56ce721f38 100644 --- a/packages/sync-service/lib/electric/postgres/configuration.ex +++ b/packages/sync-service/lib/electric/postgres/configuration.ex @@ -123,6 +123,28 @@ defmodule Electric.Postgres.Configuration do |> Map.new() end + @doc """ + Drops all tables from the given publication. + """ + @spec drop_all_publication_tables(Postgrex.conn(), String.t()) :: Postgrex.Result.t() + def drop_all_publication_tables(conn, publication_name) do + Postgrex.query!( + conn, + " + DO $$ + DECLARE + r RECORD; + BEGIN + FOR r IN (SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = '#{publication_name}') + LOOP + EXECUTE 'ALTER PUBLICATION #{Utils.quote_name(publication_name)} DROP TABLE ' || r.schemaname || '.' || r.tablename || ';'; + END LOOP; + END $$; + ", + [] + ) + end + # Joins the existing filter for the table with the where clause for the table. # If one of them is `nil` (i.e. no filter) then the resulting filter is `nil`. @spec extend_where_clause(maybe_filter(), filter()) :: filter() From a7ef1b58bebf75a0c82a7b5786d6c29c15b0982e Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 26 Sep 2024 16:52:07 +0300 Subject: [PATCH 05/33] Fix utils quoting test --- packages/sync-service/lib/electric/utils.ex | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/sync-service/lib/electric/utils.ex b/packages/sync-service/lib/electric/utils.ex index fc64ed8e0a..8b7092bed5 100644 --- a/packages/sync-service/lib/electric/utils.ex +++ b/packages/sync-service/lib/electric/utils.ex @@ -206,10 +206,7 @@ defmodule Electric.Utils do iex> quote_name("foo") ~S|"foo"| - iex> quote_name(~S|"foo"|) - ~S|"foo"| - - iex> quote_name(~S|"fo""o"|) + iex> quote_name(~S|fo"o|) ~S|"fo""o"| """ @spec quote_name(String.t()) :: String.t() From 09db776efe8af52e78b4a7f40e44e9abe35a461d Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 26 Sep 2024 17:05:11 +0300 Subject: [PATCH 06/33] Use hash of slot name for lock --- .../postgres/replication_client/connection_setup.ex | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex index 264cbfc89c..b59479137a 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex @@ -118,13 +118,10 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do ### # Start a long query that will block until the lock becomes available, based - # on the OID of the replication slot - such that if the slot is dropped and recreated - # this lock will no longer sto + # on a hash of the slot name to ensure lock is held even if slot is recreated. # NOTE: alternatively use pg_try_advisory_lock and retry with exponential backoff defp waiting_for_lock_query(state) do - query = - "SELECT pg_advisory_lock(datoid::bigint) FROM pg_replication_slots WHERE slot_name = '#{Utils.quote_name(state.slot_name)}'" - + query = "SELECT pg_advisory_lock(hashtext('#{state.slot_name}'))" {:query, query, state} end From 20285c4300cacc4e94a0501ec8c7057385c23283 Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 26 Sep 2024 17:19:45 +0300 Subject: [PATCH 07/33] Fix integration test --- integration-tests/tests/invalidated-replication-slot.lux | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/tests/invalidated-replication-slot.lux b/integration-tests/tests/invalidated-replication-slot.lux index 55d234307e..6091aa4ce4 100644 --- a/integration-tests/tests/invalidated-replication-slot.lux +++ b/integration-tests/tests/invalidated-replication-slot.lux @@ -7,7 +7,7 @@ [my invalidated_slot_error= """ [error] GenServer Electric.ConnectionManager terminating - ** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot" + ** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot_default" This slot has been invalidated because it exceeded the maximum reserved size. """] @@ -34,7 +34,7 @@ ## Confirm slot invalidation in Postgres. [shell pg] - ?invalidating slot "electric_slot" because its restart_lsn \d+/\d+ exceeds max_slot_wal_keep_size + ?invalidating slot "electric_slot_default" because its restart_lsn [\d\w]+/[\d\w]+ exceeds max_slot_wal_keep_size ## Observe the fatal connection error. [shell electric] From 51291ae6e013cc6fa33e17b66019960e88d1fd0d Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 26 Sep 2024 17:22:05 +0300 Subject: [PATCH 08/33] Add changeset --- .changeset/poor-candles-fly.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/poor-candles-fly.md diff --git a/.changeset/poor-candles-fly.md b/.changeset/poor-candles-fly.md new file mode 100644 index 0000000000..7f9119ffcd --- /dev/null +++ b/.changeset/poor-candles-fly.md @@ -0,0 +1,6 @@ +--- +"@core/sync-service": patch +--- + +- Wait for advisory lock on replication slot to enable rolling deploys. +- Configurable replication slot and publication name using `REPLICATION_STREAM_ID` environment variable. From 0acac52b84772cdcc7267fb99616419e4ea31903 Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 26 Sep 2024 20:49:34 +0300 Subject: [PATCH 09/33] Basic replication status work --- .../sync-service/lib/electric/application.ex | 9 ++++ .../lib/electric/connection_manager.ex | 21 +++++++++ .../lib/electric/plug/health_check_plug.ex | 25 +++++++++++ .../sync-service/lib/electric/plug/router.ex | 2 + .../electric/postgres/replication_client.ex | 44 ++++++++++++++++++- .../lib/electric/service_status.ex | 17 +++++++ 6 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 packages/sync-service/lib/electric/plug/health_check_plug.ex create mode 100644 packages/sync-service/lib/electric/service_status.ex diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 711e3a9817..cae027daee 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -33,6 +33,14 @@ defmodule Electric.Application do Electric.ConnectionManager.get_pg_version(Electric.ConnectionManager) end + get_service_status = fn -> + Electric.ServiceStatus.check(%{ + get_replication_status: fn -> + Electric.ConnectionManager.get_replication_status(Electric.ConnectionManager) + end + }) + end + prepare_tables_fn = {Electric.Postgres.Configuration, :configure_tables_for_replication!, [get_pg_version, publication_name]} @@ -104,6 +112,7 @@ defmodule Electric.Application do storage: storage, registry: Registry.ShapeChanges, shape_cache: {Electric.ShapeCache, []}, + get_service_status: get_service_status, inspector: inspector, long_poll_timeout: 20_000, max_age: Application.fetch_env!(:electric, :cache_max_age), diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index ba5f3ad861..d2b86f7915 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -77,6 +77,14 @@ defmodule Electric.ConnectionManager do GenServer.call(server, :get_pg_version) end + @doc """ + Returns the version of the PostgreSQL server. + """ + @spec get_replication_status(GenServer.server()) :: Electric.Postgres.ReplicationClient.status() + def get_replication_status(server) do + GenServer.call(server, :get_replication_status) + end + @spec start_link(options) :: GenServer.on_start() def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: @name) @@ -127,6 +135,19 @@ defmodule Electric.ConnectionManager do {:reply, pg_version, state} end + def handle_call( + :get_replication_status, + _from, + %{replication_client_pid: replication_client_pid} = state + ) do + status = + if is_nil(replication_client_pid), + do: :starting, + else: Electric.Postgres.ReplicationClient.get_status(replication_client_pid) + + {:reply, status, state} + end + @impl true def handle_continue(:start_replication_client, state) do case start_replication_client(state) do diff --git a/packages/sync-service/lib/electric/plug/health_check_plug.ex b/packages/sync-service/lib/electric/plug/health_check_plug.ex new file mode 100644 index 0000000000..c452f1e2af --- /dev/null +++ b/packages/sync-service/lib/electric/plug/health_check_plug.ex @@ -0,0 +1,25 @@ +defmodule Electric.Plug.HealthCheckPlug do + alias Plug.Conn + require Logger + use Plug.Builder + + plug :check_service_status + plug :send_response + + defp check_service_status(conn, _) do + get_service_status = Access.fetch!(conn.assigns.config, :get_service_status) + + status_text = + case get_service_status.() do + :starting -> "starting" + :ready -> "ready" + :active -> "active" + :stopping -> "stopping" + end + + conn |> assign(:status_text, status_text) + end + + defp send_response(%Conn{assigns: %{status_text: status_text}} = conn, _), + do: send_resp(conn, 200, Jason.encode!(%{status: status_text})) +end diff --git a/packages/sync-service/lib/electric/plug/router.ex b/packages/sync-service/lib/electric/plug/router.ex index 3c53159301..49ea2b10d2 100644 --- a/packages/sync-service/lib/electric/plug/router.ex +++ b/packages/sync-service/lib/electric/plug/router.ex @@ -18,6 +18,8 @@ defmodule Electric.Plug.Router do delete "/v1/shape/:root_table", to: Electric.Plug.DeleteShapePlug match "/v1/shape/:root_table", via: :options, to: Electric.Plug.OptionsShapePlug + get "/v1/health", to: Electric.Plug.HealthCheckPlug + match _ do send_resp(conn, 404, "Not found") end diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index b869b9a443..bae7edef66 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -13,6 +13,8 @@ defmodule Electric.Postgres.ReplicationClient do require Logger + @type status :: :starting | :waiting | :active + @type step :: :disconnected | :connected @@ -35,6 +37,7 @@ defmodule Electric.Postgres.ReplicationClient do :display_settings, origin: "postgres", txn_collector: %Collector{}, + connection_manager: nil, step: :disconnected, # Cache the end_lsn of the last processed Commit message to report it back to Postgres # on demand via standby status update messages - @@ -56,6 +59,7 @@ defmodule Electric.Postgres.ReplicationClient do slot_name: String.t(), origin: String.t(), txn_collector: Collector.t(), + connection_manager: GenServer.server(), step: Electric.Postgres.ReplicationClient.step(), display_settings: [String.t()], applied_wal: non_neg_integer @@ -67,7 +71,8 @@ defmodule Electric.Postgres.ReplicationClient do publication_name: [required: true, type: :string], try_creating_publication?: [required: true, type: :boolean], start_streaming?: [type: :boolean, default: true], - slot_name: [required: true, type: :string] + slot_name: [required: true, type: :string], + connection_manager: [required: true, type: :pid] ) @spec new(Access.t()) :: t() @@ -102,6 +107,7 @@ defmodule Electric.Postgres.ReplicationClient do # the connection error. Without this, we may observe undesirable restarts in tests between # one test process exiting and the next one starting. connect_opts = [auto_reconnect: false] ++ connection_opts + replication_opts = [connection_manager: connection_manager] ++ replication_opts case Postgrex.ReplicationConnection.start_link(__MODULE__, replication_opts, connect_opts) do {:ok, pid} -> @@ -135,6 +141,22 @@ defmodule Electric.Postgres.ReplicationClient do send(client, :start_streaming) end + @doc """ + Returns the current state of the replication client. + """ + @spec get_status(pid()) :: status() + def get_status(client) do + send(client, {:get_status, self()}) + + receive do + {:replication_status, status} -> + status + after + # Timeout in case no response is received + 5000 -> :starting + end + end + # The `Postgrex.ReplicationConnection` behaviour does not adhere to gen server conventions and # establishes its own. Unless the `sync_connect: false` option is passed to `start_link()`, the # connection process will try opening a replication connection to Postgres before returning @@ -185,6 +207,12 @@ defmodule Electric.Postgres.ReplicationClient do {:noreply, state} end + @impl true + def handle_info({:get_status, pid}, %State{step: step} = state) do + send(pid, {:replication_status, get_replication_status(step)}) + {:noreply, state} + end + @impl true @spec handle_data(binary(), State.t()) :: {:noreply, State.t()} | {:noreply, list(binary()), State.t()} @@ -279,6 +307,20 @@ defmodule Electric.Postgres.ReplicationClient do end end + @spec get_replication_status(step()) :: status() + defp get_replication_status(step) do + case step do + _ when step in [:disconnected, :connected, :create_publication, :create_slot] -> + :starting + + :waiting_for_lock -> + :waiting + + _ when step in [:set_display_setting, :ready_to_stream, :streaming] -> + :active + end + end + defp decode_message(data) do OpenTelemetry.with_span( "replication_client.decode_message", diff --git a/packages/sync-service/lib/electric/service_status.ex b/packages/sync-service/lib/electric/service_status.ex new file mode 100644 index 0000000000..6602249460 --- /dev/null +++ b/packages/sync-service/lib/electric/service_status.ex @@ -0,0 +1,17 @@ +defmodule Electric.ServiceStatus do + @type status() :: :starting | :ready | :active | :stopping + + @type opts() :: + Keyword.t(get_replication_status: (-> Electric.Postgres.ReplicationClient.status())) + + @spec check(opts()) :: status() + def check(opts) do + with replication_status <- opts.get_replication_status.() do + case replication_status do + :starting -> :starting + :waiting -> :ready + :active -> :active + end + end + end +end From 4e34836120c73073455c92114da73c0fae05efaa Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 10:33:17 +0300 Subject: [PATCH 10/33] Remove connection manager as required opt --- .../lib/electric/postgres/replication_client.ex | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index bae7edef66..b84c326be9 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -37,7 +37,6 @@ defmodule Electric.Postgres.ReplicationClient do :display_settings, origin: "postgres", txn_collector: %Collector{}, - connection_manager: nil, step: :disconnected, # Cache the end_lsn of the last processed Commit message to report it back to Postgres # on demand via standby status update messages - @@ -59,7 +58,6 @@ defmodule Electric.Postgres.ReplicationClient do slot_name: String.t(), origin: String.t(), txn_collector: Collector.t(), - connection_manager: GenServer.server(), step: Electric.Postgres.ReplicationClient.step(), display_settings: [String.t()], applied_wal: non_neg_integer @@ -71,8 +69,7 @@ defmodule Electric.Postgres.ReplicationClient do publication_name: [required: true, type: :string], try_creating_publication?: [required: true, type: :boolean], start_streaming?: [type: :boolean, default: true], - slot_name: [required: true, type: :string], - connection_manager: [required: true, type: :pid] + slot_name: [required: true, type: :string] ) @spec new(Access.t()) :: t() @@ -107,7 +104,6 @@ defmodule Electric.Postgres.ReplicationClient do # the connection error. Without this, we may observe undesirable restarts in tests between # one test process exiting and the next one starting. connect_opts = [auto_reconnect: false] ++ connection_opts - replication_opts = [connection_manager: connection_manager] ++ replication_opts case Postgrex.ReplicationConnection.start_link(__MODULE__, replication_opts, connect_opts) do {:ok, pid} -> From fbaa5a61ea525922b2066defdba5641e4b1410cf Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 16:09:49 +0300 Subject: [PATCH 11/33] Use an independent connection lock --- .../sync-service/lib/electric/application.ex | 8 +- .../lib/electric/connection_manager.ex | 47 ++++++++--- .../lib/electric/lock_connection.ex | 82 +++++++++++++++++++ .../electric/postgres/replication_client.ex | 39 --------- .../replication_client/connection_setup.ex | 28 +------ .../lib/electric/service_status.ex | 14 ++-- 6 files changed, 129 insertions(+), 89 deletions(-) create mode 100644 packages/sync-service/lib/electric/lock_connection.ex diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index cae027daee..f9a767d964 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -34,11 +34,11 @@ defmodule Electric.Application do end get_service_status = fn -> - Electric.ServiceStatus.check(%{ - get_replication_status: fn -> - Electric.ConnectionManager.get_replication_status(Electric.ConnectionManager) + Electric.ServiceStatus.check( + get_connection_status: fn -> + Electric.ConnectionManager.get_status(Electric.ConnectionManager) end - }) + ) end prepare_tables_fn = diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index d2b86f7915..e40b12fe42 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -47,6 +47,8 @@ defmodule Electric.ConnectionManager do :pool_pid, # Backoff term used for reconnection with exponential back-off. :backoff, + # Flag indicating whether the lock on the replication has been acquired. + :pg_lock_acquired, # PostgreSQL server version :pg_version, :electric_instance_id @@ -57,6 +59,8 @@ defmodule Electric.ConnectionManager do require Logger + @type status :: :waiting | :starting | :active + @type option :: {:connection_opts, Keyword.t()} | {:replication_opts, Keyword.t()} @@ -78,11 +82,11 @@ defmodule Electric.ConnectionManager do end @doc """ - Returns the version of the PostgreSQL server. + Returns the status of the connection manager. """ - @spec get_replication_status(GenServer.server()) :: Electric.Postgres.ReplicationClient.status() - def get_replication_status(server) do - GenServer.call(server, :get_replication_status) + @spec get_status(GenServer.server()) :: status() + def get_status(server) do + GenServer.call(server, :get_status) end @spec start_link(options) :: GenServer.on_start() @@ -120,14 +124,21 @@ defmodule Electric.ConnectionManager do timeline_opts: timeline_opts, log_collector: Keyword.fetch!(opts, :log_collector), shape_cache: Keyword.fetch!(opts, :shape_cache), + pg_lock_acquired: false, backoff: {:backoff.init(1000, 10_000), nil}, electric_instance_id: Keyword.fetch!(opts, :electric_instance_id) } - # We try to start the replication connection first because it requires additional - # priveleges compared to regular "pooled" connections, so failure to open a replication - # connection should be reported ASAP. - {:ok, state, {:continue, :start_replication_client}} + # Try to acquire the connection lock on the replication slot + # before starting shape and replication processes, to ensure + # a single active sync service is connected to Postgres per slot. + Electric.LockConnection.start_link( + connection_opts, + self(), + Keyword.fetch!(replication_opts, :slot_name) + ) + + {:ok, state} end @impl true @@ -136,14 +147,17 @@ defmodule Electric.ConnectionManager do end def handle_call( - :get_replication_status, + :get_status, _from, - %{replication_client_pid: replication_client_pid} = state + %{replication_client_pid: replication_client_pid, pg_lock_acquired: pg_lock_acquired} = + state ) do status = - if is_nil(replication_client_pid), - do: :starting, - else: Electric.Postgres.ReplicationClient.get_status(replication_client_pid) + cond do + not pg_lock_acquired -> :waiting + is_nil(replication_client_pid) -> :starting + true -> :active + end {:reply, status, state} end @@ -241,6 +255,13 @@ defmodule Electric.ConnectionManager do end end + def handle_cast(:connection_lock_acquired, state) do + # As soon as we acquire the connection lock, we try to start the replication connection + # first because it requires additional privileges compared to regular "pooled" connections, + # so failure to open a replication connection should be reported ASAP. + {:noreply, %{state | pg_lock_acquired: true}, {:continue, :start_replication_client}} + end + defp start_replication_client(state) do Electric.Shapes.Supervisor.start_link( electric_instance_id: state.electric_instance_id, diff --git a/packages/sync-service/lib/electric/lock_connection.ex b/packages/sync-service/lib/electric/lock_connection.ex new file mode 100644 index 0000000000..58706bcae0 --- /dev/null +++ b/packages/sync-service/lib/electric/lock_connection.ex @@ -0,0 +1,82 @@ +defmodule Electric.LockConnection do + @moduledoc """ + A Postgres connection that ensures an advisory lock is held for its entire duration, + useful for ensuring only a single sync service instance can be using a single + replication slot at any given time. + + The connection attempts to grab the lock and waits on it until it acquires it. + When it does, it fires off a :connection_lock_acquired message to the specified + `Electric.ConnectionManager` such that the required setup can acquired now that + the service is sure to be the only one operating on this replication stream. + """ + require Logger + @behaviour Postgrex.SimpleConnection + + defmodule State do + defstruct [ + :connection_manager, + :lock_acquired, + :lock_name + ] + end + + @spec start_link(Keyword.t(), GenServer.server(), String.t()) :: {:ok, pid()} | {:error, any()} + def start_link(connection_opts, connection_manager, lock_name) do + case Postgrex.SimpleConnection.start_link( + __MODULE__, + [connection_manager: connection_manager, lock_name: lock_name], + connection_opts ++ [timeout: :infinity] + ) do + {:ok, pid} -> + send(pid, :acquire_lock) + {:ok, pid} + + {:error, error} -> + raise error + end + end + + @impl true + def init(opts) do + {:ok, + %State{ + connection_manager: Keyword.fetch!(opts, :connection_manager), + lock_name: Keyword.fetch!(opts, :lock_name), + lock_acquired: false + }} + end + + @impl true + def handle_info(:acquire_lock, state) do + if(state.lock_acquired) do + notify_lock_acquired(state) + {:noreply, state} + else + {:query, lock_query(state), state} + end + end + + @impl true + def handle_result(results, state) when is_list(results) do + notify_lock_acquired(state) + {:noreply, state} + end + + @impl true + def handle_result(%Postgrex.Error{} = _error, state) do + {:query, lock_query(state), %{state | lock_acquired: false}} + end + + defp notify_lock_acquired(%State{connection_manager: connection_manager} = _state) do + GenServer.cast(connection_manager, :connection_lock_acquired) + end + + defp lock_query(%State{lock_name: name} = _state) do + "SELECT pg_advisory_lock(hashtext('#{name}'))" + end + + @impl true + def notify(_channel, _payload, _state) do + :ok + end +end diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index b84c326be9..2ad8ca6c1e 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -13,14 +13,11 @@ defmodule Electric.Postgres.ReplicationClient do require Logger - @type status :: :starting | :waiting | :active - @type step :: :disconnected | :connected | :create_publication | :create_slot - | :waiting_for_lock | :set_display_setting | :ready_to_stream | :streaming @@ -137,22 +134,6 @@ defmodule Electric.Postgres.ReplicationClient do send(client, :start_streaming) end - @doc """ - Returns the current state of the replication client. - """ - @spec get_status(pid()) :: status() - def get_status(client) do - send(client, {:get_status, self()}) - - receive do - {:replication_status, status} -> - status - after - # Timeout in case no response is received - 5000 -> :starting - end - end - # The `Postgrex.ReplicationConnection` behaviour does not adhere to gen server conventions and # establishes its own. Unless the `sync_connect: false` option is passed to `start_link()`, the # connection process will try opening a replication connection to Postgres before returning @@ -203,12 +184,6 @@ defmodule Electric.Postgres.ReplicationClient do {:noreply, state} end - @impl true - def handle_info({:get_status, pid}, %State{step: step} = state) do - send(pid, {:replication_status, get_replication_status(step)}) - {:noreply, state} - end - @impl true @spec handle_data(binary(), State.t()) :: {:noreply, State.t()} | {:noreply, list(binary()), State.t()} @@ -303,20 +278,6 @@ defmodule Electric.Postgres.ReplicationClient do end end - @spec get_replication_status(step()) :: status() - defp get_replication_status(step) do - case step do - _ when step in [:disconnected, :connected, :create_publication, :create_slot] -> - :starting - - :waiting_for_lock -> - :waiting - - _ when step in [:set_display_setting, :ready_to_stream, :streaming] -> - :active - end - end - defp decode_message(data) do OpenTelemetry.with_span( "replication_client.decode_message", diff --git a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex index b59479137a..0423ce7b22 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex @@ -117,27 +117,6 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do ### - # Start a long query that will block until the lock becomes available, based - # on a hash of the slot name to ensure lock is held even if slot is recreated. - # NOTE: alternatively use pg_try_advisory_lock and retry with exponential backoff - defp waiting_for_lock_query(state) do - query = "SELECT pg_advisory_lock(hashtext('#{state.slot_name}'))" - {:query, query, state} - end - - # Sucessfully acquired the lock for the replication slot. - defp waiting_for_lock_result([%Postgrex.Result{} = _result], state) do - Logger.debug("Acquired advisory lock on replication slot") - state - end - - defp waiting_for_lock_result(%Postgrex.Error{} = error, _state) do - # Unexpected error, fail loudly. - raise error - end - - ### - defp set_display_setting_query(%{display_settings: [query | rest]} = state) do {:query, query, %{state | display_settings: rest}} end @@ -186,8 +165,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do defp next_step(%{step: :connected, try_creating_publication?: true}), do: :create_publication defp next_step(%{step: :connected}), do: :create_slot defp next_step(%{step: :create_publication}), do: :create_slot - defp next_step(%{step: :create_slot}), do: :waiting_for_lock - defp next_step(%{step: :waiting_for_lock}), do: :set_display_setting + defp next_step(%{step: :create_slot}), do: :set_display_setting defp next_step(%{step: :set_display_setting, display_settings: queries}) when queries != [], do: :set_display_setting @@ -204,7 +182,6 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do defp query_for_step(:create_publication, state), do: create_publication_query(state) defp query_for_step(:create_slot, state), do: create_slot_query(state) - defp query_for_step(:waiting_for_lock, state), do: waiting_for_lock_query(state) defp query_for_step(:set_display_setting, state), do: set_display_setting_query(state) defp query_for_step(:ready_to_stream, state), do: ready_to_stream(state) defp query_for_step(:streaming, state), do: start_replication_slot_query(state) @@ -221,9 +198,6 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do defp dispatch_query_result(:create_slot, result, state), do: create_slot_result(result, state) - defp dispatch_query_result(:waiting_for_lock, result, state), - do: waiting_for_lock_result(result, state) - defp dispatch_query_result(:set_display_setting, result, state), do: set_display_setting_result(result, state) end diff --git a/packages/sync-service/lib/electric/service_status.ex b/packages/sync-service/lib/electric/service_status.ex index 6602249460..5218dfa7f8 100644 --- a/packages/sync-service/lib/electric/service_status.ex +++ b/packages/sync-service/lib/electric/service_status.ex @@ -1,15 +1,17 @@ defmodule Electric.ServiceStatus do @type status() :: :starting | :ready | :active | :stopping - @type opts() :: - Keyword.t(get_replication_status: (-> Electric.Postgres.ReplicationClient.status())) + @type option :: + {:get_connection_status, (-> Electric.ConnectionManager.status())} - @spec check(opts()) :: status() + @type options :: [option] + + @spec check(options()) :: status() def check(opts) do - with replication_status <- opts.get_replication_status.() do - case replication_status do + with connection_status <- opts.get_connection_status.() do + case connection_status do + :waiting -> :waiting :starting -> :starting - :waiting -> :ready :active -> :active end end From 67dc96db0e82ab04fece24c564928834faa52e9d Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 16:19:19 +0300 Subject: [PATCH 12/33] Add backoff retries --- .../lib/electric/lock_connection.ex | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/packages/sync-service/lib/electric/lock_connection.ex b/packages/sync-service/lib/electric/lock_connection.ex index 58706bcae0..e438e529e6 100644 --- a/packages/sync-service/lib/electric/lock_connection.ex +++ b/packages/sync-service/lib/electric/lock_connection.ex @@ -16,7 +16,8 @@ defmodule Electric.LockConnection do defstruct [ :connection_manager, :lock_acquired, - :lock_name + :lock_name, + :backoff ] end @@ -42,7 +43,8 @@ defmodule Electric.LockConnection do %State{ connection_manager: Keyword.fetch!(opts, :connection_manager), lock_name: Keyword.fetch!(opts, :lock_name), - lock_acquired: false + lock_acquired: false, + backoff: {:backoff.init(1000, 10_000), nil} }} end @@ -56,6 +58,10 @@ defmodule Electric.LockConnection do end end + def handle_info({:timeout, tref, msg}, %{backoff: {backoff, tref}} = state) do + handle_info(msg, %{state | backoff: {backoff, nil}}) + end + @impl true def handle_result(results, state) when is_list(results) do notify_lock_acquired(state) @@ -63,8 +69,15 @@ defmodule Electric.LockConnection do end @impl true - def handle_result(%Postgrex.Error{} = _error, state) do - {:query, lock_query(state), %{state | lock_acquired: false}} + def handle_result(%Postgrex.Error{} = error, %State{backoff: {backoff, _}} = state) do + {time, backoff} = :backoff.fail(backoff) + tref = :erlang.start_timer(time, self(), :acquire_lock) + + Logger.error( + "Failed to acquire lock #{state.lock_name} with reason #{inspect(error)} - retrying in #{inspect(time)}ms." + ) + + {:noreply, %{state | lock_acquired: false, backoff: {backoff, tref}}} end defp notify_lock_acquired(%State{connection_manager: connection_manager} = _state) do From 267d471ec61c58588541777b642d2993202e5ae8 Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 16:25:51 +0300 Subject: [PATCH 13/33] Check pool id as well for status --- .../lib/electric/connection_manager.ex | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index e40b12fe42..7596773763 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -149,14 +149,23 @@ defmodule Electric.ConnectionManager do def handle_call( :get_status, _from, - %{replication_client_pid: replication_client_pid, pg_lock_acquired: pg_lock_acquired} = + %{ + replication_client_pid: replication_client_pid, + pool_id: pool_id, + pg_lock_acquired: pg_lock_acquired + } = state ) do status = cond do - not pg_lock_acquired -> :waiting - is_nil(replication_client_pid) -> :starting - true -> :active + not pg_lock_acquired -> + :waiting + + is_nil(replication_client_pid) || is_nil(pool_id) || not Process.alive?(pool_id) -> + :starting + + true -> + :active end {:reply, status, state} From a1836a7303ee4e3a1f1fdd23f7981c7827b34366 Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 17:05:00 +0300 Subject: [PATCH 14/33] Fix integration test and lock restarting --- .../tests/postgres-disconnection.lux | 2 +- .../lib/electric/connection_manager.ex | 40 ++++++++++--------- .../lib/electric/lock_connection.ex | 7 ++-- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/integration-tests/tests/postgres-disconnection.lux b/integration-tests/tests/postgres-disconnection.lux index 83a9fc5642..289ba4843b 100644 --- a/integration-tests/tests/postgres-disconnection.lux +++ b/integration-tests/tests/postgres-disconnection.lux @@ -22,7 +22,7 @@ ## Observe the connection error. [shell electric] - ??[warning] Database connection in replication mode failed + ??[warning] Database connection in lock_connection mode failed ??[warning] Reconnecting in ## Start the Postgres container back up. diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index 7596773763..a9dc909175 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -43,6 +43,8 @@ defmodule Electric.ConnectionManager do :shape_cache, # PID of the replication client. :replication_client_pid, + # PID of the Postgres connection lock. + :lock_connection_pid, # PID of the database connection pool (a `Postgrex` process). :pool_pid, # Backoff term used for reconnection with exponential back-off. @@ -132,13 +134,7 @@ defmodule Electric.ConnectionManager do # Try to acquire the connection lock on the replication slot # before starting shape and replication processes, to ensure # a single active sync service is connected to Postgres per slot. - Electric.LockConnection.start_link( - connection_opts, - self(), - Keyword.fetch!(replication_opts, :slot_name) - ) - - {:ok, state} + {:ok, state, {:continue, :start_lock_connection}} end @impl true @@ -146,22 +142,14 @@ defmodule Electric.ConnectionManager do {:reply, pg_version, state} end - def handle_call( - :get_status, - _from, - %{ - replication_client_pid: replication_client_pid, - pool_id: pool_id, - pg_lock_acquired: pg_lock_acquired - } = - state - ) do + def handle_call(:get_status, _from, %{pg_lock_acquired: pg_lock_acquired} = state) do status = cond do not pg_lock_acquired -> :waiting - is_nil(replication_client_pid) || is_nil(pool_id) || not Process.alive?(pool_id) -> + is_nil(state.replication_client_pid) || is_nil(state.pool_id) || + not Process.alive?(state.pool_id) -> :starting true -> @@ -171,6 +159,20 @@ defmodule Electric.ConnectionManager do {:reply, status, state} end + def handle_continue(:start_lock_connection, state) do + case Electric.LockConnection.start_link( + state.connection_opts, + self(), + Keyword.fetch!(state.replication_opts, :slot_name) + ) do + {:ok, lock_connection_pid} -> + {:noreply, %{state | lock_connection_pid: lock_connection_pid}} + + {:error, reason} -> + handle_connection_error(reason, state, "lock_connection") + end + end + @impl true def handle_continue(:start_replication_client, state) do case start_replication_client(state) do @@ -232,6 +234,7 @@ defmodule Electric.ConnectionManager do tag = cond do + pid == state.lock_connection_pid -> :lock_connection pid == state.replication_client_pid -> :replication_connection pid == state.pool_pid -> :database_pool end @@ -324,6 +327,7 @@ defmodule Electric.ConnectionManager do step = cond do + is_nil(state.lock_connection_pid) -> :start_lock_connection is_nil(state.replication_client_pid) -> :start_replication_client is_nil(state.pool_pid) -> :start_connection_pool end diff --git a/packages/sync-service/lib/electric/lock_connection.ex b/packages/sync-service/lib/electric/lock_connection.ex index e438e529e6..f1ec57f064 100644 --- a/packages/sync-service/lib/electric/lock_connection.ex +++ b/packages/sync-service/lib/electric/lock_connection.ex @@ -21,19 +21,20 @@ defmodule Electric.LockConnection do ] end - @spec start_link(Keyword.t(), GenServer.server(), String.t()) :: {:ok, pid()} | {:error, any()} + @spec start_link(Keyword.t(), GenServer.server(), String.t()) :: + {:ok, pid()} | {:error, Postgrex.Error.t() | term()} def start_link(connection_opts, connection_manager, lock_name) do case Postgrex.SimpleConnection.start_link( __MODULE__, [connection_manager: connection_manager, lock_name: lock_name], - connection_opts ++ [timeout: :infinity] + connection_opts ++ [timeout: :infinity, auto_reconnect: false] ) do {:ok, pid} -> send(pid, :acquire_lock) {:ok, pid} {:error, error} -> - raise error + {:error, error} end end From b65304449160260db5b097c8d1e9bf6cb24c74da Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 17:05:30 +0300 Subject: [PATCH 15/33] Rename connection lock atom --- packages/sync-service/lib/electric/connection_manager.ex | 2 +- packages/sync-service/lib/electric/lock_connection.ex | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index a9dc909175..a88ea15cd0 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -267,7 +267,7 @@ defmodule Electric.ConnectionManager do end end - def handle_cast(:connection_lock_acquired, state) do + def handle_cast(:lock_connection_acquired, state) do # As soon as we acquire the connection lock, we try to start the replication connection # first because it requires additional privileges compared to regular "pooled" connections, # so failure to open a replication connection should be reported ASAP. diff --git a/packages/sync-service/lib/electric/lock_connection.ex b/packages/sync-service/lib/electric/lock_connection.ex index f1ec57f064..009e96cc53 100644 --- a/packages/sync-service/lib/electric/lock_connection.ex +++ b/packages/sync-service/lib/electric/lock_connection.ex @@ -5,7 +5,7 @@ defmodule Electric.LockConnection do replication slot at any given time. The connection attempts to grab the lock and waits on it until it acquires it. - When it does, it fires off a :connection_lock_acquired message to the specified + When it does, it fires off a :lock_connection_acquired message to the specified `Electric.ConnectionManager` such that the required setup can acquired now that the service is sure to be the only one operating on this replication stream. """ @@ -82,7 +82,7 @@ defmodule Electric.LockConnection do end defp notify_lock_acquired(%State{connection_manager: connection_manager} = _state) do - GenServer.cast(connection_manager, :connection_lock_acquired) + GenServer.cast(connection_manager, :lock_connection_acquired) end defp lock_query(%State{lock_name: name} = _state) do From 3f47fbdb0eb499e6ade2bda31c8704551cd44b28 Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 17:28:10 +0300 Subject: [PATCH 16/33] Fix health check endpoint to return correct messages --- .../lib/electric/plug/health_check_plug.ex | 26 +++++++++++++------ .../lib/electric/service_status.ex | 2 +- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/packages/sync-service/lib/electric/plug/health_check_plug.ex b/packages/sync-service/lib/electric/plug/health_check_plug.ex index c452f1e2af..66e6aaf8b5 100644 --- a/packages/sync-service/lib/electric/plug/health_check_plug.ex +++ b/packages/sync-service/lib/electric/plug/health_check_plug.ex @@ -4,22 +4,32 @@ defmodule Electric.Plug.HealthCheckPlug do use Plug.Builder plug :check_service_status + plug :put_relevant_headers plug :send_response defp check_service_status(conn, _) do get_service_status = Access.fetch!(conn.assigns.config, :get_service_status) - status_text = + {status_code, status_text} = case get_service_status.() do - :starting -> "starting" - :ready -> "ready" - :active -> "active" - :stopping -> "stopping" + :waiting -> {200, "waiting"} + :starting -> {200, "starting"} + :active -> {200, "active"} + :stopping -> {500, "stopping"} end - conn |> assign(:status_text, status_text) + conn |> assign(:status_text, status_text) |> assign(:status_code, status_code) end - defp send_response(%Conn{assigns: %{status_text: status_text}} = conn, _), - do: send_resp(conn, 200, Jason.encode!(%{status: status_text})) + defp put_relevant_headers(conn, _), + do: + conn + |> put_resp_header("content-type", "application/json") + |> put_resp_header("cache-control", "no-cache, no-store, must-revalidate") + + defp send_response( + %Conn{assigns: %{status_text: status_text, status_code: status_code}} = conn, + _ + ), + do: send_resp(conn, status_code, Jason.encode!(%{status: status_text})) end diff --git a/packages/sync-service/lib/electric/service_status.ex b/packages/sync-service/lib/electric/service_status.ex index 5218dfa7f8..e5b7626b28 100644 --- a/packages/sync-service/lib/electric/service_status.ex +++ b/packages/sync-service/lib/electric/service_status.ex @@ -1,5 +1,5 @@ defmodule Electric.ServiceStatus do - @type status() :: :starting | :ready | :active | :stopping + @type status() :: :waiting | :starting | :active | :stopping @type option :: {:get_connection_status, (-> Electric.ConnectionManager.status())} From 4e8e3e6788f0f56f3df7cf9808b0e9245a0e3b42 Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 17:36:31 +0300 Subject: [PATCH 17/33] Add basic unit tests for health check endpoint --- .../electric/plug/health_check_plug_test.exs | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 packages/sync-service/test/electric/plug/health_check_plug_test.exs diff --git a/packages/sync-service/test/electric/plug/health_check_plug_test.exs b/packages/sync-service/test/electric/plug/health_check_plug_test.exs new file mode 100644 index 0000000000..f3e0e2b39c --- /dev/null +++ b/packages/sync-service/test/electric/plug/health_check_plug_test.exs @@ -0,0 +1,76 @@ +defmodule Electric.Plug.HealthCheckPlugTest do + use ExUnit.Case, async: true + import Plug.Conn + alias Plug.Conn + + alias Electric.Plug.HealthCheckPlug + + @moduletag :capture_log + + @registry Registry.HealthCheckPlugTest + + setup do + start_link_supervised!({Registry, keys: :duplicate, name: @registry}) + :ok + end + + def conn(%{connection_status: connection_status} = _config) do + # Pass mock dependencies to the plug + config = %{ + get_service_status: fn -> connection_status end + } + + Plug.Test.conn("GET", "/") + |> assign(:config, config) + end + + describe "HealthCheckPlug" do + test "has appropriate content and cache headers" do + conn = + conn(%{connection_status: :waiting}) + |> HealthCheckPlug.call([]) + + assert Conn.get_resp_header(conn, "content-type") == ["application/json"] + + assert Conn.get_resp_header(conn, "cache-control") == [ + "no-cache, no-store, must-revalidate" + ] + end + + test "returns 200 when in waiting mode" do + conn = + conn(%{connection_status: :waiting}) + |> HealthCheckPlug.call([]) + + assert conn.status == 200 + assert Jason.decode!(conn.resp_body) == %{"status" => "waiting"} + end + + test "returns 200 when in starting mode" do + conn = + conn(%{connection_status: :starting}) + |> HealthCheckPlug.call([]) + + assert conn.status == 200 + assert Jason.decode!(conn.resp_body) == %{"status" => "starting"} + end + + test "returns 200 when in active mode" do + conn = + conn(%{connection_status: :active}) + |> HealthCheckPlug.call([]) + + assert conn.status == 200 + assert Jason.decode!(conn.resp_body) == %{"status" => "active"} + end + + test "returns 500 when stopping" do + conn = + conn(%{connection_status: :stopping}) + |> HealthCheckPlug.call([]) + + assert conn.status == 500 + assert Jason.decode!(conn.resp_body) == %{"status" => "stopping"} + end + end +end From 3bba4e224bc911a6105d75f5d04e3fbe883fc47b Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 18:37:59 +0300 Subject: [PATCH 18/33] Add integration test for rolling deploy --- integration-tests/tests/macros.luxinc | 8 +++- integration-tests/tests/rolling-deploy.lux | 42 +++++++++++++++++++ packages/sync-service/config/runtime.exs | 1 + .../sync-service/lib/electric/application.ex | 2 +- .../lib/electric/lock_connection.ex | 6 ++- 5 files changed, 54 insertions(+), 5 deletions(-) create mode 100644 integration-tests/tests/rolling-deploy.lux diff --git a/integration-tests/tests/macros.luxinc b/integration-tests/tests/macros.luxinc index fadb0920e1..42fa8fcef4 100644 --- a/integration-tests/tests/macros.luxinc +++ b/integration-tests/tests/macros.luxinc @@ -68,10 +68,14 @@ [endmacro] [macro setup_electric] - [shell electric] + [invoke setup_electric_shell "electric" "3000"] +[endmacro] + +[macro setup_electric_shell shell_name port] + [shell $shell_name] -$fail_pattern - !DATABASE_URL=$database_url ../electric_dev.sh + !DATABASE_URL=$database_url PORT=$port ../electric_dev.sh [endmacro] [macro teardown] diff --git a/integration-tests/tests/rolling-deploy.lux b/integration-tests/tests/rolling-deploy.lux new file mode 100644 index 0000000000..e6fea9a3ec --- /dev/null +++ b/integration-tests/tests/rolling-deploy.lux @@ -0,0 +1,42 @@ +[doc Verify handling of an Electric rolling deploy] + +[include macros.luxinc] + +[global pg_container_name=rolling-deploy__pg] + +### + +## Start a new Postgres cluster +[invoke setup_pg "" ""] + +## Start the first sync service. +[invoke setup_electric_shell "electric_1" "3000"] + +[shell electric_1] + ??[info] Acquiring lock from postgres with name electric_slot_default + ??[info] Lock acquired from postgres with name electric_slot_default + ??[info] Starting replication from postgres + + +## Start the second sync service. +[invoke setup_electric_shell "electric_2" "3001"] + +## Assert that the lock is not acquired and replication does not start +## in the second electric +[shell electric_2] + -Lock acquired from postgres|Starting replication from postgres|$fail_pattern + ??[info] Acquiring lock from postgres with name electric_slot_default + [sleep 2] + +## Terminate first electric +[shell electric_1] + !System.halt() + +## Lock should now be acquired and replication starting +[shell electric_2] + -$fail_pattern + ??[info] Lock acquired from postgres with name electric_slot_default + ??[info] Starting replication from postgres + +[cleanup] + [invoke teardown] diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 771ab6d1f8..1de129981d 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -162,6 +162,7 @@ config :electric, telemetry_statsd_host: statsd_host, db_pool_size: env!("DB_POOL_SIZE", :integer, 50), replication_stream_id: env!("REPLICATION_STREAM_ID", :string, "default"), + service_port: env!("PORT", :integer, 3000), prometheus_port: prometheus_port, storage: storage, persistent_kv: persistent_kv diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index f9a767d964..ee3d03958a 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -118,7 +118,7 @@ defmodule Electric.Application do max_age: Application.fetch_env!(:electric, :cache_max_age), stale_age: Application.fetch_env!(:electric, :cache_stale_age), allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)}, - port: 3000, + port: Application.fetch_env!(:electric, :service_port), thousand_island_options: http_listener_options()} ] |> add_prometheus_router(Application.fetch_env!(:electric, :prometheus_port)) diff --git a/packages/sync-service/lib/electric/lock_connection.ex b/packages/sync-service/lib/electric/lock_connection.ex index 009e96cc53..72c4c86445 100644 --- a/packages/sync-service/lib/electric/lock_connection.ex +++ b/packages/sync-service/lib/electric/lock_connection.ex @@ -55,6 +55,7 @@ defmodule Electric.LockConnection do notify_lock_acquired(state) {:noreply, state} else + Logger.info("Acquiring lock from postgres with name #{state.lock_name}") {:query, lock_query(state), state} end end @@ -64,9 +65,10 @@ defmodule Electric.LockConnection do end @impl true - def handle_result(results, state) when is_list(results) do + def handle_result([%Postgrex.Result{}] = _results, state) do + Logger.info("Lock acquired from postgres with name #{state.lock_name}") notify_lock_acquired(state) - {:noreply, state} + {:noreply, %{state | lock_acquired: true}} end @impl true From b2039537673c8bead45218c655226be8b1de881d Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 18:38:32 +0300 Subject: [PATCH 19/33] Fix weird formatting --- packages/sync-service/lib/electric/lock_connection.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/lock_connection.ex b/packages/sync-service/lib/electric/lock_connection.ex index 72c4c86445..5415fb6bb7 100644 --- a/packages/sync-service/lib/electric/lock_connection.ex +++ b/packages/sync-service/lib/electric/lock_connection.ex @@ -51,7 +51,7 @@ defmodule Electric.LockConnection do @impl true def handle_info(:acquire_lock, state) do - if(state.lock_acquired) do + if state.lock_acquired do notify_lock_acquired(state) {:noreply, state} else From a7e189773a2abe0e69fb7708ec2a0f00d09e6aa8 Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 18:39:13 +0300 Subject: [PATCH 20/33] Return 503 for stopping state --- packages/sync-service/lib/electric/plug/health_check_plug.ex | 2 +- .../test/electric/plug/health_check_plug_test.exs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/lib/electric/plug/health_check_plug.ex b/packages/sync-service/lib/electric/plug/health_check_plug.ex index 66e6aaf8b5..b5fce45b9a 100644 --- a/packages/sync-service/lib/electric/plug/health_check_plug.ex +++ b/packages/sync-service/lib/electric/plug/health_check_plug.ex @@ -15,7 +15,7 @@ defmodule Electric.Plug.HealthCheckPlug do :waiting -> {200, "waiting"} :starting -> {200, "starting"} :active -> {200, "active"} - :stopping -> {500, "stopping"} + :stopping -> {503, "stopping"} end conn |> assign(:status_text, status_text) |> assign(:status_code, status_code) diff --git a/packages/sync-service/test/electric/plug/health_check_plug_test.exs b/packages/sync-service/test/electric/plug/health_check_plug_test.exs index f3e0e2b39c..d30c257c87 100644 --- a/packages/sync-service/test/electric/plug/health_check_plug_test.exs +++ b/packages/sync-service/test/electric/plug/health_check_plug_test.exs @@ -64,12 +64,12 @@ defmodule Electric.Plug.HealthCheckPlugTest do assert Jason.decode!(conn.resp_body) == %{"status" => "active"} end - test "returns 500 when stopping" do + test "returns 503 when stopping" do conn = conn(%{connection_status: :stopping}) |> HealthCheckPlug.call([]) - assert conn.status == 500 + assert conn.status == 503 assert Jason.decode!(conn.resp_body) == %{"status" => "stopping"} end end From 017ed32e44f88bdbac32dd87c66aeb080e3ddf60 Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 18:42:35 +0300 Subject: [PATCH 21/33] Transform start link keyword argument --- .../lib/electric/connection_manager.ex | 6 +++--- .../sync-service/lib/electric/lock_connection.ex | 16 +++++++++++++--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index a88ea15cd0..258ce9d1a6 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -161,9 +161,9 @@ defmodule Electric.ConnectionManager do def handle_continue(:start_lock_connection, state) do case Electric.LockConnection.start_link( - state.connection_opts, - self(), - Keyword.fetch!(state.replication_opts, :slot_name) + connection_opts: state.connection_opts, + connection_manager: self(), + lock_name: Keyword.fetch!(state.replication_opts, :slot_name) ) do {:ok, lock_connection_pid} -> {:noreply, %{state | lock_connection_pid: lock_connection_pid}} diff --git a/packages/sync-service/lib/electric/lock_connection.ex b/packages/sync-service/lib/electric/lock_connection.ex index 5415fb6bb7..2bb17f4b07 100644 --- a/packages/sync-service/lib/electric/lock_connection.ex +++ b/packages/sync-service/lib/electric/lock_connection.ex @@ -12,6 +12,13 @@ defmodule Electric.LockConnection do require Logger @behaviour Postgrex.SimpleConnection + @type option :: + {:connection_opts, Keyword.t()} + | {:connection_manager, GenServer.server()} + | {:lock_name, String.t()} + + @type options :: [option] + defmodule State do defstruct [ :connection_manager, @@ -21,9 +28,12 @@ defmodule Electric.LockConnection do ] end - @spec start_link(Keyword.t(), GenServer.server(), String.t()) :: - {:ok, pid()} | {:error, Postgrex.Error.t() | term()} - def start_link(connection_opts, connection_manager, lock_name) do + @spec start_link(options()) :: {:ok, pid()} | {:error, Postgrex.Error.t() | term()} + def start_link( + connection_opts: connection_opts, + connection_manager: connection_manager, + lock_name: lock_name + ) do case Postgrex.SimpleConnection.start_link( __MODULE__, [connection_manager: connection_manager, lock_name: lock_name], From f2d6250803f12ee2c4dc5fd90b782a378bd5da2d Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 18:43:25 +0300 Subject: [PATCH 22/33] Mock lock connection under Postgres --- packages/sync-service/lib/electric/connection_manager.ex | 2 +- .../sync-service/lib/electric/{ => postgres}/lock_connection.ex | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename packages/sync-service/lib/electric/{ => postgres}/lock_connection.ex (98%) diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index 258ce9d1a6..62d9eae01e 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -160,7 +160,7 @@ defmodule Electric.ConnectionManager do end def handle_continue(:start_lock_connection, state) do - case Electric.LockConnection.start_link( + case Electric.Postgres.LockConnection.start_link( connection_opts: state.connection_opts, connection_manager: self(), lock_name: Keyword.fetch!(state.replication_opts, :slot_name) diff --git a/packages/sync-service/lib/electric/lock_connection.ex b/packages/sync-service/lib/electric/postgres/lock_connection.ex similarity index 98% rename from packages/sync-service/lib/electric/lock_connection.ex rename to packages/sync-service/lib/electric/postgres/lock_connection.ex index 2bb17f4b07..8a19fcec51 100644 --- a/packages/sync-service/lib/electric/lock_connection.ex +++ b/packages/sync-service/lib/electric/postgres/lock_connection.ex @@ -1,4 +1,4 @@ -defmodule Electric.LockConnection do +defmodule Electric.Postgres.LockConnection do @moduledoc """ A Postgres connection that ensures an advisory lock is held for its entire duration, useful for ensuring only a single sync service instance can be using a single From 934c35df4c48062d65f691250d8ec115c968a6c9 Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 19:05:23 +0300 Subject: [PATCH 23/33] Add basic lock connection tests --- .../lib/electric/postgres/lock_connection.ex | 2 +- .../postgres/lock_connection_test.exs | 83 +++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 packages/sync-service/test/electric/postgres/lock_connection_test.exs diff --git a/packages/sync-service/lib/electric/postgres/lock_connection.ex b/packages/sync-service/lib/electric/postgres/lock_connection.ex index 8a19fcec51..60941a6fab 100644 --- a/packages/sync-service/lib/electric/postgres/lock_connection.ex +++ b/packages/sync-service/lib/electric/postgres/lock_connection.ex @@ -75,7 +75,7 @@ defmodule Electric.Postgres.LockConnection do end @impl true - def handle_result([%Postgrex.Result{}] = _results, state) do + def handle_result([_] = _results, state) do Logger.info("Lock acquired from postgres with name #{state.lock_name}") notify_lock_acquired(state) {:noreply, %{state | lock_acquired: true}} diff --git a/packages/sync-service/test/electric/postgres/lock_connection_test.exs b/packages/sync-service/test/electric/postgres/lock_connection_test.exs new file mode 100644 index 0000000000..81677bb14a --- /dev/null +++ b/packages/sync-service/test/electric/postgres/lock_connection_test.exs @@ -0,0 +1,83 @@ +defmodule Electric.Postgres.LockConnectionTest do + use ExUnit.Case, async: true + import ExUnit.CaptureLog + import Support.DbSetup, except: [with_publication: 1] + + alias Electric.Postgres.LockConnection + + @moduletag :capture_log + @lock_name "test_electric_slot" + + describe "LockConnection init" do + setup [:with_unique_db] + + test "should acquire an advisory lock on startup", %{db_config: config, db_conn: conn} do + log = + capture_log(fn -> + assert {:ok, _pid} = + LockConnection.start_link( + connection_opts: config, + connection_manager: self(), + lock_name: @lock_name + ) + + assert_lock_acquired() + end) + + # should have logged lock acquisition process + assert log =~ "Acquiring lock from postgres with name #{@lock_name}" + assert log =~ "Lock acquired from postgres with name #{@lock_name}" + + # should have acquired an advisory lock on PG + assert %Postgrex.Result{rows: [[false]]} = + Postgrex.query!( + conn, + "SELECT pg_try_advisory_lock(hashtext('#{@lock_name}'))", + [] + ) + end + + test "should wait if lock is already acquired", %{db_config: config} do + # grab lock with one connection + {pid1, _} = + with_log(fn -> + assert {:ok, pid} = + LockConnection.start_link( + connection_opts: config, + connection_manager: self(), + lock_name: @lock_name + ) + + assert_lock_acquired() + pid + end) + + # try to grab the same with another + _ = + capture_log(fn -> + assert {:ok, pid} = + LockConnection.start_link( + connection_opts: config, + connection_manager: self(), + lock_name: @lock_name + ) + + # should fail to grab it + refute_lock_acquired() + + # should immediately grab it once previous lock is released + GenServer.stop(pid1) + assert_lock_acquired() + pid + end) + end + end + + defp assert_lock_acquired do + assert_receive {_, :lock_connection_acquired} + end + + defp refute_lock_acquired do + refute_receive {_, :lock_connection_acquired}, 1000 + end +end From 4655e002c9bb9deeab99ecbdab832d535e61f75e Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 19:08:15 +0300 Subject: [PATCH 24/33] Add comments explaining seemingly unnecessary pattern matching --- packages/sync-service/lib/electric/plug/health_check_plug.ex | 3 +++ packages/sync-service/lib/electric/service_status.ex | 3 +++ 2 files changed, 6 insertions(+) diff --git a/packages/sync-service/lib/electric/plug/health_check_plug.ex b/packages/sync-service/lib/electric/plug/health_check_plug.ex index b5fce45b9a..0834b69693 100644 --- a/packages/sync-service/lib/electric/plug/health_check_plug.ex +++ b/packages/sync-service/lib/electric/plug/health_check_plug.ex @@ -7,6 +7,9 @@ defmodule Electric.Plug.HealthCheckPlug do plug :put_relevant_headers plug :send_response + # Match service status to a status code and status message, + # keeping the message name decoupled from the internal representation + # of the status to ensure the API is stable defp check_service_status(conn, _) do get_service_status = Access.fetch!(conn.assigns.config, :get_service_status) diff --git a/packages/sync-service/lib/electric/service_status.ex b/packages/sync-service/lib/electric/service_status.ex index e5b7626b28..e85c0451ea 100644 --- a/packages/sync-service/lib/electric/service_status.ex +++ b/packages/sync-service/lib/electric/service_status.ex @@ -9,6 +9,9 @@ defmodule Electric.ServiceStatus do @spec check(options()) :: status() def check(opts) do with connection_status <- opts.get_connection_status.() do + # Match the connection status ot a service status - currently + # they are one and the same but keeping this decoupled for future + # additions to conditions that determine service status case connection_status do :waiting -> :waiting :starting -> :starting From 49d6620c6b440dba9dd6b1b8c4933784a57989df Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 19:09:15 +0300 Subject: [PATCH 25/33] Update changeset --- .changeset/poor-candles-fly.md | 1 + 1 file changed, 1 insertion(+) diff --git a/.changeset/poor-candles-fly.md b/.changeset/poor-candles-fly.md index 7f9119ffcd..2b42a58690 100644 --- a/.changeset/poor-candles-fly.md +++ b/.changeset/poor-candles-fly.md @@ -4,3 +4,4 @@ - Wait for advisory lock on replication slot to enable rolling deploys. - Configurable replication slot and publication name using `REPLICATION_STREAM_ID` environment variable. +- Add `HealthCheckPlug` API endopint at `v1/health` that returns `waiting`, `starting`, `active`, and `stopping` statuses. \ No newline at end of file From 3269786c3fae4bac4565d78be518d432bb7e7c86 Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 19:16:38 +0300 Subject: [PATCH 26/33] Add basic integration test for health endoint --- .../test/electric/plug/router_test.exs | 23 +++++++++++++++++++ .../test/support/component_setup.ex | 1 + 2 files changed, 24 insertions(+) diff --git a/packages/sync-service/test/electric/plug/router_test.exs b/packages/sync-service/test/electric/plug/router_test.exs index 8c9348b63c..41049ae63c 100644 --- a/packages/sync-service/test/electric/plug/router_test.exs +++ b/packages/sync-service/test/electric/plug/router_test.exs @@ -33,6 +33,29 @@ defmodule Electric.Plug.RouterTest do end end + describe "/v1/health" do + setup [:with_unique_db] + + setup do + %{publication_name: "electric_test_publication", slot_name: "electric_test_slot"} + end + + setup :with_complete_stack + + setup(ctx, + do: %{opts: Router.init(build_router_opts(ctx, get_service_status: fn -> :active end))} + ) + + test "GET returns health status of service", %{opts: opts} do + conn = + conn("GET", "/v1/health") + |> Router.call(opts) + + assert %{status: 200} = conn + assert Jason.decode!(conn.resp_body) == %{"status" => "active"} + end + end + describe "/v1/shapes" do setup [:with_unique_db, :with_basic_tables, :with_sql_execute] diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 3f482d14f3..e1f9b6cb62 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -162,6 +162,7 @@ defmodule Support.ComponentSetup do long_poll_timeout: Access.get(overrides, :long_poll_timeout, 5_000), max_age: Access.get(overrides, :max_age, 60), stale_age: Access.get(overrides, :stale_age, 300), + get_service_status: Access.get(overrides, :get_service_status, fn -> :active end), chunk_bytes_threshold: Access.get(overrides, :chunk_bytes_threshold, ctx.chunk_bytes_threshold), allow_shape_deletion: Access.get(overrides, :allow_shape_deletion, true) From 3ac68459c9fcbd5f1ddccf00cfd2aee634262e53 Mon Sep 17 00:00:00 2001 From: msfstef Date: Mon, 30 Sep 2024 19:30:16 +0300 Subject: [PATCH 27/33] Add health check plug to integration testing for rolling deploys --- integration-tests/tests/rolling-deploy.lux | 17 +++++++++++++++++ .../lib/electric/connection_manager.ex | 4 ++-- .../sync-service/lib/electric/service_status.ex | 4 ++-- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/integration-tests/tests/rolling-deploy.lux b/integration-tests/tests/rolling-deploy.lux index e6fea9a3ec..2c7ebe5d2d 100644 --- a/integration-tests/tests/rolling-deploy.lux +++ b/integration-tests/tests/rolling-deploy.lux @@ -17,6 +17,10 @@ ??[info] Lock acquired from postgres with name electric_slot_default ??[info] Starting replication from postgres +# First service should be health and active +[shell orchestator] + !curl -X GET http://localhost:3000/v1/health + ??{"status":"active"} ## Start the second sync service. [invoke setup_electric_shell "electric_2" "3001"] @@ -28,6 +32,14 @@ ??[info] Acquiring lock from postgres with name electric_slot_default [sleep 2] + +# Second service should be in waiting state, ready to take over +[shell orchestator] + !curl -X GET http://localhost:3000/v1/health + ??{"status":"active"} + !curl -X GET http://localhost:3001/v1/health + ??{"status":"waiting"} + ## Terminate first electric [shell electric_1] !System.halt() @@ -38,5 +50,10 @@ ??[info] Lock acquired from postgres with name electric_slot_default ??[info] Starting replication from postgres +# Second service is now healthy and active +[shell orchestator] + !curl -X GET http://localhost:3001/v1/health + ??{"status":"active"} + [cleanup] [invoke teardown] diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index 62d9eae01e..a94a3f5f2c 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -148,8 +148,8 @@ defmodule Electric.ConnectionManager do not pg_lock_acquired -> :waiting - is_nil(state.replication_client_pid) || is_nil(state.pool_id) || - not Process.alive?(state.pool_id) -> + is_nil(state.replication_client_pid) || is_nil(state.pool_pid) || + not Process.alive?(state.pool_pid) -> :starting true -> diff --git a/packages/sync-service/lib/electric/service_status.ex b/packages/sync-service/lib/electric/service_status.ex index e85c0451ea..15350b710d 100644 --- a/packages/sync-service/lib/electric/service_status.ex +++ b/packages/sync-service/lib/electric/service_status.ex @@ -7,8 +7,8 @@ defmodule Electric.ServiceStatus do @type options :: [option] @spec check(options()) :: status() - def check(opts) do - with connection_status <- opts.get_connection_status.() do + def check(get_connection_status: get_connection_status) do + with connection_status <- get_connection_status.() do # Match the connection status ot a service status - currently # they are one and the same but keeping this decoupled for future # additions to conditions that determine service status From 50e95f04f09517aa8b37bc7c78e97a8ada52bb1c Mon Sep 17 00:00:00 2001 From: msfstef Date: Tue, 1 Oct 2024 11:45:56 +0300 Subject: [PATCH 28/33] Assert electric process exits --- integration-tests/tests/rolling-deploy.lux | 3 +++ 1 file changed, 3 insertions(+) diff --git a/integration-tests/tests/rolling-deploy.lux b/integration-tests/tests/rolling-deploy.lux index 2c7ebe5d2d..f2d85e0b28 100644 --- a/integration-tests/tests/rolling-deploy.lux +++ b/integration-tests/tests/rolling-deploy.lux @@ -43,6 +43,9 @@ ## Terminate first electric [shell electric_1] !System.halt() + + # Confirm Electric process exit. + ??$PS1 ## Lock should now be acquired and replication starting [shell electric_2] From 685bc4f3ec4e2dbdd9b6a1e91629f6d480b5ea57 Mon Sep 17 00:00:00 2001 From: msfstef Date: Tue, 1 Oct 2024 12:01:48 +0300 Subject: [PATCH 29/33] Log periodic messages while lock is not acquired --- .../lib/electric/connection_manager.ex | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index a94a3f5f2c..c0e625224f 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -75,6 +75,8 @@ defmodule Electric.ConnectionManager do @name __MODULE__ + @lock_status_logging_interval 10_000 + @doc """ Returns the version of the PostgreSQL server. """ @@ -166,6 +168,7 @@ defmodule Electric.ConnectionManager do lock_name: Keyword.fetch!(state.replication_opts, :slot_name) ) do {:ok, lock_connection_pid} -> + Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval) {:noreply, %{state | lock_connection_pid: lock_connection_pid}} {:error, reason} -> @@ -250,6 +253,17 @@ defmodule Electric.ConnectionManager do {:noreply, %{state | replication_client_pid: nil}} end + # Periodically log the status of the lock connection until it is acquired for + # easier debugging and diagnostics. + def handle_info(:log_lock_connection_status, state) do + if not state.pg_lock_acquired do + Logger.warning(fn -> "Waiting for postgres lock to be acquired..." end) + Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval) + end + + {:noreply, state} + end + @impl true def handle_cast({:connection_opts, pid, connection_opts}, state) do Process.monitor(pid) @@ -267,7 +281,7 @@ defmodule Electric.ConnectionManager do end end - def handle_cast(:lock_connection_acquired, state) do + def handle_cast(:lock_connection_acquired, %{pg_lock_acquired: false} = state) do # As soon as we acquire the connection lock, we try to start the replication connection # first because it requires additional privileges compared to regular "pooled" connections, # so failure to open a replication connection should be reported ASAP. From f22c669b26c083f35c0543057d735657b654090f Mon Sep 17 00:00:00 2001 From: msfstef Date: Tue, 1 Oct 2024 12:34:39 +0300 Subject: [PATCH 30/33] Add crash recovery integration test --- integration-tests/tests/crash-recovery.lux | 67 ++++++++++++++++++++++ integration-tests/tests/macros.luxinc | 5 ++ 2 files changed, 72 insertions(+) create mode 100644 integration-tests/tests/crash-recovery.lux diff --git a/integration-tests/tests/crash-recovery.lux b/integration-tests/tests/crash-recovery.lux new file mode 100644 index 0000000000..8e5eb6cafa --- /dev/null +++ b/integration-tests/tests/crash-recovery.lux @@ -0,0 +1,67 @@ +[doc Verify handling of an Electric crash recovery] + +[include macros.luxinc] + +[global pg_container_name=crash-recovery__pg] + +### + +## Start a new Postgres cluster +[invoke setup_pg "" ""] + +## Add some data +[invoke start_psql] +[shell psql] + """! + CREATE TABLE items ( + id UUID PRIMARY KEY, + val TEXT + ); + """ + ??CREATE TABLE + + """! + INSERT INTO + items (id, val) + SELECT + gen_random_uuid(), + '#' || generate_series || ' test val' + FROM + generate_series(1, 10); + """ + ??INSERT 0 10 + +## Start the sync service. +[invoke setup_electric] + +[shell electric] + ??[info] Starting replication from postgres + +# Initialize a shape and collect the offset +[shell client] + # strip ANSI codes from response for easier matching + !curl -I http://localhost:3000/v1/shape/items?offset=-1 | sed 's/\x1b\[[0-9;]*m//g' + ?electric-shape-id: ([\d-]+) + [local shape_id=$1] + ?electric-chunk-last-offset: ([\w\d_]+) + [local last_offset=$1] + +## Terminate electric after giving it some time to create the shape +[shell electric] + ?Txn received + [sleep 2] + !System.halt() + ??$PS1 + +## Start the sync service again. +[invoke setup_electric] +[shell electric] + ??[info] Starting replication from postgres + +# Client should be able to continue same shape +[shell client] + !curl -I "http://localhost:3000/v1/shape/items?offset=$last_offset&shape_id=$shape_id" + ??HTTP/1.1 200 OK + +[cleanup] + [invoke teardown] diff --git a/integration-tests/tests/macros.luxinc b/integration-tests/tests/macros.luxinc index 42fa8fcef4..0fe6f5649c 100644 --- a/integration-tests/tests/macros.luxinc +++ b/integration-tests/tests/macros.luxinc @@ -41,6 +41,11 @@ ??database system is ready to accept connections [endmacro] +[macro start_psql] + [shell psql] + !docker exec -u postgres -it $pg_container_name psql +[endmacro] + [macro seed_pg] [shell psql] !docker exec -u postgres -it $pg_container_name psql From c02458182b0719defa709619ad755fe5a5a21f75 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 1 Oct 2024 17:36:01 +0300 Subject: [PATCH 31/33] Replace positional matching on a keyword list with Keyword.pop() Matching on keywords in Elixir is positional, so if the order of keys at the call site changes at any point, it would break this start_link() implementation. --- .../lib/electric/postgres/lock_connection.ex | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/sync-service/lib/electric/postgres/lock_connection.ex b/packages/sync-service/lib/electric/postgres/lock_connection.ex index 60941a6fab..70831eb142 100644 --- a/packages/sync-service/lib/electric/postgres/lock_connection.ex +++ b/packages/sync-service/lib/electric/postgres/lock_connection.ex @@ -29,14 +29,12 @@ defmodule Electric.Postgres.LockConnection do end @spec start_link(options()) :: {:ok, pid()} | {:error, Postgrex.Error.t() | term()} - def start_link( - connection_opts: connection_opts, - connection_manager: connection_manager, - lock_name: lock_name - ) do + def start_link(opts) do + {connection_opts, init_opts} = Keyword.pop(opts, :connection_opts) + case Postgrex.SimpleConnection.start_link( __MODULE__, - [connection_manager: connection_manager, lock_name: lock_name], + init_opts, connection_opts ++ [timeout: :infinity, auto_reconnect: false] ) do {:ok, pid} -> From b0d928b58d2e0410e52d6664857be97fca4e66af Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 1 Oct 2024 17:37:00 +0300 Subject: [PATCH 32/33] Remove a noop "with" from ServiceStatus.check() The "with" expression is doing nothing here. Better add control flow when it's needed rather than "just in case". --- .../lib/electric/service_status.ex | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/sync-service/lib/electric/service_status.ex b/packages/sync-service/lib/electric/service_status.ex index 15350b710d..8c141555c1 100644 --- a/packages/sync-service/lib/electric/service_status.ex +++ b/packages/sync-service/lib/electric/service_status.ex @@ -7,16 +7,16 @@ defmodule Electric.ServiceStatus do @type options :: [option] @spec check(options()) :: status() - def check(get_connection_status: get_connection_status) do - with connection_status <- get_connection_status.() do - # Match the connection status ot a service status - currently - # they are one and the same but keeping this decoupled for future - # additions to conditions that determine service status - case connection_status do - :waiting -> :waiting - :starting -> :starting - :active -> :active - end + def check(opts) do + get_connection_status_fun = Keyword.fetch!(opts, :get_connection_status) + + # Match the connection status ot a service status - currently + # they are one and the same but keeping this decoupled for future + # additions to conditions that determine service status + case get_connection_status_fun.() do + :waiting -> :waiting + :starting -> :starting + :active -> :active end end end From dc8c980833e41cc578f5831618a467ac26a9772e Mon Sep 17 00:00:00 2001 From: msfstef Date: Tue, 1 Oct 2024 18:03:48 +0300 Subject: [PATCH 33/33] Adress PR comments --- .changeset/poor-candles-fly.md | 2 +- integration-tests/tests/crash-recovery.lux | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/.changeset/poor-candles-fly.md b/.changeset/poor-candles-fly.md index 2b42a58690..3c0c4c6466 100644 --- a/.changeset/poor-candles-fly.md +++ b/.changeset/poor-candles-fly.md @@ -4,4 +4,4 @@ - Wait for advisory lock on replication slot to enable rolling deploys. - Configurable replication slot and publication name using `REPLICATION_STREAM_ID` environment variable. -- Add `HealthCheckPlug` API endopint at `v1/health` that returns `waiting`, `starting`, `active`, and `stopping` statuses. \ No newline at end of file +- Add `HealthCheckPlug` API endopint at `v1/health` that returns `waiting`, `starting`,and `active` statuses. \ No newline at end of file diff --git a/integration-tests/tests/crash-recovery.lux b/integration-tests/tests/crash-recovery.lux index 8e5eb6cafa..36a223fdfc 100644 --- a/integration-tests/tests/crash-recovery.lux +++ b/integration-tests/tests/crash-recovery.lux @@ -40,16 +40,14 @@ # Initialize a shape and collect the offset [shell client] # strip ANSI codes from response for easier matching - !curl -I http://localhost:3000/v1/shape/items?offset=-1 | sed 's/\x1b\[[0-9;]*m//g' + !curl -v -X GET http://localhost:3000/v1/shape/items?offset=-1 ?electric-shape-id: ([\d-]+) [local shape_id=$1] ?electric-chunk-last-offset: ([\w\d_]+) [local last_offset=$1] -## Terminate electric after giving it some time to create the shape +## Terminate electric [shell electric] - ?Txn received - [sleep 2] !System.halt() ??$PS1 @@ -60,7 +58,7 @@ # Client should be able to continue same shape [shell client] - !curl -I "http://localhost:3000/v1/shape/items?offset=$last_offset&shape_id=$shape_id" + !curl -v -X GET "http://localhost:3000/v1/shape/items?offset=$last_offset&shape_id=$shape_id" ??HTTP/1.1 200 OK [cleanup]