Skip to content

Commit

Permalink
Drop replication slot on tenant delete (#1965)
Browse files Browse the repository at this point in the history
Fixes #1924
  • Loading branch information
robacourt authored Nov 12, 2024
1 parent 10bd85f commit ae18f4a
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 48 deletions.
5 changes: 5 additions & 0 deletions .changeset/modern-taxis-guess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Drops the replication slot when `DELETE /v1/admin/database/:database_id` is called
3 changes: 1 addition & 2 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -209,5 +209,4 @@ config :electric,
prometheus_port: prometheus_port,
storage: storage,
persistent_kv: persistent_kv,
listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false),
tenant_tables_name: :tenant_tables
listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false)
12 changes: 8 additions & 4 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ defmodule Electric.Application do
@process_registry_name Electric.Registry.Processes
def process_registry, do: @process_registry_name

@spec process_name(atom(), atom()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, module) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id}}}
end

@spec process_name(atom(), String.t(), atom()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, tenant_id, module) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, tenant_id}}}
Expand Down Expand Up @@ -49,7 +54,7 @@ defmodule Electric.Application do
name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()},
{Registry,
name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()},
Electric.TenantSupervisor,
{Electric.TenantSupervisor, electric_instance_id: config.electric_instance_id},
{Electric.TenantManager, router_opts},
{Bandit,
plug: {Electric.Plug.Router, router_opts},
Expand Down Expand Up @@ -77,11 +82,10 @@ defmodule Electric.Application do
# from the OTP application env, runs some pre-processing functions and stores the processed
# configuration as a single map using `:persistent_term`.
defp configure do
tenant_tables_name = Application.fetch_env!(:electric, :tenant_tables_name)
:ets.new(tenant_tables_name, [:public, :named_table, :set, {:read_concurrency, true}])

electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id)

Electric.Tenant.Tables.init(electric_instance_id)

{kv_module, kv_fun, kv_params} = Application.fetch_env!(:electric, :persistent_kv)
persistent_kv = apply(kv_module, kv_fun, [kv_params])

Expand Down
37 changes: 35 additions & 2 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ defmodule Electric.Connection.Manager do
:pg_system_identifier,
# PostgreSQL timeline ID
:pg_timeline_id,
:tenant_id
:tenant_id,
drop_slot_requesters: []
]
end

Expand Down Expand Up @@ -112,6 +113,10 @@ defmodule Electric.Connection.Manager do
GenServer.call(server, :get_status)
end

def drop_replication_slot(server) do
GenServer.call(server, :drop_replication_slot)
end

def exclusive_connection_lock_acquired(server) do
GenServer.cast(server, :exclusive_connection_lock_acquired)
end
Expand Down Expand Up @@ -186,6 +191,23 @@ defmodule Electric.Connection.Manager do
{:reply, status, state}
end

def handle_call(:drop_replication_slot, _from, %{pool_pid: pool} = state) when pool != nil do
{:reply, drop_publication(state), state}
end

def handle_call(:drop_replication_slot, from, state) do
{:noreply, %{state | drop_slot_requesters: [from | state.drop_slot_requesters]}}
end

defp drop_publication(state) do
publication_name = Keyword.fetch!(state.replication_opts, :publication_name)

case Postgrex.query(state.pool_pid, "DROP PUBLICATION #{publication_name}", []) do
{:ok, _} -> :ok
error -> error
end
end

