Skip to content

Commit

Permalink
feat: Add global stack event registry and block requests before ready (
Browse files Browse the repository at this point in the history
…#2019)

PR by @icehaunter and me - makes the `StackSupervisor` accept a stack
event registry that it uses to dispatch status events about the state of
the stack.

This was preliminary work for multitenancy, and also fixes
#1922 since now we hold
connections when the stack is not ready, and release them when we
receive a "ready event" or time them out with a 503 - avoids crashing
the ETS inspector which was trying to use a DB connection from an
uninitialised pool.


Integration test is broken from
#2009
  • Loading branch information
msfstef authored Nov 21, 2024
1 parent 5a7866f commit c5b79a5
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 20 deletions.
5 changes: 5 additions & 0 deletions .changeset/olive-moons-breathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Add global stack events registry for receiving updates on the stack status
7 changes: 3 additions & 4 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule Electric.Application do
] ++
Electric.StackSupervisor.build_shared_opts(
stack_id: stack_id,
stack_events_registry: Registry.StackEvents,
storage: Application.fetch_env!(:electric, :storage)
)

Expand All @@ -49,12 +50,10 @@ defmodule Electric.Application do
Enum.concat([
[
Electric.Telemetry,
# {Registry,
# name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()},
# {Registry,
# name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()},
{Registry, name: Registry.StackEvents, keys: :duplicate},
{Electric.StackSupervisor,
stack_id: stack_id,
stack_events_registry: Registry.StackEvents,
connection_opts: Application.fetch_env!(:electric, :connection_opts),
persistent_kv: persistent_kv,
replication_opts: [
Expand Down
10 changes: 10 additions & 0 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ defmodule Electric.Connection.Manager do
:pg_timeline_id,
# ID used for process labeling and sibling discovery
:stack_id,
# Registry used for stack events
:stack_events_registry,
:tweaks,
awaiting_active: [],
drop_slot_requested: false
Expand Down Expand Up @@ -178,6 +180,7 @@ defmodule Electric.Connection.Manager do
pg_lock_acquired: false,
backoff: {:backoff.init(1000, 10_000), nil},
stack_id: Keyword.fetch!(opts, :stack_id),
stack_events_registry: Keyword.fetch!(opts, :stack_events_registry),
tweaks: Keyword.fetch!(opts, :tweaks)
}

Expand Down Expand Up @@ -231,6 +234,12 @@ defmodule Electric.Connection.Manager do
lock_name: Keyword.fetch!(state.replication_opts, :slot_name)
) do
{:ok, lock_connection_pid} ->
Electric.StackSupervisor.dispatch_stack_event(
state.stack_events_registry,
state.stack_id,
:waiting_for_connection_lock
)

Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval)
{:noreply, %{state | lock_connection_pid: lock_connection_pid}}

Expand Down Expand Up @@ -290,6 +299,7 @@ defmodule Electric.Connection.Manager do
Electric.Connection.Supervisor.start_shapes_supervisor(
stack_id: state.stack_id,
shape_cache_opts: shape_cache_opts,
stack_events_registry: state.stack_events_registry,
tweaks: state.tweaks
)

Expand Down
7 changes: 5 additions & 2 deletions packages/sync-service/lib/electric/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ defmodule Electric.Connection.Supervisor do
)

with {:ok, pid} <- Supervisor.start_child(name(opts), child_spec) do
if notify_pid = get_in(opts, [:tweaks, :notify_pid]),
do: send(notify_pid, {:startup_progress, stack_id, :shape_supervisor_ready})
Electric.StackSupervisor.dispatch_stack_event(
opts[:stack_events_registry],
stack_id,
:ready
)

{:ok, pid}
end
Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/lib/electric/plug/delete_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ defmodule Electric.Plug.DeleteShapePlug do

alias Electric.Shapes
alias Electric.Plug.ServeShapePlug.Params
import Electric.Plug.Utils, only: [hold_conn_until_stack_ready: 2]

plug :fetch_query_params
plug :put_resp_content_type, "application/json"

plug :hold_conn_until_stack_ready

plug :allow_shape_deletion
plug :validate_query_params

Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Electric.Plug.ServeShapePlug do
import Plug.Conn, except: [halt: 1]

alias Electric.Plug.Utils
import Electric.Plug.Utils, only: [hold_conn_until_stack_ready: 2]
alias Electric.Shapes
alias Electric.Schema
alias Electric.Replication.LogOffset
Expand Down Expand Up @@ -155,6 +156,8 @@ defmodule Electric.Plug.ServeShapePlug do
# start_telemetry_span needs to always be the first plug after fetching query params.
plug :start_telemetry_span
plug :put_resp_content_type, "application/json"
plug :hold_conn_until_stack_ready

plug :validate_query_params
plug :load_shape_info
plug :put_schema_header
Expand Down
23 changes: 23 additions & 0 deletions packages/sync-service/lib/electric/plug/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ defmodule Electric.Plug.Utils do
end
end

def hold_conn_until_stack_ready(conn, _opts) do
stack_id = conn.assigns.config[:stack_id]
stack_ready_timeout = Access.get(conn.assigns.config, :stack_ready_timeout, 5_000)
stack_events_registry = conn.assigns.config[:stack_events_registry]

ref = make_ref()
Electric.StackSupervisor.subscribe_to_stack_events(stack_events_registry, stack_id, ref)

if Electric.ProcessRegistry.alive?(stack_id, Electric.Replication.Supervisor) do
conn
else
receive do
{:stack_status, ^ref, :ready} ->
conn
after
stack_ready_timeout ->
conn
|> Plug.Conn.send_resp(503, Jason.encode!(%{message: "Stack not ready"}))
|> Plug.Conn.halt()
end
end
end

defmodule CORSHeaderPlug do
@behaviour Plug
import Plug.Conn
Expand Down
7 changes: 7 additions & 0 deletions packages/sync-service/lib/electric/process_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,11 @@ defmodule Electric.ProcessRegistry do
def name(stack_id, key, sub_key \\ nil) when not is_nil(stack_id) do
{:via, Registry, {registry_name(stack_id), {key, sub_key}}}
end

def alive?(stack_id, key, sub_key \\ nil) do
case GenServer.whereis(name(stack_id, key, sub_key)) do
nil -> false
_ -> true
end
end
end
19 changes: 17 additions & 2 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ defmodule Electric.StackSupervisor do
name: [type: :any, required: false],
stack_id: [type: :string, required: true],
persistent_kv: [type: :any, required: true],
stack_events_registry: [type: :atom, required: true],
connection_opts: [
type: :keyword_list,
required: true,
Expand Down Expand Up @@ -75,8 +76,7 @@ defmodule Electric.StackSupervisor do
"tweaks to the behaviour of parts of the supervision tree, used mostly for tests",
default: [],
keys: [
registry_partitions: [type: :non_neg_integer, required: false],
notify_pid: [type: :pid, required: false]
registry_partitions: [type: :non_neg_integer, required: false]
]
]
)
Expand All @@ -87,6 +87,18 @@ defmodule Electric.StackSupervisor do
end
end

