Skip to content

Commit

Permalink
Drop replication slot on database delete (#1971)
Browse files Browse the repository at this point in the history
Following on from #1965 this drops the replication slot as well as the
publication.
  • Loading branch information
robacourt authored Nov 19, 2024
1 parent 9d5933a commit 2933f27
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ defmodule CloudElectric.TenantManager do
:ets.delete(tenants, tenant_id)
state = %{state | dbs: MapSet.delete(dbs, pg_id)}

drop_replication_slot(tenant_id)
drop_replication_slot_on_stop(tenant_id)

# TODO: This leaves orphaned shapes with data on disk
:ok = DynamicTenantSupervisor.stop_tenant(tenant_id)
Expand All @@ -137,10 +137,10 @@ defmodule CloudElectric.TenantManager do
end
end

defp drop_replication_slot(tenant_id) do
defp drop_replication_slot_on_stop(tenant_id) do
tenant_id
|> Electric.Connection.Manager.name()
|> Electric.Connection.Manager.drop_replication_slot()
|> Electric.Connection.Manager.drop_replication_slot_on_stop()
end

defp do_create_tenant(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ defmodule CloudElectric.Plugs.DeleteDatabasePlugTest do
assert conn.status == 200
assert Jason.decode!(conn.resp_body) == ctx.tenant_id

# Ensure the replication slot has been dropped
assert %{rows: []} =
Postgrex.query!(
ctx.db_conn,
"SELECT slot_name FROM pg_replication_slots where slot_name=$1",
[ctx.slot_name]
)

# Ensure the publication has been dropped
assert %{rows: []} = Postgrex.query!(db_conn, "SELECT pubname FROM pg_publication", [])
end
Expand Down
77 changes: 52 additions & 25 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ defmodule Electric.Connection.Manager do
# ID used for process labeling and sibling discovery
:stack_id,
:tweaks,
drop_slot_requesters: []
awaiting_active: [],
drop_slot_requested: false
]
end

Expand Down Expand Up @@ -119,8 +120,19 @@ defmodule Electric.Connection.Manager do
GenServer.call(server, :get_status)
end

def drop_replication_slot(server) do
GenServer.call(server, :drop_replication_slot)
@doc """
Only returns once the status is `:active`.
If the status is alredy active it returns immediately.
This is useful if you need to the connection pool to be running before proceeding.
"""
@spec await_active(GenServer.server()) :: :ok
def await_active(server) do
GenServer.call(server, :await_active)
end

def drop_replication_slot_on_stop(server) do
await_active(server)
GenServer.call(server, :drop_replication_slot_on_stop)
end

def exclusive_connection_lock_acquired(server) do
Expand Down Expand Up @@ -197,21 +209,16 @@ defmodule Electric.Connection.Manager do
{:reply, status, state}
end

def handle_call(:drop_replication_slot, _from, %{pool_pid: pool} = state) when pool != nil do
{:reply, drop_publication(state), state}
def handle_call(:await_active, from, %{pool_pid: nil} = state) do
{:noreply, %{state | awaiting_active: [from | state.awaiting_active]}}
end

def handle_call(:drop_replication_slot, from, state) do
{:noreply, %{state | drop_slot_requesters: [from | state.drop_slot_requesters]}}
def handle_call(:await_active, _from, state) do
{:reply, :ok, state}
end

defp drop_publication(state) do
publication_name = Keyword.fetch!(state.replication_opts, :publication_name)

case Postgrex.query(state.pool_pid, "DROP PUBLICATION #{publication_name}", []) do
{:ok, _} -> :ok
error -> error
end
def handle_call(:drop_replication_slot_on_stop, _from, state) do
{:reply, :ok, %{state | drop_slot_requested: true}}
end

@impl true
Expand Down Expand Up @@ -294,23 +301,17 @@ defmodule Electric.Connection.Manager do

state = %{state | pool_pid: pool_pid, shape_log_collector_pid: log_collector_pid}

{:noreply, state, {:continue, :maybe_drop_replication_slot}}
for awaiting <- state.awaiting_active do
GenServer.reply(awaiting, :ok)
end

{:noreply, %{state | awaiting_active: []}}

{:error, reason} ->
handle_connection_error(reason, state, "regular")
end
end

def handle_continue(:maybe_drop_replication_slot, %{drop_slot_requesters: []} = state) do
{:noreply, state}
end

def handle_continue(:maybe_drop_replication_slot, %{drop_slot_requesters: requesters} = state) do
result = drop_publication(state)
Enum.each(requesters, fn requester -> GenServer.reply(requester, result) end)
{:noreply, %{state | drop_slot_requesters: []}}
end

@impl true
def handle_info({:timeout, tref, step}, %{backoff: {backoff, tref}} = state) do
state = %{state | backoff: {backoff, nil}}
Expand Down Expand Up @@ -367,6 +368,10 @@ defmodule Electric.Connection.Manager do
state
end

if state.drop_slot_requested do
drop_slot(state)
end

{:noreply, %{state | shape_log_collector_pid: nil}}
end

Expand Down Expand Up @@ -595,4 +600,26 @@ defmodule Electric.Connection.Manager do

log_collector_pid
end

defp drop_slot(%{pool_pid: pool} = state) do
publication_name = Keyword.fetch!(state.replication_opts, :publication_name)
slot_name = Keyword.fetch!(state.replication_opts, :slot_name)
slot_temporary? = Keyword.fetch!(state.replication_opts, :slot_temporary?)

if !slot_temporary? do
execute_and_log_errors(pool, "SELECT pg_drop_replication_slot('#{slot_name}');")
end

execute_and_log_errors(pool, "DROP PUBLICATION #{publication_name}")
end

defp execute_and_log_errors(pool, query) do
case Postgrex.query(pool, query, []) do
{:ok, _} ->
:ok

{:error, error} ->
Logger.error("Failed to execute query: #{query}\nError: #{inspect(error)}")
end
end
end

0 comments on commit 2933f27

Please sign in to comment.