Skip to content

Commit

Permalink
Shut down the connection supervisor in case of an unrecoverable repli…
Browse files Browse the repository at this point in the history
…cation error

The StackSupervisor will shut itself down as a result because the
connection supervisor is marked as a significant child.

In a multi-tenant setup, this effectively shuts down a single tenant
while the other tenants keep running.
  • Loading branch information
alco committed Dec 20, 2024
1 parent 88c77cd commit d96cc2d
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 33 deletions.
35 changes: 23 additions & 12 deletions integration-tests/tests/invalidated-replication-slot.lux
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

[my invalidated_slot_error=
"""
[error] :gen_statem {:"Elixir.Electric.ProcessRegistry:single_stack", {Electric.Postgres.ReplicationClient, nil}} terminating
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot_integration"

This slot has been invalidated because it exceeded the maximum reserved size.
"""]

[my stack_id="single_stack"]

###

## Start a new Postgres cluster configured for easy replication slot invalidation.
Expand All @@ -25,8 +26,11 @@
[shell electric]
??[info] Starting replication from postgres

# Reset the failure pattern because we'll be matching on an error.
-
# Verify that the stack supervisor is registered using regular process registration. If we
# change this at any point, the line below will catch it and we'll be able to correct the
# check further down that verifies that the stack supervisor is no longer running.
!IO.puts("Stack supervisor pid: #{inspect Process.whereis(Electric.StackSupervisor)}")
??Stack supervisor pid: #PID<

## Seed the database with enough data to exceed max_wal_size and force a checkpoint that
## will invalidate the replication slot.
Expand All @@ -36,21 +40,28 @@
[shell pg]
?invalidating slot "electric_slot_integration" because its restart_lsn [\d\w]+/[\d\w]+ exceeds max_slot_wal_keep_size

[macro verify_connection_and_stack_supervisors_shutdown stack_id invalidated_slot_error]
??$invalidated_slot_error
??[error] Stopping connection supervisor with stack_id=$stack_id due to an unrecoverable error

!IO.puts("Stack supervisor pid: #{inspect Process.whereis(Electric.StackSupervisor)}")
??Stack supervisor pid: nil
[endmacro]

## Observe the fatal connection error.
[shell electric]
??$invalidated_slot_error
# Reset the failure pattern because we'll be matching on an error.
-

# Confirm Electric process exit.
??$PS1
[invoke verify_connection_and_stack_supervisors_shutdown $stack_id $invalidated_slot_error]

## Start the sync service once again to verify that it crashes due to the invalidated slot error.
[invoke setup_electric]
# Restart the OTP application to verify that the supervisors shut down again due to the invalidated slot.
!:ok = Application.stop(:electric)
!:ok = Application.start(:electric)

[shell electric]
??[info] Starting replication from postgres
-
??$invalidated_slot_error
??$PS1

[invoke verify_connection_and_stack_supervisors_shutdown $stack_id $invalidated_slot_error]

