Skip to content

Commit

Permalink
feat (sync-service): Prevent shape consumer errors from affecting oth…
Browse files Browse the repository at this point in the history
…er shapes (#2009)

Fixes #1925 . Errors that occur while consuming the replication stream
that are to do with a specific shape, cause that shape's consumer to
remove the shape and shut down, leaving the other shapes unaffected.

Errors can occur:
1. In the selector which happens on a process common to all shapes. 
2. On the consumer process.

This PR addresses both types of error. Previous to this PR, these errors
would cause the sync service to get into an infinite crash loop and
would stop responding to HTTP requests.
  • Loading branch information
robacourt authored Nov 21, 2024
1 parent fed0761 commit 598aa28
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 39 deletions.
5 changes: 5 additions & 0 deletions .changeset/clever-dots-admire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Improve reliability: Shapes that error while processing the replication stream will now be removed leaving other shapes unaffected
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ defmodule Electric.Replication.ShapeLogCollector do
stack_id: [type: :string, required: true],
inspector: [type: :mod_arg, required: true],
# see https://hexdocs.pm/gen_stage/GenStage.html#c:init/1-options
demand: [type: {:in, [:forward, :accumulate]}, default: :accumulate],
# should this log collector process shutdown when one of its consumers crashes?
link_consumers: [type: :boolean, default: true]
demand: [type: {:in, [:forward, :accumulate]}, default: :accumulate]
)

def start_link(opts) do
Expand Down Expand Up @@ -91,29 +89,7 @@ defmodule Electric.Replication.ShapeLogCollector do
{:noreply, [], remove_subscription(from, state)}
end

def handle_cancel({:down, reason}, from, %{link_consumers: true} = state) do
# See: https://hexdocs.pm/elixir/Supervisor.html#module-exit-reasons-and-restarts
# If the consumer's shutdown is unexpected, due to some error, then exit with
# this error and let the supervisor bring us back up.
state = remove_subscription(from, state)

case reason do
{:shutdown, _} ->
{:noreply, [], state}

:shutdown ->
{:noreply, [], state}

:normal ->
{:noreply, [], state}

error ->
Logger.warning("Terminating LogCollector due to error from consumer: #{inspect(error)}")
{:stop, {:error, error}, state}
end
end

def handle_cancel({:down, _reason}, from, %{link_consumers: false} = state) do
def handle_cancel({:down, _reason}, from, state) do
{:noreply, [], remove_subscription(from, state)}
end

Expand Down
39 changes: 30 additions & 9 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Electric.Shapes.Consumer do
use GenStage,
restart: :transient,
restart: :temporary,
significant: true

alias Electric.ShapeCache.LogChunker
Expand Down Expand Up @@ -91,13 +91,22 @@ defmodule Electric.Shapes.Consumer do
subscribe_to: [{producer, [max_demand: 1, selector: &selector(&1, config.shape)]}]}
end

defp selector(%Transaction{changes: changes}, shape) do
defp selector(event, shape) do
process_event?(event, shape)
rescue
# Swallow errors here to avoid crashing the ShapeLogCollector.
# Return `true` so the event is processed, which will then error
# for the same reason and cleanup the shape.
_ -> true
end

defp process_event?(%Transaction{changes: changes}, shape) do
changes
|> Stream.flat_map(&Shape.convert_change(shape, &1))
|> Enum.any?()
end

defp selector(%Changes.Relation{} = relation_change, shape) do
defp process_event?(%Changes.Relation{} = relation_change, shape) do
Shape.is_affected_by_relation_change?(shape, relation_change)
end

Expand All @@ -116,7 +125,7 @@ defmodule Electric.Shapes.Consumer do

# TODO: ensure cleanup occurs after snapshot is done/failed/interrupted to avoid
# any race conditions and leftover data
state = cleanup(state)
cleanup(state)
{:stop, :normal, :ok, state}
end

Expand Down Expand Up @@ -160,7 +169,7 @@ defmodule Electric.Shapes.Consumer do
)

state = reply_to_snapshot_waiters({:error, error}, state)
state = cleanup(state)
cleanup(state)
{:stop, :normal, state}
end

Expand All @@ -170,10 +179,23 @@ defmodule Electric.Shapes.Consumer do
{:noreply, [], state}
end

def terminate(_reason, state) do
reply_to_snapshot_waiters({:error, "Shape terminated before snapshot was ready"}, state)
def terminate(reason, state) do
state =
reply_to_snapshot_waiters({:error, "Shape terminated before snapshot was ready"}, state)

