From 1803392f9e9377551752248f21bd110b18198cd3 Mon Sep 17 00:00:00 2001 From: Rob A'Court Date: Wed, 14 Aug 2024 13:59:41 +0100 Subject: [PATCH] feat(sync-service): Streaming the response to the client without waiting for the snapshot to finish (#1517) Previously shapes with one million rows or more (170MB+) would timeout waiting for the snapshot to be created. Now the snapshot is streamed from the database into storage while simultaneously being streamed from storage to the client. This means the first packets of the response can be sent without waiting for the snapshot to finish. This means we now support tables with over a million rows. Ilia is currently benchmarking this branch to see what the new limits are. This PR addresses #1438 and #1444 I think there are quite a few simplifications that could happen off the back of this change, but I have kept the refactoring to a minimum in this PR and will instead address the simplifications in separate PRs. --- .changeset/kind-starfishes-tap.md | 5 + .../lib/electric/concurrent_stream.ex | 51 ++++++ .../sync-service/lib/electric/shape_cache.ex | 48 +++--- .../electric/shape_cache/cub_db_storage.ex | 61 +++++-- .../electric/shape_cache/in_memory_storage.ex | 52 ++++-- .../lib/electric/shape_cache/storage.ex | 19 +- packages/sync-service/lib/electric/shapes.ex | 2 +- .../test/electric/concurrent_stream_test.exs | 55 ++++++ .../electric/plug/serve_shape_plug_test.exs | 6 +- .../replication/shape_log_collector_test.exs | 6 +- .../storage_implementations_test.exs | 163 +++++++++++++----- .../electric/shape_cache/storage_test.exs | 4 +- .../test/electric/shape_cache_test.exs | 150 ++++++++++------ 13 files changed, 458 insertions(+), 164 deletions(-) create mode 100644 .changeset/kind-starfishes-tap.md create mode 100644 packages/sync-service/lib/electric/concurrent_stream.ex create mode 100644 packages/sync-service/test/electric/concurrent_stream_test.exs diff --git a/.changeset/kind-starfishes-tap.md b/.changeset/kind-starfishes-tap.md new file mode 100644 index 0000000000..b93099e346 --- /dev/null +++ b/.changeset/kind-starfishes-tap.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Support larger shapes (1 million row, 170MB +) and faster time to first byte diff --git a/packages/sync-service/lib/electric/concurrent_stream.ex b/packages/sync-service/lib/electric/concurrent_stream.ex new file mode 100644 index 0000000000..4d571012fa --- /dev/null +++ b/packages/sync-service/lib/electric/concurrent_stream.ex @@ -0,0 +1,51 @@ +defmodule Electric.ConcurrentStream do + @default_poll_time 10 + + @doc """ + Allows concurrent reading while writing of a stream. + There can be mutiple reading processes however there must be only one writing process. + + The writing process must append an end marker to the end of the stream when it has finished + to signal to the reading processes that the stream has ended. + + If a read process runs out of data to read before the end marker has been written + it waits the `poll_time_in_ms` for more data to be written, then resumes the stream + with the `stream_fun`. + """ + + def stream_to_end(opts) do + excluded_start_key = Keyword.fetch!(opts, :excluded_start_key) + end_marker_key = Keyword.fetch!(opts, :end_marker_key) + stream_fun = Keyword.fetch!(opts, :stream_fun) + + stream_fun.(excluded_start_key, end_marker_key) + |> continue_if_not_ended(excluded_start_key, opts) + end + + defp continue_if_not_ended(stream, latest_key, opts) do + end_marker_key = Keyword.fetch!(opts, :end_marker_key) + stream_fun = Keyword.fetch!(opts, :stream_fun) + poll_time_in_ms = Keyword.get(opts, :poll_time_in_ms, @default_poll_time) + + [stream, [:premature_end]] + |> Stream.concat() + |> Stream.transform(latest_key, fn + :premature_end, latest_key -> + # Wait for more items to be added + Process.sleep(poll_time_in_ms) + + # Continue from the latest_key + stream = + stream_fun.(latest_key, end_marker_key) + |> continue_if_not_ended(latest_key, opts) + + {stream, latest_key} + + {^end_marker_key, _}, _latest_key -> + {:halt, :end_marker_seen} + + {key, _value} = item, _latest_key -> + {[item], key} + end) + end +end diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index 971d25eef4..b676dd74e3 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -22,7 +22,7 @@ defmodule Electric.ShapeCacheBehaviour do {shape_id(), current_snapshot_offset :: LogOffset.t()} @callback list_active_shapes(opts :: keyword()) :: [{shape_id(), shape_def(), xmin()}] - @callback wait_for_snapshot(GenServer.name(), shape_id()) :: :ready | {:error, term()} + @callback await_snapshot_start(GenServer.name(), shape_id()) :: :started | {:error, term()} @callback handle_truncate(GenServer.name(), shape_id()) :: :ok @callback clean_shape(GenServer.name(), shape_id()) :: :ok end @@ -136,9 +136,9 @@ defmodule Electric.ShapeCache do GenServer.call(server, {:truncate, shape_id}) end - @spec wait_for_snapshot(GenServer.name(), String.t()) :: :ready | {:error, term()} - def wait_for_snapshot(server \\ __MODULE__, shape_id) when is_binary(shape_id) do - GenServer.call(server, {:wait_for_snapshot, shape_id}, 30_000) + @spec await_snapshot_start(GenServer.name(), String.t()) :: :started | {:error, term()} + def await_snapshot_start(server \\ __MODULE__, shape_id) when is_binary(shape_id) do + GenServer.call(server, {:await_snapshot_start, shape_id}) end def init(opts) do @@ -148,7 +148,7 @@ defmodule Electric.ShapeCache do state = %{ storage: opts.storage, shape_meta_table: shape_meta_table, - waiting_for_creation: %{}, + awaiting_snapshot_start: %{}, db_pool: opts.db_pool, create_snapshot_fn: opts.create_snapshot_fn, prepare_tables_fn: opts.prepare_tables_fn @@ -188,13 +188,13 @@ defmodule Electric.ShapeCache do {:reply, {shape_id, latest_offset}, state} end - def handle_call({:wait_for_snapshot, shape_id}, from, state) do + def handle_call({:await_snapshot_start, shape_id}, from, state) do cond do not is_known_shape_id?(state, shape_id) -> {:reply, {:error, :unknown}, state} - Storage.snapshot_exists?(shape_id, state.storage) -> - {:reply, :ready, state} + Storage.snapshot_started?(shape_id, state.storage) -> + {:reply, :started, state} true -> Logger.debug("Starting a wait on the snapshot #{shape_id} for #{inspect(from)}}") @@ -235,10 +235,11 @@ defmodule Electric.ShapeCache do {:noreply, state} end - def handle_cast({:snapshot_ready, shape_id}, state) do + def handle_cast({:snapshot_started, shape_id}, state) do Logger.debug("Snapshot for #{shape_id} is ready") - {waiting, state} = pop_in(state, [:waiting_for_creation, shape_id]) - for client <- List.wrap(waiting), not is_nil(client), do: GenServer.reply(client, :ready) + Storage.mark_snapshot_as_started(shape_id, state.storage) + {waiting, state} = pop_in(state, [:awaiting_snapshot_start, shape_id]) + for client <- List.wrap(waiting), not is_nil(client), do: GenServer.reply(client, :started) {:noreply, state} end @@ -248,8 +249,13 @@ defmodule Electric.ShapeCache do ) clean_up_shape(state, shape_id) - {waiting, state} = pop_in(state, [:waiting_for_creation, shape_id]) - for client <- waiting, not is_nil(client), do: GenServer.reply(client, {:error, error}) + {waiting, state} = pop_in(state, [:awaiting_snapshot_start, shape_id]) + + # waiting may nil here if :snapshot_failed happens after :snapshot_started + if waiting do + for client <- waiting, not is_nil(client), do: GenServer.reply(client, {:error, error}) + end + {:noreply, state} end @@ -275,12 +281,12 @@ defmodule Electric.ShapeCache do shape end - defp maybe_start_snapshot(%{waiting_for_creation: map} = state, shape_id, _) + defp maybe_start_snapshot(%{awaiting_snapshot_start: map} = state, shape_id, _) when is_map_key(map, shape_id), do: state defp maybe_start_snapshot(state, shape_id, shape) do - if not Storage.snapshot_exists?(shape_id, state.storage) do + if not Storage.snapshot_started?(shape_id, state.storage) do parent = self() %{ @@ -298,9 +304,7 @@ defmodule Electric.ShapeCache do fn -> try do Utils.apply_fn_or_mfa(prepare_tables_fn_or_mfa, [pool, affected_tables]) - apply(create_snapshot_fn, [parent, shape_id, shape, pool, storage]) - GenServer.cast(parent, {:snapshot_ready, shape_id}) rescue error -> GenServer.cast(parent, {:snapshot_failed, shape_id, error, __STACKTRACE__}) end @@ -321,10 +325,10 @@ defmodule Electric.ShapeCache do end end - defp add_waiter(%{waiting_for_creation: waiters} = state, shape_id, waiter), + defp add_waiter(%{awaiting_snapshot_start: waiters} = state, shape_id, waiter), do: %{ state - | waiting_for_creation: Map.update(waiters, shape_id, [waiter], &[waiter | &1]) + | awaiting_snapshot_start: Map.update(waiters, shape_id, [waiter], &[waiter | &1]) } @doc false @@ -340,12 +344,14 @@ defmodule Electric.ShapeCache do %{rows: [[xmin]]} = Postgrex.query!(conn, "SELECT pg_snapshot_xmin(pg_current_snapshot())", []) + GenServer.cast(parent, {:snapshot_xmin_known, shape_id, xmin}) + # Enforce display settings *before* querying initial data to maintain consistent # formatting between snapshot and live log entries. Enum.each(Electric.Postgres.display_settings(), &Postgrex.query!(conn, &1, [])) - GenServer.cast(parent, {:snapshot_xmin_known, shape_id, xmin}) {query, stream} = Querying.stream_initial_data(conn, shape) + GenServer.cast(parent, {:snapshot_started, shape_id}) # could pass the shape and then make_new_snapshot! can pass it to row_to_snapshot_item # that way it has the relation, but it is still missing the pk_cols @@ -355,7 +361,7 @@ defmodule Electric.ShapeCache do end defp recover_shapes(state) do - Storage.cleanup_shapes_without_xmins(state.storage) + Storage.initialise(state.storage) state.storage |> Storage.list_shapes() diff --git a/packages/sync-service/lib/electric/shape_cache/cub_db_storage.ex b/packages/sync-service/lib/electric/shape_cache/cub_db_storage.ex index b39d404c64..656e08b696 100644 --- a/packages/sync-service/lib/electric/shape_cache/cub_db_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/cub_db_storage.ex @@ -1,9 +1,14 @@ defmodule Electric.ShapeCache.CubDbStorage do + alias Electric.ConcurrentStream alias Electric.LogItems alias Electric.Replication.LogOffset alias Electric.Telemetry.OpenTelemetry @behaviour Electric.ShapeCache.Storage + # If the storage format changes, increase `@version` to prevent + # the incompatable older versions being read + @version 1 + @version_key :version @snapshot_key_type 0 @log_key_type 1 @snapshot_offset LogOffset.first() @@ -12,7 +17,7 @@ defmodule Electric.ShapeCache.CubDbStorage do file_path = Access.get(opts, :file_path, "./shapes") db = Access.get(opts, :db, :shape_db) - {:ok, %{file_path: file_path, db: db}} + {:ok, %{file_path: file_path, db: db, version: @version}} end def child_spec(opts) do @@ -29,12 +34,20 @@ defmodule Electric.ShapeCache.CubDbStorage do CubDB.start_link(data_dir: opts.file_path, name: opts.db) end - def cleanup_shapes_without_xmins(opts) do + def initialise(opts) do + stored_version = stored_version(opts) + opts.db |> CubDB.select(min_key: shapes_start(), max_key: shapes_end()) |> Stream.map(fn {{:shapes, shape_id}, _} -> shape_id end) - |> Stream.reject(&snapshot_xmin(&1, opts)) + |> Stream.filter(fn shape_id -> + stored_version != opts.version || + snapshot_xmin(shape_id, opts) == nil || + CubDB.has_key?(opts.db, snapshot_end(shape_id)) == false + end) |> Enum.each(&cleanup!(&1, opts)) + + CubDB.put(opts.db, @version_key, @version) end def list_shapes(opts) do @@ -77,17 +90,26 @@ defmodule Electric.ShapeCache.CubDbStorage do end end - @spec snapshot_exists?(any(), any()) :: false - def snapshot_exists?(shape_id, opts) do - CubDB.has_key?(opts.db, snapshot_meta_key(shape_id)) + @spec snapshot_started?(any(), any()) :: false + def snapshot_started?(shape_id, opts) do + CubDB.has_key?(opts.db, snapshot_start(shape_id)) end def get_snapshot(shape_id, opts) do stream = - opts.db - |> CubDB.select( - min_key: snapshot_start(shape_id), - max_key: snapshot_end(shape_id) + ConcurrentStream.stream_to_end( + excluded_start_key: snapshot_start(shape_id), + end_marker_key: snapshot_end(shape_id), + poll_time_in_ms: 10, + stream_fun: fn excluded_start_key, included_end_key -> + if !snapshot_started?(shape_id, opts), do: raise("Snapshot no longer available") + + CubDB.select(opts.db, + min_key: excluded_start_key, + max_key: included_end_key, + min_key_inclusive: false + ) + end ) |> Stream.flat_map(fn {_, items} -> items end) |> Stream.map(fn {_, item} -> item end) @@ -112,7 +134,11 @@ defmodule Electric.ShapeCache.CubDbStorage do def has_log_entry?(shape_id, offset, opts) do # FIXME: this is naive while we don't have snapshot metadata to get real offsets CubDB.has_key?(opts.db, log_key(shape_id, offset)) or - (snapshot_exists?(shape_id, opts) and offset == @snapshot_offset) + (snapshot_started?(shape_id, opts) and offset == @snapshot_offset) + end + + def mark_snapshot_as_started(shape_id, opts) do + CubDB.put(opts.db, snapshot_start(shape_id), 0) end def make_new_snapshot!(shape_id, shape, query_info, data_stream, opts) do @@ -127,7 +153,7 @@ defmodule Electric.ShapeCache.CubDbStorage do |> Stream.each(fn [{key, _} | _] = chunk -> CubDB.put(opts.db, key, chunk) end) |> Stream.run() - CubDB.put(opts.db, snapshot_meta_key(shape_id), 0) + CubDB.put(opts.db, snapshot_end(shape_id), 0) end) end @@ -141,7 +167,6 @@ defmodule Electric.ShapeCache.CubDbStorage do def cleanup!(shape_id, opts) do [ - snapshot_meta_key(shape_id), shape_key(shape_id), xmin_key(shape_id) ] @@ -155,10 +180,6 @@ defmodule Electric.ShapeCache.CubDbStorage do |> Stream.map(&elem(&1, 0)) end - defp snapshot_meta_key(shape_id) do - {:snapshot_metadata, shape_id} - end - defp snapshot_key(shape_id, index) do {shape_id, @snapshot_key_type, index} end @@ -187,6 +208,10 @@ defmodule Electric.ShapeCache.CubDbStorage do defp log_start(shape_id), do: log_key(shape_id, LogOffset.first()) defp log_end(shape_id), do: log_key(shape_id, LogOffset.last()) - defp snapshot_start(shape_id), do: snapshot_key(shape_id, 0) + defp snapshot_start(shape_id), do: snapshot_key(shape_id, -1) defp snapshot_end(shape_id), do: snapshot_key(shape_id, :end) + + defp stored_version(opts) do + CubDB.get(opts.db, @version_key) + end end diff --git a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex index 17f27bb4d8..5ac3d7fbbb 100644 --- a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex @@ -1,4 +1,5 @@ defmodule Electric.ShapeCache.InMemoryStorage do + alias Electric.ConcurrentStream alias Electric.LogItems alias Electric.Replication.LogOffset alias Electric.Telemetry.OpenTelemetry @@ -30,20 +31,37 @@ defmodule Electric.ShapeCache.InMemoryStorage do def list_shapes(_opts), do: [] def add_shape(_shape_id, _shape, _opts), do: :ok def set_snapshot_xmin(_shape_id, _xmin, _opts), do: :ok - def cleanup_shapes_without_xmins(_opts), do: :ok + def initialise(_opts), do: :ok - def snapshot_exists?(shape_id, opts) do - case :ets.match(opts.snapshot_ets_table, {{:metadata, shape_id}, :_}, 1) do - {[_], _} -> true - :"$end_of_table" -> false - end + def snapshot_started?(shape_id, opts) do + :ets.member(opts.snapshot_ets_table, snapshot_start(shape_id)) + end + + defp snapshot_key(shape_id, index) do + {:data, shape_id, index} end + @snapshot_start_index 0 + @snapshot_end_index :end + defp snapshot_start(shape_id), do: snapshot_key(shape_id, @snapshot_start_index) + defp snapshot_end(shape_id), do: snapshot_key(shape_id, @snapshot_end_index) + def get_snapshot(shape_id, opts) do stream = - :ets.select(opts.snapshot_ets_table, [ - {{{:data, shape_id, :"$1"}, :"$2"}, [], [{{:"$1", :"$2"}}]} - ]) + ConcurrentStream.stream_to_end( + excluded_start_key: @snapshot_start_index, + end_marker_key: @snapshot_end_index, + poll_time_in_ms: 10, + stream_fun: fn excluded_start_key, included_end_key -> + if !snapshot_started?(shape_id, opts), do: raise("Snapshot no longer available") + + :ets.select(opts.snapshot_ets_table, [ + {{snapshot_key(shape_id, :"$1"), :"$2"}, + [{:andalso, {:>, :"$1", excluded_start_key}, {:"=<", :"$1", included_end_key}}], + [{{:"$1", :"$2"}}]} + ]) + end + ) |> Stream.map(fn {_, item} -> item end) {@snapshot_offset, stream} @@ -76,10 +94,14 @@ defmodule Electric.ShapeCache.InMemoryStorage do ]) do [true] -> true # FIXME: this is naive while we don't have snapshot metadata to get real offset - [] -> snapshot_exists?(shape_id, opts) and offset == @snapshot_offset + [] -> snapshot_started?(shape_id, opts) and offset == @snapshot_offset end end + def mark_snapshot_as_started(shape_id, opts) do + :ets.insert(opts.snapshot_ets_table, {snapshot_start(shape_id), 0}) + end + @spec make_new_snapshot!( String.t(), Electric.Shapes.Shape.t(), @@ -93,14 +115,15 @@ defmodule Electric.ShapeCache.InMemoryStorage do data_stream |> LogItems.from_snapshot_row_stream(@snapshot_offset, shape, query_info) - |> Stream.map(fn log_item -> - {{:data, shape_id, log_item.key}, Jason.encode!(log_item)} + |> Stream.with_index(1) + |> Stream.map(fn {log_item, index} -> + {snapshot_key(shape_id, index), Jason.encode!(log_item)} end) |> Stream.chunk_every(500) |> Stream.each(fn chunk -> :ets.insert(ets_table, chunk) end) |> Stream.run() - :ets.insert(ets_table, {{:metadata, shape_id}, 0}) + :ets.insert(ets_table, {snapshot_end(shape_id), 0}) :ok end) end @@ -119,8 +142,7 @@ defmodule Electric.ShapeCache.InMemoryStorage do end def cleanup!(shape_id, opts) do - :ets.match_delete(opts.snapshot_ets_table, {{:data, shape_id, :_}, :_}) - :ets.match_delete(opts.snapshot_ets_table, {{:metadata, shape_id}, :_}) + :ets.match_delete(opts.snapshot_ets_table, {snapshot_key(shape_id, :_), :_}) :ets.match_delete(opts.log_ets_table, {{shape_id, :_}, :_}) :ok end diff --git a/packages/sync-service/lib/electric/shape_cache/storage.ex b/packages/sync-service/lib/electric/shape_cache/storage.ex index 0238134ee1..ef89a70f8b 100644 --- a/packages/sync-service/lib/electric/shape_cache/storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/storage.ex @@ -31,7 +31,7 @@ defmodule Electric.ShapeCache.Storage do @callback shared_opts(term()) :: {:ok, compiled_opts()} | {:error, term()} @doc "Start any processes required to run the storage backend" @callback start_link(compiled_opts()) :: GenServer.on_start() - @callback cleanup_shapes_without_xmins(storage()) :: :ok + @callback initialise(storage()) :: :ok @callback list_shapes(storage()) :: [ shape_id: shape_id(), shape: Shape.t(), @@ -41,7 +41,7 @@ defmodule Electric.ShapeCache.Storage do @callback add_shape(shape_id(), Shape.t(), storage()) :: :ok @callback set_snapshot_xmin(shape_id(), non_neg_integer(), storage()) :: :ok @doc "Check if snapshot for a given shape id already exists" - @callback snapshot_exists?(shape_id(), compiled_opts()) :: boolean() + @callback snapshot_started?(shape_id(), compiled_opts()) :: boolean() @doc "Get the full snapshot for a given shape, also returning the offset this snapshot includes" @callback get_snapshot(shape_id(), compiled_opts()) :: {offset :: LogOffset.t(), log()} @doc """ @@ -57,6 +57,7 @@ defmodule Electric.ShapeCache.Storage do Enumerable.t(row()), compiled_opts() ) :: :ok + @callback mark_snapshot_as_started(shape_id, compiled_opts()) :: :ok @doc "Append log items from one transaction to the log" @callback append_to_log!( shape_id(), @@ -73,9 +74,9 @@ defmodule Electric.ShapeCache.Storage do @type storage() :: {module(), compiled_opts()} - @spec cleanup_shapes_without_xmins(storage()) :: :ok - def cleanup_shapes_without_xmins({mod, opts}), - do: apply(mod, :cleanup_shapes_without_xmins, [opts]) + @spec initialise(storage()) :: :ok + def initialise({mod, opts}), + do: apply(mod, :initialise, [opts]) @spec list_shapes(storage()) :: [ shape_id: shape_id(), @@ -94,8 +95,8 @@ defmodule Electric.ShapeCache.Storage do do: apply(mod, :set_snapshot_xmin, [shape_id, xmin, opts]) @doc "Check if snapshot for a given shape id already exists" - @spec snapshot_exists?(shape_id(), storage()) :: boolean() - def snapshot_exists?(shape_id, {mod, opts}), do: mod.snapshot_exists?(shape_id, opts) + @spec snapshot_started?(shape_id(), storage()) :: boolean() + def snapshot_started?(shape_id, {mod, opts}), do: mod.snapshot_started?(shape_id, opts) @doc "Get the full snapshot for a given shape, also returning the offset this snapshot includes" @spec get_snapshot(shape_id(), storage()) :: {offset :: LogOffset.t(), log()} def get_snapshot(shape_id, {mod, opts}), do: mod.get_snapshot(shape_id, opts) @@ -114,6 +115,10 @@ defmodule Electric.ShapeCache.Storage do def make_new_snapshot!(shape_id, shape, meta, stream, {mod, opts}), do: mod.make_new_snapshot!(shape_id, shape, meta, stream, opts) + @spec mark_snapshot_as_started(shape_id, compiled_opts()) :: :ok + def mark_snapshot_as_started(shape_id, {mod, opts}), + do: mod.mark_snapshot_as_started(shape_id, opts) + @doc """ Append log items from one transaction to the log """ diff --git a/packages/sync-service/lib/electric/shapes.ex b/packages/sync-service/lib/electric/shapes.ex index e86b1069d8..b3ca58ee00 100644 --- a/packages/sync-service/lib/electric/shapes.ex +++ b/packages/sync-service/lib/electric/shapes.ex @@ -13,7 +13,7 @@ defmodule Electric.Shapes do storage = Access.fetch!(config, :storage) server = Access.get(opts, :server, shape_cache) - with :ready <- shape_cache.wait_for_snapshot(server, shape_id) do + with :started <- shape_cache.await_snapshot_start(server, shape_id) do {:ok, Storage.get_snapshot(shape_id, storage)} end end diff --git a/packages/sync-service/test/electric/concurrent_stream_test.exs b/packages/sync-service/test/electric/concurrent_stream_test.exs new file mode 100644 index 0000000000..d8969771e9 --- /dev/null +++ b/packages/sync-service/test/electric/concurrent_stream_test.exs @@ -0,0 +1,55 @@ +defmodule Electric.ConcurrentStreamTest do + use ExUnit.Case, async: true + alias Electric.ConcurrentStream + @item_count 10 + @end_marker_key @item_count + 1 + + describe "stream_to_end/2" do + setup %{tmp_dir: tmp_dir, test: test} do + db = :"cubdb_#{test}" + CubDB.start_link(data_dir: tmp_dir, name: db) + {:ok, %{db: db}} + end + + @tag :tmp_dir + test "returns complete stream from CubDB when it's being written to concurrently", %{db: db} do + stream = + ConcurrentStream.stream_to_end( + excluded_start_key: 0, + end_marker_key: @end_marker_key, + stream_fun: fn excluded_start_key, included_end_key -> + CubDB.select(db, + min_key: excluded_start_key, + end_key: included_end_key, + min_key_inclusive: false + ) + end + ) + + read_tasks = + for _ <- 1..10 do + Task.async(fn -> + items = Enum.to_list(stream) + + assert Enum.count(items) == @item_count + + for i <- 1..@item_count do + assert Enum.at(items, i - 1) == {i, "item_#{i}"} + end + end) + end + + # Write the stream concurrently + for i <- 1..@item_count do + CubDB.put(db, i, "item_#{i}") + # Sleep to give the read process time to run + Process.sleep(1) + end + + # Write the end marker to let the read process that the stream has ended + CubDB.put(db, @end_marker_key, "ended") + + Task.await_many(read_tasks) + end + end +end diff --git a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs index 1687184abb..16a4dda99a 100644 --- a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs +++ b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs @@ -99,7 +99,7 @@ defmodule Electric.Plug.ServeShapePlugTest do |> expect(:get_or_create_shape_id, fn @test_shape, _opts -> {@test_shape_id, @test_offset} end) - |> expect(:wait_for_snapshot, fn _, @test_shape_id -> :ready end) + |> expect(:await_snapshot_start, fn _, @test_shape_id -> :started end) next_offset = LogOffset.increment(@first_offset) @@ -140,7 +140,7 @@ defmodule Electric.Plug.ServeShapePlugTest do |> expect(:get_or_create_shape_id, fn @test_shape, _opts -> {@test_shape_id, @test_offset} end) - |> expect(:wait_for_snapshot, fn _, @test_shape_id -> :ready end) + |> expect(:await_snapshot_start, fn _, @test_shape_id -> :started end) next_offset = LogOffset.increment(@first_offset) @@ -173,7 +173,7 @@ defmodule Electric.Plug.ServeShapePlugTest do |> expect(:get_or_create_shape_id, fn @test_shape, _opts -> {@test_shape_id, @test_offset} end) - |> expect(:wait_for_snapshot, fn _, @test_shape_id -> :ready end) + |> expect(:await_snapshot_start, fn _, @test_shape_id -> :started end) next_offset = LogOffset.increment(@first_offset) diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index 16d2e09e0c..68c20bec38 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -306,7 +306,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do create_snapshot_fn: fn parent, shape_id, shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) end ) @@ -333,7 +333,9 @@ defmodule Electric.Replication.ShapeLogCollectorTest do shape_cache_opts: shape_cache_opts } do {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, shape_cache_opts) - :ready = ShapeCache.wait_for_snapshot(Keyword.fetch!(shape_cache_opts, :server), shape_id) + + :started = + ShapeCache.await_snapshot_start(Keyword.fetch!(shape_cache_opts, :server), shape_id) lsn = Lsn.from_integer(10) diff --git a/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs b/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs index a7037816a6..b60b88afa2 100644 --- a/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs +++ b/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs @@ -39,7 +39,7 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do doctest module, import: true - describe "#{module_name}.snapshot_exists?/2" do + describe "#{module_name}.snapshot_started?/2" do setup do {:ok, %{module: unquote(module)}} end @@ -47,22 +47,13 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do setup :start_storage test "returns false when shape does not exist", %{module: storage, opts: opts} do - assert storage.snapshot_exists?(@shape_id, opts) == false + assert storage.snapshot_started?(@shape_id, opts) == false end - test "returns true when shape does exist", %{module: storage, opts: opts} do - storage.make_new_snapshot!(@shape_id, @shape, @query_info, @data_stream, opts) + test "returns true when snapshot has started", %{module: storage, opts: opts} do + storage.mark_snapshot_as_started(@shape_id, opts) - assert storage.snapshot_exists?(@shape_id, opts) == true - end - - test "returns true when shape does exist even from empty query reeults", %{ - module: storage, - opts: opts - } do - storage.make_new_snapshot!(@shape_id, @shape, @query_info, [], opts) - - assert storage.snapshot_exists?(@shape_id, opts) == true + assert storage.snapshot_started?(@shape_id, opts) == true end end @@ -73,16 +64,8 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do setup :start_storage - test "returns empty stream when shape does not exist", %{module: storage, opts: opts} do - {_, stream} = storage.get_snapshot(@shape_id, opts) - assert [] = Enum.to_list(stream) - end - - test "returns the zero offset when shape does not exist", %{module: storage, opts: opts} do - assert {@zero_offset, _} = storage.get_snapshot(@shape_id, opts) - end - test "returns snapshot when shape does exist", %{module: storage, opts: opts} do + storage.mark_snapshot_as_started(@shape_id, opts) storage.make_new_snapshot!(@shape_id, @shape, @query_info, @data_stream, opts) {@snapshot_offset, stream} = storage.get_snapshot(@shape_id, opts) @@ -109,8 +92,11 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do [<<4::128>>, "row4"] ] + storage.mark_snapshot_as_started(@shape_id, opts) storage.make_new_snapshot!(@shape_id, @shape, @query_info, @data_stream, opts) + storage.mark_snapshot_as_started("another-shape-id", opts) + storage.make_new_snapshot!( "another-shape-id", @shape, @@ -138,12 +124,13 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do end test "returns snapshot offset when shape does exist", %{module: storage, opts: opts} do + storage.mark_snapshot_as_started(@shape_id, opts) storage.make_new_snapshot!(@shape_id, @shape, @query_info, @data_stream, opts) {@snapshot_offset, _} = storage.get_snapshot(@shape_id, opts) end - test "does not return log items", %{module: storage, opts: opts} do + test "does not return items not in the snapshot", %{module: storage, opts: opts} do log_items = [ %Changes.NewRecord{ @@ -154,10 +141,44 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do ] |> changes_to_log_items() + storage.mark_snapshot_as_started(@shape_id, opts) + storage.make_new_snapshot!(@shape_id, @shape, @query_info, @data_stream, opts) :ok = storage.append_to_log!(@shape_id, log_items, opts) {@snapshot_offset, stream} = storage.get_snapshot(@shape_id, opts) - assert [] = Enum.to_list(stream) + assert Enum.count(stream) == Enum.count(@data_stream) + end + + test "returns complete snapshot when the snapshot is concurrently being written", %{ + module: storage, + opts: opts + } do + row_count = 10 + + data_stream = + Stream.map(1..row_count, fn i -> + # Sleep to give the read process time to run + Process.sleep(1) + [<>, "row#{i}"] + end) + + storage.mark_snapshot_as_started(@shape_id, opts) + {@snapshot_offset, stream} = storage.get_snapshot(@shape_id, opts) + + read_task = + Task.async(fn -> + log = Enum.to_list(stream) + + assert Enum.count(log) == row_count + + for {item, i} <- Enum.with_index(log, 1) do + assert Jason.decode!(item, keys: :atoms).value.title == "row#{i}" + end + end) + + storage.make_new_snapshot!(@shape_id, @shape, @query_info, data_stream, opts) + + Task.await(read_task) end end @@ -416,30 +437,24 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do setup :start_storage - test "causes snapshot_exists?/2 to return false", %{module: storage, opts: opts} do + test "causes snapshot_started?/2 to return false", %{module: storage, opts: opts} do storage.make_new_snapshot!(@shape_id, @shape, @query_info, @data_stream, opts) storage.cleanup!(@shape_id, opts) - assert storage.snapshot_exists?(@shape_id, opts) == false + assert storage.snapshot_started?(@shape_id, opts) == false end - test "causes get_snapshot/2 to return empty stream", %{module: storage, opts: opts} do + test "causes get_snapshot/2 to raise an error", %{module: storage, opts: opts} do + storage.mark_snapshot_as_started(@shape_id, opts) storage.make_new_snapshot!(@shape_id, @shape, @query_info, @data_stream, opts) storage.cleanup!(@shape_id, opts) - {_, stream} = storage.get_snapshot(@shape_id, opts) - - assert [] = Enum.to_list(stream) - end - - test "causes get_snapshot/2 to return a zero offset", %{module: storage, opts: opts} do - storage.make_new_snapshot!(@shape_id, @shape, @query_info, @data_stream, opts) - - storage.cleanup!(@shape_id, opts) - - assert {@zero_offset, _} = storage.get_snapshot(@shape_id, opts) + assert_raise RuntimeError, fn -> + {@zero_offset, stream} = storage.get_snapshot(@shape_id, opts) + Stream.run(stream) + end end test "causes get_log_stream/4 to return empty stream", %{module: storage, opts: opts} do @@ -499,7 +514,7 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do opts: opts } do refute storage.has_log_entry?(@shape_id, @snapshot_offset, opts) - storage.make_new_snapshot!(@shape_id, @shape, @query_info, @data_stream, opts) + storage.mark_snapshot_as_started(@shape_id, opts) assert storage.has_log_entry?(@shape_id, @snapshot_offset, opts) end @@ -583,31 +598,87 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do end end - describe "#{module_name}.cleanup_shapes_without_xmins/1" do + describe "#{module_name}.initialise/1" do setup do {:ok, %{module: unquote(module)}} end setup :start_storage - test "cleans up the shape if the snapshot_xmin has not been set", %{ + test "removes the shape if the snapshot_xmin has not been set", %{ module: storage, opts: opts } do + storage.initialise(opts) + + storage.add_shape("shape-1", @shape, opts) + storage.add_shape("shape-2", @shape, opts) + storage.add_shape("shape-3", @shape, opts) + storage.mark_snapshot_as_started("shape-1", opts) + storage.mark_snapshot_as_started("shape-2", opts) + storage.mark_snapshot_as_started("shape-3", opts) + storage.make_new_snapshot!("shape-1", @shape, @query_info, @data_stream, opts) + storage.make_new_snapshot!("shape-2", @shape, @query_info, @data_stream, opts) + storage.make_new_snapshot!("shape-3", @shape, @query_info, @data_stream, opts) + storage.set_snapshot_xmin("shape-1", 11, opts) + storage.set_snapshot_xmin("shape-3", 33, opts) + + storage.initialise(opts) + + assert storage.snapshot_started?("shape-1", opts) == true + assert storage.snapshot_started?("shape-2", opts) == false + assert storage.snapshot_started?("shape-3", opts) == true + end + + test "removes the shape if the snapshot has not finished", %{ + module: storage, + opts: opts + } do + storage.initialise(opts) + storage.add_shape("shape-1", @shape, opts) storage.add_shape("shape-2", @shape, opts) storage.add_shape("shape-3", @shape, opts) + storage.mark_snapshot_as_started("shape-1", opts) + storage.mark_snapshot_as_started("shape-2", opts) + storage.mark_snapshot_as_started("shape-3", opts) + storage.make_new_snapshot!("shape-1", @shape, @query_info, @data_stream, opts) + storage.make_new_snapshot!("shape-3", @shape, @query_info, @data_stream, opts) + storage.set_snapshot_xmin("shape-1", 11, opts) + storage.set_snapshot_xmin("shape-2", 22, opts) + storage.set_snapshot_xmin("shape-3", 33, opts) + + storage.initialise(opts) + + assert storage.snapshot_started?("shape-1", opts) == true + assert storage.snapshot_started?("shape-2", opts) == false + assert storage.snapshot_started?("shape-3", opts) == true + end + + test "removes all shapes if the storage version has changed", %{ + module: storage, + opts: opts + } do + storage.initialise(opts) + + storage.add_shape("shape-1", @shape, opts) + storage.add_shape("shape-2", @shape, opts) + storage.add_shape("shape-3", @shape, opts) + storage.mark_snapshot_as_started("shape-1", opts) + storage.mark_snapshot_as_started("shape-2", opts) + storage.mark_snapshot_as_started("shape-3", opts) storage.make_new_snapshot!("shape-1", @shape, @query_info, @data_stream, opts) storage.make_new_snapshot!("shape-2", @shape, @query_info, @data_stream, opts) storage.make_new_snapshot!("shape-3", @shape, @query_info, @data_stream, opts) storage.set_snapshot_xmin("shape-1", 11, opts) + storage.set_snapshot_xmin("shape-2", 22, opts) storage.set_snapshot_xmin("shape-3", 33, opts) - storage.cleanup_shapes_without_xmins(opts) + storage.initialise(%{opts | version: "new-version"}) - assert storage.snapshot_exists?("shape-1", opts) == true - assert storage.snapshot_exists?("shape-2", opts) == false - assert storage.snapshot_exists?("shape-3", opts) == true + assert storage.snapshot_started?("shape-1", opts) == false + assert storage.snapshot_started?("shape-2", opts) == false + assert storage.snapshot_started?("shape-3", opts) == false end end end diff --git a/packages/sync-service/test/electric/shape_cache/storage_test.exs b/packages/sync-service/test/electric/shape_cache/storage_test.exs index 18d83aeb83..1ad17a4057 100644 --- a/packages/sync-service/test/electric/shape_cache/storage_test.exs +++ b/packages/sync-service/test/electric/shape_cache/storage_test.exs @@ -14,7 +14,7 @@ defmodule Electric.ShapeCache.StorageTest do MockStorage |> Mox.expect(:make_new_snapshot!, fn _, _, _, _, :opts -> :ok end) - |> Mox.expect(:snapshot_exists?, fn _, :opts -> true end) + |> Mox.expect(:snapshot_started?, fn _, :opts -> true end) |> Mox.expect(:get_snapshot, fn _, :opts -> {1, []} end) |> Mox.expect(:append_to_log!, fn _, _, :opts -> :ok end) |> Mox.expect(:get_log_stream, fn _, _, _, :opts -> [] end) @@ -22,7 +22,7 @@ defmodule Electric.ShapeCache.StorageTest do |> Mox.expect(:cleanup!, fn _, :opts -> :ok end) Storage.make_new_snapshot!(shape_id, %{}, %{}, [], storage) - Storage.snapshot_exists?(shape_id, storage) + Storage.snapshot_started?(shape_id, storage) Storage.get_snapshot(shape_id, storage) Storage.append_to_log!(shape_id, [], storage) Storage.get_log_stream(shape_id, LogOffset.first(), storage) diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index 3ef1527458..a1ebfc3a11 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -73,14 +73,14 @@ defmodule Electric.ShapeCacheTest do create_snapshot_fn: fn parent, shape_id, shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) end ) {shape_id, offset} = ShapeCache.get_or_create_shape_id(@shape, opts) assert offset == @zero_offset - assert :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) - assert Storage.snapshot_exists?(shape_id, storage) + assert :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) + assert Storage.snapshot_started?(shape_id, storage) end test "triggers table prep and snapshot creation only once", ctx do @@ -95,7 +95,7 @@ defmodule Electric.ShapeCacheTest do send(test_pid, {:called, :create_snapshot_fn}) GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) end ) @@ -104,7 +104,7 @@ defmodule Electric.ShapeCacheTest do # subsequent calls return the same shape_id for _ <- 1..10, do: assert({^shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts)) - assert :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) + assert :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) assert_received {:called, :prepare_tables_fn} assert_received {:called, :create_snapshot_fn} @@ -121,7 +121,7 @@ defmodule Electric.ShapeCacheTest do send(test_pid, {:called, :create_snapshot_fn}) GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) end ) @@ -147,7 +147,7 @@ defmodule Electric.ShapeCacheTest do shape_id = Task.await(create_call_1) assert shape_id == Task.await(create_call_2) - assert :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) + assert :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) # any queued calls should still return the existing shape_id # after the snapshot has been created (simulated by directly @@ -202,8 +202,7 @@ defmodule Electric.ShapeCacheTest do test "creates initial snapshot from DB data", %{storage: storage, shape_cache_opts: opts} do {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts) - assert :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) - assert Storage.snapshot_exists?(shape_id, storage) + assert :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) assert {@zero_offset, stream} = Storage.get_snapshot(shape_id, storage) assert [%{"value" => %{"value" => "test1"}}, %{"value" => %{"value" => "test2"}}] = @@ -265,7 +264,7 @@ defmodule Electric.ShapeCacheTest do ) {shape_id, _} = ShapeCache.get_or_create_shape_id(shape, opts) - assert :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) + assert :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) assert {@zero_offset, stream} = Storage.get_snapshot(shape_id, storage) assert [ @@ -284,11 +283,9 @@ defmodule Electric.ShapeCacheTest do } = map end - test "updates latest offset correctly", - %{storage: storage, shape_cache_opts: opts} do + test "updates latest offset correctly", %{shape_cache_opts: opts, storage: storage} do {shape_id, initial_offset} = ShapeCache.get_or_create_shape_id(@shape, opts) - assert :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) - assert Storage.snapshot_exists?(shape_id, storage) + assert :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) assert {^shape_id, offset_after_snapshot} = ShapeCache.get_or_create_shape_id(@shape, opts) expected_offset_after_log_entry = @@ -308,6 +305,10 @@ defmodule Electric.ShapeCacheTest do assert initial_offset == offset_after_snapshot assert offset_after_log_entry > offset_after_snapshot assert offset_after_log_entry == expected_offset_after_log_entry + + # Stop snapshot process gracefully to prevent errors being logged in the test + {_, stream} = Storage.get_snapshot(shape_id, storage) + Stream.run(stream) end test "errors if appending to untracked shape_id", %{shape_cache_opts: opts} do @@ -328,7 +329,7 @@ defmodule Electric.ShapeCacheTest do {shape_id, _} = ShapeCache.get_or_create_shape_id(shape, opts) assert {:error, %Postgrex.Error{postgres: %{code: :undefined_table}}} = - ShapeCache.wait_for_snapshot(opts[:server], shape_id) + ShapeCache.await_snapshot_start(opts[:server], shape_id) shape_id end) @@ -357,12 +358,12 @@ defmodule Electric.ShapeCacheTest do create_snapshot_fn: fn parent, shape_id, shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) end ) {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts) - assert :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) + assert :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) assert [{^shape_id, @shape, 10}] = ShapeCache.list_active_shapes(opts) end @@ -378,7 +379,7 @@ defmodule Electric.ShapeCacheTest do receive(do: ({:continue, ^ref} -> :ok)) GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) end ) @@ -391,15 +392,15 @@ defmodule Electric.ShapeCacheTest do send(pid, {:continue, ref}) - assert :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) + assert :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) assert [{^shape_id, @shape, 10}] = ShapeCache.list_active_shapes(opts) end end - describe "wait_for_snapshot/4" do + describe "await_snapshot_start/4" do setup :with_in_memory_storage - test "returns :ready for existing snapshot", %{storage: storage} = ctx do + test "returns :started for snapshots that have started", %{storage: storage} = ctx do %{shape_cache_opts: opts} = with_shape_cache(Map.put(ctx, :pool, nil), prepare_tables_fn: @prepare_tables_noop, @@ -408,10 +409,9 @@ defmodule Electric.ShapeCacheTest do {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts) - # Manually create a snapshot - Storage.make_new_snapshot!(shape_id, @shape, @basic_query_meta, [["test"]], storage) + Storage.mark_snapshot_as_started(shape_id, storage) - assert ShapeCache.wait_for_snapshot(opts[:server], shape_id) == :ready + assert ShapeCache.await_snapshot_start(opts[:server], shape_id) == :started end test "returns an error if waiting is for an unknown shape id", @@ -424,13 +424,13 @@ defmodule Electric.ShapeCacheTest do create_snapshot_fn: fn parent, shape_id, shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) end ) - assert {:error, :unknown} = ShapeCache.wait_for_snapshot(opts[:server], shape_id) + assert {:error, :unknown} = ShapeCache.await_snapshot_start(opts[:server], shape_id) - refute Storage.snapshot_exists?(shape_id, storage) + refute Storage.snapshot_started?(shape_id, storage) end test "handles buffering multiple callers correctly", ctx do @@ -447,21 +447,73 @@ defmodule Electric.ShapeCacheTest do # Sometimes only some tasks subscribe before reaching this point, and then hang # if we don't actually have a snapshot. This is kind of part of the test, because - # `wait_for_snapshot/3` should always resolve to `:ready` in concurrent situations - Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + # `await_snapshot_start/3` should always resolve to `:started` in concurrent situations + Storage.mark_snapshot_as_started(shape_id, storage) + GenServer.cast(parent, {:snapshot_started, shape_id}) + Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [[1], [2]], storage) end ) {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts) tasks = - for _ <- 1..10, do: Task.async(ShapeCache, :wait_for_snapshot, [opts[:server], shape_id]) + for _ <- 1..10 do + Task.async(fn -> + assert :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) + {_, stream} = Storage.get_snapshot(shape_id, ctx.storage) + assert Enum.count(stream) == 2 + end) + end assert_receive {:waiting_point, ref, pid} send(pid, {:continue, ref}) - assert Enum.all?(Task.await_many(tasks), &(&1 == :ready)) + Task.await_many(tasks) + end + + @tag :capture_log + test "errors while streaming from database are sent to all callers", ctx do + stream_from_database = + Stream.map(1..5, fn + 5 -> + raise "some error" + + n -> + # Sleep to allow read processes to run + Process.sleep(1) + [n] + end) + + %{shape_cache_opts: opts} = + with_shape_cache(Map.put(ctx, :pool, nil), + prepare_tables_fn: @prepare_tables_noop, + create_snapshot_fn: fn parent, shape_id, shape, _, storage -> + GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) + Storage.mark_snapshot_as_started(shape_id, storage) + GenServer.cast(parent, {:snapshot_started, shape_id}) + + Storage.make_new_snapshot!( + shape_id, + shape, + @basic_query_meta, + stream_from_database, + storage + ) + end + ) + + {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts) + + tasks = + for _ <- 1..10 do + Task.async(fn -> + :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) + {_, stream} = Storage.get_snapshot(shape_id, ctx.storage) + assert_raise RuntimeError, fn -> Stream.run(stream) end + end) + end + + Task.await_many(tasks) end test "propagates error in snapshot creation to listeners", ctx do @@ -484,7 +536,7 @@ defmodule Electric.ShapeCacheTest do ) {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts) - task = Task.async(fn -> ShapeCache.wait_for_snapshot(opts[:server], shape_id) end) + task = Task.async(fn -> ShapeCache.await_snapshot_start(opts[:server], shape_id) end) log = capture_log(fn -> @@ -510,13 +562,13 @@ defmodule Electric.ShapeCacheTest do create_snapshot_fn: fn parent, shape_id, shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) end ) {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts) Process.sleep(50) - assert :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) + assert :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) Storage.append_to_log!( shape_id, @@ -530,7 +582,7 @@ defmodule Electric.ShapeCacheTest do storage ) - assert Storage.snapshot_exists?(shape_id, storage) + assert Storage.snapshot_started?(shape_id, storage) assert Enum.count(Storage.get_log_stream(shape_id, @zero_offset, storage)) == 1 log = capture_log(fn -> ShapeCache.handle_truncate(opts[:server], shape_id) end) @@ -539,7 +591,7 @@ defmodule Electric.ShapeCacheTest do # Wait a bit for the async cleanup to complete Process.sleep(100) - refute Storage.snapshot_exists?(shape_id, storage) + refute Storage.snapshot_started?(shape_id, storage) assert Enum.count(Storage.get_log_stream(shape_id, @zero_offset, storage)) == 0 {shape_id2, _} = ShapeCache.get_or_create_shape_id(@shape, opts) assert shape_id != shape_id2 @@ -557,13 +609,13 @@ defmodule Electric.ShapeCacheTest do create_snapshot_fn: fn parent, shape_id, shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) end ) {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts) Process.sleep(50) - assert :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) + assert :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) Storage.append_to_log!( shape_id, @@ -577,7 +629,7 @@ defmodule Electric.ShapeCacheTest do storage ) - assert Storage.snapshot_exists?(shape_id, storage) + assert Storage.snapshot_started?(shape_id, storage) assert Enum.count(Storage.get_log_stream(shape_id, @zero_offset, storage)) == 1 log = capture_log(fn -> ShapeCache.clean_shape(opts[:server], shape_id) end) @@ -586,7 +638,7 @@ defmodule Electric.ShapeCacheTest do # Wait a bit for the async cleanup to complete Process.sleep(100) - refute Storage.snapshot_exists?(shape_id, storage) + refute Storage.snapshot_started?(shape_id, storage) assert Enum.count(Storage.get_log_stream(shape_id, @zero_offset, storage)) == 0 {shape_id2, _} = ShapeCache.get_or_create_shape_id(@shape, opts) assert shape_id != shape_id2 @@ -601,7 +653,7 @@ defmodule Electric.ShapeCacheTest do create_snapshot_fn: fn parent, shape_id, shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, 10}) Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) end ) @@ -625,14 +677,14 @@ defmodule Electric.ShapeCacheTest do create_snapshot_fn: fn parent, shape_id, shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, @snapshot_xmin}) Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) end ) ) test "restores shape_ids", %{shape_cache_opts: opts} = context do {shape_id1, _} = ShapeCache.get_or_create_shape_id(@shape, opts) - :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id1) + :started = ShapeCache.await_snapshot_start(opts[:server], shape_id1) restart_shape_cache(context) {shape_id2, _} = ShapeCache.get_or_create_shape_id(@shape, opts) assert shape_id1 == shape_id2 @@ -640,11 +692,11 @@ defmodule Electric.ShapeCacheTest do test "restores snapshot xmins", %{shape_cache_opts: opts} = context do {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts) - :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) + :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) [{^shape_id, @shape, @snapshot_xmin}] = ShapeCache.list_active_shapes(opts) restart_shape_cache(context) - :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) + :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) assert [{^shape_id, @shape, @snapshot_xmin}] = ShapeCache.list_active_shapes(opts) end @@ -652,13 +704,13 @@ defmodule Electric.ShapeCacheTest do test "restores latest offset", %{shape_cache_opts: opts} = context do offset = @change_offset {shape_id, _} = ShapeCache.get_or_create_shape_id(@shape, opts) - :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) + :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) :ok = ShapeCache.append_to_log!(shape_id, offset, @log_items, opts) {^shape_id, ^offset} = ShapeCache.get_or_create_shape_id(@shape, opts) restart_shape_cache(context) - :ready = ShapeCache.wait_for_snapshot(opts[:server], shape_id) + :started = ShapeCache.await_snapshot_start(opts[:server], shape_id) assert {^shape_id, ^offset} = ShapeCache.get_or_create_shape_id(@shape, opts) end @@ -673,7 +725,7 @@ defmodule Electric.ShapeCacheTest do create_snapshot_fn: fn parent, shape_id, shape, _, storage -> GenServer.cast(parent, {:snapshot_xmin_known, shape_id, @snapshot_xmin}) Storage.make_new_snapshot!(shape_id, shape, @basic_query_meta, [["test"]], storage) - GenServer.cast(parent, {:snapshot_ready, shape_id}) + GenServer.cast(parent, {:snapshot_started, shape_id}) end ) end