Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop replication slot on tenant delete #1965

Merged
merged 17 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/modern-taxis-guess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Drops the replication slot when `DELETE /v1/admin/database/:database_id` is called
3 changes: 1 addition & 2 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -209,5 +209,4 @@ config :electric,
prometheus_port: prometheus_port,
storage: storage,
persistent_kv: persistent_kv,
listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false),
tenant_tables_name: :tenant_tables
listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false)
12 changes: 8 additions & 4 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ defmodule Electric.Application do
@process_registry_name Electric.Registry.Processes
def process_registry, do: @process_registry_name

@spec process_name(atom(), atom()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, module) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id}}}
end

@spec process_name(atom(), String.t(), atom()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, tenant_id, module) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, tenant_id}}}
Expand Down Expand Up @@ -49,7 +54,7 @@ defmodule Electric.Application do
name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()},
{Registry,
name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()},
Electric.TenantSupervisor,
{Electric.TenantSupervisor, electric_instance_id: config.electric_instance_id},
{Electric.TenantManager, router_opts},
{Bandit,
plug: {Electric.Plug.Router, router_opts},
Expand Down Expand Up @@ -77,11 +82,10 @@ defmodule Electric.Application do
# from the OTP application env, runs some pre-processing functions and stores the processed
# configuration as a single map using `:persistent_term`.
defp configure do
tenant_tables_name = Application.fetch_env!(:electric, :tenant_tables_name)
:ets.new(tenant_tables_name, [:public, :named_table, :set, {:read_concurrency, true}])

electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id)

Electric.TenantTables.init(electric_instance_id)

{kv_module, kv_fun, kv_params} = Application.fetch_env!(:electric, :persistent_kv)
persistent_kv = apply(kv_module, kv_fun, [kv_params])

Expand Down
34 changes: 32 additions & 2 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ defmodule Electric.Connection.Manager do
:pg_system_identifier,
# PostgreSQL timeline ID
:pg_timeline_id,
:tenant_id
:tenant_id,
drop_slot_requesters: []
]
end

Expand Down Expand Up @@ -112,6 +113,10 @@ defmodule Electric.Connection.Manager do
GenServer.call(server, :get_status)
end

def drop_replication_slot(server) do
GenServer.call(server, :drop_replication_slot)
end

def exclusive_connection_lock_acquired(server) do
GenServer.cast(server, :exclusive_connection_lock_acquired)
end
Expand Down Expand Up @@ -186,6 +191,20 @@ defmodule Electric.Connection.Manager do
{:reply, status, state}
end

def handle_call(:drop_replication_slot, _from, %{pool_pid: pool} = state) when pool != nil do
drop_publication(state)
{:reply, :ok, state}
end

def handle_call(:drop_replication_slot, from, state) do
{:noreply, %{state | drop_slot_requesters: [from | state.drop_slot_requesters]}}
end

defp drop_publication(state) do
publication_name = Keyword.fetch!(state.replication_opts, :publication_name)
Postgrex.query!(state.pool_pid, "DROP PUBLICATION #{publication_name}", [])
Copy link
Contributor Author

@robacourt robacourt Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This runs the query directly on the pool rather than using the Postgrex.ReplicationConnection :query return term like we do for creating the publication. This is because there isn't a way to switch from stream mode back into query mode without waiting for something to arrive in the stream. WAL Keep-alives will eventually come but this is rather slow. Running the query directly seemed a better option.

end