if is_error?(reason) do
cleanup(state)
end

state
end

defp is_error?(:normal), do: false
defp is_error?(:killed), do: false
defp is_error?(:shutdown), do: false
defp is_error?({:shutdown, _}), do: false
defp is_error?(_), do: true

# `Shapes.Dispatcher` only works with single-events, so we can safely assert
# that here
def handle_events([%Changes.Relation{}], _from, state) do
Expand All @@ -193,7 +215,7 @@ defmodule Electric.Shapes.Consumer do
state
)

state = cleanup(state)
cleanup(state)
{:stop, :normal, state}
end

Expand Down Expand Up @@ -336,7 +358,6 @@ defmodule Electric.Shapes.Consumer do
%{shape_status: {shape_status, shape_status_state}} = state
shape_status.remove_shape(shape_status_state, state.shape_handle)
ShapeCache.Storage.cleanup!(state.storage)
state
end

defp reply_to_snapshot_waiters(_reply, %{awaiting_snapshot_start: []} = state) do
Expand Down
2 changes: 0 additions & 2 deletions packages/sync-service/test/electric/shape_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -795,8 +795,6 @@ defmodule Electric.ShapeCacheTest do

setup do
%{
# don't crash the log collector when the shape consumers get killed by our tests
link_log_collector: false,
inspector: Support.StubInspector.new([%{name: "id", type: "int8", pk_position: 0}])
}
end
Expand Down
76 changes: 76 additions & 0 deletions packages/sync-service/test/electric/shapes/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,82 @@ defmodule Electric.Shapes.ConsumerTest do
assert_receive {:DOWN, ^ref1, :process, _, _}
refute_receive {:DOWN, ^ref2, :process, _, _}
end

test "unexpected error while handling events stops affected consumer and cleans affected shape",
ctx do
Mock.ShapeCache
|> expect(:update_shape_latest_offset, fn @shape_handle1, _, _ ->
raise "The unexpected error"
end)
|> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1))

lsn = Lsn.from_string("0/10")

txn =
%Transaction{xid: 150, lsn: lsn, last_log_offset: LogOffset.new(lsn, 0)}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "1"},
log_offset: LogOffset.first()
})

ref1 =
Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle1)))

ref2 =
Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle2)))

Mock.ShapeStatus
|> expect(:remove_shape, 1, fn _, _ -> :ok end)
|> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1))
|> expect(:remove_shape, 0, fn _, _ -> :ok end)
|> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2))

:ok = ShapeLogCollector.store_transaction(txn, ctx.producer)

assert_receive {Support.TestStorage, :cleanup!, @shape_handle1}
refute_receive {Support.TestStorage, :cleanup!, @shape_handle2}

assert_receive {:DOWN, ^ref1, :process, _, _}
refute_receive {:DOWN, ^ref2, :process, _, _}
end

test "consumer crashing stops affected consumer and cleans affected shape", ctx do
ref1 =
Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle1)))

ref2 =
Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle2)))

Mock.ShapeStatus
|> expect(:remove_shape, 1, fn _, _ -> :ok end)
|> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1))
|> expect(:remove_shape, 0, fn _, _ -> :ok end)
|> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2))

GenServer.cast(Consumer.name(ctx.stack_id, @shape_handle1), :unexpected_cast)

assert_receive {Support.TestStorage, :cleanup!, @shape_handle1}
refute_receive {Support.TestStorage, :cleanup!, @shape_handle2}

assert_receive {:DOWN, ^ref1, :process, _, _}
refute_receive {:DOWN, ^ref2, :process, _, _}
end

test "consumer exiting normally does not clean up the shape", ctx do
ref =
Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle1)))

Mock.ShapeStatus
|> expect(:remove_shape, 0, fn _, _ -> :ok end)
|> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1))

GenServer.stop(Consumer.name(ctx.stack_id, @shape_handle1))

refute_receive {Support.TestStorage, :cleanup!, @shape_handle1}

assert_receive {:DOWN, ^ref, :process, _, _}
end
end

describe "transaction handling with real storage" do
Expand Down
3 changes: 1 addition & 2 deletions packages/sync-service/test/support/component_setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ defmodule Support.ComponentSetup do
{:ok, _} =
ShapeLogCollector.start_link(
stack_id: ctx.stack_id,
inspector: ctx.inspector,
link_consumers: Map.get(ctx, :link_log_collector, true)
inspector: ctx.inspector
)

%{shape_log_collector: ShapeLogCollector.name(ctx.stack_id)}
Expand Down

0 comments on commit 598aa28

Please sign in to comment.