Skip to content

Commit

Permalink
fix: truncates no longer crash replication (#2156)
Browse files Browse the repository at this point in the history
We had an issue where a truncate caused the `Consumer` process to
`GenServer.call` `ShapeCache`, which in turn `GenServer.call`-ed the
`Consumer` and then both crashed with a timeout and weren't cleaned up
correctly. This addresses that and makes the shape process after the
Truncate clean up and stop gracefully, test included.
  • Loading branch information
icehaunter authored Dec 12, 2024
1 parent 01c63ae commit 218b7d4
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 81 deletions.
5 changes: 5 additions & 0 deletions .changeset/gorgeous-bottles-mate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

fix: truncates no longer cause a stop to an incoming replication stream
18 changes: 0 additions & 18 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ defmodule Electric.ShapeCacheBehaviour do
{shape_handle(), current_snapshot_offset :: LogOffset.t()}
@callback list_shapes(keyword() | map()) :: [{shape_handle(), Shape.t()}]
@callback await_snapshot_start(shape_handle(), opts :: keyword()) :: :started | {:error, term()}
@callback handle_truncate(shape_handle(), keyword()) :: :ok
@callback clean_shape(shape_handle(), keyword()) :: :ok
@callback clean_all_shapes(keyword()) :: :ok
@callback has_shape?(shape_handle(), keyword()) :: boolean()
Expand Down Expand Up @@ -154,13 +153,6 @@ defmodule Electric.ShapeCache do
GenServer.call(server, {:clean_all})
end

@impl Electric.ShapeCacheBehaviour
@spec handle_truncate(shape_handle(), keyword()) :: :ok
def handle_truncate(shape_handle, opts \\ []) do
server = Access.get(opts, :server, name(opts))
GenStage.call(server, {:truncate, shape_handle})
end

@impl Electric.ShapeCacheBehaviour
@spec await_snapshot_start(shape_handle(), keyword()) :: :started | {:error, term()}
def await_snapshot_start(shape_handle, opts \\ []) when is_binary(shape_handle) do
Expand Down Expand Up @@ -276,16 +268,6 @@ defmodule Electric.ShapeCache do
state}
end

def handle_call({:truncate, shape_handle}, _from, state) do
with :ok <- clean_up_shape(state, shape_handle) do
Logger.info(
"Truncating and rotating shape handle, previous shape handle #{shape_handle} cleaned up"
)
end

{:reply, :ok, state}
end

def handle_call({:clean, shape_handle}, _from, state) do
# ignore errors when cleaning up non-existant shape id
with :ok <- clean_up_shape(state, shape_handle) do
Expand Down
4 changes: 1 addition & 3 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,7 @@ defmodule Electric.Shapes.Consumer do
"Truncate operation encountered while processing txn #{txn.xid} for #{shape_handle}"
)

:ok = shape_cache.handle_truncate(shape_handle, shape_cache_opts)

:ok = ShapeCache.Storage.cleanup!(storage)
cleanup(state)