@impl true
def handle_continue(:start_lock_connection, %State{lock_connection_pid: nil} = state) do
case Electric.Postgres.LockConnection.start_link(
Expand Down Expand Up @@ -263,13 +282,24 @@ defmodule Electric.Connection.Manager do
Process.monitor(log_collector_pid)

state = %{state | pool_pid: pool_pid, shape_log_collector_pid: log_collector_pid}
{:noreply, state}

{:noreply, state, {:continue, :maybe_drop_replication_slot}}

{:error, reason} ->
handle_connection_error(reason, state, "regular")
end
end

def handle_continue(:maybe_drop_replication_slot, %{drop_slot_requesters: []} = state) do
{:noreply, state}
end

def handle_continue(:maybe_drop_replication_slot, %{drop_slot_requesters: requesters} = state) do
drop_publication(state)
Enum.each(requesters, fn requester -> GenServer.reply(requester, :ok) end)
{:noreply, %{state | drop_slot_requesters: []}}
end

@impl true
def handle_info({:timeout, tref, step}, %{backoff: {backoff, tref}} = state) do
state = %{state | backoff: {backoff, nil}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
__MODULE__,
Map.new(opts)
|> Map.put_new(:pg_info_table, @default_pg_info_table)
|> Map.put_new(:pg_relation_table, @default_pg_relation_table)
|> Map.put_new_lazy(:tenant_tables_name, fn ->
Application.fetch_env!(:electric, :tenant_tables_name)
end),
|> Map.put_new(:pg_relation_table, @default_pg_relation_table),
name: name(opts)
)

Expand Down Expand Up @@ -252,7 +249,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do

def fetch_tenant_tables_name(opts) do
case Access.fetch(opts, :tenant_tables_name) do
:error -> Application.fetch_env!(:electric, :tenant_tables_name)
:error -> Electric.TenantTables.name(Access.fetch!(opts, :electric_instance_id))
{:ok, tenant_tables_name} -> tenant_tables_name
end
end
Expand Down
13 changes: 8 additions & 5 deletions packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ defmodule Electric.TenantSupervisor do

require Logger

@name Electric.DynamicTenantSupervisor

def start_link(_opts) do
DynamicSupervisor.start_link(__MODULE__, [], name: @name)
def start_link(opts) do
DynamicSupervisor.start_link(__MODULE__, [], name: name(opts))
end

def start_tenant(opts) do
Logger.debug(fn -> "Starting tenant for #{Access.fetch!(opts, :tenant_id)}" end)
DynamicSupervisor.start_child(@name, {Tenant.Supervisor, opts})
DynamicSupervisor.start_child(name(opts), {Tenant.Supervisor, opts})
end

@doc """
Expand All @@ -33,4 +31,9 @@ defmodule Electric.TenantSupervisor do
Logger.debug(fn -> "Starting #{__MODULE__}" end)
DynamicSupervisor.init(strategy: :one_for_one)
end

defp name(opts) do
electric_instance_id = Access.fetch!(opts, :electric_instance_id)
Electric.Application.process_name(electric_instance_id, __MODULE__)
end
end
5 changes: 5 additions & 0 deletions packages/sync-service/lib/electric/tenant_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ defmodule Electric.TenantManager do

case GenServer.call(server, {:delete_tenant, tenant_id, pg_id}) do
:ok ->
opts
|> Keyword.fetch!(:electric_instance_id)
|> Electric.Connection.Manager.name(tenant_id)
|> Electric.Connection.Manager.drop_replication_slot()

:ok = Electric.TenantSupervisor.stop_tenant(opts ++ [tenant_id: tenant_id])
:ok = Electric.Tenant.Persistence.delete_tenant!(tenant_id, opts)

Expand Down
14 changes: 14 additions & 0 deletions packages/sync-service/lib/electric/tenant_tables.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Electric.TenantTables do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this module to Tenant.Tables one level down? Top level is already quite crowded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

def name(electric_instance_id) do
:"tenant_tables_#{electric_instance_id}"
end

def init(electric_instance_id) do
:ets.new(name(electric_instance_id), [
:public,
:named_table,
:set,
{:read_concurrency, true}
])
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Electric.Plug.RemoveDatabasePlugTest do

import Support.ComponentSetup
import Support.DbSetup
import Support.TestUtils, only: [with_electric_instance_id: 1]

alias Support.Mock

Expand Down Expand Up @@ -48,16 +49,16 @@ defmodule Electric.Plug.RemoveDatabasePlugTest do
}
end

setup :with_complete_stack
setup :with_electric_instance_id
setup :with_tenant_id
setup :with_registry
setup :with_persistent_kv
setup :with_tenant_tables
setup :with_app_config
setup :with_tenant_manager
setup :with_supervised_tenant

test "returns 200 when successfully deleting a tenant", ctx do
# The tenant manager will try to shut down the tenant supervisor
# but we did not start a tenant supervisor in this test
# so we create one here
supervisor_name = Electric.Tenant.Supervisor.name(ctx.electric_instance_id, ctx.tenant_id)
Supervisor.start_link([], name: supervisor_name, strategy: :one_for_one)

conn =
ctx
|> conn("DELETE", ctx.tenant_id)
Expand All @@ -70,6 +71,9 @@ defmodule Electric.Plug.RemoveDatabasePlugTest do
app_config: ctx.app_config,
electric_instance_id: ctx.electric_instance_id
) == %{}

# Ensure the publication has been dropped
assert %{rows: []} = Postgrex.query!(ctx.db_conn, "SELECT pubname FROM pg_publication", [])
end

test "returns 404 when tenant is not found", ctx do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do
use Support.TransactionCase, async: true
import Support.ComponentSetup
import Support.DbStructureSetup
import Support.TestUtils, only: [with_electric_instance_id: 1]
alias Electric.Postgres.Inspector.EtsInspector

describe "load_relation/2" do
setup [:with_tenant_id, :with_inspector, :with_basic_tables, :with_sql_execute]
setup [
:with_electric_instance_id,
:with_tenant_id,
:with_inspector,
:with_basic_tables,
:with_sql_execute
]

setup %{inspector: {EtsInspector, opts}} do
{:ok, %{opts: opts, table: {"public", "items"}}}
Expand Down Expand Up @@ -54,7 +61,13 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do
end

describe "clean/2" do
setup [:with_tenant_id, :with_inspector, :with_basic_tables, :with_sql_execute]
setup [
:with_electric_instance_id,
:with_tenant_id,
:with_inspector,
:with_basic_tables,
:with_sql_execute
]

setup %{
inspector: {EtsInspector, opts},
Expand Down Expand Up @@ -124,7 +137,7 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do
end

describe "load_column_info/2" do
setup [:with_tenant_id, :with_inspector, :with_basic_tables]
setup [:with_electric_instance_id, :with_tenant_id, :with_inspector, :with_basic_tables]

setup %{inspector: {EtsInspector, opts}} do
{:ok, %{opts: opts, table: {"public", "items"}}}
Expand Down
20 changes: 17 additions & 3 deletions packages/sync-service/test/electric/shapes/shape_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,15 @@ defmodule Electric.Shapes.ShapeTest do
import Support.DbSetup
import Support.DbStructureSetup
import Support.ComponentSetup
import Support.TestUtils, only: [with_electric_instance_id: 1]

setup [:with_shared_db, :with_tenant_id, :with_inspector, :with_sql_execute]
setup [
:with_electric_instance_id,
:with_shared_db,
:with_tenant_id,
:with_inspector,
:with_sql_execute
]

@tag with_sql: [
"CREATE SCHEMA IF NOT EXISTS test",
Expand Down Expand Up @@ -366,8 +373,15 @@ defmodule Electric.Shapes.ShapeTest do
import Support.DbSetup
import Support.DbStructureSetup
import Support.ComponentSetup

setup [:with_shared_db, :with_tenant_id, :with_inspector, :with_sql_execute]
import Support.TestUtils, only: [with_electric_instance_id: 1]

setup [
:with_electric_instance_id,
:with_shared_db,
:with_tenant_id,
:with_inspector,
:with_sql_execute
]

@tag with_sql: [
"CREATE SCHEMA IF NOT EXISTS test",
Expand Down
20 changes: 13 additions & 7 deletions packages/sync-service/test/electric/tenant_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Electric.TenantManagerTest do

import Support.ComponentSetup
import Support.DbSetup
import Support.TestUtils, only: [with_electric_instance_id: 1]

@moduletag :tmp_dir

Expand Down Expand Up @@ -227,17 +228,22 @@ defmodule Electric.TenantManagerTest do
describe "delete_tenant/2" do
setup :with_unique_db

setup do
setup ctx do
%{
publication_name: "electric_test_publication"
publication_name: "electric_test_publication",
connection_opts: Map.fetch!(ctx, :db_config)
}
end

setup ctx do
ctx
|> Map.put(:connection_opts, Map.fetch!(ctx, :db_config))
|> with_complete_stack(tenant: &with_supervised_tenant/1)
end
setup :with_electric_instance_id
setup :with_tenant_id
setup :with_registry
setup :with_persistent_kv
setup :with_tenant_tables
setup :with_slot_name_and_stream_id
setup :with_app_config
setup :with_tenant_manager
setup :with_supervised_tenant

test "deletes the tenant", %{
electric_instance_id: electric_instance_id,
Expand Down
Loading