diff --git a/.changeset/modern-taxis-guess.md b/.changeset/modern-taxis-guess.md new file mode 100644 index 0000000000..b0d1d04d83 --- /dev/null +++ b/.changeset/modern-taxis-guess.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Drops the replication slot when `DELETE /v1/admin/database/:database_id` is called diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 90cabe9766..ef5424ca97 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -209,5 +209,4 @@ config :electric, prometheus_port: prometheus_port, storage: storage, persistent_kv: persistent_kv, - listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false), - tenant_tables_name: :tenant_tables + listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false) diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 038a5c5af5..9022349cb1 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -5,6 +5,11 @@ defmodule Electric.Application do @process_registry_name Electric.Registry.Processes def process_registry, do: @process_registry_name + @spec process_name(atom(), atom()) :: {:via, atom(), {atom(), term()}} + def process_name(electric_instance_id, module) when is_atom(module) do + {:via, Registry, {@process_registry_name, {module, electric_instance_id}}} + end + @spec process_name(atom(), String.t(), atom()) :: {:via, atom(), {atom(), term()}} def process_name(electric_instance_id, tenant_id, module) when is_atom(module) do {:via, Registry, {@process_registry_name, {module, electric_instance_id, tenant_id}}} @@ -49,7 +54,7 @@ defmodule Electric.Application do name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()}, {Registry, name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()}, - Electric.TenantSupervisor, + {Electric.TenantSupervisor, electric_instance_id: config.electric_instance_id}, {Electric.TenantManager, router_opts}, {Bandit, plug: {Electric.Plug.Router, router_opts}, @@ -77,11 +82,10 @@ defmodule Electric.Application do # from the OTP application env, runs some pre-processing functions and stores the processed # configuration as a single map using `:persistent_term`. defp configure do - tenant_tables_name = Application.fetch_env!(:electric, :tenant_tables_name) - :ets.new(tenant_tables_name, [:public, :named_table, :set, {:read_concurrency, true}]) - electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id) + Electric.Tenant.Tables.init(electric_instance_id) + {kv_module, kv_fun, kv_params} = Application.fetch_env!(:electric, :persistent_kv) persistent_kv = apply(kv_module, kv_fun, [kv_params]) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index b9d1f59632..7086448d7b 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -59,7 +59,8 @@ defmodule Electric.Connection.Manager do :pg_system_identifier, # PostgreSQL timeline ID :pg_timeline_id, - :tenant_id + :tenant_id, + drop_slot_requesters: [] ] end @@ -112,6 +113,10 @@ defmodule Electric.Connection.Manager do GenServer.call(server, :get_status) end + def drop_replication_slot(server) do + GenServer.call(server, :drop_replication_slot) + end + def exclusive_connection_lock_acquired(server) do GenServer.cast(server, :exclusive_connection_lock_acquired) end @@ -186,6 +191,23 @@ defmodule Electric.Connection.Manager do {:reply, status, state} end + def handle_call(:drop_replication_slot, _from, %{pool_pid: pool} = state) when pool != nil do + {:reply, drop_publication(state), state} + end + + def handle_call(:drop_replication_slot, from, state) do + {:noreply, %{state | drop_slot_requesters: [from | state.drop_slot_requesters]}} + end + + defp drop_publication(state) do + publication_name = Keyword.fetch!(state.replication_opts, :publication_name) + + case Postgrex.query(state.pool_pid, "DROP PUBLICATION #{publication_name}", []) do + {:ok, _} -> :ok + error -> error + end + end + @impl true def handle_continue(:start_lock_connection, %State{lock_connection_pid: nil} = state) do case Electric.Postgres.LockConnection.start_link( @@ -263,13 +285,24 @@ defmodule Electric.Connection.Manager do Process.monitor(log_collector_pid) state = %{state | pool_pid: pool_pid, shape_log_collector_pid: log_collector_pid} - {:noreply, state} + + {:noreply, state, {:continue, :maybe_drop_replication_slot}} {:error, reason} -> handle_connection_error(reason, state, "regular") end end + def handle_continue(:maybe_drop_replication_slot, %{drop_slot_requesters: []} = state) do + {:noreply, state} + end + + def handle_continue(:maybe_drop_replication_slot, %{drop_slot_requesters: requesters} = state) do + result = drop_publication(state) + Enum.each(requesters, fn requester -> GenServer.reply(requester, result) end) + {:noreply, %{state | drop_slot_requesters: []}} + end + @impl true def handle_info({:timeout, tref, step}, %{backoff: {backoff, tref}} = state) do state = %{state | backoff: {backoff, nil}} diff --git a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex index cfa93119bc..3f644c6325 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex @@ -30,10 +30,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do __MODULE__, Map.new(opts) |> Map.put_new(:pg_info_table, @default_pg_info_table) - |> Map.put_new(:pg_relation_table, @default_pg_relation_table) - |> Map.put_new_lazy(:tenant_tables_name, fn -> - Application.fetch_env!(:electric, :tenant_tables_name) - end), + |> Map.put_new(:pg_relation_table, @default_pg_relation_table), name: name(opts) ) @@ -252,7 +249,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do def fetch_tenant_tables_name(opts) do case Access.fetch(opts, :tenant_tables_name) do - :error -> Application.fetch_env!(:electric, :tenant_tables_name) + :error -> Electric.Tenant.Tables.name(Access.fetch!(opts, :electric_instance_id)) {:ok, tenant_tables_name} -> tenant_tables_name end end diff --git a/packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex b/packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex index 4fdc8d9cdd..f3026ddbb5 100644 --- a/packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex +++ b/packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex @@ -8,15 +8,13 @@ defmodule Electric.TenantSupervisor do require Logger - @name Electric.DynamicTenantSupervisor - - def start_link(_opts) do - DynamicSupervisor.start_link(__MODULE__, [], name: @name) + def start_link(opts) do + DynamicSupervisor.start_link(__MODULE__, [], name: name(opts)) end def start_tenant(opts) do Logger.debug(fn -> "Starting tenant for #{Access.fetch!(opts, :tenant_id)}" end) - DynamicSupervisor.start_child(@name, {Tenant.Supervisor, opts}) + DynamicSupervisor.start_child(name(opts), {Tenant.Supervisor, opts}) end @doc """ @@ -33,4 +31,9 @@ defmodule Electric.TenantSupervisor do Logger.debug(fn -> "Starting #{__MODULE__}" end) DynamicSupervisor.init(strategy: :one_for_one) end + + defp name(opts) do + electric_instance_id = Access.fetch!(opts, :electric_instance_id) + Electric.Application.process_name(electric_instance_id, __MODULE__) + end end diff --git a/packages/sync-service/lib/electric/tenant/tables.ex b/packages/sync-service/lib/electric/tenant/tables.ex new file mode 100644 index 0000000000..3b61ba287b --- /dev/null +++ b/packages/sync-service/lib/electric/tenant/tables.ex @@ -0,0 +1,14 @@ +defmodule Electric.Tenant.Tables do + def name(electric_instance_id) do + :"tenant_tables_#{electric_instance_id}" + end + + def init(electric_instance_id) do + :ets.new(name(electric_instance_id), [ + :public, + :named_table, + :set, + {:read_concurrency, true} + ]) + end +end diff --git a/packages/sync-service/lib/electric/tenant_manager.ex b/packages/sync-service/lib/electric/tenant_manager.ex index 51e197f95c..d5b1fb6986 100644 --- a/packages/sync-service/lib/electric/tenant_manager.ex +++ b/packages/sync-service/lib/electric/tenant_manager.ex @@ -238,6 +238,12 @@ defmodule Electric.TenantManager do case GenServer.call(server, {:delete_tenant, tenant_id, pg_id}) do :ok -> + :ok = + opts + |> Keyword.fetch!(:electric_instance_id) + |> Electric.Connection.Manager.name(tenant_id) + |> Electric.Connection.Manager.drop_replication_slot() + :ok = Electric.TenantSupervisor.stop_tenant(opts ++ [tenant_id: tenant_id]) :ok = Electric.Tenant.Persistence.delete_tenant!(tenant_id, opts) diff --git a/packages/sync-service/test/electric/plug/remove_database_plug_test.exs b/packages/sync-service/test/electric/plug/remove_database_plug_test.exs index 9d29d734d9..7bc0b92f67 100644 --- a/packages/sync-service/test/electric/plug/remove_database_plug_test.exs +++ b/packages/sync-service/test/electric/plug/remove_database_plug_test.exs @@ -6,6 +6,7 @@ defmodule Electric.Plug.RemoveDatabasePlugTest do import Support.ComponentSetup import Support.DbSetup + import Support.TestUtils, only: [with_electric_instance_id: 1] alias Support.Mock @@ -48,16 +49,16 @@ defmodule Electric.Plug.RemoveDatabasePlugTest do } end - setup :with_complete_stack + setup :with_electric_instance_id + setup :with_tenant_id + setup :with_registry + setup :with_persistent_kv + setup :with_tenant_tables setup :with_app_config + setup :with_tenant_manager + setup :with_supervised_tenant test "returns 200 when successfully deleting a tenant", ctx do - # The tenant manager will try to shut down the tenant supervisor - # but we did not start a tenant supervisor in this test - # so we create one here - supervisor_name = Electric.Tenant.Supervisor.name(ctx.electric_instance_id, ctx.tenant_id) - Supervisor.start_link([], name: supervisor_name, strategy: :one_for_one) - conn = ctx |> conn("DELETE", ctx.tenant_id) @@ -70,6 +71,9 @@ defmodule Electric.Plug.RemoveDatabasePlugTest do app_config: ctx.app_config, electric_instance_id: ctx.electric_instance_id ) == %{} + + # Ensure the publication has been dropped + assert %{rows: []} = Postgrex.query!(ctx.db_conn, "SELECT pubname FROM pg_publication", []) end test "returns 404 when tenant is not found", ctx do diff --git a/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs b/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs index b8e107a5fe..344894bc80 100644 --- a/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs +++ b/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs @@ -2,10 +2,17 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do use Support.TransactionCase, async: true import Support.ComponentSetup import Support.DbStructureSetup + import Support.TestUtils, only: [with_electric_instance_id: 1] alias Electric.Postgres.Inspector.EtsInspector describe "load_relation/2" do - setup [:with_tenant_id, :with_inspector, :with_basic_tables, :with_sql_execute] + setup [ + :with_electric_instance_id, + :with_tenant_id, + :with_inspector, + :with_basic_tables, + :with_sql_execute + ] setup %{inspector: {EtsInspector, opts}} do {:ok, %{opts: opts, table: {"public", "items"}}} @@ -54,7 +61,13 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do end describe "clean/2" do - setup [:with_tenant_id, :with_inspector, :with_basic_tables, :with_sql_execute] + setup [ + :with_electric_instance_id, + :with_tenant_id, + :with_inspector, + :with_basic_tables, + :with_sql_execute + ] setup %{ inspector: {EtsInspector, opts}, @@ -124,7 +137,7 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do end describe "load_column_info/2" do - setup [:with_tenant_id, :with_inspector, :with_basic_tables] + setup [:with_electric_instance_id, :with_tenant_id, :with_inspector, :with_basic_tables] setup %{inspector: {EtsInspector, opts}} do {:ok, %{opts: opts, table: {"public", "items"}}} diff --git a/packages/sync-service/test/electric/shapes/shape_test.exs b/packages/sync-service/test/electric/shapes/shape_test.exs index 284e5e18bc..0175e5e408 100644 --- a/packages/sync-service/test/electric/shapes/shape_test.exs +++ b/packages/sync-service/test/electric/shapes/shape_test.exs @@ -213,8 +213,15 @@ defmodule Electric.Shapes.ShapeTest do import Support.DbSetup import Support.DbStructureSetup import Support.ComponentSetup + import Support.TestUtils, only: [with_electric_instance_id: 1] - setup [:with_shared_db, :with_tenant_id, :with_inspector, :with_sql_execute] + setup [ + :with_electric_instance_id, + :with_shared_db, + :with_tenant_id, + :with_inspector, + :with_sql_execute + ] @tag with_sql: [ "CREATE SCHEMA IF NOT EXISTS test", @@ -366,8 +373,15 @@ defmodule Electric.Shapes.ShapeTest do import Support.DbSetup import Support.DbStructureSetup import Support.ComponentSetup - - setup [:with_shared_db, :with_tenant_id, :with_inspector, :with_sql_execute] + import Support.TestUtils, only: [with_electric_instance_id: 1] + + setup [ + :with_electric_instance_id, + :with_shared_db, + :with_tenant_id, + :with_inspector, + :with_sql_execute + ] @tag with_sql: [ "CREATE SCHEMA IF NOT EXISTS test", diff --git a/packages/sync-service/test/electric/tenant_manager_test.exs b/packages/sync-service/test/electric/tenant_manager_test.exs index 942229bfe3..a162295401 100644 --- a/packages/sync-service/test/electric/tenant_manager_test.exs +++ b/packages/sync-service/test/electric/tenant_manager_test.exs @@ -6,6 +6,7 @@ defmodule Electric.TenantManagerTest do import Support.ComponentSetup import Support.DbSetup + import Support.TestUtils, only: [with_electric_instance_id: 1] @moduletag :tmp_dir @@ -227,17 +228,22 @@ defmodule Electric.TenantManagerTest do describe "delete_tenant/2" do setup :with_unique_db - setup do + setup ctx do %{ - publication_name: "electric_test_publication" + publication_name: "electric_test_publication", + connection_opts: Map.fetch!(ctx, :db_config) } end - setup ctx do - ctx - |> Map.put(:connection_opts, Map.fetch!(ctx, :db_config)) - |> with_complete_stack(tenant: &with_supervised_tenant/1) - end + setup :with_electric_instance_id + setup :with_tenant_id + setup :with_registry + setup :with_persistent_kv + setup :with_tenant_tables + setup :with_slot_name_and_stream_id + setup :with_app_config + setup :with_tenant_manager + setup :with_supervised_tenant test "deletes the tenant", %{ electric_instance_id: electric_instance_id, diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 7e2882d6de..b421d574ef 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -14,15 +14,16 @@ defmodule Support.ComponentSetup do end def with_tenant_manager(ctx) do - Electric.TenantSupervisor.start_link([]) + {:ok, _} = + Electric.TenantSupervisor.start_link(electric_instance_id: ctx.electric_instance_id) opts = [ app_config: ctx.app_config, electric_instance_id: ctx.electric_instance_id, - tenant_tables_name: Access.get(ctx, :tenant_tables_name, nil) + tenant_tables_name: Electric.Tenant.Tables.name(ctx.electric_instance_id) ] - Electric.TenantManager.start_link(opts) + {:ok, _} = Electric.TenantManager.start_link(opts) %{tenant_manager: Electric.TenantManager.name(opts)} end @@ -67,20 +68,26 @@ defmodule Support.ComponentSetup do ] :ok = Electric.TenantManager.store_tenant(tenant, tenant_opts) - Electric.TenantSupervisor.start_tenant(ctx) %{tenant: tenant} end def with_supervised_tenant(ctx) do - tenant = Access.get(ctx, :tenant_config, tenant_config(ctx)) + tenant = + [ + electric_instance_id: ctx.electric_instance_id, + tenant_id: ctx.tenant_id, + pg_id: Map.get(ctx, :pg_id, "12345"), + registry: ctx.registry, + long_poll_timeout: Access.get(ctx, :long_poll_timeout, 20_000), + max_age: Access.get(ctx, :max_age, 60), + stale_age: Access.get(ctx, :stale_age, 300), + get_service_status: fn -> :active end + ] :ok = Electric.TenantManager.create_tenant(ctx.tenant_id, ctx.db_config, pg_id: tenant[:pg_id], - shape_cache: tenant[:shape_cache], - storage: tenant[:storage], - inspector: tenant[:inspector], registry: tenant[:registry], long_poll_timeout: tenant[:long_poll_timeout], max_age: tenant[:max_age], @@ -242,13 +249,18 @@ defmodule Support.ComponentSetup do %{replication_client: pid} end + def with_tenant_tables(ctx) do + Electric.Tenant.Tables.init(ctx.electric_instance_id) + %{tenant_tables_name: Electric.Tenant.Tables.name(ctx.electric_instance_id)} + end + def with_inspector(ctx) do server = :"inspector #{full_test_name(ctx)}" pg_info_table = :"pg_info_table #{full_test_name(ctx)}" pg_relation_table = :"pg_relation_table #{full_test_name(ctx)}" - tenant_tables_name = :"tenant_tables_name #{full_test_name(ctx)}" - :ets.new(tenant_tables_name, [:public, :named_table, :set]) + tenant_tables_name = + Map.get_lazy(ctx, :tenant_tables_name, fn -> with_tenant_tables(ctx).tenant_tables_name end) {:ok, _} = EtsInspector.start_link(