Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add global stack event registry and block requests before ready #2019

Merged
merged 13 commits into from
Nov 21, 2024
5 changes: 5 additions & 0 deletions .changeset/olive-moons-breathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Add global stack events registry for receiving updates on the stack status
7 changes: 3 additions & 4 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule Electric.Application do
] ++
Electric.StackSupervisor.build_shared_opts(
stack_id: stack_id,
stack_events_registry: Registry.StackEvents,
storage: Application.fetch_env!(:electric, :storage)
)

Expand All @@ -49,12 +50,10 @@ defmodule Electric.Application do
Enum.concat([
[
Electric.Telemetry,
# {Registry,
# name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()},
# {Registry,
# name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()},
{Registry, name: Registry.StackEvents, keys: :duplicate},
{Electric.StackSupervisor,
stack_id: stack_id,
stack_events_registry: Registry.StackEvents,
connection_opts: Application.fetch_env!(:electric, :connection_opts),
persistent_kv: persistent_kv,
replication_opts: [
Expand Down
10 changes: 10 additions & 0 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ defmodule Electric.Connection.Manager do
:pg_timeline_id,
# ID used for process labeling and sibling discovery
:stack_id,
# Registry used for stack events
:stack_events_registry,
:tweaks,
awaiting_active: [],
drop_slot_requested: false
Expand Down Expand Up @@ -178,6 +180,7 @@ defmodule Electric.Connection.Manager do
pg_lock_acquired: false,
backoff: {:backoff.init(1000, 10_000), nil},
stack_id: Keyword.fetch!(opts, :stack_id),
stack_events_registry: Keyword.fetch!(opts, :stack_events_registry),
tweaks: Keyword.fetch!(opts, :tweaks)
}

Expand Down Expand Up @@ -231,6 +234,12 @@ defmodule Electric.Connection.Manager do
lock_name: Keyword.fetch!(state.replication_opts, :slot_name)
) do
{:ok, lock_connection_pid} ->
Electric.StackSupervisor.dispatch_stack_event(
state.stack_events_registry,
state.stack_id,
:waiting_for_connection_lock
)

Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval)
{:noreply, %{state | lock_connection_pid: lock_connection_pid}}

Expand Down Expand Up @@ -290,6 +299,7 @@ defmodule Electric.Connection.Manager do
Electric.Connection.Supervisor.start_shapes_supervisor(
stack_id: state.stack_id,
shape_cache_opts: shape_cache_opts,
stack_events_registry: state.stack_events_registry,
tweaks: state.tweaks
)

Expand Down
7 changes: 5 additions & 2 deletions packages/sync-service/lib/electric/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ defmodule Electric.Connection.Supervisor do
)

with {:ok, pid} <- Supervisor.start_child(name(opts), child_spec) do
if notify_pid = get_in(opts, [:tweaks, :notify_pid]),
do: send(notify_pid, {:startup_progress, stack_id, :shape_supervisor_ready})
Electric.StackSupervisor.dispatch_stack_event(
opts[:stack_events_registry],
stack_id,
:ready
)

{:ok, pid}
end
Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/lib/electric/plug/delete_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ defmodule Electric.Plug.DeleteShapePlug do

alias Electric.Shapes
alias Electric.Plug.ServeShapePlug.Params
import Electric.Plug.Utils, only: [hold_conn_until_stack_ready: 2]

plug :fetch_query_params
plug :put_resp_content_type, "application/json"

plug :hold_conn_until_stack_ready

plug :allow_shape_deletion
plug :validate_query_params

Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Electric.Plug.ServeShapePlug do
import Plug.Conn, except: [halt: 1]

alias Electric.Plug.Utils
import Electric.Plug.Utils, only: [hold_conn_until_stack_ready: 2]
alias Electric.Shapes
alias Electric.Schema
alias Electric.Replication.LogOffset
Expand Down Expand Up @@ -155,6 +156,8 @@ defmodule Electric.Plug.ServeShapePlug do
# start_telemetry_span needs to always be the first plug after fetching query params.
plug :start_telemetry_span
plug :put_resp_content_type, "application/json"
plug :hold_conn_until_stack_ready

plug :validate_query_params
plug :load_shape_info
plug :put_schema_header
Expand Down
23 changes: 23 additions & 0 deletions packages/sync-service/lib/electric/plug/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ defmodule Electric.Plug.Utils do
end
end

def hold_conn_until_stack_ready(conn, _opts) do
stack_id = conn.assigns.config[:stack_id]
stack_ready_timeout = Access.get(conn.assigns.config, :stack_ready_timeout, 5_000)
stack_events_registry = conn.assigns.config[:stack_events_registry]

ref = make_ref()
Electric.StackSupervisor.subscribe_to_stack_events(stack_events_registry, stack_id, ref)

if Electric.ProcessRegistry.alive?(stack_id, Electric.Replication.Supervisor) do
conn
else
receive do
{:stack_status, ^ref, :ready} ->
conn
after
stack_ready_timeout ->
conn
|> Plug.Conn.send_resp(503, Jason.encode!(%{message: "Stack not ready"}))
|> Plug.Conn.halt()
end
end
end

defmodule CORSHeaderPlug do
@behaviour Plug
import Plug.Conn
Expand Down
7 changes: 7 additions & 0 deletions packages/sync-service/lib/electric/process_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,11 @@ defmodule Electric.ProcessRegistry do
def name(stack_id, key, sub_key \\ nil) when not is_nil(stack_id) do
{:via, Registry, {registry_name(stack_id), {key, sub_key}}}
end

def alive?(stack_id, key, sub_key \\ nil) do
case GenServer.whereis(name(stack_id, key, sub_key)) do
nil -> false
_ -> true
end
end
end
19 changes: 17 additions & 2 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ defmodule Electric.StackSupervisor do
name: [type: :any, required: false],
stack_id: [type: :string, required: true],
persistent_kv: [type: :any, required: true],
stack_events_registry: [type: :atom, required: true],
connection_opts: [
type: :keyword_list,
required: true,
Expand Down Expand Up @@ -75,8 +76,7 @@ defmodule Electric.StackSupervisor do
"tweaks to the behaviour of parts of the supervision tree, used mostly for tests",
default: [],
keys: [
registry_partitions: [type: :non_neg_integer, required: false],
notify_pid: [type: :pid, required: false]
registry_partitions: [type: :non_neg_integer, required: false]
]
]
)
Expand All @@ -87,6 +87,18 @@ defmodule Electric.StackSupervisor do
end
end

