diff --git a/.changeset/weak-chairs-type.md b/.changeset/weak-chairs-type.md new file mode 100644 index 0000000000..41462a4a8b --- /dev/null +++ b/.changeset/weak-chairs-type.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Store shape definitions along with shape data and use that to restore them instead of persisted cached metadata. This removes the unified serilization and persistence of all shape metadata and allows better scaling of speed of shape creation. diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index 444c1f921d..d67c64425d 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -188,7 +188,8 @@ defmodule Electric.ShapeCache do {:ok, persistent_state} = opts.shape_status.initialise( persistent_kv: opts.persistent_kv, - shape_meta_table: opts.shape_meta_table + shape_meta_table: opts.shape_meta_table, + storage: opts.storage ) state = %{ diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage.ex b/packages/sync-service/lib/electric/shape_cache/file_storage.ex index fd21568ef8..6ba2dff57f 100644 --- a/packages/sync-service/lib/electric/shape_cache/file_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/file_storage.ex @@ -10,13 +10,23 @@ defmodule Electric.ShapeCache.FileStorage do @version 2 @version_key :version + @shape_definition_file_name "shape_defintion.json" + @xmin_key :snapshot_xmin @snapshot_meta_key :snapshot_meta @snapshot_started_key :snapshot_started @behaviour Electric.ShapeCache.Storage - defstruct [:base_path, :shape_id, :db, :cubdb_dir, :snapshot_dir, version: @version] + defstruct [ + :base_path, + :shape_id, + :db, + :cubdb_dir, + :shape_definition_dir, + :snapshot_dir, + version: @version + ] @impl Electric.ShapeCache.Storage def shared_opts(opts) do @@ -37,7 +47,8 @@ defmodule Electric.ShapeCache.FileStorage do shape_id: shape_id, db: name(electric_instance_id, shape_id), cubdb_dir: Path.join([base_path, shape_id, "cubdb"]), - snapshot_dir: Path.join([base_path, shape_id, "snapshots"]) + snapshot_dir: Path.join([base_path, shape_id, "snapshots"]), + shape_definition_dir: Path.join([base_path, shape_id]) } end @@ -62,7 +73,8 @@ defmodule Electric.ShapeCache.FileStorage do end defp initialise_filesystem(opts) do - with :ok <- File.mkdir_p(opts.cubdb_dir), + with :ok <- File.mkdir_p(opts.shape_definition_dir), + :ok <- File.mkdir_p(opts.cubdb_dir), :ok <- File.mkdir_p(opts.snapshot_dir) do :ok end @@ -73,6 +85,7 @@ defmodule Electric.ShapeCache.FileStorage do stored_version = stored_version(opts) if stored_version != opts.version || snapshot_xmin(opts) == nil || + not File.exists?(shape_definition_path(opts)) || not CubDB.has_key?(opts.db, @snapshot_meta_key) do cleanup!(opts) end @@ -80,6 +93,53 @@ defmodule Electric.ShapeCache.FileStorage do CubDB.put(opts.db, @version_key, @version) end + @impl Electric.ShapeCache.Storage + def set_shape_definition(shape, %FS{} = opts) do + file_path = shape_definition_path(opts) + encoded_shape = Jason.encode!(shape) + + case File.write(file_path, encoded_shape, [:exclusive]) do + :ok -> + :ok + + {:error, :eexist} -> + # file already exists - by virtue of the shape ID being the hash of the + # definition we do not need to compare them + :ok + + {:error, reason} -> + raise "Failed to write shape definition to file: #{reason}" + end + end + + @impl Electric.ShapeCache.Storage + def get_all_stored_shapes(%{base_path: base_path}) do + case File.ls(base_path) do + {:ok, shape_ids} -> + Enum.reduce(shape_ids, %{}, fn shape_id, acc -> + shape_def_path = + shape_definition_path(%{shape_definition_dir: Path.join(base_path, shape_id)}) + + with {:ok, shape_def_encoded} <- File.read(shape_def_path), + {:ok, shape_def_json} <- Jason.decode(shape_def_encoded), + shape = Electric.Shapes.Shape.from_json_safe!(shape_def_json) do + Map.put(acc, shape_id, shape) + else + # if the shape definition file cannot be read/decoded, just ignore it + {:error, _reason} -> acc + end + end) + |> then(&{:ok, &1}) + + {:error, :enoent} -> + # if not present, there's no stored shapes + {:ok, %{}} + + {:error, reason} -> + {:error, reason} + end + end + @impl Electric.ShapeCache.Storage def get_current_position(%FS{} = opts) do {:ok, latest_offset(opts), snapshot_xmin(opts)} @@ -253,9 +313,15 @@ defmodule Electric.ShapeCache.FileStorage do {:ok, _} = File.rm_rf(shape_snapshot_path(opts)) + {:ok, _} = File.rm_rf(shape_definition_path(opts)) + :ok end + defp shape_definition_path(%{shape_definition_dir: shape_definition_dir} = _opts) do + Path.join(shape_definition_dir, @shape_definition_file_name) + end + defp keys_from_range(min_key, max_key, opts) do CubDB.select(opts.db, min_key: min_key, max_key: max_key) |> Stream.map(&elem(&1, 0)) diff --git a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex index f052dc5e10..ef212ba901 100644 --- a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex @@ -106,6 +106,18 @@ defmodule Electric.ShapeCache.InMemoryStorage do @impl Electric.ShapeCache.Storage def initialise(%MS{} = _opts), do: :ok + @impl Electric.ShapeCache.Storage + def set_shape_definition(_shape, %MS{} = _opts) do + # no-op - only used to restore shapes between sessions + :ok + end + + @impl Electric.ShapeCache.Storage + def get_all_stored_shapes(_opts) do + # shapes not stored, empty map returned + {:ok, %{}} + end + @impl Electric.ShapeCache.Storage def snapshot_started?(%MS{} = opts) do try do diff --git a/packages/sync-service/lib/electric/shape_cache/shape_status.ex b/packages/sync-service/lib/electric/shape_cache/shape_status.ex index 1834264b3b..c8783b925d 100644 --- a/packages/sync-service/lib/electric/shape_cache/shape_status.ex +++ b/packages/sync-service/lib/electric/shape_cache/shape_status.ex @@ -48,16 +48,18 @@ defmodule Electric.ShapeCache.ShapeStatus do """ alias Electric.PersistentKV alias Electric.Shapes.Shape + alias Electric.ShapeCache.Storage alias Electric.Replication.LogOffset alias Electric.Replication.Changes.{Column, Relation} @schema NimbleOptions.new!( persistent_kv: [type: :any, required: true], shape_meta_table: [type: {:or, [:atom, :reference]}, required: true], + storage: [type: :mod_arg, required: true], root: [type: :string, default: "./shape_cache"] ) - defstruct [:persistent_kv, :root, :shape_meta_table] + defstruct [:persistent_kv, :root, :shape_meta_table, :storage] @type shape_id() :: Electric.ShapeCacheBehaviour.shape_id() @type xmin() :: Electric.ShapeCacheBehaviour.xmin() @@ -65,6 +67,7 @@ defmodule Electric.ShapeCache.ShapeStatus do @type t() :: %__MODULE__{ persistent_kv: PersistentKV.t(), root: String.t(), + storage: Storage.storage(), shape_meta_table: table() } @type option() :: unquote(NimbleOptions.option_typespec(@schema)) @@ -82,11 +85,12 @@ defmodule Electric.ShapeCache.ShapeStatus do def initialise(opts) do with {:ok, config} <- NimbleOptions.validate(opts, @schema), {:ok, kv_backend} <- Access.fetch(config, :persistent_kv), - {:ok, table_name} = Access.fetch(config, :shape_meta_table) do + {:ok, table_name} = Access.fetch(config, :shape_meta_table), + {:ok, storage} = Access.fetch(config, :storage) do persistent_kv = PersistentKV.Serialized.new!( backend: kv_backend, - decoder: {__MODULE__, :decode_shapes, []} + decoder: {__MODULE__, :decode_relations, []} ) meta_table = :ets.new(table_name, [:named_table, :public, :ordered_set]) @@ -94,14 +98,18 @@ defmodule Electric.ShapeCache.ShapeStatus do state = struct( __MODULE__, - Keyword.merge(config, persistent_kv: persistent_kv, shape_meta_table: meta_table) + Keyword.merge(config, + persistent_kv: persistent_kv, + shape_meta_table: meta_table, + storage: storage + ) ) load(state) end end - @spec add_shape(t(), Shape.t()) :: {:ok, shape_id(), LogOffset.t()} | {:error, term()} + @spec add_shape(t(), Shape.t()) :: {:ok, shape_id()} | {:error, term()} def add_shape(state, shape) do {hash, shape_id} = Shape.generate_id(shape) # fresh snapshots always start with a zero offset - only once they @@ -117,9 +125,7 @@ defmodule Electric.ShapeCache.ShapeStatus do ] ) - with :ok <- save(state) do - {:ok, shape_id} - end + {:ok, shape_id} end @spec list_shapes(t()) :: [{shape_id(), Shape.t()}] @@ -151,9 +157,7 @@ defmodule Electric.ShapeCache.ShapeStatus do ] ) - with :ok <- save(state) do - {:ok, shape} - end + {:ok, shape} rescue # Sometimes we're calling cleanup when snapshot creation has failed for # some reason. In those cases we're not sure about the state of the ETS @@ -295,11 +299,10 @@ defmodule Electric.ShapeCache.ShapeStatus do end @doc false - def decode_shapes(json) do - with {:ok, %{"shapes" => shapes, "relations" => relations}} <- Jason.decode(json) do + def decode_relations(json) do + with {:ok, %{"relations" => relations}} <- Jason.decode(json) do {:ok, %{ - shapes: Map.new(shapes, fn {id, shape} -> {id, Shape.from_json_safe!(shape)} end), relations: Map.new(relations, fn %{"id" => id} = relation -> {id, relation_from_json(relation)} @@ -325,21 +328,20 @@ defmodule Electric.ShapeCache.ShapeStatus do end defp save(state) do - shapes = Map.new(list_shapes(state)) relations = list_relations(state) PersistentKV.set( state.persistent_kv, key(state), %{ - shapes: shapes, relations: relations } ) end defp load(state) do - with {:ok, %{shapes: shapes, relations: relations}} <- load_shapes(state) do + with {:ok, %{relations: relations}} <- load_relations(state), + {:ok, shapes} <- Storage.get_all_stored_shapes(state.storage) do :ets.insert( state.shape_meta_table, Enum.concat([ @@ -363,9 +365,9 @@ defmodule Electric.ShapeCache.ShapeStatus do end end - defp load_shapes(state) do + defp load_relations(state) do case PersistentKV.get(state.persistent_kv, key(state)) do - {:ok, %{shapes: _shapes, relations: _relations} = data} -> + {:ok, %{relations: _relations} = data} -> {:ok, data} {:error, :not_found} -> diff --git a/packages/sync-service/lib/electric/shape_cache/storage.ex b/packages/sync-service/lib/electric/shape_cache/storage.ex index 9a33b4f343..acdf75ac6d 100644 --- a/packages/sync-service/lib/electric/shape_cache/storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/storage.ex @@ -1,6 +1,7 @@ defmodule Electric.ShapeCache.Storage do import Electric.Replication.LogOffset, only: [is_log_offset_lt: 2] + alias Electric.Shapes.Shape alias Electric.Shapes.Querying alias Electric.Replication.LogOffset @@ -32,6 +33,13 @@ defmodule Electric.ShapeCache.Storage do @doc "Run any initial setup tasks" @callback initialise(shape_opts()) :: :ok + @doc "Store the shape definition" + @callback set_shape_definition(Shape.t(), shape_opts()) :: :ok + + @doc "Retrieve all stored shapes" + @callback get_all_stored_shapes(compiled_opts()) :: + {:ok, %{shape_id() => Shape.t()}} | {:error, term()} + @doc """ Get the current xmin and offset for the shape storage. @@ -121,6 +129,16 @@ defmodule Electric.ShapeCache.Storage do mod.initialise(shape_opts) end + @impl __MODULE__ + def set_shape_definition(shape, {mod, shape_opts}) do + mod.set_shape_definition(shape, shape_opts) + end + + @impl __MODULE__ + def get_all_stored_shapes({mod, opts}) do + mod.get_all_stored_shapes(opts) + end + @impl __MODULE__ def get_current_position({mod, shape_opts}) do mod.get_current_position(shape_opts) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 228d580c10..3085cb696c 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -58,6 +58,9 @@ defmodule Electric.Shapes.Consumer do :ok = ShapeCache.Storage.initialise(storage) + # Store the shape definition to ensure we can restore it + :ok = ShapeCache.Storage.set_shape_definition(config.shape, storage) + {:ok, latest_offset, snapshot_xmin} = ShapeCache.Storage.get_current_position(storage) :ok = diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index b17c8aacbd..854a043e91 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -8,14 +8,14 @@ defmodule Electric.Replication.ShapeLogCollectorTest do alias Electric.Replication.LogOffset alias Support.Mock - import Support.ComponentSetup, only: [with_electric_instance_id: 1] + import Support.ComponentSetup, only: [with_electric_instance_id: 1, with_in_memory_storage: 1] import Mox @moduletag :capture_log setup :verify_on_exit! - setup :with_electric_instance_id + setup [:with_electric_instance_id, :with_in_memory_storage] setup(ctx) do # Start a test Registry @@ -32,7 +32,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do {:ok, pid} = start_supervised({ShapeLogCollector, opts}) Mock.ShapeStatus - |> expect(:initialise, 1, fn opts -> Electric.ShapeCache.ShapeStatus.initialise(opts) end) + |> expect(:initialise, 1, fn _opts -> {:ok, %{}} end) |> expect(:list_shapes, 1, fn _ -> [] end) # allow the ShapeCache to call this mock |> allow(self(), fn -> GenServer.whereis(Electric.ShapeCache) end) diff --git a/packages/sync-service/test/electric/shape_cache/shape_status_test.exs b/packages/sync-service/test/electric/shape_cache/shape_status_test.exs index 5d378e9050..e7deca21cd 100644 --- a/packages/sync-service/test/electric/shape_cache/shape_status_test.exs +++ b/packages/sync-service/test/electric/shape_cache/shape_status_test.exs @@ -8,6 +8,11 @@ defmodule Electric.ShapeCache.ShapeStatusTest do alias Electric.Shapes.Shape alias Support.StubInspector + alias Support.Mock + import Mox + + setup :verify_on_exit! + defp shape! do assert {:ok, %Shape{where: %{query: "value = 'test'"}} = shape} = Shape.new("other_table", inspector: {__MODULE__, nil}, where: "value = 'test'") @@ -25,13 +30,21 @@ defmodule Electric.ShapeCache.ShapeStatusTest do defp table_name, do: :"#{__MODULE__}-#{System.unique_integer([:positive, :monotonic])}" setup do - [kv: PersistentKV.Memory.new!()] + [persistent_kv: PersistentKV.Memory.new!()] end defp new_state(ctx, opts \\ []) do table = Keyword.get(opts, :table, table_name()) - {:ok, state} = ShapeStatus.initialise(persistent_kv: ctx.kv, shape_meta_table: table) + Mock.Storage + |> expect(:get_all_stored_shapes, 1, fn _ -> {:ok, Access.get(opts, :stored_shapes, %{})} end) + + {:ok, state} = + ShapeStatus.initialise( + persistent_kv: ctx.persistent_kv, + storage: {Mock.Storage, []}, + shape_meta_table: table + ) shapes = Keyword.get(opts, :shapes, []) @@ -49,13 +62,25 @@ defmodule Electric.ShapeCache.ShapeStatusTest do assert [] = ShapeStatus.list_shapes(state) end - test "can add and retain shapes", ctx do + test "can recover shapes from storage", ctx do {:ok, state, []} = new_state(ctx) shape = shape!() assert {:ok, shape_id} = ShapeStatus.add_shape(state, shape) + + {:ok, state, []} = + new_state(ctx, + stored_shapes: %{ + shape_id => shape + } + ) + assert [{^shape_id, ^shape}] = ShapeStatus.list_shapes(state) + end + test "can add shapes", ctx do {:ok, state, []} = new_state(ctx) + shape = shape!() + assert {:ok, shape_id} = ShapeStatus.add_shape(state, shape) assert [{^shape_id, ^shape}] = ShapeStatus.list_shapes(state) end @@ -73,9 +98,6 @@ defmodule Electric.ShapeCache.ShapeStatusTest do assert {:ok, ^shape_1} = ShapeStatus.remove_shape(state, shape_id_1) assert [{^shape_id_2, ^shape_2}] = ShapeStatus.list_shapes(state) - - {:ok, state, []} = new_state(ctx) - assert [{^shape_id_2, ^shape_2}] = ShapeStatus.list_shapes(state) end test "get_existing_shape/2 with %Shape{}", ctx do @@ -87,9 +109,6 @@ defmodule Electric.ShapeCache.ShapeStatusTest do assert {:ok, shape_id} = ShapeStatus.add_shape(state, shape) assert {^shape_id, _} = ShapeStatus.get_existing_shape(state, shape) - {:ok, state, []} = new_state(ctx) - assert {^shape_id, _} = ShapeStatus.get_existing_shape(state, shape) - assert {:ok, ^shape} = ShapeStatus.remove_shape(state, shape_id) refute ShapeStatus.get_existing_shape(state, shape) end @@ -103,10 +122,6 @@ defmodule Electric.ShapeCache.ShapeStatusTest do assert {^shape_id, _} = ShapeStatus.get_existing_shape(state, shape) assert {^shape_id, _} = ShapeStatus.get_existing_shape(state, shape_id) - {:ok, state, []} = new_state(ctx) - assert {^shape_id, _} = ShapeStatus.get_existing_shape(state, shape) - assert {^shape_id, _} = ShapeStatus.get_existing_shape(state, shape_id) - assert {:ok, ^shape} = ShapeStatus.remove_shape(state, shape_id) refute ShapeStatus.get_existing_shape(state, shape) refute ShapeStatus.get_existing_shape(state, shape_id) @@ -116,20 +131,13 @@ defmodule Electric.ShapeCache.ShapeStatusTest do shape = shape!() table = table_name() - {:ok, _state, [shape_id]} = new_state(ctx, table: table, shapes: [shape]) + {:ok, state, [shape_id]} = new_state(ctx, table: table, shapes: [shape]) refute ShapeStatus.get_existing_shape(table, "1234") assert {^shape_id, _} = ShapeStatus.get_existing_shape(table, shape) assert {^shape_id, _} = ShapeStatus.get_existing_shape(table, shape_id) - table = table_name() - - {:ok, state, []} = new_state(ctx, table: table) - - assert {^shape_id, _} = ShapeStatus.get_existing_shape(table, shape) - assert {^shape_id, _} = ShapeStatus.get_existing_shape(table, shape_id) - assert {:ok, ^shape} = ShapeStatus.remove_shape(state, shape_id) refute ShapeStatus.get_existing_shape(table, shape) refute ShapeStatus.get_existing_shape(table, shape_id) diff --git a/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs b/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs index da1cc3d276..26c04a0e3f 100644 --- a/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs +++ b/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs @@ -1,6 +1,7 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do use ExUnit.Case, async: true + alias Electric.Shapes.Shape alias Electric.ShapeCache.FileStorage alias Electric.Postgres.Lsn alias Electric.Replication.LogOffset @@ -14,6 +15,15 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do @moduletag :tmp_dir @shape_id "the-shape-id" + @shape %Shape{ + root_table: {"public", "items"}, + table_info: %{ + {"public", "items"} => %{ + columns: [%{name: "id", type: :text}, %{name: "value", type: :text}], + pk: ["id"] + } + } + } @snapshot_offset LogOffset.first() @snapshot_offset_encoded to_string(@snapshot_offset) @@ -429,12 +439,30 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do setup :start_storage + test "removes the shape if the shape definition has not been set", %{ + module: storage, + opts: opts + } do + storage.initialise(opts) + + # storage.set_shape_definition(@shape, opts) + storage.mark_snapshot_as_started(opts) + storage.make_new_snapshot!(@data_stream, opts) + storage.set_snapshot_xmin(11, opts) + assert storage.snapshot_started?(opts) + + storage.initialise(opts) + + refute storage.snapshot_started?(opts) + end + test "removes the shape if the snapshot_xmin has not been set", %{ module: storage, opts: opts } do storage.initialise(opts) + storage.set_shape_definition(@shape, opts) storage.mark_snapshot_as_started(opts) storage.make_new_snapshot!(@data_stream, opts) # storage.set_snapshot_xmin(11, opts) @@ -451,6 +479,7 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do } do storage.initialise(opts) + storage.set_shape_definition(@shape, opts) storage.mark_snapshot_as_started(opts) storage.set_snapshot_xmin(22, opts) @@ -465,6 +494,7 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do } do storage.initialise(opts) + storage.set_shape_definition(@shape, opts) storage.mark_snapshot_as_started(opts) storage.make_new_snapshot!(@data_stream, opts) storage.set_snapshot_xmin(11, opts) @@ -474,6 +504,32 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do refute storage.snapshot_started?(opts) end end + + describe "#{module_name}.get_all_stored_shapes/1" do + setup do + {:ok, %{module: unquote(module)}} + end + + setup :start_storage + + test "retrieves no shapes if no shapes persisted", %{ + module: storage, + opts: opts + } do + assert {:ok, %{}} = Electric.ShapeCache.Storage.get_all_stored_shapes({storage, opts}) + end + + test "retrieves stored shapes", %{ + module: storage, + opts: opts + } do + storage.initialise(opts) + storage.set_shape_definition(@shape, opts) + + assert {:ok, %{@shape_id => @shape}} = + Electric.ShapeCache.Storage.get_all_stored_shapes({storage, opts}) + end + end end defp start_storage(%{module: module} = context) do diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index df77f46e3c..e4a0754187 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -150,6 +150,8 @@ defmodule Electric.Shapes.ConsumerTest do id: {Shapes.Consumer.Supervisor, shape_id} ) + assert_receive {Support.TestStorage, :set_shape_definition, ^shape_id, ^shape} + consumer end @@ -547,7 +549,7 @@ defmodule Electric.Shapes.ConsumerTest do end def init(_opts) do - {:ok, %{shapes: %{}, crashing: %{}}} + {:ok, %{shapes: %{}, crashing: %{}, stored_shapes: %{}}} end def handle_call({:crash_once, shape_id}, _from, state) do @@ -560,6 +562,14 @@ defmodule Electric.Shapes.ConsumerTest do {:reply, crash?, %{state | crashing: crashing}} end + def handle_call({:set_shape_definition, shape_id, shape}, _from, state) do + {:reply, :ok, %{state | stored_shapes: Map.put(state.stored_shapes, shape_id, shape)}} + end + + def handle_call({:get_all_stored_shapes}, _from, state) do + {:reply, {:ok, state.stored_shapes}, state} + end + def handle_call({:get_current_position, shape_id}, _from, state) do %{latest_offset: offset, snapshot_xmin: xmin} = Map.get(state.shapes, shape_id, %{latest_offset: LogOffset.first(), snapshot_xmin: nil}) @@ -603,6 +613,14 @@ defmodule Electric.Shapes.ConsumerTest do :ok end + def get_all_stored_shapes(opts) do + GenServer.call(opts.backend, {:get_all_stored_shapes}) + end + + def set_shape_definition(shape, opts) do + GenServer.call(opts.backend, {:set_shape_definition, opts.shape_id, shape}) + end + def snapshot_started?(opts) do GenServer.call(opts.backend, {:snapshot_started?, opts.shape_id}) end diff --git a/packages/sync-service/test/support/test_storage.ex b/packages/sync-service/test/support/test_storage.ex index f4cfec8360..ccb253c1b2 100644 --- a/packages/sync-service/test/support/test_storage.ex +++ b/packages/sync-service/test/support/test_storage.ex @@ -64,6 +64,18 @@ defmodule Support.TestStorage do end end + @impl Electric.ShapeCache.Storage + def set_shape_definition(shape, {parent, shape_id, _, storage}) do + send(parent, {__MODULE__, :set_shape_definition, shape_id, shape}) + Storage.set_shape_definition(shape, storage) + end + + @impl Electric.ShapeCache.Storage + def get_all_stored_shapes({parent, _init, storage}) do + send(parent, {__MODULE__, :get_all_stored_shapes}) + Storage.get_all_stored_shapes(storage) + end + @impl Electric.ShapeCache.Storage def get_current_position({parent, shape_id, _, storage}) do send(parent, {__MODULE__, :get_current_position, shape_id})