From d7e7c72234e98ef2cb7e123bbba66e87d4d317d2 Mon Sep 17 00:00:00 2001 From: Stefanos Mousafeiris Date: Tue, 17 Dec 2024 12:54:23 +0200 Subject: [PATCH] feat(sync-service): Clean up publication filters (#2154) Closes https://github.com/electric-sql/electric/issues/1774 This work started to introduce column filters (see https://github.com/electric-sql/electric/issues/1831) but ended up on a road block because of us using `REPLICA IDENTITY FULL` - however the work also takes care of cleaning up filters. - Introduced singular process for updating publication - we were locking on it before anyway, might as well linearise it ourselves. - Process maintains reference counted structure for the filters per relation, including where clauses and filtered columns, in order to produce correct overall filters per relation - Update to the publication is debounced to allow batching together many shape creations - Every update does a complete rewrite of the publication filters so they are maintained clean - but also introduced a `remove_shape` call so that if electric remains with no shapes it should also have no subscriptions to tables. ## TODOs - [x] Write tests for `PublicationManager` - [x] Write procedure for recovering in-memory state from `shape_status.list_shapes` in `recover_shapes` - [ ] Split where clauses at top-level `AND`s to improve filter optimality (suggested be @icehaunter ) - [edit: not doing this now, as we can be smart about this an do even more "merging" of where clauses like `x = 1` and `x = 2` to `x in (1, 2)` - separate PR] --- .changeset/wild-bugs-raise.md | 5 + .../lib/electric/connection/manager.ex | 2 + .../lib/electric/connection/supervisor.ex | 9 + .../lib/electric/postgres/configuration.ex | 247 +++++++---- .../lib/electric/replication/eval/expr.ex | 22 + .../replication/publication_manager.ex | 400 ++++++++++++++++++ .../lib/electric/replication/supervisor.ex | 3 +- .../sync-service/lib/electric/shape_cache.ex | 15 +- .../lib/electric/shapes/consumer.ex | 7 +- .../electric/shapes/consumer/snapshotter.ex | 49 +-- .../electric/shapes/consumer_supervisor.ex | 2 +- .../sync-service/lib/electric/shapes/shape.ex | 12 +- .../lib/electric/stack_supervisor.ex | 24 +- .../electric/postgres/configuration_test.exs | 188 +++++--- .../electric/replication/eval/expr_test.exs | 7 + .../replication/publication_manager_test.exs | 256 +++++++++++ .../replication/shape_log_collector_test.exs | 12 +- .../test/electric/shape_cache_test.exs | 85 ++-- .../test/electric/shapes/consumer_test.exs | 65 ++- .../test/support/component_setup.ex | 47 +- .../sync-service/test/support/db_setup.ex | 2 +- packages/sync-service/test/support/mocks.ex | 4 + .../typescript-client/test/client.test.ts | 6 + 23 files changed, 1200 insertions(+), 269 deletions(-) create mode 100644 .changeset/wild-bugs-raise.md create mode 100644 packages/sync-service/lib/electric/replication/publication_manager.ex create mode 100644 packages/sync-service/test/electric/replication/eval/expr_test.exs create mode 100644 packages/sync-service/test/electric/replication/publication_manager_test.exs diff --git a/.changeset/wild-bugs-raise.md b/.changeset/wild-bugs-raise.md new file mode 100644 index 0000000000..705df56558 --- /dev/null +++ b/.changeset/wild-bugs-raise.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Introduced `PublicationManager` process to create and clean up publication filters. diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index ded8707935..2939d19e29 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -319,6 +319,8 @@ defmodule Electric.Connection.Manager do Electric.Connection.Supervisor.start_shapes_supervisor( stack_id: state.stack_id, shape_cache_opts: shape_cache_opts, + pool_opts: state.pool_opts, + replication_opts: state.replication_opts, stack_events_registry: state.stack_events_registry, tweaks: state.tweaks ) diff --git a/packages/sync-service/lib/electric/connection/supervisor.ex b/packages/sync-service/lib/electric/connection/supervisor.ex index f4acdd0968..2fca1ef689 100644 --- a/packages/sync-service/lib/electric/connection/supervisor.ex +++ b/packages/sync-service/lib/electric/connection/supervisor.ex @@ -38,10 +38,18 @@ defmodule Electric.Connection.Supervisor do def start_shapes_supervisor(opts) do stack_id = Keyword.fetch!(opts, :stack_id) shape_cache_opts = Keyword.fetch!(opts, :shape_cache_opts) + db_pool_opts = Keyword.fetch!(opts, :pool_opts) + replication_opts = Keyword.fetch!(opts, :replication_opts) inspector = Keyword.fetch!(shape_cache_opts, :inspector) shape_cache_spec = {Electric.ShapeCache, shape_cache_opts} + publication_manager_spec = + {Electric.Replication.PublicationManager, + stack_id: stack_id, + publication_name: Keyword.fetch!(replication_opts, :publication_name), + db_pool: Keyword.fetch!(db_pool_opts, :name)} + shape_log_collector_spec = {Electric.Replication.ShapeLogCollector, stack_id: stack_id, inspector: inspector} @@ -51,6 +59,7 @@ defmodule Electric.Connection.Supervisor do Electric.Replication.Supervisor, stack_id: stack_id, shape_cache: shape_cache_spec, + publication_manager: publication_manager_spec, log_collector: shape_log_collector_spec }, restart: :temporary diff --git a/packages/sync-service/lib/electric/postgres/configuration.ex b/packages/sync-service/lib/electric/postgres/configuration.ex index 04e050e87b..c14fae16d8 100644 --- a/packages/sync-service/lib/electric/postgres/configuration.ex +++ b/packages/sync-service/lib/electric/postgres/configuration.ex @@ -4,11 +4,10 @@ defmodule Electric.Postgres.Configuration do a provided connection. """ require Logger + alias Electric.Replication.PublicationManager.RelationFilter alias Electric.Utils - alias Electric.Shapes.Shape - @type filter() :: String.t() | nil - @type filters() :: %{Electric.relation() => filter()} + @type filters() :: %{Electric.relation() => RelationFilter.t()} @pg_15 150_000 @@ -25,34 +24,75 @@ defmodule Electric.Postgres.Configuration do """ @spec configure_tables_for_replication!( Postgrex.conn(), - [Shape.table_with_where_clause()], - (-> String.t()), + filters(), + String.t(), float() ) :: {:ok, [:ok]} - def configure_tables_for_replication!(pool, relations, get_pg_version, publication_name) do + def configure_tables_for_replication!(pool, relation_filters, pg_version, publication_name) do configure_tables_for_replication_internal!( pool, - relations, - get_pg_version.(), + relation_filters, + pg_version, publication_name ) end - defp configure_tables_for_replication_internal!(pool, relations, pg_version, publication_name) + @doc """ + Get Postgres server version + """ + @spec get_pg_version(Postgrex.conn()) :: {:ok, non_neg_integer()} | {:error, term()} + def get_pg_version(conn) do + case Postgrex.query( + conn, + "SELECT current_setting('server_version_num') server_version_num", + [] + ) do + {:ok, result} when result.num_rows == 1 -> + [[version_str]] = result.rows + {:ok, String.to_integer(version_str)} + + {:error, err} -> + {:error, err} + end + end + + defp configure_tables_for_replication_internal!( + pool, + relation_filters, + pg_version, + publication_name + ) when pg_version < @pg_15 do Postgrex.transaction(pool, fn conn -> - for {relation, _} <- relations, - table = Utils.relation_to_sql(relation), - publication = Utils.quote_name(publication_name) do + publication = Utils.quote_name(publication_name) + + relation_filters = filter_for_existing_relations(conn, relation_filters) + + prev_published_tables = + get_publication_tables(conn, publication_name) + |> Enum.map(&Utils.relation_to_sql/1) + |> MapSet.new() + + new_published_tables = + relation_filters + |> Map.keys() + |> Enum.map(&Utils.relation_to_sql/1) + |> MapSet.new() + + alter_ops = + Enum.concat( + MapSet.difference(new_published_tables, prev_published_tables) + |> Enum.map(&{&1, "ADD"}), + MapSet.difference(prev_published_tables, new_published_tables) + |> Enum.map(&{&1, "DROP"}) + ) + + for {table, op} <- alter_ops do Postgrex.query!(conn, "SAVEPOINT before_publication", []) # PG 14 and below do not support filters on tables of publications - case Postgrex.query( - conn, - "ALTER PUBLICATION #{publication} ADD TABLE #{table}", - [] - ) do + case Postgrex.query(conn, "ALTER PUBLICATION #{publication} #{op} TABLE #{table}", []) do {:ok, _} -> Postgrex.query!(conn, "RELEASE SAVEPOINT before_publication", []) :ok @@ -68,41 +108,38 @@ defmodule Electric.Postgres.Configuration do end end - set_replica_identity!(conn, relations) + set_replica_identity!(conn, relation_filters) end) end - defp configure_tables_for_replication_internal!(pool, relations, _pg_version, publication_name) do + defp configure_tables_for_replication_internal!( + pool, + relation_filters, + _pg_version, + publication_name + ) do Postgrex.transaction(pool, fn conn -> - # We're using advisory locks to prevent race conditions when multiple - # processes try to read-then-update the publication configuration. We're not using `SELECT FOR UPDATE` - # because it doesn't read the value that was updated by other transaction holding the lock. This lock - # is thus acquired before reading the existing configuration, so the first read sees the latest value. - Postgrex.query!(conn, "SELECT pg_advisory_xact_lock($1)", [:erlang.phash2(publication_name)]) + # Ensure that all tables are present in the publication + relation_filters = filter_for_existing_relations(conn, relation_filters) - filters = get_publication_filters(conn, publication_name) - - # Get the existing filter for the table - # and extend it with the where clause for the table - # and update the table in the map with the new filter - filters = - Enum.reduce(relations, filters, fn {relation, clause}, acc -> - Map.update(acc, relation, clause, &extend_where_clause(&1, clause)) - end) - - Postgrex.query!(conn, make_alter_publication_query(publication_name, filters), []) + # Update the entire publication with the new filters + Postgrex.query!( + conn, + make_alter_publication_query(publication_name, relation_filters), + [] + ) # `ALTER TABLE` should be after the publication altering, because it takes out an exclusive lock over this table, # but the publication altering takes out a shared lock on all mentioned tables, so a concurrent transaction will # deadlock if the order is reversed. - set_replica_identity!(conn, relations) + set_replica_identity!(conn, relation_filters) [:ok] end) end - defp set_replica_identity!(conn, relations) do - for {relation, _} <- relations, + defp set_replica_identity!(conn, relation_filters) do + for %RelationFilter{relation: relation} <- Map.values(relation_filters), table = Utils.relation_to_sql(relation) do %Postgrex.Result{rows: [[correct_identity?]]} = Postgrex.query!( @@ -118,72 +155,100 @@ defmodule Electric.Postgres.Configuration do end end - # Returns the filters grouped by table for the given publication. - @spec get_publication_filters(Postgrex.conn(), String.t()) :: filters() - defp get_publication_filters(conn, publication) do + @spec get_publication_tables(Postgrex.conn(), String.t()) :: list(Electric.relation()) + defp get_publication_tables(conn, publication) do Postgrex.query!( conn, - "SELECT schemaname, tablename, rowfilter FROM pg_publication_tables WHERE pubname = $1", + "SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = $1", [publication] ) |> Map.fetch!(:rows) - |> Enum.map(&{Enum.take(&1, 2) |> List.to_tuple(), Enum.at(&1, 2)}) - |> Map.new() - end - - @doc """ - Drops all tables from the given publication. - """ - @spec drop_all_publication_tables(Postgrex.conn(), String.t()) :: Postgrex.Result.t() - def drop_all_publication_tables(conn, publication_name) do - Postgrex.query!( - conn, - " - DO $$ - DECLARE - r RECORD; - BEGIN - FOR r IN (SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = '#{publication_name}') - LOOP - EXECUTE 'ALTER PUBLICATION #{Utils.quote_name(publication_name)} DROP TABLE ' || r.schemaname || '.' || r.tablename || ';'; - END LOOP; - END $$; - ", - [] - ) - end - - # Joins the existing filter for the table with the where clause for the table. - # If one of them is `nil` (i.e. no filter) then the resulting filter is `nil`. - @spec extend_where_clause(filter(), filter()) :: filter() - defp extend_where_clause(filter, where_clause) when is_nil(filter) or is_nil(where_clause) do - nil - end - - defp extend_where_clause(filter, where_clause) do - "(#{filter} OR #{where_clause})" + |> Enum.map(&(Enum.take(&1, 2) |> List.to_tuple())) end # Makes an SQL query that alters the given publication whith the given tables and filters. @spec make_alter_publication_query(String.t(), filters()) :: String.t() defp make_alter_publication_query(publication_name, filters) do - base_sql = "ALTER PUBLICATION #{Utils.quote_name(publication_name)} SET TABLE " - - tables = - filters - |> Enum.map(&make_table_clause/1) - |> Enum.join(", ") - - base_sql <> tables + case Map.values(filters) do + [] -> + """ + DO $$ + DECLARE + tables TEXT; + BEGIN + SELECT string_agg(format('%I.%I', schemaname, tablename), ', ') + INTO tables + FROM pg_publication_tables + WHERE pubname = '#{publication_name}' ; + + IF tables IS NOT NULL THEN + EXECUTE format('ALTER PUBLICATION #{Utils.quote_name(publication_name)} DROP TABLE %s', tables); + END IF; + END $$; + """ + + filters -> + base_sql = "ALTER PUBLICATION #{Utils.quote_name(publication_name)} SET TABLE " + + tables = + filters + |> Enum.map(&make_table_clause/1) + |> Enum.join(", ") + + base_sql <> tables + end end - @spec make_table_clause({Electric.relation(), filter()}) :: String.t() - defp make_table_clause({{schema, tbl}, nil}) do - Utils.relation_to_sql({schema, tbl}) + @spec filter_for_existing_relations(Postgrex.conn(), filters()) :: filters() + defp filter_for_existing_relations(conn, filters) do + query = """ + WITH input_relations AS ( + SELECT + UNNEST($1::text[]) AS schemaname, + UNNEST($2::text[]) AS tablename + ) + SELECT ir.schemaname, ir.tablename + FROM input_relations ir + JOIN pg_class pc ON pc.relname = ir.tablename + JOIN pg_namespace pn ON pn.oid = pc.relnamespace + WHERE pn.nspname = ir.schemaname AND pc.relkind = 'r'; + """ + + relations = Map.keys(filters) + + Postgrex.query!(conn, query, [ + Enum.map(relations, &elem(&1, 0)), + Enum.map(relations, &elem(&1, 1)) + ]) + |> Map.fetch!(:rows) + |> Enum.map(&List.to_tuple/1) + |> Enum.reduce(%{}, fn rel, new_filters -> + case Map.get(filters, rel) do + nil -> new_filters + filter -> Map.put(new_filters, rel, filter) + end + end) end - defp make_table_clause({{schema, tbl}, where}) do - table = Utils.relation_to_sql({schema, tbl}) - table <> " WHERE " <> where + @spec make_table_clause(RelationFilter.t()) :: String.t() + defp make_table_clause(%RelationFilter{ + relation: relation, + where_clauses: where_clauses + # selected_columns: cols + }) do + table = Utils.relation_to_sql(relation) + + # NOTE: cannot filter on columns with REPLICA IDENTITY FULL + # cols = if cols == nil, do: "", else: " (#{Enum.join(cols, ", ")})" + cols = "" + + where = + if where_clauses == nil, + do: "", + else: + " WHERE " <> + "(#{where_clauses |> Enum.map(& &1.query) |> Enum.join(" OR ")})" + + table <> cols <> where end end diff --git a/packages/sync-service/lib/electric/replication/eval/expr.ex b/packages/sync-service/lib/electric/replication/eval/expr.ex index 56729e7541..ac2d27f30b 100644 --- a/packages/sync-service/lib/electric/replication/eval/expr.ex +++ b/packages/sync-service/lib/electric/replication/eval/expr.ex @@ -15,4 +15,26 @@ defmodule Electric.Replication.Eval.Expr do used_refs: used_refs(), returns: Env.pg_type() } + + @doc """ + Returns a flat list of all used refs used in the expression + that point to the current table + + ## Examples + + iex> used_refs = %{["id"] => :int8, ["created_at"] => :timestamp} + iex> unqualified_refs(%Expr{query: "id = 1", used_refs: used_refs}) + ["created_at", "id"] + + iex> used_refs = %{["id"] => :int8, ["potato", "created_at"] => :timestamp} + iex> unqualified_refs(%Expr{query: "id = 1", used_refs: used_refs, returns: :int8}) + ["id"] + """ + @spec unqualified_refs(t()) :: [String.t()] + def unqualified_refs(%__MODULE__{used_refs: used_refs}) do + used_refs + # Keep only used refs that are pointing to current table + |> Enum.filter(&match?({[_], _}, &1)) + |> Enum.map(fn {[key], _} -> key end) + end end diff --git a/packages/sync-service/lib/electric/replication/publication_manager.ex b/packages/sync-service/lib/electric/replication/publication_manager.ex new file mode 100644 index 0000000000..e26a64251e --- /dev/null +++ b/packages/sync-service/lib/electric/replication/publication_manager.ex @@ -0,0 +1,400 @@ +defmodule Electric.Replication.PublicationManager do + require Logger + use GenServer + + alias Electric.Postgres.Configuration + alias Electric.Replication.Eval.Expr + alias Electric.Shapes.Shape + + @callback name(binary() | Keyword.t()) :: atom() + @callback recover_shape(Shape.t(), Keyword.t()) :: :ok + @callback add_shape(Shape.t(), Keyword.t()) :: :ok + @callback remove_shape(Shape.t(), Keyword.t()) :: :ok + @callback refresh_publication(Keyword.t()) :: :ok + + defstruct [ + :relation_filter_counters, + :prepared_relation_filters, + :committed_relation_filters, + :update_debounce_timeout, + :scheduled_updated_ref, + :retries, + :waiters, + :publication_name, + :db_pool, + :pg_version, + :configure_tables_for_replication_fn + ] + + @typep state() :: %__MODULE__{ + relation_filter_counters: %{Electric.relation() => map()}, + prepared_relation_filters: %{Electric.relation() => __MODULE__.RelationFilter.t()}, + committed_relation_filters: %{Electric.relation() => __MODULE__.RelationFilter.t()}, + update_debounce_timeout: timeout(), + scheduled_updated_ref: nil | reference(), + waiters: list(GenServer.from()), + publication_name: String.t(), + db_pool: term(), + pg_version: non_neg_integer(), + configure_tables_for_replication_fn: fun() + } + @typep filter_operation :: :add | :remove + + defmodule RelationFilter do + defstruct [:relation, :where_clauses, :selected_columns] + + @type t :: %__MODULE__{ + relation: Electric.relation(), + where_clauses: [Electric.Replication.Eval.Expr.t()] | nil, + selected_columns: [String.t()] | nil + } + end + + @retry_timeout 300 + @max_retries 3 + @default_debounce_timeout 50 + + @relation_counter :relation_counter + @relation_where :relation_where + @relation_column :relation_column + + @name_schema_tuple {:tuple, [:atom, :atom, :any]} + @genserver_name_schema {:or, [:atom, @name_schema_tuple]} + @schema NimbleOptions.new!( + name: [type: @genserver_name_schema, required: false], + stack_id: [type: :string, required: true], + publication_name: [type: :string, required: true], + db_pool: [type: {:or, [:atom, :pid, @name_schema_tuple]}], + pg_version: [type: {:or, [:integer, :atom]}, required: false, default: nil], + update_debounce_timeout: [type: :timeout, default: @default_debounce_timeout], + configure_tables_for_replication_fn: [ + type: {:fun, 4}, + required: false, + default: &Configuration.configure_tables_for_replication!/4 + ], + server: [type: :any, required: false] + ) + + def name(stack_id) when not is_map(stack_id) and not is_list(stack_id) do + Electric.ProcessRegistry.name(stack_id, __MODULE__) + end + + def name(opts) do + stack_id = Access.fetch!(opts, :stack_id) + name(stack_id) + end + + @spec add_shape(Shape.t(), Keyword.t()) :: :ok + def add_shape(shape, opts \\ []) do + server = Access.get(opts, :server, name(opts)) + + case GenServer.call(server, {:add_shape, shape}) do + :ok -> :ok + {:error, err} -> raise err + end + end + + @spec recover_shape(Shape.t(), Keyword.t()) :: :ok + def recover_shape(shape, opts \\ []) do + server = Access.get(opts, :server, name(opts)) + GenServer.call(server, {:recover_shape, shape}) + end + + @spec remove_shape(Shape.t(), Keyword.t()) :: :ok + def remove_shape(shape, opts \\ []) do + server = Access.get(opts, :server, name(opts)) + + case GenServer.call(server, {:remove_shape, shape}) do + :ok -> :ok + {:error, err} -> raise err + end + end + + @spec refresh_publication(Keyword.t()) :: :ok + def refresh_publication(opts \\ []) do + server = Access.get(opts, :server, name(opts)) + + case GenServer.call(server, :refresh_publication) do + :ok -> :ok + {:error, err} -> raise err + end + end + + def start_link(opts) do + with {:ok, opts} <- NimbleOptions.validate(opts, @schema) do + stack_id = Keyword.fetch!(opts, :stack_id) + name = Keyword.get(opts, :name, name(stack_id)) + + db_pool = + Keyword.get( + opts, + :db_pool, + Electric.ProcessRegistry.name(stack_id, Electric.DbPool) + ) + + GenServer.start_link( + __MODULE__, + Map.new(opts) |> Map.put(:db_pool, db_pool) |> Map.put(:name, name), + name: name + ) + end + end + + @impl true + def init(opts) do + state = %__MODULE__{ + relation_filter_counters: %{}, + prepared_relation_filters: %{}, + committed_relation_filters: %{}, + scheduled_updated_ref: nil, + retries: 0, + waiters: [], + update_debounce_timeout: + Access.get(opts, :update_debounce_timeout, @default_debounce_timeout), + publication_name: Access.fetch!(opts, :publication_name), + db_pool: Access.fetch!(opts, :db_pool), + pg_version: Access.fetch!(opts, :pg_version), + configure_tables_for_replication_fn: + Access.fetch!(opts, :configure_tables_for_replication_fn) + } + + {:ok, state, {:continue, :get_pg_version}} + end + + @impl true + def handle_continue(:get_pg_version, state) do + state = get_pg_version(state) + {:noreply, state} + end + + @impl true + def handle_call({:add_shape, shape}, from, state) do + state = update_relation_filters_for_shape(shape, :add, state) + state = add_waiter(from, state) + state = schedule_update_publication(state.update_debounce_timeout, state) + {:noreply, state} + end + + def handle_call({:recover_shape, shape}, _from, state) do + state = update_relation_filters_for_shape(shape, :add, state) + {:reply, :ok, state} + end + + def handle_call({:remove_shape, shape}, from, state) do + state = update_relation_filters_for_shape(shape, :remove, state) + state = add_waiter(from, state) + state = schedule_update_publication(state.update_debounce_timeout, state) + {:noreply, state} + end + + def handle_call(:refresh_publication, from, state) do + state = add_waiter(from, state) + state = schedule_update_publication(state.update_debounce_timeout, state) + {:noreply, state} + end + + @impl true + def handle_info( + :update_publication, + %__MODULE__{prepared_relation_filters: relation_filters, retries: retries} = state + ) do + state = %{state | scheduled_updated_ref: nil, retries: 0} + + case update_publication(state) do + :ok -> + state = reply_to_waiters(:ok, state) + {:noreply, %{state | committed_relation_filters: relation_filters}} + + {:error, err} when retries < @max_retries -> + Logger.warning("Failed to configure publication, retrying: #{inspect(err)}") + state = schedule_update_publication(@retry_timeout, %{state | retries: retries + 1}) + {:noreply, state} + + {:error, err} -> + Logger.error("Failed to configure publication: #{inspect(err)}") + state = reply_to_waiters({:error, err}, state) + {:noreply, state} + end + end + + @spec schedule_update_publication(timeout(), state()) :: state() + defp schedule_update_publication(timeout, %__MODULE__{scheduled_updated_ref: nil} = state) do + ref = Process.send_after(self(), :update_publication, timeout) + %{state | scheduled_updated_ref: ref} + end + + defp schedule_update_publication(_timeout, %__MODULE__{scheduled_updated_ref: _} = state), + do: state + + @spec update_publication(state()) :: :ok | {:error, term()} + defp update_publication( + %__MODULE__{ + committed_relation_filters: committed_filters, + prepared_relation_filters: current_filters + } = _state + ) + when current_filters == committed_filters, + do: :ok + + defp update_publication( + %__MODULE__{ + prepared_relation_filters: relation_filters, + publication_name: publication_name, + db_pool: db_pool, + pg_version: pg_version, + configure_tables_for_replication_fn: configure_tables_for_replication_fn + } = _state + ) do + configure_tables_for_replication_fn.( + db_pool, + relation_filters, + pg_version, + publication_name + ) + + :ok + rescue + err -> {:error, err} + end + + defp get_pg_version(%{pg_version: pg_version} = state) when not is_nil(pg_version), do: state + + defp get_pg_version(%{pg_version: nil, db_pool: db_pool} = state) do + case Configuration.get_pg_version(db_pool) do + {:ok, pg_version} -> + %{state | pg_version: pg_version} + + {:error, err} -> + Logger.error("Failed to get PG version, retrying after timeout: #{inspect(err)}") + Process.sleep(@retry_timeout) + get_pg_version(state) + end + end + + @spec update_relation_filters_for_shape(Shape.t(), filter_operation(), state()) :: state() + defp update_relation_filters_for_shape( + %Shape{root_table: relation} = shape, + operation, + %__MODULE__{prepared_relation_filters: prepared_relation_filters} = state + ) do + state = update_relation_filter_counters(shape, operation, state) + new_relation_filter = get_relation_filter(relation, state) + + new_relation_filters = + if new_relation_filter == nil, + do: Map.delete(prepared_relation_filters, relation), + else: Map.put(prepared_relation_filters, relation, new_relation_filter) + + %{state | prepared_relation_filters: new_relation_filters} + end + + @spec get_relation_filter(Electric.relation(), state()) :: RelationFilter.t() | nil + defp get_relation_filter( + relation, + %__MODULE__{relation_filter_counters: relation_filter_counters} = _state + ) do + case Map.get(relation_filter_counters, relation) do + nil -> + nil + + filter_counters -> + Enum.reduce( + Map.keys(filter_counters), + %RelationFilter{relation: relation, where_clauses: [], selected_columns: []}, + fn + @relation_counter, acc -> + acc + + {@relation_column, nil}, acc -> + %RelationFilter{acc | selected_columns: nil} + + {@relation_column, _col}, %{selected_columns: nil} = acc -> + acc + + {@relation_column, col}, %{selected_columns: cols} = acc -> + %RelationFilter{acc | selected_columns: [col | cols]} + + {@relation_where, nil}, acc -> + %RelationFilter{acc | where_clauses: nil} + + {@relation_where, _where}, %{where_clauses: nil} = acc -> + acc + + {@relation_where, where}, %{where_clauses: wheres} = acc -> + %RelationFilter{acc | where_clauses: [where | wheres]} + end + ) + end + end + + @spec update_relation_filter_counters(Shape.t(), filter_operation(), state()) :: state() + defp update_relation_filter_counters( + %Shape{root_table: table} = shape, + operation, + %__MODULE__{relation_filter_counters: relation_filter_counters} = state + ) do + increment = if operation == :add, do: 1, else: -1 + filter_counters = Map.get(relation_filter_counters, table, %{}) + + {relation_ctr, filter_counters} = + update_map_counter(filter_counters, @relation_counter, increment) + + if relation_ctr > 0 do + filter_counters = + Enum.concat( + get_selected_columns_for_shape(shape) |> Enum.map(&{@relation_column, &1}), + get_where_clauses_for_shape(shape) |> Enum.map(&{@relation_where, &1}) + ) + |> Enum.reduce(filter_counters, fn col, filter -> + {_, filter} = update_map_counter(filter, col, increment) + filter + end) + + %{ + state + | relation_filter_counters: Map.put(relation_filter_counters, table, filter_counters) + } + else + %{state | relation_filter_counters: Map.delete(relation_filter_counters, table)} + end + end + + @spec update_map_counter(map(), any(), integer()) :: {any(), map()} + defp update_map_counter(map, key, inc) do + Map.get_and_update(map, key, fn + nil when inc < 0 -> {nil, nil} + ctr when ctr + inc < 0 -> :pop + nil -> {inc, inc} + ctr -> {ctr + inc, ctr + inc} + end) + end + + @spec get_selected_columns_for_shape(Shape.t()) :: MapSet.t(String.t() | nil) + defp get_selected_columns_for_shape(%Shape{where: _, selected_columns: nil}), + do: MapSet.new([nil]) + + defp get_selected_columns_for_shape(%Shape{where: nil, selected_columns: columns}), + do: MapSet.new(columns) + + defp get_selected_columns_for_shape(%Shape{where: where, selected_columns: columns}) do + # If columns are selected, include columns used in the where clause + where_cols = where |> Expr.unqualified_refs() |> MapSet.new() + MapSet.union(MapSet.new(columns), where_cols) + end + + @spec get_where_clauses_for_shape(Shape.t()) :: + MapSet.t(Electric.Replication.Eval.Expr.t() | nil) + defp get_where_clauses_for_shape(%Shape{where: nil}), do: MapSet.new([nil]) + # TODO: flatten where clauses by splitting top level ANDs + defp get_where_clauses_for_shape(%Shape{where: where}), do: MapSet.new([where]) + + @spec add_waiter(GenServer.from(), state()) :: state() + defp add_waiter(from, %__MODULE__{waiters: waiters} = state), + do: %{state | waiters: [from | waiters]} + + @spec reply_to_waiters(any(), state()) :: state() + defp reply_to_waiters(reply, %__MODULE__{waiters: waiters} = state) do + for from <- waiters, do: GenServer.reply(from, reply) + %{state | waiters: []} + end +end diff --git a/packages/sync-service/lib/electric/replication/supervisor.ex b/packages/sync-service/lib/electric/replication/supervisor.ex index e93a774aa7..447fbe983e 100644 --- a/packages/sync-service/lib/electric/replication/supervisor.ex +++ b/packages/sync-service/lib/electric/replication/supervisor.ex @@ -23,6 +23,7 @@ defmodule Electric.Replication.Supervisor do # TODO: weird to have these without defaults but `consumer_supervisor` with a default shape_cache = Keyword.fetch!(opts, :shape_cache) + publication_manager = Keyword.fetch!(opts, :publication_manager) log_collector = Keyword.fetch!(opts, :log_collector) stack_id = Keyword.fetch!(opts, :stack_id) @@ -33,7 +34,7 @@ defmodule Electric.Replication.Supervisor do {Electric.Shapes.DynamicConsumerSupervisor, [stack_id: stack_id]} ) - children = [consumer_supervisor, log_collector, shape_cache] + children = [consumer_supervisor, publication_manager, log_collector, shape_cache] Supervisor.init(children, strategy: :one_for_all) end end diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index 88c4febde1..b5277c6d0d 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -48,6 +48,7 @@ defmodule Electric.ShapeCache do log_producer: [type: @genserver_name_schema, required: true], consumer_supervisor: [type: @genserver_name_schema, required: true], storage: [type: :mod_arg, required: true], + publication_manager: [type: :mod_arg, required: true], chunk_bytes_threshold: [type: :non_neg_integer, required: true], inspector: [type: :mod_arg, required: true], shape_status: [type: :atom, default: Electric.ShapeCache.ShapeStatus], @@ -57,7 +58,6 @@ defmodule Electric.ShapeCache do type: {:fun, 2}, default: &Shapes.Consumer.Snapshotter.run_with_conn/2 ], - prepare_tables_fn: [type: {:or, [:mfa, {:fun, 2}]}, required: true], create_snapshot_fn: [ type: {:fun, 7}, default: &Shapes.Consumer.Snapshotter.query_in_readonly_txn/7 @@ -205,6 +205,7 @@ defmodule Electric.ShapeCache do name: opts.name, stack_id: opts.stack_id, storage: opts.storage, + publication_manager: opts.publication_manager, chunk_bytes_threshold: opts.chunk_bytes_threshold, inspector: opts.inspector, shape_meta_table: meta_table, @@ -213,7 +214,6 @@ defmodule Electric.ShapeCache do shape_status_state: shape_status_state, run_with_conn_fn: opts.run_with_conn_fn, create_snapshot_fn: opts.create_snapshot_fn, - prepare_tables_fn: opts.prepare_tables_fn, log_producer: opts.log_producer, registry: opts.registry, consumer_supervisor: opts.consumer_supervisor, @@ -226,6 +226,10 @@ defmodule Electric.ShapeCache do recover_shapes(state) end + # ensure publication filters are in line with existing shapes + {publication_manager, publication_manager_opts} = opts.publication_manager + publication_manager.refresh_publication(publication_manager_opts) + # do this after finishing this function so that we're subscribed to the # producer before it starts forwarding its demand send(self(), :consumers_ready) @@ -303,11 +307,16 @@ defmodule Electric.ShapeCache do end defp recover_shapes(state) do + %{publication_manager: {publication_manager, publication_manager_opts}} = state + state.shape_status_state |> state.shape_status.list_shapes() |> Enum.each(fn {shape_handle, shape} -> try do {:ok, _pid, _snapshot_xmin, _latest_offset} = start_shape(shape_handle, shape, state) + + # recover publication filter state + :ok = publication_manager.recover_shape(shape, publication_manager_opts) rescue e -> Logger.error("Failed to recover shape #{shape_handle}: #{inspect(e)}") @@ -329,6 +338,7 @@ defmodule Electric.ShapeCache do shape: shape, shape_status: {state.shape_status, state.shape_status_state}, storage: state.storage, + publication_manager: state.publication_manager, chunk_bytes_threshold: state.chunk_bytes_threshold, log_producer: state.log_producer, shape_cache: @@ -341,7 +351,6 @@ defmodule Electric.ShapeCache do registry: state.registry, db_pool: state.db_pool, run_with_conn_fn: state.run_with_conn_fn, - prepare_tables_fn: state.prepare_tables_fn, create_snapshot_fn: state.create_snapshot_fn ) do consumer = Shapes.Consumer.name(state.stack_id, shape_handle) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index be9c352fa2..1362200913 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -370,8 +370,13 @@ defmodule Electric.Shapes.Consumer do end defp cleanup(state) do - %{shape_status: {shape_status, shape_status_state}} = state + %{ + shape_status: {shape_status, shape_status_state}, + publication_manager: {publication_manager, publication_manager_opts} + } = state + shape_status.remove_shape(shape_status_state, state.shape_handle) + publication_manager.remove_shape(state.shape, publication_manager_opts) ShapeCache.Storage.cleanup!(state.storage) end diff --git a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex index b575bd2631..5815fff3c3 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex @@ -4,9 +4,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do alias Electric.ShapeCache.Storage alias Electric.Shapes alias Electric.Shapes.Querying - alias Electric.Shapes.Shape alias Electric.Telemetry.OpenTelemetry - alias Electric.Utils require Logger @@ -48,48 +46,35 @@ defmodule Electric.Shapes.Consumer.Snapshotter do %{ db_pool: pool, storage: storage, - run_with_conn_fn: run_with_conn_fn, create_snapshot_fn: create_snapshot_fn, - prepare_tables_fn: prepare_tables_fn_or_mfa, + publication_manager: {publication_manager, publication_manager_opts}, stack_id: stack_id, chunk_bytes_threshold: chunk_bytes_threshold } = state - affected_tables = Shape.affected_tables(shape) - OpenTelemetry.with_span( "shape_snapshot.create_snapshot_task", shape_attrs(shape_handle, shape), stack_id, fn -> try do - # Grab the same connection from the pool for both operations to - # ensure we only queue for it once. - apply(run_with_conn_fn, [ - pool, - fn pool_conn -> - OpenTelemetry.with_span( - "shape_snapshot.prepare_tables", - shape_attrs(shape_handle, shape), - stack_id, - fn -> - Utils.apply_fn_or_mfa(prepare_tables_fn_or_mfa, [ - pool_conn, - affected_tables - ]) - end - ) - - apply(create_snapshot_fn, [ - consumer, - shape_handle, - shape, - pool_conn, - storage, - stack_id, - chunk_bytes_threshold - ]) + OpenTelemetry.with_span( + "shape_snapshot.prepare_tables", + shape_attrs(shape_handle, shape), + stack_id, + fn -> + publication_manager.add_shape(shape, publication_manager_opts) end + ) + + apply(create_snapshot_fn, [ + consumer, + shape_handle, + shape, + pool, + storage, + stack_id, + chunk_bytes_threshold ]) rescue error -> diff --git a/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex b/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex index a99f76232b..4172be8135 100644 --- a/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex +++ b/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex @@ -16,10 +16,10 @@ defmodule Electric.Shapes.ConsumerSupervisor do registry: [type: :atom, required: true], shape_status: [type: :mod_arg, required: true], storage: [type: :mod_arg, required: true], + publication_manager: [type: :mod_arg, required: true], chunk_bytes_threshold: [type: :non_neg_integer, required: true], run_with_conn_fn: [type: {:fun, 2}, default: &DBConnection.run/2], db_pool: [type: {:or, [:atom, :pid, @name_schema_tuple]}, required: true], - prepare_tables_fn: [type: {:or, [:mfa, {:fun, 2}]}, required: true], create_snapshot_fn: [ type: {:fun, 7}, default: &Electric.Shapes.Consumer.Snapshotter.query_in_readonly_txn/7 diff --git a/packages/sync-service/lib/electric/shapes/shape.ex b/packages/sync-service/lib/electric/shapes/shape.ex index 9333ffa398..6cda57f408 100644 --- a/packages/sync-service/lib/electric/shapes/shape.ex +++ b/packages/sync-service/lib/electric/shapes/shape.ex @@ -34,8 +34,6 @@ defmodule Electric.Shapes.Shape do replica: replica() } - @type table_with_where_clause() :: {Electric.relation(), String.t() | nil} - @type json_relation() :: [String.t(), ...] @type json_table_info() :: table_info() | json_relation() @type json_table_list() :: [json_table_info(), ...] @@ -189,14 +187,8 @@ defmodule Electric.Shapes.Shape do @doc """ List tables that are a part of this shape. """ - @spec affected_tables(t()) :: [table_with_where_clause()] - def affected_tables(%__MODULE__{root_table: table, where: nil}), do: [{table, nil}] - - def affected_tables(%__MODULE__{ - root_table: table, - where: %Electric.Replication.Eval.Expr{query: where_clause} - }), - do: [{table, "(" <> where_clause <> ")"}] + @spec affected_tables(t()) :: [Electric.relation()] + def affected_tables(%__MODULE__{root_table: table}), do: [table] @doc """ Convert a change to be correctly represented within the shape. diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index 43998d3258..e8d94c872a 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -24,6 +24,7 @@ defmodule Electric.StackSupervisor do 1. `Electric.ShapeCache.Storage` is a process that knows how to write to disk. Takes configuration options for the underlying storage, is an end point 2. `Electric.Shapes.Consumer` is GenStage consumer, subscribing to `LogCollector`, which acts a shared producer for all shapes. It passes any incoming operation along to the storage. 3. `Electric.Shapes.Consumer.Snapshotter` is a temporary GenServer that executes initial snapshot query and writes that to storage + 3. `Electric.Replication.PublicationManager` manages all filters on the publication for the replication 2. `Electric.Replication.ShapeLogCollector` collects transactions from the replication connection, fanning them out to `Electric.Shapes.Consumer` (4.1.1.2) 3. `Electric.ShapeCache` coordinates shape creation and handle allocation, shape metadata """ @@ -148,6 +149,14 @@ defmodule Electric.StackSupervisor do {Electric.ShapeCache, stack_id: stack_id, server: Electric.ShapeCache.name(stack_id)} ) + publication_manager = + Access.get( + opts, + :publication_manager, + {Electric.Replication.PublicationManager, + stack_id: stack_id, server: Electric.Replication.PublicationManager.name(stack_id)} + ) + inspector = Access.get( opts, @@ -159,6 +168,7 @@ defmodule Electric.StackSupervisor do [ shape_cache: shape_cache, + publication_manager: publication_manager, registry: shape_changes_registry_name, stack_events_registry: opts[:stack_events_registry], storage: storage_mod_arg(opts), @@ -197,25 +207,13 @@ defmodule Electric.StackSupervisor do db_pool = Electric.ProcessRegistry.name(stack_id, Electric.DbPool) - get_pg_version_fn = fn -> - server = Electric.Connection.Manager.name(stack_id) - Electric.Connection.Manager.get_pg_version(server) - end - - prepare_tables_mfa = - { - Electric.Postgres.Configuration, - :configure_tables_for_replication!, - [get_pg_version_fn, config.replication_opts[:publication_name]] - } - shape_changes_registry_name = :"#{Registry.ShapeChanges}:#{stack_id}" shape_cache_opts = [ stack_id: stack_id, storage: storage, inspector: inspector, - prepare_tables_fn: prepare_tables_mfa, + publication_manager: {Electric.Replication.PublicationManager, stack_id: stack_id}, chunk_bytes_threshold: config.chunk_bytes_threshold, log_producer: shape_log_collector, consumer_supervisor: Electric.Shapes.DynamicConsumerSupervisor.name(stack_id), diff --git a/packages/sync-service/test/electric/postgres/configuration_test.exs b/packages/sync-service/test/electric/postgres/configuration_test.exs index e902210ab7..9d462bb099 100644 --- a/packages/sync-service/test/electric/postgres/configuration_test.exs +++ b/packages/sync-service/test/electric/postgres/configuration_test.exs @@ -2,6 +2,8 @@ defmodule Electric.Postgres.ConfigurationTest do use ExUnit.Case, async: true import ExUnit.CaptureLog + alias Electric.Replication.PublicationManager.RelationFilter + alias Electric.Replication.Eval alias Electric.Postgres.Configuration @pg_15 150_000 @@ -49,21 +51,24 @@ defmodule Electric.Postgres.ConfigurationTest do describe "configure_tables_for_replication!/3" do test "sets REPLICA IDENTITY on the table and adds it to the publication", - %{pool: conn, publication_name: publication, get_pg_version: get_pg_version} do + %{pool: conn, publication_name: publication, pg_version: pg_version} do assert get_table_identity(conn, {"public", "items"}) == "d" assert list_tables_in_publication(conn, publication) == [] Configuration.configure_tables_for_replication!( conn, - [{{"public", "items"}, "(value ILIKE 'yes%')"}], - get_pg_version, + %{ + {"public", "items"} => %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}] + } + }, + pg_version, publication ) assert get_table_identity(conn, {"public", "items"}) == "f" - pg_version = get_pg_version.() - assert list_tables_in_publication(conn, publication) == expected_filters( [ @@ -74,15 +79,20 @@ defmodule Electric.Postgres.ConfigurationTest do end test "doesn't execute `ALTER TABLE` if table identity is already full", - %{pool: conn, publication_name: publication, get_pg_version: get_pg_version} do + %{pool: conn, publication_name: publication, pg_version: pg_version} do assert get_table_identity(conn, {"public", "items"}) == "d" assert list_tables_in_publication(conn, publication) == [] assert capture_log(fn -> Configuration.configure_tables_for_replication!( conn, - [{{"public", "items"}, "(value ILIKE 'yes%')"}], - get_pg_version, + %{ + {"public", "items"} => %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}] + } + }, + pg_version, publication ) end) =~ "Altering identity" @@ -92,8 +102,13 @@ defmodule Electric.Postgres.ConfigurationTest do refute capture_log(fn -> Configuration.configure_tables_for_replication!( conn, - [{{"public", "items"}, "(value ILIKE 'no%')"}], - get_pg_version, + %{ + {"public", "items"} => %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'no%')"}] + } + }, + pg_version, publication ) end) =~ "Altering identity" @@ -102,7 +117,7 @@ defmodule Electric.Postgres.ConfigurationTest do test "works with multiple tables", %{ pool: conn, publication_name: publication, - get_pg_version: get_pg_version + pg_version: pg_version } do assert get_table_identity(conn, {"public", "items"}) == "d" assert get_table_identity(conn, {"public", "other_table"}) == "d" @@ -110,19 +125,23 @@ defmodule Electric.Postgres.ConfigurationTest do Configuration.configure_tables_for_replication!( conn, - [ - {{"public", "items"}, "(value ILIKE 'yes%')"}, - {{"public", "other_table"}, "(value ILIKE 'no%')"} - ], - get_pg_version, + %{ + {"public", "items"} => %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}] + }, + {"public", "other_table"} => %RelationFilter{ + relation: {"public", "other_table"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'no%')"}] + } + }, + pg_version, publication ) assert get_table_identity(conn, {"public", "items"}) == "f" assert get_table_identity(conn, {"public", "other_table"}) == "f" - pg_version = get_pg_version.() - assert list_tables_in_publication(conn, publication) == expected_filters( [ @@ -133,10 +152,10 @@ defmodule Electric.Postgres.ConfigurationTest do ) end - test "keeps all tables when updating one of them", %{ + test "can update existing where clauses by updating all tables", %{ pool: conn, publication_name: publication, - get_pg_version: get_pg_version + pg_version: pg_version } do assert get_table_identity(conn, {"public", "items"}) == "d" assert get_table_identity(conn, {"public", "other_table"}) == "d" @@ -144,19 +163,23 @@ defmodule Electric.Postgres.ConfigurationTest do Configuration.configure_tables_for_replication!( conn, - [ - {{"public", "items"}, "(value ILIKE 'yes%')"}, - {{"public", "other_table"}, "(value ILIKE 'no%')"} - ], - get_pg_version, + %{ + {"public", "items"} => %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}] + }, + {"public", "other_table"} => %RelationFilter{ + relation: {"public", "other_table"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'no%')"}] + } + }, + pg_version, publication ) assert get_table_identity(conn, {"public", "items"}) == "f" assert get_table_identity(conn, {"public", "other_table"}) == "f" - pg_version = get_pg_version.() - assert list_tables_in_publication(conn, publication) == expected_filters( [ @@ -168,10 +191,17 @@ defmodule Electric.Postgres.ConfigurationTest do Configuration.configure_tables_for_replication!( conn, - [ - {{"public", "other_table"}, "(value ILIKE 'yes%')"} - ], - get_pg_version, + %{ + {"public", "items"} => %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}] + }, + {"public", "other_table"} => %RelationFilter{ + relation: {"public", "other_table"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}] + } + }, + pg_version, publication ) @@ -179,26 +209,28 @@ defmodule Electric.Postgres.ConfigurationTest do expected_filters( [ {"public", "items", "(value ~~* 'yes%'::text)"}, - {"public", "other_table", - "((value ~~* 'no%'::text) OR (value ~~* 'yes%'::text))"} + {"public", "other_table", "(value ~~* 'yes%'::text)"} ], pg_version ) end test "doesn't fail when one of the tables is already configured", - %{pool: conn, publication_name: publication, get_pg_version: get_pg_version} do + %{pool: conn, publication_name: publication, pg_version: pg_version} do Configuration.configure_tables_for_replication!( conn, - [{{"public", "items"}, "(value ILIKE 'yes%')"}], - get_pg_version, + %{ + {"public", "items"} => %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}] + } + }, + pg_version, publication ) assert get_table_identity(conn, {"public", "other_table"}) == "d" - pg_version = get_pg_version.() - assert list_tables_in_publication(conn, publication) == expected_filters( [ @@ -210,8 +242,14 @@ defmodule Electric.Postgres.ConfigurationTest do # Configure `items` table again but with a different where clause Configuration.configure_tables_for_replication!( conn, - [{{"public", "items"}, "(value ILIKE 'no%')"}, {{"public", "other_table"}, nil}], - get_pg_version, + %{ + {"public", "items"} => %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'no%')"}] + }, + {"public", "other_table"} => %RelationFilter{relation: {"public", "other_table"}} + }, + pg_version, publication ) @@ -221,7 +259,7 @@ defmodule Electric.Postgres.ConfigurationTest do assert list_tables_in_publication(conn, publication) == expected_filters( [ - {"public", "items", "((value ~~* 'yes%'::text) OR (value ~~* 'no%'::text))"}, + {"public", "items", "(value ~~* 'no%'::text)"}, {"public", "other_table", nil} ], pg_version @@ -231,8 +269,11 @@ defmodule Electric.Postgres.ConfigurationTest do # the resulting publication should no longer have a filter for that table Configuration.configure_tables_for_replication!( conn, - [{{"public", "items"}, nil}, {{"public", "other_table"}, "(value ILIKE 'no%')"}], - get_pg_version, + %{ + {"public", "items"} => %RelationFilter{relation: {"public", "items"}}, + {"public", "other_table"} => %RelationFilter{relation: {"public", "other_table"}} + }, + pg_version, publication ) @@ -246,12 +287,14 @@ defmodule Electric.Postgres.ConfigurationTest do ) end - test "fails when a publication doesn't exist", %{pool: conn, get_pg_version: get_pg_version} do + test "fails when a publication doesn't exist", %{pool: conn, pg_version: pg_version} do assert_raise Postgrex.Error, ~r/undefined_object/, fn -> Configuration.configure_tables_for_replication!( conn, - [{{"public", "items"}, nil}], - get_pg_version, + %{ + {"public", "items"} => %RelationFilter{relation: {"public", "items"}} + }, + pg_version, "nonexistent" ) end @@ -260,27 +303,51 @@ defmodule Electric.Postgres.ConfigurationTest do test "concurrent alters to the publication don't deadlock and run correctly", %{ pool: conn, publication_name: publication, - get_pg_version: get_pg_version + pg_version: pg_version } do # Create the publication first Configuration.configure_tables_for_replication!( conn, - [ - {{"public", "items"}, "(value ILIKE 'yes%')"}, - {{"public", "other_table"}, "(value ILIKE '1%')"}, - {{"public", "other_other_table"}, "(value ILIKE '1%')"} - ], - get_pg_version, + %{ + {"public", "items"} => %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}] + }, + {"public", "other_table"} => %RelationFilter{ + relation: {"public", "other_table"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE '1%')"}] + }, + {"public", "other_other_table"} => %RelationFilter{ + relation: {"public", "other_other_table"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE '1%')"}] + } + }, + pg_version, publication ) + new_filters = %{ + {"public", "items"} => %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE 'yes%')"}] + }, + {"public", "other_table"} => %RelationFilter{ + relation: {"public", "other_table"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE '2%')"}] + }, + {"public", "other_other_table"} => %RelationFilter{ + relation: {"public", "other_other_table"}, + where_clauses: [%Eval.Expr{query: "(value ILIKE '2%')"}] + } + } + task1 = Task.async(fn -> Postgrex.transaction(conn, fn conn -> Configuration.configure_tables_for_replication!( conn, - [{{"public", "other_other_table"}, "(value ILIKE '2%')"}], - get_pg_version, + new_filters, + pg_version, publication ) @@ -293,8 +360,8 @@ defmodule Electric.Postgres.ConfigurationTest do Postgrex.transaction(conn, fn conn -> Configuration.configure_tables_for_replication!( conn, - [{{"public", "other_table"}, "(value ILIKE '2%')"}], - get_pg_version, + new_filters, + pg_version, publication ) @@ -310,11 +377,10 @@ defmodule Electric.Postgres.ConfigurationTest do expected_filters( [ {"public", "items", "(value ~~* 'yes%'::text)"}, - {"public", "other_other_table", - "((value ~~* '1%'::text) OR (value ~~* '2%'::text))"}, - {"public", "other_table", "((value ~~* '1%'::text) OR (value ~~* '2%'::text))"} + {"public", "other_other_table", "(value ~~* '2%'::text)"}, + {"public", "other_table", "(value ~~* '2%'::text)"} ], - get_pg_version.() + pg_version ) end end diff --git a/packages/sync-service/test/electric/replication/eval/expr_test.exs b/packages/sync-service/test/electric/replication/eval/expr_test.exs new file mode 100644 index 0000000000..241b615bb6 --- /dev/null +++ b/packages/sync-service/test/electric/replication/eval/expr_test.exs @@ -0,0 +1,7 @@ +defmodule Electric.Replication.Eval.ExprTest do + use ExUnit.Case, async: true + + alias Electric.Replication.Eval.Expr + + doctest Electric.Replication.Eval.Expr, import: true +end diff --git a/packages/sync-service/test/electric/replication/publication_manager_test.exs b/packages/sync-service/test/electric/replication/publication_manager_test.exs new file mode 100644 index 0000000000..fd754fa984 --- /dev/null +++ b/packages/sync-service/test/electric/replication/publication_manager_test.exs @@ -0,0 +1,256 @@ +defmodule Electric.Replication.PublicationManagerTest do + alias Electric.Replication.Eval.Expr + alias Electric.Replication.PublicationManager.RelationFilter + alias Electric.Shapes.Shape + alias Electric.Replication.PublicationManager + + use ExUnit.Case, async: true + + import Support.ComponentSetup + + defp generate_shape(relation, where_clause \\ nil, selected_columns \\ nil) do + %Shape{ + root_table: relation, + root_table_id: 1, + table_info: %{ + relation => %{ + columns: + ([ + %{name: "id", type: :text, type_id: {25, 1}}, + %{name: "value", type: :text, type_id: {25, 1}} + ] ++ (selected_columns || [])) + |> Enum.map(fn col -> %{name: col, type: :text, type_id: {25, 1}} end), + pk: ["id"] + } + }, + where: where_clause, + selected_columns: selected_columns + } + end + + setup :with_stack_id_from_test + + setup ctx do + test_pid = self() + + configure_tables_fn = fn _, filters, _, _ -> + send(test_pid, {:filters, Map.values(filters)}) + end + + %{publication_manager: {_, publication_manager_opts}} = + with_publication_manager(%{ + module: ctx.module, + test: ctx.test, + stack_id: ctx.stack_id, + update_debounce_timeout: Access.get(ctx, :update_debounce_timeout, 0), + publication_name: "pub_#{ctx.stack_id}", + pool: :no_pool, + pg_version: 150_001, + configure_tables_for_replication_fn: configure_tables_fn + }) + + %{opts: publication_manager_opts} + end + + describe "add_shape/2" do + test "should add filters for single shape", %{opts: opts} do + shape = generate_shape({"public", "items"}, %{query: "id = 1"}) + assert :ok == PublicationManager.add_shape(shape, opts) + + assert_receive {:filters, + [ + %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%{query: "id = 1"}] + } + ]} + end + + test "should accept multiple shapes for different relations", %{opts: opts} do + shape1 = generate_shape({"public", "items"}, %{query: "id = 1"}) + shape2 = generate_shape({"public", "other"}) + assert :ok == PublicationManager.add_shape(shape1, opts) + assert :ok == PublicationManager.add_shape(shape2, opts) + + assert_receive {:filters, + [ + %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%{query: "id = 1"}] + }, + %RelationFilter{relation: {"public", "other"}} + ]} + end + + test "should merge where clauses for same relation", %{opts: opts} do + shape1 = generate_shape({"public", "items"}, %{query: "id = 1"}) + shape2 = generate_shape({"public", "items"}, %{query: "id = 2"}) + shape3 = generate_shape({"public", "items"}, %{query: "id = 1"}) + assert :ok == PublicationManager.add_shape(shape1, opts) + assert :ok == PublicationManager.add_shape(shape2, opts) + assert :ok == PublicationManager.add_shape(shape3, opts) + + assert_receive {:filters, + [ + %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%{query: "id = 2"}, %{query: "id = 1"}] + } + ]} + end + + test "should remove where clauses when one covers everything", %{opts: opts} do + shape1 = generate_shape({"public", "items"}, %{query: "id = 1"}) + shape2 = generate_shape({"public", "items"}, nil) + assert :ok == PublicationManager.add_shape(shape1, opts) + assert :ok == PublicationManager.add_shape(shape2, opts) + + assert_receive {:filters, + [ + %RelationFilter{ + relation: {"public", "items"}, + where_clauses: nil + } + ]} + end + + test "should merge selected columns for same relation", %{opts: opts} do + shape1 = generate_shape({"public", "items"}, nil, ["id", "value"]) + shape2 = generate_shape({"public", "items"}, nil, ["id", "potato"]) + assert :ok == PublicationManager.add_shape(shape1, opts) + assert :ok == PublicationManager.add_shape(shape2, opts) + + assert_receive {:filters, + [ + %RelationFilter{ + relation: {"public", "items"}, + selected_columns: ["value", "potato", "id"] + } + ]} + end + + test "should remove selected columns when all selected by shape", %{opts: opts} do + shape1 = generate_shape({"public", "items"}, nil, ["id", "value"]) + shape2 = generate_shape({"public", "items"}, nil, nil) + assert :ok == PublicationManager.add_shape(shape1, opts) + assert :ok == PublicationManager.add_shape(shape2, opts) + + assert_receive {:filters, + [ + %RelationFilter{ + relation: {"public", "items"}, + selected_columns: nil + } + ]} + end + + test "should include selected columns referenced in where clauses", %{opts: opts} do + shape = + generate_shape( + {"public", "items"}, + %Expr{ + query: "id = 1", + used_refs: %{["id"] => :int8, ["created_at"] => :timestamp} + }, + ["id", "value"] + ) + + assert :ok == PublicationManager.add_shape(shape, opts) + + assert_receive {:filters, + [ + %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%{query: "id = 1"}], + selected_columns: ["value", "id", "created_at"] + } + ]} + end + + @tag update_debounce_timeout: 50 + test "should not update publication if new shape adds nothing", %{opts: opts} do + shape1 = generate_shape({"public", "items"}, %{query: "id = 1"}) + shape2 = generate_shape({"public", "items"}, %{query: "id = 2"}) + shape3 = generate_shape({"public", "items"}, %{query: "id = 1"}) + + task1 = Task.async(fn -> PublicationManager.add_shape(shape1, opts) end) + task2 = Task.async(fn -> PublicationManager.add_shape(shape2, opts) end) + + Task.await_many([task1, task2]) + + assert_receive {:filters, + [ + %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%{query: "id = 2"}, %{query: "id = 1"}] + } + ]} + + assert :ok == PublicationManager.add_shape(shape3, opts) + + refute_receive {:filters, _}, 500 + end + end + + describe "remove_shape/2" do + test "should remove single shape", %{opts: opts} do + shape = generate_shape({"public", "items"}, %{query: "id = 1"}) + assert :ok == PublicationManager.add_shape(shape, opts) + assert :ok == PublicationManager.remove_shape(shape, opts) + + assert_receive {:filters, []} + end + + @tag update_debounce_timeout: 50 + test "should reference count to avoid removing needed filters", %{opts: opts} do + shape1 = generate_shape({"public", "items"}, %{query: "id = 1"}) + shape2 = generate_shape({"public", "items"}, %{query: "id = 2"}) + shape3 = generate_shape({"public", "items"}, %{query: "id = 1"}) + task1 = Task.async(fn -> PublicationManager.add_shape(shape1, opts) end) + task2 = Task.async(fn -> PublicationManager.add_shape(shape2, opts) end) + task3 = Task.async(fn -> PublicationManager.add_shape(shape3, opts) end) + + Task.await_many([task1, task2, task3]) + + assert_receive {:filters, + [ + %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%{query: "id = 2"}, %{query: "id = 1"}] + } + ]} + + assert :ok == PublicationManager.remove_shape(shape1, opts) + + refute_receive {:filters, _}, 500 + end + end + + describe "recover_shape/2" do + test "should add filters for single shape without updating anything", %{opts: opts} do + shape = generate_shape({"public", "items"}, %{query: "id = 1"}) + assert :ok == PublicationManager.recover_shape(shape, opts) + + refute_receive {:filters, _}, 500 + end + end + + describe "refresh_publication/2" do + test "should update publication if there are changes to add", %{opts: opts} do + shape = generate_shape({"public", "items"}, %{query: "id = 1"}) + assert :ok == PublicationManager.recover_shape(shape, opts) + + refute_receive {:filters, _}, 500 + + assert :ok == PublicationManager.refresh_publication(opts) + + assert_receive {:filters, + [ + %RelationFilter{ + relation: {"public", "items"}, + where_clauses: [%{query: "id = 1"}] + } + ]} + end + end +end diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index 3260248b44..21265e61f5 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -10,14 +10,20 @@ defmodule Electric.Replication.ShapeLogCollectorTest do alias Support.Mock alias Support.StubInspector - import Support.ComponentSetup, only: [with_in_memory_storage: 1, with_stack_id_from_test: 1] + + import Support.ComponentSetup, + only: [ + with_in_memory_storage: 1, + with_stack_id_from_test: 1, + with_noop_publication_manager: 1 + ] import Mox @moduletag :capture_log setup :verify_on_exit! - setup [:with_stack_id_from_test, :with_in_memory_storage] + setup [:with_stack_id_from_test, :with_in_memory_storage, :with_noop_publication_manager] setup(ctx) do # Start a test Registry @@ -47,7 +53,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(), inspector: {Mock.Inspector, []}, shape_status: Mock.ShapeStatus, - prepare_tables_fn: fn _, _ -> {:ok, [:ok]} end, + publication_manager: ctx.publication_manager, log_producer: ShapeLogCollector.name(ctx.stack_id), stack_id: ctx.stack_id, consumer_supervisor: Electric.Shapes.DynamicConsumerSupervisor.name(ctx.stack_id), diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index 9a6ed196a9..c66b5e1537 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -17,6 +17,7 @@ defmodule Electric.ShapeCacheTest do import Support.DbSetup import Support.DbStructureSetup import Support.TestUtils + alias Support.Mock @moduletag :capture_log @@ -49,8 +50,6 @@ defmodule Electric.ShapeCacheTest do @zero_offset LogOffset.last_before_real_offsets() - @prepare_tables_noop {__MODULE__, :prepare_tables_noop, []} - @stub_inspector StubInspector.new([ %{name: "id", type: "int8", type_id: {20, 1}, pk_position: 0}, %{name: "value", type: "text", type_id: {25, 1}} @@ -69,14 +68,14 @@ defmodule Electric.ShapeCacheTest do :with_log_chunking, :with_no_pool, :with_registry, - :with_shape_log_collector + :with_shape_log_collector, + :with_noop_publication_manager ] setup ctx do with_shape_cache( Map.put(ctx, :inspector, @stub_inspector), - create_snapshot_fn: fn _, _, _, _, _, _, _ -> nil end, - prepare_tables_fn: @prepare_tables_noop + create_snapshot_fn: fn _, _, _, _, _, _, _ -> nil end ) end @@ -98,14 +97,14 @@ defmodule Electric.ShapeCacheTest do :with_in_memory_storage, :with_log_chunking, :with_registry, - :with_shape_log_collector + :with_shape_log_collector, + :with_noop_publication_manager ] test "creates initial snapshot if one doesn't exist", %{storage: storage} = ctx do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10}) Storage.make_new_snapshot!([["test"]], storage) @@ -124,12 +123,18 @@ defmodule Electric.ShapeCacheTest do test "triggers table prep and snapshot creation only once", ctx do test_pid = self() + defmodule TempPubManager do + def add_shape(_, opts) do + send(opts[:test_pid], {:called, :prepare_tables_fn}) + end + + def refresh_publication(_), do: :ok + end + %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: fn nil, [{{"public", "items"}, nil}] -> - send(test_pid, {:called, :prepare_tables_fn}) - end, + publication_manager: {TempPubManager, [test_pid: test_pid]}, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> send(test_pid, {:called, :create_snapshot_fn}) GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10}) @@ -157,7 +162,6 @@ defmodule Electric.ShapeCacheTest do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> send(test_pid, {:called, :create_snapshot_fn}) GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10}) @@ -211,6 +215,7 @@ defmodule Electric.ShapeCacheTest do :with_basic_tables, :with_inspector, :with_shape_log_collector, + :with_publication_manager, :with_shape_cache ] @@ -370,7 +375,14 @@ defmodule Electric.ShapeCacheTest do end test "correctly propagates the error", %{shape_cache_opts: opts} do - shape = %Shape{root_table: {"public", "nonexistent"}, root_table_id: 2} + shape = %Shape{ + @shape + | root_table: {"public", "nonexistent"}, + root_table_id: 2, + table_info: %{ + {"public", "nonexistent"} => Map.fetch!(@shape.table_info, @shape.root_table) + } + } {shape_handle, log} = with_log(fn -> @@ -395,14 +407,14 @@ defmodule Electric.ShapeCacheTest do :with_in_memory_storage, :with_log_chunking, :with_registry, - :with_shape_log_collector + :with_shape_log_collector, + :with_noop_publication_manager ] test "returns empty list initially", ctx do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), - run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop + run_with_conn_fn: &run_with_conn_noop/2 ) meta_table = Access.fetch!(opts, :shape_meta_table) @@ -414,7 +426,6 @@ defmodule Electric.ShapeCacheTest do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10}) Storage.make_new_snapshot!([["test"]], storage) @@ -435,7 +446,6 @@ defmodule Electric.ShapeCacheTest do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> ref = make_ref() send(test_pid, {:waiting_point, ref, self()}) @@ -467,14 +477,14 @@ defmodule Electric.ShapeCacheTest do :with_in_memory_storage, :with_log_chunking, :with_registry, - :with_shape_log_collector + :with_shape_log_collector, + :with_noop_publication_manager ] test "returns true for known shape handle", ctx do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _, _, _, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 100}) GenServer.cast(parent, {:snapshot_started, shape_handle}) @@ -490,7 +500,6 @@ defmodule Electric.ShapeCacheTest do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _, _, _, _, _ -> Process.sleep(100) GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 100}) @@ -509,14 +518,14 @@ defmodule Electric.ShapeCacheTest do :with_in_memory_storage, :with_log_chunking, :with_registry, - :with_shape_log_collector + :with_shape_log_collector, + :with_noop_publication_manager ] test "returns :started for snapshots that have started", ctx do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _, _, _, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 100}) GenServer.cast(parent, {:snapshot_started, shape_handle}) @@ -536,7 +545,6 @@ defmodule Electric.ShapeCacheTest do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10}) Storage.make_new_snapshot!([["test"]], storage) @@ -555,7 +563,6 @@ defmodule Electric.ShapeCacheTest do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> ref = make_ref() send(test_pid, {:waiting_point, ref, self()}) @@ -611,7 +618,6 @@ defmodule Electric.ShapeCacheTest do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10}) GenServer.cast(parent, {:snapshot_started, shape_handle}) @@ -649,7 +655,6 @@ defmodule Electric.ShapeCacheTest do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _shape, _, _storage, _, _ -> ref = make_ref() send(test_pid, {:waiting_point, ref, self()}) @@ -685,14 +690,14 @@ defmodule Electric.ShapeCacheTest do :with_in_memory_storage, :with_log_chunking, :with_registry, - :with_shape_log_collector + :with_shape_log_collector, + :with_noop_publication_manager ] test "cleans up shape data and rotates the shape handle", ctx do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10}) Storage.make_new_snapshot!([["test"]], storage) @@ -749,7 +754,6 @@ defmodule Electric.ShapeCacheTest do %{shape_cache_opts: opts} = with_shape_cache(Map.merge(ctx, %{pool: nil, inspector: @stub_inspector}), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10}) Storage.make_new_snapshot!([["test"]], storage) @@ -780,6 +784,7 @@ defmodule Electric.ShapeCacheTest do :with_log_chunking, :with_registry, :with_shape_log_collector, + :with_noop_publication_manager, :with_no_pool ] @@ -787,7 +792,6 @@ defmodule Electric.ShapeCacheTest do do: with_shape_cache(Map.put(ctx, :inspector, @stub_inspector), run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, @snapshot_xmin}) Storage.make_new_snapshot!([["test"]], storage) @@ -819,6 +823,24 @@ defmodule Electric.ShapeCacheTest do {:ok, @snapshot_xmin} = ShapeStatus.snapshot_xmin(meta_table, shape_handle) end + test "restores publication filters", %{shape_cache_opts: opts} = context do + {shape_handle1, _} = ShapeCache.get_or_create_shape_handle(@shape, opts) + :started = ShapeCache.await_snapshot_start(shape_handle1, opts) + + Mock.PublicationManager + |> expect(:recover_shape, 1, fn _, _ -> :ok end) + |> expect(:refresh_publication, 1, fn _ -> :ok end) + |> allow(self(), fn -> Process.whereis(opts[:server]) end) + + restart_shape_cache(%{ + context + | publication_manager: {Mock.PublicationManager, []} + }) + + {shape_handle2, _} = ShapeCache.get_or_create_shape_handle(@shape, opts) + assert shape_handle1 == shape_handle2 + end + test "restores latest offset", %{shape_cache_opts: opts} = context do offset = @change_offset {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape, opts) @@ -861,7 +883,6 @@ defmodule Electric.ShapeCacheTest do with_cub_db_storage(context) with_shape_cache(Map.put(context, :inspector, @stub_inspector), - prepare_tables_fn: @prepare_tables_noop, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, @snapshot_xmin}) Storage.make_new_snapshot!([["test"]], storage) @@ -907,8 +928,6 @@ defmodule Electric.ShapeCacheTest do end end - def prepare_tables_noop(_, _), do: :ok - def run_with_conn_noop(conn, cb), do: cb.(conn) defp stream_to_list(stream) do diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index abade9c501..9f83d43031 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -73,8 +73,6 @@ defmodule Electric.Shapes.ConsumerTest do Lsn.from_integer(offset) end - defp prepare_tables_fn(_pool, _affected_tables), do: :ok - defp run_with_conn_noop(conn, cb), do: cb.(conn) describe "event handling" do @@ -146,11 +144,11 @@ defmodule Electric.Shapes.ConsumerTest do registry: registry_name, shape_cache: {Mock.ShapeCache, []}, shape_status: {Mock.ShapeStatus, []}, + publication_manager: {Mock.PublicationManager, []}, storage: storage, chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(), - run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: &prepare_tables_fn/2}, + run_with_conn_fn: &run_with_conn_noop/2}, id: {Shapes.ConsumerSupervisor, shape_handle} ) @@ -314,11 +312,14 @@ defmodule Electric.Shapes.ConsumerTest do last_log_offset = LogOffset.new(lsn, 0) Mock.ShapeStatus - |> expect(:remove_shape, fn _, @shape_handle1 -> :ok end) - |> allow( - self(), - Shapes.Consumer.name(ctx.stack_id, @shape_handle1) - ) + |> expect(:remove_shape, 1, fn _, @shape_handle1 -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + + shape1 = ctx.shapes[@shape_handle1] + + Mock.PublicationManager + |> expect(:remove_shape, 1, fn ^shape1, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) txn = %Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset} @@ -373,6 +374,15 @@ defmodule Electric.Shapes.ConsumerTest do Shapes.Consumer.name(ctx.stack_id, @shape_handle1) ) + shape = ctx.shapes[@shape_handle1] + + Mock.PublicationManager + |> expect(:remove_shape, 1, fn ^shape, _ -> :ok end) + |> allow( + self(), + Shapes.Consumer.name(ctx.stack_id, @shape_handle1) + ) + txn = %Transaction{ xid: xid, @@ -512,6 +522,15 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:remove_shape, 0, fn _, _ -> :ok end) |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + shape1 = ctx.shapes[@shape_handle1] + shape2 = ctx.shapes[@shape_handle2] + + Mock.PublicationManager + |> expect(:remove_shape, 1, fn ^shape1, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> expect(:remove_shape, 0, fn ^shape2, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + assert :ok = ShapeLogCollector.handle_relation_msg(rel, ctx.producer) assert_receive {:DOWN, ^ref1, :process, _, _} @@ -553,6 +572,15 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:remove_shape, 0, fn _, _ -> :ok end) |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + shape1 = ctx.shapes[@shape_handle1] + shape2 = ctx.shapes[@shape_handle2] + + Mock.PublicationManager + |> expect(:remove_shape, 1, fn ^shape1, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> expect(:remove_shape, 0, fn ^shape2, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + :ok = ShapeLogCollector.store_transaction(txn, ctx.producer) assert_receive {Support.TestStorage, :cleanup!, @shape_handle1} @@ -575,6 +603,15 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:remove_shape, 0, fn _, _ -> :ok end) |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + shape1 = ctx.shapes[@shape_handle1] + shape2 = ctx.shapes[@shape_handle2] + + Mock.PublicationManager + |> expect(:remove_shape, 1, fn ^shape1, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> expect(:remove_shape, 0, fn ^shape2, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + GenServer.cast(Consumer.name(ctx.stack_id, @shape_handle1), :unexpected_cast) assert_receive {Support.TestStorage, :cleanup!, @shape_handle1} @@ -592,6 +629,12 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:remove_shape, 0, fn _, _ -> :ok end) |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + shape1 = ctx.shapes[@shape_handle1] + + Mock.PublicationManager + |> expect(:remove_shape, 0, fn ^shape1, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + GenServer.stop(Consumer.name(ctx.stack_id, @shape_handle1)) refute_receive {Support.TestStorage, :cleanup!, @shape_handle1} @@ -610,7 +653,8 @@ defmodule Electric.Shapes.ConsumerTest do {Support.ComponentSetup, :with_registry}, {Support.ComponentSetup, :with_cub_db_storage}, {Support.ComponentSetup, :with_log_chunking}, - {Support.ComponentSetup, :with_shape_log_collector} + {Support.ComponentSetup, :with_shape_log_collector}, + {Support.ComponentSetup, :with_noop_publication_manager} ] setup(ctx) do @@ -624,7 +668,6 @@ defmodule Electric.Shapes.ConsumerTest do }), log_producer: ctx.shape_log_collector, run_with_conn_fn: &run_with_conn_noop/2, - prepare_tables_fn: fn _, _ -> :ok end, create_snapshot_fn: fn parent, shape_handle, _shape, _, storage, _, _ -> if is_integer(snapshot_delay), do: Process.sleep(snapshot_delay) GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, 10}) diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 7dc47a28c4..9483e80f17 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -9,6 +9,15 @@ defmodule Support.ComponentSetup do alias Electric.ShapeCache.InMemoryStorage alias Electric.Postgres.Inspector.EtsInspector + defmodule NoopPublicationManager do + @behaviour Electric.Replication.PublicationManager + def name(_), do: :pub_man + def add_shape(_shape, _opts), do: :ok + def recover_shape(_shape, _opts), do: :ok + def remove_shape(_shape, _opts), do: :ok + def refresh_publication(_opts), do: :ok + end + def with_stack_id_from_test(ctx) do stack_id = full_test_name(ctx) registry = start_link_supervised!({Electric.ProcessRegistry, stack_id: stack_id}) @@ -55,10 +64,38 @@ defmodule Support.ComponentSetup do %{chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold()} end + def with_publication_manager(ctx) do + server = :"publication_manager_#{full_test_name(ctx)}" + + {:ok, _} = + Electric.Replication.PublicationManager.start_link( + name: server, + stack_id: ctx.stack_id, + publication_name: ctx.publication_name, + update_debounce_timeout: Access.get(ctx, :update_debounce_timeout, 0), + db_pool: ctx.pool, + pg_version: Access.get(ctx, :pg_version, nil), + configure_tables_for_replication_fn: + Access.get( + ctx, + :configure_tables_for_replication_fn, + &Electric.Postgres.Configuration.configure_tables_for_replication!/4 + ) + ) + + %{ + publication_manager: + {Electric.Replication.PublicationManager, stack_id: ctx.stack_id, server: server} + } + end + + def with_noop_publication_manager(_ctx) do + %{publication_manager: {NoopPublicationManager, []}} + end + def with_shape_cache(ctx, additional_opts \\ []) do server = :"shape_cache_#{full_test_name(ctx)}" consumer_supervisor = :"consumer_supervisor_#{full_test_name(ctx)}" - get_pg_version = fn -> Application.fetch_env!(:electric, :pg_version_for_tests) end start_opts = [ @@ -66,6 +103,7 @@ defmodule Support.ComponentSetup do stack_id: ctx.stack_id, inspector: ctx.inspector, storage: ctx.storage, + publication_manager: ctx.publication_manager, chunk_bytes_threshold: ctx.chunk_bytes_threshold, db_pool: ctx.pool, registry: ctx.registry, @@ -73,13 +111,6 @@ defmodule Support.ComponentSetup do consumer_supervisor: consumer_supervisor ] |> Keyword.merge(additional_opts) - |> Keyword.put_new_lazy(:prepare_tables_fn, fn -> - { - Electric.Postgres.Configuration, - :configure_tables_for_replication!, - [get_pg_version, ctx.publication_name] - } - end) {:ok, _pid} = Electric.Shapes.DynamicConsumerSupervisor.start_link( diff --git a/packages/sync-service/test/support/db_setup.ex b/packages/sync-service/test/support/db_setup.ex index 5bee120ce4..1493ba6ac9 100644 --- a/packages/sync-service/test/support/db_setup.ex +++ b/packages/sync-service/test/support/db_setup.ex @@ -52,7 +52,7 @@ defmodule Support.DbSetup do %{rows: [[pg_version]]} = Postgrex.query!(ctx.db_conn, "SELECT current_setting('server_version_num')::integer", []) - {:ok, %{get_pg_version: fn -> pg_version end}} + {:ok, %{pg_version: pg_version}} end def with_shared_db(_ctx) do diff --git a/packages/sync-service/test/support/mocks.ex b/packages/sync-service/test/support/mocks.ex index 8899b38ec3..96ca0530d8 100644 --- a/packages/sync-service/test/support/mocks.ex +++ b/packages/sync-service/test/support/mocks.ex @@ -4,4 +4,8 @@ defmodule Support.Mock do Mox.defmock(Support.Mock.Inspector, for: Electric.Postgres.Inspector) Mox.defmock(Support.Mock.ShapeStatus, for: Electric.ShapeCache.ShapeStatusBehaviour) Mox.defmock(Support.Mock.PersistentKV, for: Electric.PersistentKV) + + Mox.defmock(Support.Mock.PublicationManager, + for: Electric.Replication.PublicationManager + ) end diff --git a/packages/typescript-client/test/client.test.ts b/packages/typescript-client/test/client.test.ts index bef73bb4bf..d227fb1669 100644 --- a/packages/typescript-client/test/client.test.ts +++ b/packages/typescript-client/test/client.test.ts @@ -452,9 +452,14 @@ describe(`Shape`, () => { }) it(`should support async error handler`, async ({ issuesTableUrl }) => { + let authChanged: () => void + const authChangePromise = new Promise((res) => { + authChanged = res + }) const mockErrorHandler = vi.fn().mockImplementation(async (error) => { if (error instanceof FetchError && error.status === 401) { await sleep(200) + authChanged() return { headers: { Authorization: `valid credentials`, @@ -489,6 +494,7 @@ describe(`Shape`, () => { expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError) expect(shapeStream.isConnected()).toBe(false) + await authChangePromise await sleep(200) // give some time for the error handler to modify the authorization header expect(shapeStream.isConnected()).toBe(true) })