[cleanup]
[invoke teardown]
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ defmodule Electric.Application do
],
pool_opts: [pool_size: Electric.Config.get_env(:db_pool_size)],
storage: storage,
chunk_bytes_threshold: Electric.Config.get_env(:chunk_bytes_threshold)
chunk_bytes_threshold: Electric.Config.get_env(:chunk_bytes_threshold),
name: Electric.StackSupervisor
},
{Electric.Telemetry, stack_id: stack_id, storage: storage},
{Bandit,
Expand Down
44 changes: 25 additions & 19 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,13 @@ defmodule Electric.Connection.Manager do
# connection and the DB pool. If any of the latter two shut down, Connection.Manager will
# itself terminate to be restarted by its supervisor in a clean state.
def handle_info({:EXIT, pid, reason}, %State{replication_client_pid: pid} = state) do
halt_if_fatal_error!(reason)
with false <- stop_if_fatal_error(reason, state) do
Logger.debug(
"Handling the exit of the replication client #{inspect(pid)} with reason #{inspect(reason)}"
)

Logger.debug(
"Handling the exit of the replication client #{inspect(pid)} with reason #{inspect(reason)}"
)

{:noreply, %{state | replication_client_pid: nil}, {:continue, :start_replication_client}}
{:noreply, %{state | replication_client_pid: nil}, {:continue, :start_replication_client}}
end
end

# The most likely reason for the lock connection or the DB pool to exit is the database
Expand Down Expand Up @@ -524,8 +524,13 @@ defmodule Electric.Connection.Manager do
end

defp handle_connection_error(error, state, mode) do
halt_if_fatal_error!(error)
with false <- stop_if_fatal_error(error, state) do
state = schedule_reconnection_after_error(error, state, mode)
{:noreply, state}
end
end

defp schedule_reconnection_after_error(error, state, mode) do
message =
case error do
%DBConnection.ConnectionError{message: message} ->
Expand Down Expand Up @@ -553,8 +558,7 @@ defmodule Electric.Connection.Manager do
is_nil(state.pool_pid) -> :start_connection_pool
end

state = schedule_reconnection(step, state)
{:noreply, state}
schedule_reconnection(step, state)
end

defp pg_error_extra_info(pg_error) do
Expand All @@ -573,23 +577,25 @@ defmodule Electric.Connection.Manager do
end
end

@invalid_slot_detail "This slot has been invalidated because it exceeded the maximum reserved size."

defp halt_if_fatal_error!(
defp stop_if_fatal_error(
%Postgrex.Error{
postgres: %{
code: :object_not_in_prerequisite_state,
detail: @invalid_slot_detail,
pg_code: "55000",
routine: "StartLogicalReplication"
detail: "This slot has been invalidated" <> _,
pg_code: "55000"
}
} = error
} = error,
state
) do
System.stop(1)
exit(error)
# Perform supervisor shutdown in a task to avoid a circular dependency where the manager
# process is waiting for the supervisor to shut down its children, one of which is the
# manager process itself.
Task.start(Electric.Connection.Supervisor, :shutdown, [state.stack_id, error])

{:noreply, state}
end

defp halt_if_fatal_error!(_), do: nil
defp stop_if_fatal_error(_, _), do: false

defp schedule_reconnection(step, %State{backoff: {backoff, _}} = state) do
{time, backoff} = :backoff.fail(backoff)
Expand Down
18 changes: 17 additions & 1 deletion packages/sync-service/lib/electric/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ defmodule Electric.Connection.Supervisor do
has successfully initialized a database connection pool.
"""

use Supervisor
# This supervisor is meant to be a child of Electric.StackSupervisor.
#
# The `restart: :transient, significant: true` combo allows for shutting the supervisor down
# and signalling the parent supervisor to shut itself down as well if that one has
# `:auto_shutdown` set to `:any_significant` or `:all_significant`.
use Supervisor, restart: :transient, significant: true

require Logger

def name(opts) do
Electric.ProcessRegistry.name(opts[:stack_id], __MODULE__)
Expand All @@ -28,6 +35,15 @@ defmodule Electric.Connection.Supervisor do
Supervisor.start_link(__MODULE__, opts, name: name(opts))
end

def shutdown(stack_id, reason) do
Logger.error(
"Stopping connection supervisor with stack_id=#{inspect(stack_id)} " <>
"due to an unrecoverable error: #{inspect(reason)}"
)

Supervisor.stop(name(stack_id: stack_id), {:shutdown, reason}, 1_000)
end

def init(opts) do
Process.set_label({:connection_supervisor, opts[:stack_id]})
Logger.metadata(stack_id: opts[:stack_id])
Expand Down
7 changes: 7 additions & 0 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ defmodule Electric.StackSupervisor do
2. `Electric.Replication.ShapeLogCollector` collects transactions from the replication connection, fanning them out to `Electric.Shapes.Consumer` (4.1.1.2)
3. `Electric.ShapeCache` coordinates shape creation and handle allocation, shape metadata
"""

# Setting `restart: :transient` is required for passing the `:auto_shutdown` to `Supervisor.init()` below.
use Supervisor, restart: :transient

@opts_schema NimbleOptions.new!(
Expand Down Expand Up @@ -119,6 +121,11 @@ defmodule Electric.StackSupervisor do
Registry.register(registry, {:stack_status, stack_id}, value)
end

# noop if there's no registry running
def dispatch_stack_event(nil, _stack_id, _event) do
:ok
end

def dispatch_stack_event(registry, stack_id, event) do
Registry.dispatch(registry, {:stack_status, stack_id}, fn entries ->
for {pid, ref} <- entries do
Expand Down

0 comments on commit d96cc2d

Please sign in to comment.