From 0f4f916bd1dd225855b42841f4b8debe5b292749 Mon Sep 17 00:00:00 2001 From: rob Date: Wed, 6 Nov 2024 17:36:41 +0000 Subject: [PATCH 01/17] Isolate TenantSupervisor in tests --- packages/sync-service/lib/electric/application.ex | 5 +++++ .../lib/electric/tenant/dynamic_supervisor.ex | 13 ++++++++----- .../sync-service/test/support/component_setup.ex | 10 ++++++++-- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 038a5c5af5..9bda3a8a9e 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -5,6 +5,11 @@ defmodule Electric.Application do @process_registry_name Electric.Registry.Processes def process_registry, do: @process_registry_name + @spec process_name(atom(), atom()) :: {:via, atom(), {atom(), term()}} + def process_name(electric_instance_id, module) when is_atom(module) do + {:via, Registry, {@process_registry_name, {module, electric_instance_id}}} + end + @spec process_name(atom(), String.t(), atom()) :: {:via, atom(), {atom(), term()}} def process_name(electric_instance_id, tenant_id, module) when is_atom(module) do {:via, Registry, {@process_registry_name, {module, electric_instance_id, tenant_id}}} diff --git a/packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex b/packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex index 4fdc8d9cdd..f3026ddbb5 100644 --- a/packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex +++ b/packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex @@ -8,15 +8,13 @@ defmodule Electric.TenantSupervisor do require Logger - @name Electric.DynamicTenantSupervisor - - def start_link(_opts) do - DynamicSupervisor.start_link(__MODULE__, [], name: @name) + def start_link(opts) do + DynamicSupervisor.start_link(__MODULE__, [], name: name(opts)) end def start_tenant(opts) do Logger.debug(fn -> "Starting tenant for #{Access.fetch!(opts, :tenant_id)}" end) - DynamicSupervisor.start_child(@name, {Tenant.Supervisor, opts}) + DynamicSupervisor.start_child(name(opts), {Tenant.Supervisor, opts}) end @doc """ @@ -33,4 +31,9 @@ defmodule Electric.TenantSupervisor do Logger.debug(fn -> "Starting #{__MODULE__}" end) DynamicSupervisor.init(strategy: :one_for_one) end + + defp name(opts) do + electric_instance_id = Access.fetch!(opts, :electric_instance_id) + Electric.Application.process_name(electric_instance_id, __MODULE__) + end end diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 7e2882d6de..da998a9c83 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -14,7 +14,8 @@ defmodule Support.ComponentSetup do end def with_tenant_manager(ctx) do - Electric.TenantSupervisor.start_link([]) + {:ok, _} = + Electric.TenantSupervisor.start_link(electric_instance_id: ctx.electric_instance_id) opts = [ app_config: ctx.app_config, @@ -22,7 +23,7 @@ defmodule Support.ComponentSetup do tenant_tables_name: Access.get(ctx, :tenant_tables_name, nil) ] - Electric.TenantManager.start_link(opts) + {:ok, _} = Electric.TenantManager.start_link(opts) %{tenant_manager: Electric.TenantManager.name(opts)} end @@ -67,7 +68,12 @@ defmodule Support.ComponentSetup do ] :ok = Electric.TenantManager.store_tenant(tenant, tenant_opts) + Electric.TenantSupervisor.start_tenant(ctx) + # {:ok, _} = + # ctx + # |> Map.put(:connection_opts, ctx.db_config) + # |> Electric.TenantSupervisor.start_tenant() %{tenant: tenant} end From 20b91a996952120193fd823e3b8dba154a3c477b Mon Sep 17 00:00:00 2001 From: rob Date: Wed, 6 Nov 2024 17:39:09 +0000 Subject: [PATCH 02/17] Fix but comment out not yet used supervisor --- packages/sync-service/test/support/component_setup.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index da998a9c83..5beec15d3d 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -69,7 +69,6 @@ defmodule Support.ComponentSetup do :ok = Electric.TenantManager.store_tenant(tenant, tenant_opts) - Electric.TenantSupervisor.start_tenant(ctx) # {:ok, _} = # ctx # |> Map.put(:connection_opts, ctx.db_config) From c08d0591acdba8749d67a9997ef592b20883afdd Mon Sep 17 00:00:00 2001 From: rob Date: Wed, 6 Nov 2024 18:26:40 +0000 Subject: [PATCH 03/17] Standardise tenant_table_name --- packages/sync-service/config/runtime.exs | 3 +-- .../sync-service/lib/electric/application.ex | 5 ++--- .../postgres/inspector/ets_inspector.ex | 5 +---- .../lib/electric/tenants_table.ex | 14 +++++++++++++ .../postgres/inspector/ets_inspector_test.exs | 19 +++++++++++++++--- .../test/electric/shapes/shape_test.exs | 20 ++++++++++++++++--- .../test/support/component_setup.ex | 6 +++--- 7 files changed, 54 insertions(+), 18 deletions(-) create mode 100644 packages/sync-service/lib/electric/tenants_table.ex diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 90cabe9766..ef5424ca97 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -209,5 +209,4 @@ config :electric, prometheus_port: prometheus_port, storage: storage, persistent_kv: persistent_kv, - listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false), - tenant_tables_name: :tenant_tables + listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false) diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 9bda3a8a9e..6e04da6fe6 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -82,11 +82,10 @@ defmodule Electric.Application do # from the OTP application env, runs some pre-processing functions and stores the processed # configuration as a single map using `:persistent_term`. defp configure do - tenant_tables_name = Application.fetch_env!(:electric, :tenant_tables_name) - :ets.new(tenant_tables_name, [:public, :named_table, :set, {:read_concurrency, true}]) - electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id) + Electric.TenantsTable.init(electric_instance_id) + {kv_module, kv_fun, kv_params} = Application.fetch_env!(:electric, :persistent_kv) persistent_kv = apply(kv_module, kv_fun, [kv_params]) diff --git a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex index cfa93119bc..e63f40b8d0 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex @@ -30,10 +30,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do __MODULE__, Map.new(opts) |> Map.put_new(:pg_info_table, @default_pg_info_table) - |> Map.put_new(:pg_relation_table, @default_pg_relation_table) - |> Map.put_new_lazy(:tenant_tables_name, fn -> - Application.fetch_env!(:electric, :tenant_tables_name) - end), + |> Map.put_new(:pg_relation_table, @default_pg_relation_table), name: name(opts) ) diff --git a/packages/sync-service/lib/electric/tenants_table.ex b/packages/sync-service/lib/electric/tenants_table.ex new file mode 100644 index 0000000000..1b312ebadf --- /dev/null +++ b/packages/sync-service/lib/electric/tenants_table.ex @@ -0,0 +1,14 @@ +defmodule Electric.TenantsTable do + def name(electric_instance_id) do + :"tenant_tables_#{electric_instance_id}" + end + + def init(electric_instance_id) do + :ets.new(name(electric_instance_id), [ + :public, + :named_table, + :set, + {:read_concurrency, true} + ]) + end +end diff --git a/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs b/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs index b8e107a5fe..344894bc80 100644 --- a/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs +++ b/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs @@ -2,10 +2,17 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do use Support.TransactionCase, async: true import Support.ComponentSetup import Support.DbStructureSetup + import Support.TestUtils, only: [with_electric_instance_id: 1] alias Electric.Postgres.Inspector.EtsInspector describe "load_relation/2" do - setup [:with_tenant_id, :with_inspector, :with_basic_tables, :with_sql_execute] + setup [ + :with_electric_instance_id, + :with_tenant_id, + :with_inspector, + :with_basic_tables, + :with_sql_execute + ] setup %{inspector: {EtsInspector, opts}} do {:ok, %{opts: opts, table: {"public", "items"}}} @@ -54,7 +61,13 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do end describe "clean/2" do - setup [:with_tenant_id, :with_inspector, :with_basic_tables, :with_sql_execute] + setup [ + :with_electric_instance_id, + :with_tenant_id, + :with_inspector, + :with_basic_tables, + :with_sql_execute + ] setup %{ inspector: {EtsInspector, opts}, @@ -124,7 +137,7 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do end describe "load_column_info/2" do - setup [:with_tenant_id, :with_inspector, :with_basic_tables] + setup [:with_electric_instance_id, :with_tenant_id, :with_inspector, :with_basic_tables] setup %{inspector: {EtsInspector, opts}} do {:ok, %{opts: opts, table: {"public", "items"}}} diff --git a/packages/sync-service/test/electric/shapes/shape_test.exs b/packages/sync-service/test/electric/shapes/shape_test.exs index 284e5e18bc..0175e5e408 100644 --- a/packages/sync-service/test/electric/shapes/shape_test.exs +++ b/packages/sync-service/test/electric/shapes/shape_test.exs @@ -213,8 +213,15 @@ defmodule Electric.Shapes.ShapeTest do import Support.DbSetup import Support.DbStructureSetup import Support.ComponentSetup + import Support.TestUtils, only: [with_electric_instance_id: 1] - setup [:with_shared_db, :with_tenant_id, :with_inspector, :with_sql_execute] + setup [ + :with_electric_instance_id, + :with_shared_db, + :with_tenant_id, + :with_inspector, + :with_sql_execute + ] @tag with_sql: [ "CREATE SCHEMA IF NOT EXISTS test", @@ -366,8 +373,15 @@ defmodule Electric.Shapes.ShapeTest do import Support.DbSetup import Support.DbStructureSetup import Support.ComponentSetup - - setup [:with_shared_db, :with_tenant_id, :with_inspector, :with_sql_execute] + import Support.TestUtils, only: [with_electric_instance_id: 1] + + setup [ + :with_electric_instance_id, + :with_shared_db, + :with_tenant_id, + :with_inspector, + :with_sql_execute + ] @tag with_sql: [ "CREATE SCHEMA IF NOT EXISTS test", diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 5beec15d3d..ef48028add 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -20,7 +20,7 @@ defmodule Support.ComponentSetup do opts = [ app_config: ctx.app_config, electric_instance_id: ctx.electric_instance_id, - tenant_tables_name: Access.get(ctx, :tenant_tables_name, nil) + tenant_tables_name: Electric.TenantsTable.name(ctx.electric_instance_id) ] {:ok, _} = Electric.TenantManager.start_link(opts) @@ -252,8 +252,8 @@ defmodule Support.ComponentSetup do pg_info_table = :"pg_info_table #{full_test_name(ctx)}" pg_relation_table = :"pg_relation_table #{full_test_name(ctx)}" - tenant_tables_name = :"tenant_tables_name #{full_test_name(ctx)}" - :ets.new(tenant_tables_name, [:public, :named_table, :set]) + tenant_tables_name = Electric.TenantsTable.name(ctx.electric_instance_id) + Electric.TenantsTable.init(ctx.electric_instance_id) {:ok, _} = EtsInspector.start_link( From 81a3939fa4ed5f572199e945989b7a17e9308e9a Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 7 Nov 2024 15:49:38 +0000 Subject: [PATCH 04/17] Use supervised tenant in plug test --- .../plug/remove_database_plug_test.exs | 15 +++++----- .../test/support/component_setup.ex | 29 +++++++++++++++---- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/packages/sync-service/test/electric/plug/remove_database_plug_test.exs b/packages/sync-service/test/electric/plug/remove_database_plug_test.exs index 9d29d734d9..bca30e0b05 100644 --- a/packages/sync-service/test/electric/plug/remove_database_plug_test.exs +++ b/packages/sync-service/test/electric/plug/remove_database_plug_test.exs @@ -6,6 +6,7 @@ defmodule Electric.Plug.RemoveDatabasePlugTest do import Support.ComponentSetup import Support.DbSetup + import Support.TestUtils, only: [with_electric_instance_id: 1] alias Support.Mock @@ -48,16 +49,16 @@ defmodule Electric.Plug.RemoveDatabasePlugTest do } end - setup :with_complete_stack + setup :with_electric_instance_id + setup :with_tenant_id + setup :with_registry + setup :with_persistent_kv + setup :with_tenant_tables setup :with_app_config + setup :with_tenant_manager + setup :with_supervised_tenant test "returns 200 when successfully deleting a tenant", ctx do - # The tenant manager will try to shut down the tenant supervisor - # but we did not start a tenant supervisor in this test - # so we create one here - supervisor_name = Electric.Tenant.Supervisor.name(ctx.electric_instance_id, ctx.tenant_id) - Supervisor.start_link([], name: supervisor_name, strategy: :one_for_one) - conn = ctx |> conn("DELETE", ctx.tenant_id) diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index ef48028add..34ece6d529 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -44,6 +44,19 @@ defmodule Support.ComponentSetup do ] end + defp tenant_config_supervised(ctx) do + [ + electric_instance_id: ctx.electric_instance_id, + tenant_id: ctx.tenant_id, + pg_id: Map.get(ctx, :pg_id, "12345"), + registry: ctx.registry, + long_poll_timeout: Access.get(ctx, :long_poll_timeout, 20_000), + max_age: Access.get(ctx, :max_age, 60), + stale_age: Access.get(ctx, :stale_age, 300), + get_service_status: fn -> :active end + ] + end + def store_tenant(tenant, ctx) do :ok = Electric.TenantManager.store_tenant(tenant, @@ -78,14 +91,11 @@ defmodule Support.ComponentSetup do end def with_supervised_tenant(ctx) do - tenant = Access.get(ctx, :tenant_config, tenant_config(ctx)) + tenant = Access.get(ctx, :tenant_config, tenant_config_supervised(ctx)) :ok = Electric.TenantManager.create_tenant(ctx.tenant_id, ctx.db_config, pg_id: tenant[:pg_id], - shape_cache: tenant[:shape_cache], - storage: tenant[:storage], - inspector: tenant[:inspector], registry: tenant[:registry], long_poll_timeout: tenant[:long_poll_timeout], max_age: tenant[:max_age], @@ -104,7 +114,11 @@ defmodule Support.ComponentSetup do [{tenant_supervisor_pid, _}] = Registry.lookup(registry_name, registry_key) - %{tenant: tenant, tenant_supervisor_pid: tenant_supervisor_pid} + %{ + tenant: tenant, + tenant_supervisor_pid: tenant_supervisor_pid + # shape_cache: Electric.ShapeCache.name(ctx.electric_instance_id, ctx.tenant_id) + } end def with_registry(ctx) do @@ -247,6 +261,11 @@ defmodule Support.ComponentSetup do %{replication_client: pid} end + def with_tenant_tables(ctx) do + Electric.TenantsTable.init(ctx.electric_instance_id) + %{tenant_tables_name: Electric.TenantsTable.name(ctx.electric_instance_id)} + end + def with_inspector(ctx) do server = :"inspector #{full_test_name(ctx)}" pg_info_table = :"pg_info_table #{full_test_name(ctx)}" From 10f3675f5763d9c716407c2390af02be6c64cef5 Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 7 Nov 2024 16:57:14 +0000 Subject: [PATCH 05/17] Tidy up component_setup --- .../test/support/component_setup.ex | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 34ece6d529..5793daeb4d 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -44,19 +44,6 @@ defmodule Support.ComponentSetup do ] end - defp tenant_config_supervised(ctx) do - [ - electric_instance_id: ctx.electric_instance_id, - tenant_id: ctx.tenant_id, - pg_id: Map.get(ctx, :pg_id, "12345"), - registry: ctx.registry, - long_poll_timeout: Access.get(ctx, :long_poll_timeout, 20_000), - max_age: Access.get(ctx, :max_age, 60), - stale_age: Access.get(ctx, :stale_age, 300), - get_service_status: fn -> :active end - ] - end - def store_tenant(tenant, ctx) do :ok = Electric.TenantManager.store_tenant(tenant, @@ -82,16 +69,21 @@ defmodule Support.ComponentSetup do :ok = Electric.TenantManager.store_tenant(tenant, tenant_opts) - # {:ok, _} = - # ctx - # |> Map.put(:connection_opts, ctx.db_config) - # |> Electric.TenantSupervisor.start_tenant() - %{tenant: tenant} end def with_supervised_tenant(ctx) do - tenant = Access.get(ctx, :tenant_config, tenant_config_supervised(ctx)) + tenant = + [ + electric_instance_id: ctx.electric_instance_id, + tenant_id: ctx.tenant_id, + pg_id: Map.get(ctx, :pg_id, "12345"), + registry: ctx.registry, + long_poll_timeout: Access.get(ctx, :long_poll_timeout, 20_000), + max_age: Access.get(ctx, :max_age, 60), + stale_age: Access.get(ctx, :stale_age, 300), + get_service_status: fn -> :active end + ] :ok = Electric.TenantManager.create_tenant(ctx.tenant_id, ctx.db_config, @@ -114,11 +106,7 @@ defmodule Support.ComponentSetup do [{tenant_supervisor_pid, _}] = Registry.lookup(registry_name, registry_key) - %{ - tenant: tenant, - tenant_supervisor_pid: tenant_supervisor_pid - # shape_cache: Electric.ShapeCache.name(ctx.electric_instance_id, ctx.tenant_id) - } + %{tenant: tenant, tenant_supervisor_pid: tenant_supervisor_pid} end def with_registry(ctx) do From 4fe895314107a397aa523c40df456b892d802035 Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 7 Nov 2024 17:19:26 +0000 Subject: [PATCH 06/17] Correct name of TenantTables --- packages/sync-service/lib/electric/application.ex | 2 +- .../electric/{tenants_table.ex => tenant_tables.ex} | 2 +- packages/sync-service/test/support/component_setup.ex | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) rename packages/sync-service/lib/electric/{tenants_table.ex => tenant_tables.ex} (88%) diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 6e04da6fe6..1eab8eb02a 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -84,7 +84,7 @@ defmodule Electric.Application do defp configure do electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id) - Electric.TenantsTable.init(electric_instance_id) + Electric.TenantTables.init(electric_instance_id) {kv_module, kv_fun, kv_params} = Application.fetch_env!(:electric, :persistent_kv) persistent_kv = apply(kv_module, kv_fun, [kv_params]) diff --git a/packages/sync-service/lib/electric/tenants_table.ex b/packages/sync-service/lib/electric/tenant_tables.ex similarity index 88% rename from packages/sync-service/lib/electric/tenants_table.ex rename to packages/sync-service/lib/electric/tenant_tables.ex index 1b312ebadf..b2bf1720c8 100644 --- a/packages/sync-service/lib/electric/tenants_table.ex +++ b/packages/sync-service/lib/electric/tenant_tables.ex @@ -1,4 +1,4 @@ -defmodule Electric.TenantsTable do +defmodule Electric.TenantTables do def name(electric_instance_id) do :"tenant_tables_#{electric_instance_id}" end diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 5793daeb4d..1c0237c574 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -20,7 +20,7 @@ defmodule Support.ComponentSetup do opts = [ app_config: ctx.app_config, electric_instance_id: ctx.electric_instance_id, - tenant_tables_name: Electric.TenantsTable.name(ctx.electric_instance_id) + tenant_tables_name: Electric.TenantTables.name(ctx.electric_instance_id) ] {:ok, _} = Electric.TenantManager.start_link(opts) @@ -250,8 +250,8 @@ defmodule Support.ComponentSetup do end def with_tenant_tables(ctx) do - Electric.TenantsTable.init(ctx.electric_instance_id) - %{tenant_tables_name: Electric.TenantsTable.name(ctx.electric_instance_id)} + Electric.TenantTables.init(ctx.electric_instance_id) + %{tenant_tables_name: Electric.TenantTables.name(ctx.electric_instance_id)} end def with_inspector(ctx) do @@ -259,8 +259,8 @@ defmodule Support.ComponentSetup do pg_info_table = :"pg_info_table #{full_test_name(ctx)}" pg_relation_table = :"pg_relation_table #{full_test_name(ctx)}" - tenant_tables_name = Electric.TenantsTable.name(ctx.electric_instance_id) - Electric.TenantsTable.init(ctx.electric_instance_id) + tenant_tables_name = Electric.TenantTables.name(ctx.electric_instance_id) + Electric.TenantTables.init(ctx.electric_instance_id) {:ok, _} = EtsInspector.start_link( From 1717eeed0ca2190d4fa0bd7cbac5281cee12f2d1 Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 7 Nov 2024 17:33:11 +0000 Subject: [PATCH 07/17] Prevent tenant_tables being created twice --- packages/sync-service/test/support/component_setup.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 1c0237c574..2c9ce7c279 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -259,8 +259,8 @@ defmodule Support.ComponentSetup do pg_info_table = :"pg_info_table #{full_test_name(ctx)}" pg_relation_table = :"pg_relation_table #{full_test_name(ctx)}" - tenant_tables_name = Electric.TenantTables.name(ctx.electric_instance_id) - Electric.TenantTables.init(ctx.electric_instance_id) + tenant_tables_name = + Map.get_lazy(ctx, :tenant_tables_name, fn -> with_tenant_tables(ctx).tenant_tables_name end) {:ok, _} = EtsInspector.start_link( From 750e5b0810d2401aa17174ca1a87ced50e1bf6ed Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 11 Nov 2024 12:09:58 +0000 Subject: [PATCH 08/17] Crude implimentation of drop that breaks many tests --- packages/sync-service/lib/electric/tenant_manager.ex | 11 +++++++++++ .../test/electric/plug/remove_database_plug_test.exs | 3 +++ 2 files changed, 14 insertions(+) diff --git a/packages/sync-service/lib/electric/tenant_manager.ex b/packages/sync-service/lib/electric/tenant_manager.ex index 51e197f95c..8c8ddf566b 100644 --- a/packages/sync-service/lib/electric/tenant_manager.ex +++ b/packages/sync-service/lib/electric/tenant_manager.ex @@ -238,6 +238,17 @@ defmodule Electric.TenantManager do case GenServer.call(server, {:delete_tenant, tenant_id, pg_id}) do :ok -> + publication_name = + Keyword.fetch!(opts, :app_config).replication_opts.publication_name + + # TODO: Remove this sleep and wait for the pool to be started instead + Process.sleep(100) + + opts + |> Keyword.fetch!(:electric_instance_id) + |> Electric.Application.process_name(tenant_id, Electric.DbPool) + |> Postgrex.query!("DROP PUBLICATION #{publication_name}", []) + :ok = Electric.TenantSupervisor.stop_tenant(opts ++ [tenant_id: tenant_id]) :ok = Electric.Tenant.Persistence.delete_tenant!(tenant_id, opts) diff --git a/packages/sync-service/test/electric/plug/remove_database_plug_test.exs b/packages/sync-service/test/electric/plug/remove_database_plug_test.exs index bca30e0b05..7bc0b92f67 100644 --- a/packages/sync-service/test/electric/plug/remove_database_plug_test.exs +++ b/packages/sync-service/test/electric/plug/remove_database_plug_test.exs @@ -71,6 +71,9 @@ defmodule Electric.Plug.RemoveDatabasePlugTest do app_config: ctx.app_config, electric_instance_id: ctx.electric_instance_id ) == %{} + + # Ensure the publication has been dropped + assert %{rows: []} = Postgrex.query!(ctx.db_conn, "SELECT pubname FROM pg_publication", []) end test "returns 404 when tenant is not found", ctx do From 061995bf4c63ba68ea43b4324709b236a2766c62 Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 11 Nov 2024 12:28:13 +0000 Subject: [PATCH 09/17] Replace sleep with wait --- .../sync-service/lib/electric/tenant_manager.ex | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/tenant_manager.ex b/packages/sync-service/lib/electric/tenant_manager.ex index 8c8ddf566b..fa316f79e3 100644 --- a/packages/sync-service/lib/electric/tenant_manager.ex +++ b/packages/sync-service/lib/electric/tenant_manager.ex @@ -241,8 +241,7 @@ defmodule Electric.TenantManager do publication_name = Keyword.fetch!(opts, :app_config).replication_opts.publication_name - # TODO: Remove this sleep and wait for the pool to be started instead - Process.sleep(100) + ensure_connection_pool_has_started(tenant_id, opts) opts |> Keyword.fetch!(:electric_instance_id) @@ -261,6 +260,20 @@ defmodule Electric.TenantManager do end end + defp ensure_connection_pool_has_started(tenant_id, opts) do + manager = + Electric.Connection.Manager.name(Keyword.fetch!(opts, :electric_instance_id), tenant_id) + + case Electric.Connection.Manager.get_status(manager) do + :active -> + :ok + + _ -> + Process.sleep(10) + ensure_connection_pool_has_started(tenant_id, opts) + end + end + ## Internal API @impl GenServer From 168262e8d29a60f6e9af251811d6ea8812f6b3e8 Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 11 Nov 2024 15:05:31 +0000 Subject: [PATCH 10/17] Fix tenant manager test --- .../test/electric/tenant_manager_test.exs | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/packages/sync-service/test/electric/tenant_manager_test.exs b/packages/sync-service/test/electric/tenant_manager_test.exs index 942229bfe3..a162295401 100644 --- a/packages/sync-service/test/electric/tenant_manager_test.exs +++ b/packages/sync-service/test/electric/tenant_manager_test.exs @@ -6,6 +6,7 @@ defmodule Electric.TenantManagerTest do import Support.ComponentSetup import Support.DbSetup + import Support.TestUtils, only: [with_electric_instance_id: 1] @moduletag :tmp_dir @@ -227,17 +228,22 @@ defmodule Electric.TenantManagerTest do describe "delete_tenant/2" do setup :with_unique_db - setup do + setup ctx do %{ - publication_name: "electric_test_publication" + publication_name: "electric_test_publication", + connection_opts: Map.fetch!(ctx, :db_config) } end - setup ctx do - ctx - |> Map.put(:connection_opts, Map.fetch!(ctx, :db_config)) - |> with_complete_stack(tenant: &with_supervised_tenant/1) - end + setup :with_electric_instance_id + setup :with_tenant_id + setup :with_registry + setup :with_persistent_kv + setup :with_tenant_tables + setup :with_slot_name_and_stream_id + setup :with_app_config + setup :with_tenant_manager + setup :with_supervised_tenant test "deletes the tenant", %{ electric_instance_id: electric_instance_id, From b4999d422992650f0da98a46c8d94a2108a20a38 Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 11 Nov 2024 15:29:57 +0000 Subject: [PATCH 11/17] Move dropping the slot into the connection manager --- .../lib/electric/connection/manager.ex | 24 +++++++++++++++++++ .../lib/electric/tenant_manager.ex | 23 ++---------------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index b9d1f59632..048667be01 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -112,6 +112,23 @@ defmodule Electric.Connection.Manager do GenServer.call(server, :get_status) end + def await_active(server) do + case get_status(server) do + :active -> + :ok + + _ -> + Process.sleep(10) + await_active(server) + end + end + + def drop_replication_slot(server) do + # Ensure that the connection pool is available + await_active(server) + GenServer.call(server, :drop_replication_slot) + end + def exclusive_connection_lock_acquired(server) do GenServer.cast(server, :exclusive_connection_lock_acquired) end @@ -186,6 +203,13 @@ defmodule Electric.Connection.Manager do {:reply, status, state} end + def handle_call(:drop_replication_slot, _from, %{pool_pid: pool_pid} = state) + when pool_pid != nil do + publication_name = Keyword.fetch!(state.replication_opts, :publication_name) + Postgrex.query!(pool_pid, "DROP PUBLICATION #{publication_name}", []) + {:reply, :ok, state} + end + @impl true def handle_continue(:start_lock_connection, %State{lock_connection_pid: nil} = state) do case Electric.Postgres.LockConnection.start_link( diff --git a/packages/sync-service/lib/electric/tenant_manager.ex b/packages/sync-service/lib/electric/tenant_manager.ex index fa316f79e3..f532e5af3c 100644 --- a/packages/sync-service/lib/electric/tenant_manager.ex +++ b/packages/sync-service/lib/electric/tenant_manager.ex @@ -238,15 +238,10 @@ defmodule Electric.TenantManager do case GenServer.call(server, {:delete_tenant, tenant_id, pg_id}) do :ok -> - publication_name = - Keyword.fetch!(opts, :app_config).replication_opts.publication_name - - ensure_connection_pool_has_started(tenant_id, opts) - opts |> Keyword.fetch!(:electric_instance_id) - |> Electric.Application.process_name(tenant_id, Electric.DbPool) - |> Postgrex.query!("DROP PUBLICATION #{publication_name}", []) + |> Electric.Connection.Manager.name(tenant_id) + |> Electric.Connection.Manager.drop_replication_slot() :ok = Electric.TenantSupervisor.stop_tenant(opts ++ [tenant_id: tenant_id]) :ok = Electric.Tenant.Persistence.delete_tenant!(tenant_id, opts) @@ -260,20 +255,6 @@ defmodule Electric.TenantManager do end end - defp ensure_connection_pool_has_started(tenant_id, opts) do - manager = - Electric.Connection.Manager.name(Keyword.fetch!(opts, :electric_instance_id), tenant_id) - - case Electric.Connection.Manager.get_status(manager) do - :active -> - :ok - - _ -> - Process.sleep(10) - ensure_connection_pool_has_started(tenant_id, opts) - end - end - ## Internal API @impl GenServer From 0f5393388a727c97bdbdb5c83f08f4cb35b32f6b Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 11 Nov 2024 16:19:28 +0000 Subject: [PATCH 12/17] Fix running outside of tests --- packages/sync-service/lib/electric/application.ex | 2 +- .../lib/electric/postgres/inspector/ets_inspector.ex | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 1eab8eb02a..7436d3f3fd 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -54,7 +54,7 @@ defmodule Electric.Application do name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()}, {Registry, name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()}, - Electric.TenantSupervisor, + {Electric.TenantSupervisor, electric_instance_id: config.electric_instance_id}, {Electric.TenantManager, router_opts}, {Bandit, plug: {Electric.Plug.Router, router_opts}, diff --git a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex index e63f40b8d0..0bcde4a6ad 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex @@ -249,7 +249,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do def fetch_tenant_tables_name(opts) do case Access.fetch(opts, :tenant_tables_name) do - :error -> Application.fetch_env!(:electric, :tenant_tables_name) + :error -> Electric.TenantTables.name(Access.fetch!(opts, :electric_instance_id)) {:ok, tenant_tables_name} -> tenant_tables_name end end From e57e33817cd05f8d8981c84973453018f35f56fd Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 11 Nov 2024 16:51:22 +0000 Subject: [PATCH 13/17] Add changeset --- .changeset/modern-taxis-guess.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/modern-taxis-guess.md diff --git a/.changeset/modern-taxis-guess.md b/.changeset/modern-taxis-guess.md new file mode 100644 index 0000000000..b0d1d04d83 --- /dev/null +++ b/.changeset/modern-taxis-guess.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Drops the replication slot when `DELETE /v1/admin/database/:database_id` is called From 8ecba699af1fa3f4ba8d8752d86719ca42054c63 Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 11 Nov 2024 17:32:16 +0000 Subject: [PATCH 14/17] Remove need for pollling wait --- .../lib/electric/connection/manager.ex | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 048667be01..9d7b436401 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -59,7 +59,8 @@ defmodule Electric.Connection.Manager do :pg_system_identifier, # PostgreSQL timeline ID :pg_timeline_id, - :tenant_id + :tenant_id, + drop_slot_requesters: [] ] end @@ -112,20 +113,7 @@ defmodule Electric.Connection.Manager do GenServer.call(server, :get_status) end - def await_active(server) do - case get_status(server) do - :active -> - :ok - - _ -> - Process.sleep(10) - await_active(server) - end - end - def drop_replication_slot(server) do - # Ensure that the connection pool is available - await_active(server) GenServer.call(server, :drop_replication_slot) end @@ -203,13 +191,20 @@ defmodule Electric.Connection.Manager do {:reply, status, state} end - def handle_call(:drop_replication_slot, _from, %{pool_pid: pool_pid} = state) - when pool_pid != nil do - publication_name = Keyword.fetch!(state.replication_opts, :publication_name) - Postgrex.query!(pool_pid, "DROP PUBLICATION #{publication_name}", []) + def handle_call(:drop_replication_slot, _from, %{pool_pid: pool} = state) when pool != nil do + drop_publication(state) {:reply, :ok, state} end + def handle_call(:drop_replication_slot, from, state) do + {:noreply, %{state | drop_slot_requesters: [from | state.drop_slot_requesters]}} + end + + defp drop_publication(state) do + publication_name = Keyword.fetch!(state.replication_opts, :publication_name) + Postgrex.query!(state.pool_pid, "DROP PUBLICATION #{publication_name}", []) + end + @impl true def handle_continue(:start_lock_connection, %State{lock_connection_pid: nil} = state) do case Electric.Postgres.LockConnection.start_link( @@ -287,13 +282,24 @@ defmodule Electric.Connection.Manager do Process.monitor(log_collector_pid) state = %{state | pool_pid: pool_pid, shape_log_collector_pid: log_collector_pid} - {:noreply, state} + + if state.drop_slot_requesters == [] do + {:noreply, state} + else + {:noreply, state, {:continue, :drop_replication_slot}} + end {:error, reason} -> handle_connection_error(reason, state, "regular") end end + def handle_continue(:drop_replication_slot, state) do + drop_publication(state) + Enum.each(state.drop_slot_requesters, fn requester -> GenServer.reply(requester, :ok) end) + {:noreply, %{state | drop_slot_requesters: []}} + end + @impl true def handle_info({:timeout, tref, step}, %{backoff: {backoff, tref}} = state) do state = %{state | backoff: {backoff, nil}} From 97b95351c327aead7f0130f36d557dd6ef36a585 Mon Sep 17 00:00:00 2001 From: rob Date: Mon, 11 Nov 2024 17:37:26 +0000 Subject: [PATCH 15/17] Refactor if to pattern match --- .../lib/electric/connection/manager.ex | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 9d7b436401..99f5b65159 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -283,20 +283,20 @@ defmodule Electric.Connection.Manager do state = %{state | pool_pid: pool_pid, shape_log_collector_pid: log_collector_pid} - if state.drop_slot_requesters == [] do - {:noreply, state} - else - {:noreply, state, {:continue, :drop_replication_slot}} - end + {:noreply, state, {:continue, :maybe_drop_replication_slot}} {:error, reason} -> handle_connection_error(reason, state, "regular") end end - def handle_continue(:drop_replication_slot, state) do + def handle_continue(:maybe_drop_replication_slot, %{drop_slot_requesters: []} = state) do + {:noreply, state} + end + + def handle_continue(:maybe_drop_replication_slot, %{drop_slot_requesters: requesters} = state) do drop_publication(state) - Enum.each(state.drop_slot_requesters, fn requester -> GenServer.reply(requester, :ok) end) + Enum.each(requesters, fn requester -> GenServer.reply(requester, :ok) end) {:noreply, %{state | drop_slot_requesters: []}} end From a4c209ff2bb76feec322fa98ffb3b3bff259c0ea Mon Sep 17 00:00:00 2001 From: rob Date: Tue, 12 Nov 2024 08:28:15 +0000 Subject: [PATCH 16/17] Rename Electric.TenantTables to Electric.Tenant.Tables --- packages/sync-service/lib/electric/application.ex | 2 +- .../lib/electric/postgres/inspector/ets_inspector.ex | 2 +- .../lib/electric/{tenant_tables.ex => tenant/tables.ex} | 2 +- packages/sync-service/test/support/component_setup.ex | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) rename packages/sync-service/lib/electric/{tenant_tables.ex => tenant/tables.ex} (87%) diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 7436d3f3fd..9022349cb1 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -84,7 +84,7 @@ defmodule Electric.Application do defp configure do electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id) - Electric.TenantTables.init(electric_instance_id) + Electric.Tenant.Tables.init(electric_instance_id) {kv_module, kv_fun, kv_params} = Application.fetch_env!(:electric, :persistent_kv) persistent_kv = apply(kv_module, kv_fun, [kv_params]) diff --git a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex index 0bcde4a6ad..3f644c6325 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex @@ -249,7 +249,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do def fetch_tenant_tables_name(opts) do case Access.fetch(opts, :tenant_tables_name) do - :error -> Electric.TenantTables.name(Access.fetch!(opts, :electric_instance_id)) + :error -> Electric.Tenant.Tables.name(Access.fetch!(opts, :electric_instance_id)) {:ok, tenant_tables_name} -> tenant_tables_name end end diff --git a/packages/sync-service/lib/electric/tenant_tables.ex b/packages/sync-service/lib/electric/tenant/tables.ex similarity index 87% rename from packages/sync-service/lib/electric/tenant_tables.ex rename to packages/sync-service/lib/electric/tenant/tables.ex index b2bf1720c8..3b61ba287b 100644 --- a/packages/sync-service/lib/electric/tenant_tables.ex +++ b/packages/sync-service/lib/electric/tenant/tables.ex @@ -1,4 +1,4 @@ -defmodule Electric.TenantTables do +defmodule Electric.Tenant.Tables do def name(electric_instance_id) do :"tenant_tables_#{electric_instance_id}" end diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 2c9ce7c279..b421d574ef 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -20,7 +20,7 @@ defmodule Support.ComponentSetup do opts = [ app_config: ctx.app_config, electric_instance_id: ctx.electric_instance_id, - tenant_tables_name: Electric.TenantTables.name(ctx.electric_instance_id) + tenant_tables_name: Electric.Tenant.Tables.name(ctx.electric_instance_id) ] {:ok, _} = Electric.TenantManager.start_link(opts) @@ -250,8 +250,8 @@ defmodule Support.ComponentSetup do end def with_tenant_tables(ctx) do - Electric.TenantTables.init(ctx.electric_instance_id) - %{tenant_tables_name: Electric.TenantTables.name(ctx.electric_instance_id)} + Electric.Tenant.Tables.init(ctx.electric_instance_id) + %{tenant_tables_name: Electric.Tenant.Tables.name(ctx.electric_instance_id)} end def with_inspector(ctx) do From ba8b59277e60476ce3d2a3b5ba7ebd63d3603dc8 Mon Sep 17 00:00:00 2001 From: rob Date: Tue, 12 Nov 2024 09:35:22 +0000 Subject: [PATCH 17/17] Crash calling process not Connection.Manager if drop fails --- .../sync-service/lib/electric/connection/manager.ex | 13 ++++++++----- .../sync-service/lib/electric/tenant_manager.ex | 9 +++++---- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 99f5b65159..7086448d7b 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -192,8 +192,7 @@ defmodule Electric.Connection.Manager do end def handle_call(:drop_replication_slot, _from, %{pool_pid: pool} = state) when pool != nil do - drop_publication(state) - {:reply, :ok, state} + {:reply, drop_publication(state), state} end def handle_call(:drop_replication_slot, from, state) do @@ -202,7 +201,11 @@ defmodule Electric.Connection.Manager do defp drop_publication(state) do publication_name = Keyword.fetch!(state.replication_opts, :publication_name) - Postgrex.query!(state.pool_pid, "DROP PUBLICATION #{publication_name}", []) + + case Postgrex.query(state.pool_pid, "DROP PUBLICATION #{publication_name}", []) do + {:ok, _} -> :ok + error -> error + end end @impl true @@ -295,8 +298,8 @@ defmodule Electric.Connection.Manager do end def handle_continue(:maybe_drop_replication_slot, %{drop_slot_requesters: requesters} = state) do - drop_publication(state) - Enum.each(requesters, fn requester -> GenServer.reply(requester, :ok) end) + result = drop_publication(state) + Enum.each(requesters, fn requester -> GenServer.reply(requester, result) end) {:noreply, %{state | drop_slot_requesters: []}} end diff --git a/packages/sync-service/lib/electric/tenant_manager.ex b/packages/sync-service/lib/electric/tenant_manager.ex index f532e5af3c..d5b1fb6986 100644 --- a/packages/sync-service/lib/electric/tenant_manager.ex +++ b/packages/sync-service/lib/electric/tenant_manager.ex @@ -238,10 +238,11 @@ defmodule Electric.TenantManager do case GenServer.call(server, {:delete_tenant, tenant_id, pg_id}) do :ok -> - opts - |> Keyword.fetch!(:electric_instance_id) - |> Electric.Connection.Manager.name(tenant_id) - |> Electric.Connection.Manager.drop_replication_slot() + :ok = + opts + |> Keyword.fetch!(:electric_instance_id) + |> Electric.Connection.Manager.name(tenant_id) + |> Electric.Connection.Manager.drop_replication_slot() :ok = Electric.TenantSupervisor.stop_tenant(opts ++ [tenant_id: tenant_id]) :ok = Electric.Tenant.Persistence.delete_tenant!(tenant_id, opts)