@impl true
def handle_continue(:start_lock_connection, %State{lock_connection_pid: nil} = state) do
case Electric.Postgres.LockConnection.start_link(
Expand Down Expand Up @@ -263,13 +285,24 @@ defmodule Electric.Connection.Manager do
Process.monitor(log_collector_pid)

state = %{state | pool_pid: pool_pid, shape_log_collector_pid: log_collector_pid}
{:noreply, state}

{:noreply, state, {:continue, :maybe_drop_replication_slot}}

{:error, reason} ->
handle_connection_error(reason, state, "regular")
end
end

def handle_continue(:maybe_drop_replication_slot, %{drop_slot_requesters: []} = state) do
{:noreply, state}
end

def handle_continue(:maybe_drop_replication_slot, %{drop_slot_requesters: requesters} = state) do
result = drop_publication(state)
Enum.each(requesters, fn requester -> GenServer.reply(requester, result) end)
{:noreply, %{state | drop_slot_requesters: []}}
end

@impl true
def handle_info({:timeout, tref, step}, %{backoff: {backoff, tref}} = state) do
state = %{state | backoff: {backoff, nil}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
__MODULE__,
Map.new(opts)
|> Map.put_new(:pg_info_table, @default_pg_info_table)
|> Map.put_new(:pg_relation_table, @default_pg_relation_table)
|> Map.put_new_lazy(:tenant_tables_name, fn ->
Application.fetch_env!(:electric, :tenant_tables_name)
end),
|> Map.put_new(:pg_relation_table, @default_pg_relation_table),
name: name(opts)
)

Expand Down Expand Up @@ -252,7 +249,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do

def fetch_tenant_tables_name(opts) do
case Access.fetch(opts, :tenant_tables_name) do
:error -> Application.fetch_env!(:electric, :tenant_tables_name)
:error -> Electric.Tenant.Tables.name(Access.fetch!(opts, :electric_instance_id))
{:ok, tenant_tables_name} -> tenant_tables_name
end
end
Expand Down
13 changes: 8 additions & 5 deletions packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ defmodule Electric.TenantSupervisor do

require Logger

@name Electric.DynamicTenantSupervisor

def start_link(_opts) do
DynamicSupervisor.start_link(__MODULE__, [], name: @name)
def start_link(opts) do
DynamicSupervisor.start_link(__MODULE__, [], name: name(opts))
end

def start_tenant(opts) do
Logger.debug(fn -> "Starting tenant for #{Access.fetch!(opts, :tenant_id)}" end)
DynamicSupervisor.start_child(@name, {Tenant.Supervisor, opts})
DynamicSupervisor.start_child(name(opts), {Tenant.Supervisor, opts})
end

@doc """
Expand All @@ -33,4 +31,9 @@ defmodule Electric.TenantSupervisor do
Logger.debug(fn -> "Starting #{__MODULE__}" end)
DynamicSupervisor.init(strategy: :one_for_one)
end

defp name(opts) do
electric_instance_id = Access.fetch!(opts, :electric_instance_id)
Electric.Application.process_name(electric_instance_id, __MODULE__)
end
end
14 changes: 14 additions & 0 deletions packages/sync-service/lib/electric/tenant/tables.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Electric.Tenant.Tables do
def name(electric_instance_id) do
:"tenant_tables_#{electric_instance_id}"
end

def init(electric_instance_id) do
:ets.new(name(electric_instance_id), [
:public,
:named_table,
:set,
{:read_concurrency, true}
])
end
end
6 changes: 6 additions & 0 deletions packages/sync-service/lib/electric/tenant_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ defmodule Electric.TenantManager do

case GenServer.call(server, {:delete_tenant, tenant_id, pg_id}) do
:ok ->
:ok =
opts
|> Keyword.fetch!(:electric_instance_id)
|> Electric.Connection.Manager.name(tenant_id)
|> Electric.Connection.Manager.drop_replication_slot()

:ok = Electric.TenantSupervisor.stop_tenant(opts ++ [tenant_id: tenant_id])
:ok = Electric.Tenant.Persistence.delete_tenant!(tenant_id, opts)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Electric.Plug.RemoveDatabasePlugTest do

import Support.ComponentSetup
import Support.DbSetup
import Support.TestUtils, only: [with_electric_instance_id: 1]

alias Support.Mock

Expand Down Expand Up @@ -48,16 +49,16 @@ defmodule Electric.Plug.RemoveDatabasePlugTest do
}
end

setup :with_complete_stack
setup :with_electric_instance_id
setup :with_tenant_id
setup :with_registry
setup :with_persistent_kv
setup :with_tenant_tables
setup :with_app_config
setup :with_tenant_manager
setup :with_supervised_tenant

test "returns 200 when successfully deleting a tenant", ctx do
# The tenant manager will try to shut down the tenant supervisor
# but we did not start a tenant supervisor in this test
# so we create one here
supervisor_name = Electric.Tenant.Supervisor.name(ctx.electric_instance_id, ctx.tenant_id)
Supervisor.start_link([], name: supervisor_name, strategy: :one_for_one)

conn =
ctx
|> conn("DELETE", ctx.tenant_id)
Expand All @@ -70,6 +71,9 @@ defmodule Electric.Plug.RemoveDatabasePlugTest do
app_config: ctx.app_config,
electric_instance_id: ctx.electric_instance_id
) == %{}

# Ensure the publication has been dropped
assert %{rows: []} = Postgrex.query!(ctx.db_conn, "SELECT pubname FROM pg_publication", [])
end

test "returns 404 when tenant is not found", ctx do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do
use Support.TransactionCase, async: true
import Support.ComponentSetup
import Support.DbStructureSetup
import Support.TestUtils, only: [with_electric_instance_id: 1]
alias Electric.Postgres.Inspector.EtsInspector

describe "load_relation/2" do
setup [:with_tenant_id, :with_inspector, :with_basic_tables, :with_sql_execute]
setup [
:with_electric_instance_id,
:with_tenant_id,
:with_inspector,
:with_basic_tables,
:with_sql_execute
]

setup %{inspector: {EtsInspector, opts}} do
{:ok, %{opts: opts, table: {"public", "items"}}}
Expand Down Expand Up @@ -54,7 +61,13 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do
end

describe "clean/2" do
setup [:with_tenant_id, :with_inspector, :with_basic_tables, :with_sql_execute]
setup [
:with_electric_instance_id,
:with_tenant_id,
:with_inspector,
:with_basic_tables,
:with_sql_execute
]

setup %{
inspector: {EtsInspector, opts},
Expand Down Expand Up @@ -124,7 +137,7 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do
end

describe "load_column_info/2" do
setup [:with_tenant_id, :with_inspector, :with_basic_tables]
setup [:with_electric_instance_id, :with_tenant_id, :with_inspector, :with_basic_tables]

setup %{inspector: {EtsInspector, opts}} do
{:ok, %{opts: opts, table: {"public", "items"}}}
Expand Down
20 changes: 17 additions & 3 deletions packages/sync-service/test/electric/shapes/shape_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,15 @@ defmodule Electric.Shapes.ShapeTest do
import Support.DbSetup
import Support.DbStructureSetup
import Support.ComponentSetup
import Support.TestUtils, only: [with_electric_instance_id: 1]

setup [:with_shared_db, :with_tenant_id, :with_inspector, :with_sql_execute]
setup [
:with_electric_instance_id,
:with_shared_db,
:with_tenant_id,
:with_inspector,
:with_sql_execute
]

@tag with_sql: [
"CREATE SCHEMA IF NOT EXISTS test",
Expand Down Expand Up @@ -366,8 +373,15 @@ defmodule Electric.Shapes.ShapeTest do
import Support.DbSetup
import Support.DbStructureSetup
import Support.ComponentSetup

setup [:with_shared_db, :with_tenant_id, :with_inspector, :with_sql_execute]
import Support.TestUtils, only: [with_electric_instance_id: 1]

setup [
:with_electric_instance_id,
:with_shared_db,
:with_tenant_id,
:with_inspector,
:with_sql_execute
]

@tag with_sql: [
"CREATE SCHEMA IF NOT EXISTS test",
Expand Down
20 changes: 13 additions & 7 deletions packages/sync-service/test/electric/tenant_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Electric.TenantManagerTest do

import Support.ComponentSetup
import Support.DbSetup
import Support.TestUtils, only: [with_electric_instance_id: 1]

@moduletag :tmp_dir

Expand Down Expand Up @@ -227,17 +228,22 @@ defmodule Electric.TenantManagerTest do
describe "delete_tenant/2" do
setup :with_unique_db

setup do
setup ctx do
%{
publication_name: "electric_test_publication"
publication_name: "electric_test_publication",
connection_opts: Map.fetch!(ctx, :db_config)
}
end

setup ctx do
ctx
|> Map.put(:connection_opts, Map.fetch!(ctx, :db_config))
|> with_complete_stack(tenant: &with_supervised_tenant/1)
end
setup :with_electric_instance_id
setup :with_tenant_id
setup :with_registry
setup :with_persistent_kv
setup :with_tenant_tables
setup :with_slot_name_and_stream_id
setup :with_app_config
setup :with_tenant_manager
setup :with_supervised_tenant

test "deletes the tenant", %{
electric_instance_id: electric_instance_id,
Expand Down
Loading

0 comments on commit ae18f4a

Please sign in to comment.