{:halt, {:truncate, notify(txn, %{state | log_state: @initial_log_state})}}

Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"},
"mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"},
"mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"},
"mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"},
"mox": {:hex, :mox, "1.2.0", "a2cd96b4b80a3883e3100a221e8adc1b98e4c3a332a8fc434c39526babafd5b3", [:mix], [{:nimble_ownership, "~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}], "hexpm", "c7b92b3cc69ee24a7eeeaf944cd7be22013c52fcb580c1f33f50845ec821089a"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_ownership": {:hex, :nimble_ownership, "1.0.0", "3f87744d42c21b2042a0aa1d48c83c77e6dd9dd357e425a038dd4b49ba8b79a1", [:mix], [], "hexpm", "7c16cc74f4e952464220a73055b557a273e8b1b7ace8489ec9d86e9ad56cb2cc"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
Expand Down
76 changes: 76 additions & 0 deletions packages/sync-service/test/electric/plug/router_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,82 @@ defmodule Electric.Plug.RouterTest do
[^shape_handle] = Plug.Conn.get_resp_header(conn, "electric-handle")
end

@tag with_sql: [
"INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')"
]
test "GET returns a 409 on a truncate and can follow a new shape afterwards", %{
opts: opts,
db_conn: db_conn
} do
conn = Router.call(conn("GET", "/v1/shape?table=items&offset=-1"), opts)

assert %{status: 200} = conn
handle = get_resp_shape_handle(conn)
offset = get_resp_last_offset(conn)
assert [%{"value" => %{"value" => "test value 1"}}] = Jason.decode!(conn.resp_body)

task =
Task.async(fn ->
Router.call(
conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{handle}&live"),
opts
)
end)

Postgrex.query!(db_conn, "INSERT INTO items VALUES (gen_random_uuid(), 'test value 2')", [])

conn = Task.await(task)

assert %{status: 200} = conn
assert ^handle = get_resp_shape_handle(conn)
offset = get_resp_last_offset(conn)
assert [%{"value" => %{"value" => "test value 2"}}, _] = Jason.decode!(conn.resp_body)

task =
Task.async(fn ->
Router.call(
conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{handle}&live"),
opts
)
end)

Postgrex.query!(db_conn, "TRUNCATE TABLE items", [])
assert %{status: 204} = Task.await(task)

conn =
Router.call(conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{handle}"), opts)

assert %{status: 409} = conn
assert [%{"headers" => %{"control" => "must-refetch"}}] = Jason.decode!(conn.resp_body)

conn =
Router.call(conn("GET", "/v1/shape?table=items&offset=-1"), opts)

assert %{status: 200} = conn
new_handle = get_resp_shape_handle(conn)
refute new_handle == handle
offset = get_resp_last_offset(conn)
assert [] = Jason.decode!(conn.resp_body)

task =
Task.async(fn ->
Router.call(
conn("GET", "/v1/shape?table=items&offset=#{offset}&handle=#{new_handle}&live"),
opts
)
end)

Postgrex.query!(db_conn, "INSERT INTO items VALUES (gen_random_uuid(), 'test value 3')", [])

conn = Task.await(task)

assert %{status: 200} = conn
assert ^new_handle = get_resp_shape_handle(conn)
# offset = get_resp_last_offset(conn)
assert [%{"value" => %{"value" => "test value 3"}}, @up_to_date] =
Jason.decode!(conn.resp_body)
end

@tag with_sql: [
"INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')"
]
Expand Down
55 changes: 0 additions & 55 deletions packages/sync-service/test/electric/shape_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -679,61 +679,6 @@ defmodule Electric.ShapeCacheTest do
end
end

describe "handle_truncate/2" do
setup [
:with_stack_id_from_test,
:with_in_memory_storage,
:with_log_chunking,
:with_registry,
:with_shape_log_collector
]

test "cleans up shape data and rotates the shape handle", ctx do
%{shape_cache_opts: opts} =
with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}),
run_with_conn_fn: &run_with_conn_noop/2,
prepare_tables_fn: @prepare_tables_noop,
create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ ->
GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10})
Storage.make_new_snapshot!([["test"]], storage)
GenServer.cast(parent, {:snapshot_started, shape_handle})
end
)

{shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape, opts)
Process.sleep(50)
assert :started = ShapeCache.await_snapshot_start(shape_handle, opts)

storage = Storage.for_shape(shape_handle, ctx.storage)

Storage.append_to_log!(
changes_to_log_items([
%Electric.Replication.Changes.NewRecord{
relation: {"public", "items"},
record: %{"id" => "1", "value" => "Alice"},
log_offset: LogOffset.new(Electric.Postgres.Lsn.from_integer(1000), 0)
}
]),
storage
)

assert Storage.snapshot_started?(storage)
assert Enum.count(Storage.get_log_stream(@zero_offset, storage)) == 1

ref =
Shapes.Consumer.whereis(ctx.stack_id, shape_handle)
|> Process.monitor()

log = capture_log(fn -> ShapeCache.handle_truncate(shape_handle, opts) end)
assert log =~ "Truncating and rotating shape handle"

assert_receive {:DOWN, ^ref, :process, _pid, _}
# Wait a bit for the async cleanup to complete

refute Storage.snapshot_started?(storage)
end
end

describe "clean_shape/2" do
setup [
:with_stack_id_from_test,
Expand Down
8 changes: 4 additions & 4 deletions packages/sync-service/test/electric/shapes/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ defmodule Electric.Shapes.ConsumerTest do
lsn = Lsn.from_string("0/10")
last_log_offset = LogOffset.new(lsn, 0)

Mock.ShapeCache
|> expect(:handle_truncate, fn @shape_handle1, _ -> :ok end)
Mock.ShapeStatus
|> expect(:remove_shape, fn _, @shape_handle1 -> :ok end)
|> allow(
self(),
Shapes.Consumer.name(ctx.stack_id, @shape_handle1)
Expand Down Expand Up @@ -366,8 +366,8 @@ defmodule Electric.Shapes.ConsumerTest do
lsn = Lsn.from_string("0/10")
last_log_offset = LogOffset.new(lsn, 0)

Mock.ShapeCache
|> expect(:handle_truncate, fn @shape_handle1, _ -> :ok end)
Mock.ShapeStatus
|> expect(:remove_shape, fn _, @shape_handle1 -> :ok end)
|> allow(
self(),
Shapes.Consumer.name(ctx.stack_id, @shape_handle1)
Expand Down

0 comments on commit 218b7d4

Please sign in to comment.