def subscribe_to_stack_events(registry, stack_id, value) do
Registry.register(registry, {:stack_status, stack_id}, value)
end

def dispatch_stack_event(registry, stack_id, event) do
Registry.dispatch(registry, {:stack_status, stack_id}, fn entries ->
for {pid, ref} <- entries do
send(pid, {:stack_status, ref, event})
end
end)
end

def build_shared_opts(opts) do
# needs validation
opts = Map.new(opts)
Expand All @@ -113,8 +125,10 @@ defmodule Electric.StackSupervisor do
[
shape_cache: shape_cache,
registry: shape_changes_registry_name,
stack_events_registry: opts[:stack_events_registry],
storage: storage_mod_arg(opts),
inspector: inspector,
stack_id: stack_id,
get_service_status: fn -> Electric.ServiceStatus.check(stack_id) end
]
end
Expand Down Expand Up @@ -174,6 +188,7 @@ defmodule Electric.StackSupervisor do
stack_id: stack_id,
# Coming from the outside, need validation
connection_opts: config.connection_opts,
stack_events_registry: config.stack_events_registry,
replication_opts:
[
transaction_received:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ defmodule Electric.Plug.DeleteShapePlugTest do

import Mox

setup :with_stack_id_from_test
setup :verify_on_exit!
@moduletag :capture_log

Expand All @@ -34,18 +33,15 @@ defmodule Electric.Plug.DeleteShapePlugTest do
def load_relation(tbl, _),
do: Support.StubInspector.load_relation(tbl, nil)

setup do
start_link_supervised!({Registry, keys: :duplicate, name: @registry})
:ok
end

def conn(_ctx, method, "?" <> _ = query_string) do
Plug.Test.conn(method, "/" <> query_string)
end

def call_delete_shape_plug(conn, ctx, allow \\ true) do
config = [
stack_id: ctx.stack_id,
stack_events_registry: Registry.StackEvents,
stack_ready_timeout: 100,
pg_id: @test_pg_id,
shape_cache: {Mock.ShapeCache, []},
storage: {Mock.Storage, []},
Expand All @@ -61,6 +57,19 @@ defmodule Electric.Plug.DeleteShapePlugTest do
end

describe "DeleteShapePlug" do
setup :with_stack_id_from_test

setup ctx do
start_link_supervised!({Registry, keys: :duplicate, name: @registry})

{:via, _, {registry_name, registry_key}} =
Electric.Replication.Supervisor.name(ctx)

{:ok, _} = Registry.register(registry_name, registry_key, nil)

:ok
end

test "returns 404 if shape deletion is not allowed", ctx do
conn =
ctx
Expand Down Expand Up @@ -114,4 +123,19 @@ defmodule Electric.Plug.DeleteShapePlugTest do
assert conn.status == 202
end
end

describe "stack not ready" do
setup :with_stack_id_from_test

test "returns 503", ctx do
conn =
ctx
|> conn(:delete, "?table=public.users")
|> call_delete_shape_plug(ctx)

assert conn.status == 503

assert Jason.decode!(conn.resp_body) == %{"message" => "Stack not ready"}
end
end
end
47 changes: 45 additions & 2 deletions packages/sync-service/test/electric/plug/serve_shape_plug_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ defmodule Electric.Plug.ServeShapePlugTest do
config = [
stack_id: ctx.stack_id,
pg_id: @test_pg_id,
stack_events_registry: Registry.StackEvents,
stack_ready_timeout: Access.get(ctx, :stack_ready_timeout, 100),
shape_cache: {Mock.ShapeCache, []},
storage: {Mock.Storage, []},
inspector: {__MODULE__, []},
Expand All @@ -77,6 +79,14 @@ defmodule Electric.Plug.ServeShapePlugTest do
describe "serving shape" do
setup :with_stack_id_from_test

setup ctx do
{:via, _, {registry_name, registry_key}} =
Electric.Replication.Supervisor.name(ctx)

{:ok, _} = Registry.register(registry_name, registry_key, nil)
:ok
end

test "returns 400 for invalid table", ctx do
conn =
ctx
Expand All @@ -96,7 +106,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
conn =
ctx
|> conn(:get, %{"table" => "foo"}, "?offset=invalid")
|> ServeShapePlug.call([])
|> call_serve_shape_plug(ctx)

assert conn.status == 400

Expand All @@ -107,7 +117,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
conn =
ctx
|> conn(:get, %{}, "?offset=-1")
|> ServeShapePlug.call([])
|> call_serve_shape_plug(ctx)

assert conn.status == 400

Expand Down Expand Up @@ -688,4 +698,37 @@ defmodule Electric.Plug.ServeShapePlugTest do
assert Plug.Conn.get_resp_header(conn, "electric-handle") == [test_shape_handle]
end
end

describe "stack not ready" do
setup :with_stack_id_from_test

test "returns 503", ctx do
conn =
ctx
|> conn(:get, %{"table" => "public.users"}, "?offset=-1&replica=full")
|> call_serve_shape_plug(ctx)

assert conn.status == 503

assert Jason.decode!(conn.resp_body) == %{"message" => "Stack not ready"}
end

@tag stack_ready_timeout: 1000
test "waits until stack ready and proceeds", ctx do
conn_task =
Task.async(fn ->
ctx
|> conn(:get, %{"table" => "public.users", "columns" => "id,invalid"}, "?offset=-1")
|> call_serve_shape_plug(ctx)
end)

Process.sleep(50)

Electric.StackSupervisor.dispatch_stack_event(Registry.StackEvents, ctx.stack_id, :ready)

conn = Task.await(conn_task)

assert conn.status == 400
end
end
end
Loading

0 comments on commit c5b79a5

Please sign in to comment.