def subscribe_to_stack_events(registry, stack_id, value) do
Registry.register(registry, {:stack_status, stack_id}, value)
end

def dispatch_stack_event(registry, stack_id, event) do
Registry.dispatch(registry, {:stack_status, stack_id}, fn entries ->
for {pid, ref} <- entries do
send(pid, {:stack_status, ref, event})
end
end)
end

def build_shared_opts(opts) do
# needs validation
opts = Map.new(opts)
Expand All @@ -113,8 +125,10 @@ defmodule Electric.StackSupervisor do
[
shape_cache: shape_cache,
registry: shape_changes_registry_name,
stack_events_registry: opts[:stack_events_registry],
storage: storage_mod_arg(opts),
inspector: inspector,
stack_id: stack_id,
get_service_status: fn -> Electric.ServiceStatus.check(stack_id) end
]
end
Expand Down Expand Up @@ -174,6 +188,7 @@ defmodule Electric.StackSupervisor do
stack_id: stack_id,
# Coming from the outside, need validation
connection_opts: config.connection_opts,
stack_events_registry: config.stack_events_registry,
replication_opts:
[
transaction_received:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ defmodule Electric.Plug.DeleteShapePlugTest do

import Mox

setup :with_stack_id_from_test
setup :verify_on_exit!
@moduletag :capture_log

Expand All @@ -34,18 +33,15 @@ defmodule Electric.Plug.DeleteShapePlugTest do
def load_relation(tbl, _),
do: Support.StubInspector.load_relation(tbl, nil)

setup do
start_link_supervised!({Registry, keys: :duplicate, name: @registry})
:ok
end

def conn(_ctx, method, "?" <> _ = query_string) do
Plug.Test.conn(method, "/" <> query_string)
end

def call_delete_shape_plug(conn, ctx, allow \\ true) do
config = [
stack_id: ctx.stack_id,
stack_events_registry: Registry.StackEvents,
stack_ready_timeout: 100,
pg_id: @test_pg_id,
shape_cache: {Mock.ShapeCache, []},
storage: {Mock.Storage, []},
Expand All @@ -61,6 +57,19 @@ defmodule Electric.Plug.DeleteShapePlugTest do
end

describe "DeleteShapePlug" do
setup :with_stack_id_from_test

setup ctx do
start_link_supervised!({Registry, keys: :duplicate, name: @registry})

{:via, _, {registry_name, registry_key}} =
Electric.Replication.Supervisor.name(ctx)

{:ok, _} = Registry.register(registry_name, registry_key, nil)

:ok
end

test "returns 404 if shape deletion is not allowed", ctx do
conn =
ctx
Expand Down Expand Up @@ -114,4 +123,19 @@ defmodule Electric.Plug.DeleteShapePlugTest do
assert conn.status == 202
end
end

describe "stack not ready" do
setup :with_stack_id_from_test

test "returns 503", ctx do
conn =
ctx
|> conn(:delete, "?table=public.users")
|> call_delete_shape_plug(ctx)

assert conn.status == 503

assert Jason.decode!(conn.resp_body) == %{"message" => "Stack not ready"}
end
end
end
47 changes: 45 additions & 2 deletions packages/sync-service/test/electric/plug/serve_shape_plug_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ defmodule Electric.Plug.ServeShapePlugTest do
config = [
stack_id: ctx.stack_id,
pg_id: @test_pg_id,
stack_events_registry: Registry.StackEvents,
stack_ready_timeout: Access.get(ctx, :stack_ready_timeout, 100),
shape_cache: {Mock.ShapeCache, []},
storage: {Mock.Storage, []},
inspector: {__MODULE__, []},
Expand All @@ -77,6 +79,14 @@ defmodule Electric.Plug.ServeShapePlugTest do
describe "serving shape" do
setup :with_stack_id_from_test

setup ctx do
{:via, _, {registry_name, registry_key}} =
Electric.Replication.Supervisor.name(ctx)

{:ok, _} = Registry.register(registry_name, registry_key, nil)
:ok
end

test "returns 400 for invalid table", ctx do
conn =
ctx
Expand All @@ -96,7 +106,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
conn =
ctx
|> conn(:get, %{"table" => "foo"}, "?offset=invalid")
|> ServeShapePlug.call([])
|> call_serve_shape_plug(ctx)

assert conn.status == 400

Expand All @@ -107,7 +117,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
conn =
ctx
|> conn(:get, %{}, "?offset=-1")
|> ServeShapePlug.call([])
|> call_serve_shape_plug(ctx)

assert conn.status == 400

Expand Down Expand Up @@ -688,4 +698,37 @@ defmodule Electric.Plug.ServeShapePlugTest do
assert Plug.Conn.get_resp_header(conn, "electric-handle") == [test_shape_handle]
end
end

describe "stack not ready" do
setup :with_stack_id_from_test

test "returns 503", ctx do
conn =
ctx
|> conn(:get, %{"table" => "public.users"}, "?offset=-1&replica=full")
|> call_serve_shape_plug(ctx)

assert conn.status == 503
kevin-dp marked this conversation as resolved.
Show resolved Hide resolved

assert Jason.decode!(conn.resp_body) == %{"message" => "Stack not ready"}
end

@tag stack_ready_timeout: 1000
test "waits until stack ready and proceeds", ctx do
conn_task =
Task.async(fn ->
ctx
|> conn(:get, %{"table" => "public.users", "columns" => "id,invalid"}, "?offset=-1")
|> call_serve_shape_plug(ctx)
end)

Process.sleep(50)

Electric.StackSupervisor.dispatch_stack_event(Registry.StackEvents, ctx.stack_id, :ready)

conn = Task.await(conn_task)

assert conn.status == 400
end
end
end
Loading
Loading