diff --git a/packages/sync-service/lib/electric/plug/utils.ex b/packages/sync-service/lib/electric/plug/utils.ex index 9dd7d1104a..c36d032baa 100644 --- a/packages/sync-service/lib/electric/plug/utils.ex +++ b/packages/sync-service/lib/electric/plug/utils.ex @@ -127,8 +127,7 @@ defmodule Electric.Plug.Utils do stack_ready_timeout = Access.get(conn.assigns.config, :stack_ready_timeout, 5_000) stack_events_registry = conn.assigns.config[:stack_events_registry] - ref = make_ref() - Electric.StackSupervisor.subscribe_to_stack_events(stack_events_registry, stack_id, ref) + ref = Electric.StackSupervisor.subscribe_to_stack_events(stack_events_registry, stack_id) if Electric.ProcessRegistry.alive?(stack_id, Electric.Replication.Supervisor) do conn diff --git a/packages/sync-service/lib/electric/postgres/configuration.ex b/packages/sync-service/lib/electric/postgres/configuration.ex index c14fae16d8..f1494af940 100644 --- a/packages/sync-service/lib/electric/postgres/configuration.ex +++ b/packages/sync-service/lib/electric/postgres/configuration.ex @@ -211,7 +211,7 @@ defmodule Electric.Postgres.Configuration do FROM input_relations ir JOIN pg_class pc ON pc.relname = ir.tablename JOIN pg_namespace pn ON pn.oid = pc.relnamespace - WHERE pn.nspname = ir.schemaname AND pc.relkind = 'r'; + WHERE pn.nspname = ir.schemaname AND pc.relkind IN ('r', 'p'); """ relations = Map.keys(filters) diff --git a/packages/sync-service/lib/electric/postgres/inspector.ex b/packages/sync-service/lib/electric/postgres/inspector.ex index b2924579fa..a9a6ec5255 100644 --- a/packages/sync-service/lib/electric/postgres/inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector.ex @@ -1,7 +1,9 @@ defmodule Electric.Postgres.Inspector do alias Electric.Replication.Eval.Parser + @type relation :: Electric.relation() @type relation_id :: Electric.relation_id() + @type relation_kind :: :ordinary_table | :partitioned_table @type column_info :: %{ name: String.t(), @@ -17,15 +19,19 @@ defmodule Electric.Postgres.Inspector do @type relation_info :: %{ relation_id: relation_id(), - relation: relation() + relation: relation(), + kind: relation_kind(), + parent: nil | relation(), + children: nil | [relation(), ...] } - @callback load_relation(String.t(), opts :: term()) :: + @callback load_relation(String.t() | relation(), opts :: term()) :: {:ok, relation_info()} | {:error, String.t()} @callback load_column_info(relation(), opts :: term()) :: {:ok, [column_info()]} | :table_not_found + # @callback introspect_relation() @callback clean(relation(), opts :: term()) :: true @type inspector :: {module(), opts :: term()} @@ -40,9 +46,14 @@ defmodule Electric.Postgres.Inspector do `"Users"` would return `{"public", "Users"}`, `some_schema.users` would return `{"some_schema", "users"}`. """ - @spec load_relation(String.t(), inspector()) :: {:ok, relation_info()} | {:error, String.t()} - def load_relation(table, {module, opts}), - do: module.load_relation(table, opts) + def load_relation(%{schema: schema, table: table}, inspector), + do: load_relation({schema, table}, inspector) + + @spec load_relation(String.t() | relation(), inspector()) :: + {:ok, relation_info()} | {:error, String.t()} + def load_relation(table, {module, opts}) do + module.load_relation(table, opts) + end @doc """ Load column information about a given table using a provided inspector. @@ -55,6 +66,7 @@ defmodule Electric.Postgres.Inspector do @doc """ Clean up all information about a given relation using a provided inspector. """ + @spec clean(relation(), inspector()) :: true def clean(relation, {module, opts}), do: module.clean(relation, opts) diff --git a/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex index a16b99018e..2030376672 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/direct_inspector.ex @@ -4,30 +4,79 @@ defmodule Electric.Postgres.Inspector.DirectInspector do @doc """ Returns the PG relation from the table name. """ - def load_relation(table, conn) do + def load_relation(table, conn) when is_binary(table) do # The extra cast from $1 to text is needed because of Postgrex' OID type encoding # see: https://github.com/elixir-ecto/postgrex#oid-type-encoding - query = """ - SELECT nspname, relname, pg_class.oid - FROM pg_class - JOIN pg_namespace ON relnamespace = pg_namespace.oid - WHERE - relkind = 'r' AND - pg_class.oid = $1::text::regclass - """ + query = load_relation_query("$1::text::regclass") + do_load_relation(conn, query, [table]) + end + + def load_relation({schema, name}, conn) when is_binary(schema) and is_binary(name) do + query = load_relation_query("format('%I.%I', $1::text, $2::text)::regclass") + do_load_relation(conn, query, [schema, name]) + end - case Postgrex.query(conn, query, [table]) do + defp do_load_relation(conn, query, params) do + case Postgrex.query(conn, query, params) do {:ok, result} -> # We expect exactly one row because the query didn't fail # so the relation exists since we could cast it to a regclass - [[schema, table, oid]] = result.rows - {:ok, %{relation_id: oid, relation: {schema, table}}} + [[schema, table, oid, kind, parent, children]] = result.rows + + {:ok, + %{ + relation_id: oid, + relation: {schema, table}, + kind: resolve_kind(kind), + parent: map_relations(parent), + children: map_relations(children) + }} {:error, err} -> {:error, Exception.message(err)} end end + defp load_relation_query(match) do + # partitions can live in other namespaces from the parent/root table, so we + # need to keep track of them + [ + """ + SELECT pn.nspname, pc.relname, pc.oid, pc.relkind, pi_parent.parent, pi_children.children + FROM pg_catalog.pg_class pc + JOIN pg_catalog.pg_namespace pn ON pc.relnamespace = pn.oid + LEFT OUTER JOIN ( -- get schema and name of parent table (if any) + SELECT pi.inhrelid, ARRAY[pn.nspname, pc.relname] parent + FROM pg_catalog.pg_inherits pi + JOIN pg_catalog.pg_class pc ON pi.inhparent = pc.oid + JOIN pg_catalog.pg_namespace pn ON pc.relnamespace = pn.oid + ) pi_parent ON pc.oid = pi_parent.inhrelid + LEFT OUTER JOIN ( -- get list of child partitions (if any) + SELECT pi.inhparent, ARRAY_AGG(ARRAY[pn.nspname, pc.relname]) AS children + FROM pg_catalog.pg_inherits pi + JOIN pg_catalog.pg_class pc ON pi.inhrelid = pc.oid + JOIN pg_catalog.pg_namespace pn ON pc.relnamespace = pn.oid + GROUP BY pi.inhparent + ) pi_children ON pc.oid = pi_children.inhparent + WHERE + pc.relkind IN ('r', 'p') AND + """, + "pc.oid = ", + match + ] + end + + defp resolve_kind("r"), do: :ordinary_table + defp resolve_kind("p"), do: :partitioned_table + + defp map_relations(nil), do: nil + + defp map_relations([schema, name]) when is_binary(schema) and is_binary(name), + do: {schema, name} + + defp map_relations(relations) when is_list(relations), + do: Enum.map(relations, &map_relations/1) + @doc """ Load table information (refs) from the database """ @@ -49,7 +98,7 @@ defmodule Electric.Postgres.Inspector.DirectInspector do JOIN pg_type ON atttypid = pg_type.oid LEFT JOIN pg_index ON indrelid = pg_class.oid AND indisprimary LEFT JOIN pg_type AS elem_pg_type ON pg_type.typelem = elem_pg_type.oid - WHERE relname = $1 AND nspname = $2 AND relkind = 'r' + WHERE relname = $1 AND nspname = $2 AND relkind IN ('r', 'p') ORDER BY pg_class.oid, attnum """ diff --git a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex index 270c6dec8e..5695457dec 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex @@ -1,6 +1,8 @@ defmodule Electric.Postgres.Inspector.EtsInspector do - alias Electric.Postgres.Inspector.DirectInspector use GenServer + + alias Electric.Postgres.Inspector.DirectInspector + @behaviour Electric.Postgres.Inspector ## Public API @@ -117,7 +119,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do {:error, err} -> {:reply, {:error, err}, state} - {:ok, relation} -> + {:ok, %{relation: relation} = info} -> # We keep the mapping in both directions: # - Forward: user-provided table name -> PG relation (many-to-one) # e.g. `~s|users|` -> `{"public", "users"}` @@ -127,9 +129,11 @@ defmodule Electric.Postgres.Inspector.EtsInspector do # # The forward direction allows for efficient lookup (based on user-provided table name) # the backward direction allows for efficient cleanup (based on PG relation) - :ets.insert(state.pg_info_table, {{table, :table_to_relation}, relation}) - :ets.insert(state.pg_relation_table, {{relation, :relation_to_table}, table}) - {:reply, {:ok, relation}, state} + :ets.insert(state.pg_info_table, {{table, :table_to_relation}, info}) + :ets.insert(state.pg_info_table, {{relation, :table_to_relation}, info}) + :ets.insert(state.pg_relation_table, {{info, :relation_to_table}, table}) + :ets.insert(state.pg_relation_table, {{info, :relation_to_table}, relation}) + {:reply, {:ok, info}, state} end relation -> @@ -159,12 +163,26 @@ defmodule Electric.Postgres.Inspector.EtsInspector do end @pg_rel_position 2 - defp relation_from_ets(table, opts_or_state) do + defp relation_from_ets(table, opts_or_state) when is_binary(table) do ets_table = get_column_info_table(opts_or_state) :ets.lookup_element(ets_table, {table, :table_to_relation}, @pg_rel_position, :not_found) end + defp relation_from_ets({_schema, _name} = relation, opts_or_state) do + ets_table = get_column_info_table(opts_or_state) + + with info when is_map(info) <- + :ets.lookup_element( + ets_table, + {relation, :table_to_relation}, + @pg_rel_position, + :not_found + ) do + info + end + end + @pg_table_idx 1 defp tables_from_ets(relation, opts_or_state) do ets_table = get_relation_table(opts_or_state) diff --git a/packages/sync-service/lib/electric/replication/shape_log_collector.ex b/packages/sync-service/lib/electric/replication/shape_log_collector.ex index 8964c18e37..4f0e6f3a6b 100644 --- a/packages/sync-service/lib/electric/replication/shape_log_collector.ex +++ b/packages/sync-service/lib/electric/replication/shape_log_collector.ex @@ -58,7 +58,8 @@ defmodule Electric.Replication.ShapeLogCollector do state = Map.merge(opts, %{producer: nil, subscriptions: {0, MapSet.new()}}) # start in demand: :accumulate mode so that the ShapeCache is able to start # all active consumers before we start sending transactions - {:producer, state, dispatcher: Electric.Shapes.Dispatcher, demand: opts.demand} + {:producer, state, + dispatcher: {Electric.Shapes.Dispatcher, inspector: state.inspector}, demand: opts.demand} end def handle_subscribe(:consumer, _opts, from, state) do @@ -148,14 +149,43 @@ defmodule Electric.Replication.ShapeLogCollector do OpenTelemetry.add_span_attributes("rel.is_dropped": true) + reload_partitioned_table(rel, state) + {:reply, :ok, [], state} end defp handle_relation(rel, from, state) do OpenTelemetry.add_span_attributes("rel.is_dropped": false) + reload_partitioned_table(rel, state) {:noreply, [rel], %{state | producer: from}} end + defp reload_partitioned_table(rel, state) do + case Inspector.load_relation(rel, state.inspector) do + {:ok, %{parent: nil}} -> + :ok + + {:ok, %{parent: {_, _} = parent}} -> + # probably a new partition for an existing partitioned table + # so force a reload of the relation info + + # TODO: we should probabaly have a way to clean the inspector cache + # just based on the relation, there's a chance that this results in + # a query to pg just to then drop the info + with {:ok, info} <- Inspector.load_relation(parent, state.inspector) do + Inspector.clean(info, state.inspector) + end + + {:ok, _} -> + # probably a malformed value from a test inspector + :ok + + {:error, _} -> + # just ignore errors here, they're unlikely anyway + :ok + end + end + defp remove_subscription(from, %{subscriptions: {count, set}} = state) do subscriptions = if MapSet.member?(set, from) do diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 985bfb78dc..17633c0668 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -3,11 +3,12 @@ defmodule Electric.Shapes.Consumer do restart: :temporary, significant: true - alias Electric.ShapeCache.LogChunker alias Electric.LogItems + alias Electric.Postgres.Inspector alias Electric.Replication.Changes alias Electric.Replication.Changes.Transaction alias Electric.ShapeCache + alias Electric.ShapeCache.LogChunker alias Electric.Shapes.Shape alias Electric.Telemetry.OpenTelemetry alias Electric.Utils @@ -182,25 +183,51 @@ defmodule Electric.Shapes.Consumer do # `Shapes.Dispatcher` only works with single-events, so we can safely assert # that here - def handle_events([%Changes.Relation{}], _from, state) do - %{shape: %{root_table: root_table}, inspector: {inspector, inspector_opts}} = state + def handle_events([%Changes.Relation{} = relation], _from, state) do + %{shape: %{root_table: root_table} = shape, inspector: inspector} = state + + # we now recelve relation messages from partitions, as well as ones + # affecting our root table so we need to be clear what we're getting -- if + # the relation message refers to our root table then we need to drop the + # shape as something has changed. if the relation is a new partition, so + # it's parent is our root table, then we need to just add that partition to + # our shape so txns from the new partition are properly mapped to our root + # table. + if relation.id == shape.root_table_id do + Logger.info( + "Schema for the table #{Utils.inspect_relation(root_table)} changed - terminating shape #{state.shape_handle}" + ) - Logger.info( - "Schema for the table #{Utils.inspect_relation(root_table)} changed - terminating shape #{state.shape_handle}" - ) + # We clean up the relation info from ETS as it has changed and we want + # to source the fresh info from postgres for the next shape creation + Inspector.clean(root_table, inspector) - # We clean up the relation info from ETS as it has changed and we want - # to source the fresh info from postgres for the next shape creation - inspector.clean(root_table, inspector_opts) + state = + reply_to_snapshot_waiters( + {:error, "Shape relation changed before snapshot was ready"}, + state + ) - state = - reply_to_snapshot_waiters( - {:error, "Shape relation changed before snapshot was ready"}, - state + cleanup(state) + + {:stop, :normal, state} + else + # if we're receiving this relation message but the relation doesn't refer + # to the root table for the shape, then it **must** be because of the addition of a partition + # to the root table + + {:ok, %{parent: ^root_table, relation: table}} = + Inspector.load_relation({relation.schema, relation.table}, inspector) + + # a new partition has been added + Logger.info( + "New partition #{Utils.inspect_relation(table)} for table #{Utils.inspect_relation(root_table)}" ) - cleanup(state) - {:stop, :normal, state} + shape = Shape.add_partition(shape, root_table, table) + + {:noreply, [], %{state | shape: shape}} + end end # Buffer incoming transactions until we know our xmin diff --git a/packages/sync-service/lib/electric/shapes/dispatcher.ex b/packages/sync-service/lib/electric/shapes/dispatcher.ex index 65743dc6d1..9e4d83b231 100644 --- a/packages/sync-service/lib/electric/shapes/dispatcher.ex +++ b/packages/sync-service/lib/electric/shapes/dispatcher.ex @@ -22,6 +22,7 @@ defmodule Electric.Shapes.Dispatcher do """ require Logger + alias Electric.Shapes.Filter defmodule State do @@ -32,13 +33,13 @@ defmodule Electric.Shapes.Dispatcher do @impl GenStage.Dispatcher - def init(_opts) do + def init(opts) do {:ok, %State{ waiting: 0, pending: nil, subscribers: [], - filter: Filter.new(), + filter: Filter.new(opts), pids: MapSet.new() }} end @@ -141,9 +142,14 @@ defmodule Electric.Shapes.Dispatcher do end def dispatch([event], _length, %State{waiting: 0, subscribers: subscribers} = state) do + {filter, affected_shapes} = + Filter.affected_shapes( + state.filter, + event + ) + {waiting, pending} = - state.filter - |> Filter.affected_shapes(event) + affected_shapes |> Enum.reduce({0, MapSet.new()}, fn {pid, ref} = subscriber, {waiting, pending} -> Process.send(pid, {:"$gen_consumer", {self(), ref}, [event]}, [:noconnect]) {waiting + 1, MapSet.put(pending, subscriber)} @@ -160,7 +166,7 @@ defmodule Electric.Shapes.Dispatcher do {waiting, pending} end - {:ok, [], %State{state | waiting: waiting, pending: pending}} + {:ok, [], %State{state | filter: filter, waiting: waiting, pending: pending}} end @impl GenStage.Dispatcher diff --git a/packages/sync-service/lib/electric/shapes/filter.ex b/packages/sync-service/lib/electric/shapes/filter.ex index e43de40e18..7f7be575b2 100644 --- a/packages/sync-service/lib/electric/shapes/filter.ex +++ b/packages/sync-service/lib/electric/shapes/filter.ex @@ -10,6 +10,7 @@ defmodule Electric.Shapes.Filter do the table specific logic to the `Filter.Table` module. """ + alias Electric.Postgres.Inspector alias Electric.Replication.Changes alias Electric.Replication.Changes.DeletedRecord alias Electric.Replication.Changes.NewRecord @@ -20,15 +21,20 @@ defmodule Electric.Shapes.Filter do alias Electric.Shapes.Filter alias Electric.Shapes.Filter.Table alias Electric.Shapes.Shape + require Logger - defstruct tables: %{} + @enforce_keys [:inspector] + defstruct [:inspector, tables: %{}, partitions: %{}, partition_ownership: %{}] @type t :: %Filter{} @type shape_id :: any() - @spec new() :: Filter.t() - def new, do: %Filter{} + @spec new(keyword()) :: Filter.t() + def new(opts) do + {:ok, inspector} = Keyword.fetch(opts, :inspector) + %Filter{inspector: inspector} + end @doc """ Add a shape for the filter to track. @@ -37,41 +43,98 @@ defmodule Electric.Shapes.Filter do by `affected_shapes/2` when the shape is affected by a change. """ @spec add_shape(Filter.t(), shape_id(), Shape.t()) :: Filter.t() - def add_shape(%Filter{tables: tables}, shape_id, shape) do - %Filter{ - tables: - Map.update( - tables, - shape.root_table, - Table.add_shape(Table.new(), {shape_id, shape}), - fn table -> - Table.add_shape(table, {shape_id, shape}) + def add_shape(%Filter{} = filter, shape_id, shape) do + case Inspector.load_relation(shape.root_table, filter.inspector) do + {:ok, relation} -> + filter + |> Map.update!(:tables, fn tables -> + Map.update( + tables, + shape.root_table, + Table.add_shape(Table.new(), {shape_id, shape}), + fn table -> + Table.add_shape(table, {shape_id, shape}) + end + ) + end) + |> Map.update!(:partitions, fn partitions -> + case relation do + %{children: [_ | _] = children} -> + Enum.reduce(children, partitions, fn child, partitions -> + Map.put(partitions, child, [shape.root_table]) + end) + + _ -> + partitions end - ) - } + end) + |> Map.update!(:partition_ownership, fn ownership -> + relations = + case relation do + %{children: [_ | _] = children} -> + [shape.root_table | children] + + _ -> + [shape.root_table] + end + + Enum.reduce(relations, ownership, fn relation, relation_ownership -> + Map.update( + relation_ownership, + relation, + MapSet.new([shape_id]), + &MapSet.put(&1, shape_id) + ) + end) + end) + + {:error, _reason} -> + # error will be handled by other parts of the stack, our job is not to + # crash + filter + end end @doc """ Remove a shape from the filter. """ @spec remove_shape(Filter.t(), shape_id()) :: Filter.t() - def remove_shape(%Filter{tables: tables}, shape_id) do - %Filter{ - tables: - tables - |> Enum.map(fn {table_name, table} -> - {table_name, Table.remove_shape(table, shape_id)} - end) - |> Enum.reject(fn {_table, table} -> Table.empty?(table) end) - |> Map.new() - } + def remove_shape(%Filter{} = filter, shape_id) do + Map.update!(filter, :tables, fn tables -> + tables + |> Enum.map(fn {table_name, table} -> + {table_name, Table.remove_shape(table, shape_id)} + end) + |> Enum.reject(fn {_table, table} -> Table.empty?(table) end) + |> Map.new() + end) + |> Map.update!(:partition_ownership, fn ownership -> + Map.new(ownership, fn {relation, shape_ids} -> + {relation, MapSet.delete(shape_ids, shape_id)} + end) + end) + |> clean_up_partitions() + end + + defp clean_up_partitions(filter) do + {empty, full} = + Enum.split_with(filter.partition_ownership, fn {_relation, shape_ids} -> + Enum.empty?(shape_ids) + end) + + remove_relations = Enum.map(empty, &elem(&1, 0)) + + %{filter | partition_ownership: Map.new(full)} + |> Map.update!(:partitions, fn partitions -> + Enum.reduce(remove_relations, partitions, &Map.delete(&2, &1)) + end) end @doc """ Returns the shape IDs for all shapes that have been added to the filter that are affected by the given change. """ - @spec affected_shapes(Filter.t(), Changes.change()) :: MapSet.t(shape_id()) + @spec affected_shapes(Filter.t(), Changes.change()) :: {t(), MapSet.t(shape_id())} def affected_shapes(%Filter{} = filter, change) do shapes_affected_by_change(filter, change) rescue @@ -82,24 +145,43 @@ defmodule Electric.Shapes.Filter do """) # We can't tell which shapes are affected, the safest thing to do is return all shapes - filter - |> all_shapes() - |> MapSet.new(fn {shape_id, _shape} -> shape_id end) + { + filter, + filter + |> all_shapes() + |> MapSet.new(fn {shape_id, _shape} -> shape_id end) + } end defp shapes_affected_by_change(%Filter{} = filter, %Relation{} = relation) do + table = {relation.schema, relation.table} + + filter = update_partitions(filter, table) + # Check all shapes is all tables becuase the table may have been renamed - for {shape_id, shape} <- all_shapes(filter), - Shape.is_affected_by_relation_change?(shape, relation), - into: MapSet.new() do - shape_id - end + affected = + for {shape_id, shape} <- all_shapes(filter), + relation <- [relation | Map.get(filter.partitions, table, [])], + Shape.is_affected_by_relation_change?(shape, relation), + into: MapSet.new() do + shape_id + end + + { + filter, + affected + } end - defp shapes_affected_by_change(%Filter{} = filter, %Transaction{changes: changes}) do - changes - |> Enum.map(&affected_shapes(filter, &1)) - |> Enum.reduce(MapSet.new(), &MapSet.union(&1, &2)) + defp shapes_affected_by_change(%Filter{} = filter, %Transaction{} = tx) do + %{changes: changes} = tx + + { + filter, + changes + |> Enum.map(&affected_shapes(filter, &1)) + |> Enum.reduce(MapSet.new(), &MapSet.union(&1, &2)) + } end defp shapes_affected_by_change(%Filter{} = filter, %NewRecord{ @@ -130,11 +212,15 @@ defmodule Electric.Shapes.Filter do end end - defp shapes_affected_by_record(filter, table_name, record) do - case Map.get(filter.tables, table_name) do - nil -> MapSet.new() - table -> Table.affected_shapes(table, record) - end + defp shapes_affected_by_record(filter, relation, record) do + relations = [relation | Map.get(filter.partitions, relation, [])] + + Enum.reduce(relations, MapSet.new(), fn relation, affected -> + case Map.get(filter.tables, relation) do + nil -> affected + table -> MapSet.union(affected, Table.affected_shapes(table, record)) + end + end) end defp all_shapes(%Filter{} = filter) do @@ -151,4 +237,14 @@ defmodule Electric.Shapes.Filter do table -> Table.all_shapes(table) end end + + defp update_partitions(filter, relation) do + case Inspector.load_relation(relation, filter.inspector) do + {:ok, %{parent: {_, _} = parent}} -> + Map.update!(filter, :partitions, &Map.put(&1, relation, [parent])) + + _ -> + filter + end + end end diff --git a/packages/sync-service/lib/electric/shapes/shape.ex b/packages/sync-service/lib/electric/shapes/shape.ex index 6cda57f408..e14b967c9c 100644 --- a/packages/sync-service/lib/electric/shapes/shape.ex +++ b/packages/sync-service/lib/electric/shapes/shape.ex @@ -15,7 +15,8 @@ defmodule Electric.Shapes.Shape do :table_info, :where, :selected_columns, - replica: :default + replica: :default, + partitions: %{} ] @type replica() :: :full | :default @@ -29,6 +30,7 @@ defmodule Electric.Shapes.Shape do table_info: %{ Electric.relation() => table_info() }, + partitions: %{Electric.relation() => Electric.relation()}, where: Electric.Replication.Eval.Expr.t() | nil, selected_columns: [String.t(), ...] | nil, replica: replica() @@ -79,17 +81,21 @@ defmodule Electric.Shapes.Shape do def new(table, opts) do with {:ok, opts} <- NimbleOptions.validate(opts, @shape_schema), inspector <- Access.fetch!(opts, :inspector), - {:ok, %{relation: table, relation_id: relation_id}} <- validate_table(table, inspector), + {:ok, relation} <- validate_table(table, inspector), + %{relation: table, relation_id: relation_id} <- relation, {:ok, column_info, pk_cols} <- load_column_info(table, inspector), {:ok, selected_columns} <- validate_selected_columns(column_info, pk_cols, Access.get(opts, :columns)), refs = Inspector.columns_to_expr(column_info), {:ok, where} <- maybe_parse_where_clause(Access.get(opts, :where), refs) do + children = relation |> Map.get(:children, []) |> List.wrap() + {:ok, %__MODULE__{ root_table: table, root_table_id: relation_id, table_info: %{table => %{pk: pk_cols, columns: column_info}}, + partitions: Map.new(children, &{&1, table}), where: where, selected_columns: selected_columns, replica: Access.get(opts, :replica, :default) @@ -188,7 +194,17 @@ defmodule Electric.Shapes.Shape do List tables that are a part of this shape. """ @spec affected_tables(t()) :: [Electric.relation()] - def affected_tables(%__MODULE__{root_table: table}), do: [table] + def affected_tables(%__MODULE__{root_table: table, partitions: partitions}) do + [table | Map.keys(partitions)] + end + + def add_partition( + %__MODULE__{partitions: partitions} = shape, + {_, _} = root, + {_, _} = partition + ) do + %{shape | partitions: Map.put(partitions, partition, root)} + end @doc """ Convert a change to be correctly represented within the shape. @@ -197,9 +213,22 @@ defmodule Electric.Shapes.Shape do Updates, on the other hand, may be converted to an "new record" or a "deleted record" if the previous/new version of the updated row isn't in the shape. """ - def convert_change(%__MODULE__{root_table: table}, %{relation: relation}) - when table != relation, - do: [] + def convert_change(%__MODULE__{root_table: table} = shape, %{relation: relation} = change) + when table != relation do + %{partitions: partitions} = shape + + # if the change has reached here because its an update to a partition child + # on a root table, and the shape is on the root table, then re-write the + # change to come from the shape's root table + case Map.fetch(partitions, relation) do + {:ok, ^table} -> + # This does not re-write the change's key. Is that a problem? + convert_change(shape, %{change | relation: table}) + + _ -> + [] + end + end def convert_change(%__MODULE__{where: nil, selected_columns: nil}, change), do: [change] @@ -286,7 +315,16 @@ defmodule Electric.Shapes.Shape do when old_id !== new_id, do: true - def is_affected_by_relation_change?(_, _), do: false + # the relation in this case is the parent table of a partition and we're + # handling the case where a new partition has been added to an existing + # partitioned table - the new partition arrives as a relation message and is + # handled by the clauses above, but the the link between the new partition + # and the partitioned table is handled with raw relation tuples + def is_affected_by_relation_change?(%__MODULE__{root_table: relation}, {_, _} = relation) do + true + end + + def is_affected_by_relation_change?(_shape, _relation), do: false @spec to_json_safe(t()) :: json_safe() def to_json_safe(%__MODULE__{} = shape) do diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index 71c355a3cd..69104ee773 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -115,8 +115,9 @@ defmodule Electric.StackSupervisor do end end - def subscribe_to_stack_events(registry, stack_id, value) do - Registry.register(registry, {:stack_status, stack_id}, value) + def subscribe_to_stack_events(registry, stack_id, ref \\ make_ref()) do + {:ok, _pid} = Registry.register(registry, {:stack_status, stack_id}, ref) + ref end def dispatch_stack_event(registry, stack_id, event) do @@ -132,7 +133,7 @@ defmodule Electric.StackSupervisor do opts = Map.new(opts) stack_id = opts[:stack_id] - shape_changes_registry_name = :"#{Registry.ShapeChanges}:#{stack_id}" + shape_changes_registry_name = registry_name(stack_id) shape_cache = Access.get( @@ -175,6 +176,10 @@ defmodule Electric.StackSupervisor do {mod, arg |> Keyword.put(:stack_id, stack_id) |> mod.shared_opts()} end + def registry_name(stack_id) do + :"#{Registry.ShapeChanges}:#{stack_id}" + end + @impl true def init(%{stack_id: stack_id} = config) do Process.set_label({:stack_supervisor, stack_id}) @@ -199,7 +204,7 @@ defmodule Electric.StackSupervisor do db_pool = Electric.ProcessRegistry.name(stack_id, Electric.DbPool) - shape_changes_registry_name = :"#{Registry.ShapeChanges}:#{stack_id}" + shape_changes_registry_name = registry_name(stack_id) shape_cache_opts = [ stack_id: stack_id, diff --git a/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs b/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs index da4f608677..9713c614b7 100644 --- a/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs +++ b/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs @@ -18,10 +18,18 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do end test "returns relation from table name", %{opts: opts, table: table} do - assert {:ok, %{relation: ^table, relation_id: _}} = + assert {:ok, %{relation: ^table, relation_id: _, kind: :ordinary_table}} = EtsInspector.load_relation("PuBliC.ItEmS", opts) end + test "can reload the info using the relation", %{opts: opts, table: table} do + assert {:ok, %{relation: ^table, relation_id: _, kind: :ordinary_table}} = + EtsInspector.load_relation("public.items", opts) + + assert {:ok, %{relation: ^table, relation_id: _, kind: :ordinary_table}} = + EtsInspector.load_relation(table, opts) + end + test "returns same value from ETS cache as the original call", %{opts: opts, table: table} do original = EtsInspector.load_relation("PuBliC.ItEmS", opts) from_cache = EtsInspector.load_relation("PuBliC.ItEmS", opts) @@ -57,6 +65,53 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do assert original2 == from_cache2 assert {:ok, %{relation: {"public", "ITEMS"}, relation_id: _}} = original2 end + + @tag with_sql: [ + ~s|CREATE TABLE "just_normal_john" (a INT PRIMARY KEY)| + ] + test "returns blank children and parent for non-partitioned tables", %{ + opts: opts + } do + assert {:ok, %{relation: {"public", "just_normal_john"}, parent: nil, children: nil}} = + EtsInspector.load_relation("public.just_normal_john", opts) + end + + @tag with_sql: [ + ~s|CREATE SCHEMA other|, + ~s|CREATE TABLE "partitioned_items" (a INT, b INT, PRIMARY KEY (a, b)) PARTITION BY RANGE (b)|, + ~s|CREATE TABLE "partitioned_items_100" PARTITION OF "partitioned_items" FOR VALUES FROM (0) TO (99)|, + ~s|CREATE TABLE "partitioned_items_200" PARTITION OF "partitioned_items" FOR VALUES FROM (100) TO (199)|, + ~s|CREATE TABLE other."partitioned_items_300" PARTITION OF "partitioned_items" FOR VALUES FROM (200) TO (299)| + ] + test "returns the partitioned table heirarchy", %{ + opts: opts + } do + partitions = [ + {"public", "partitioned_items_100"}, + {"public", "partitioned_items_200"}, + {"other", "partitioned_items_300"} + ] + + assert {:ok, + %{ + parent: nil, + relation: {"public", "partitioned_items"}, + relation_id: _, + kind: :partitioned_table, + children: ^partitions + }} = EtsInspector.load_relation("public.partitioned_items", opts) + + for {schema, name} = relation <- partitions do + assert {:ok, + %{ + parent: {"public", "partitioned_items"}, + relation: ^relation, + relation_id: _, + kind: :ordinary_table, + children: nil + }} = EtsInspector.load_relation("#{schema}.#{name}", opts) + end + end end describe "clean/2" do @@ -85,7 +140,7 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do # Another table table3 = ~s|"ITEMS"| - assert {:ok, relation} = EtsInspector.load_relation(table1, opts) + assert {:ok, %{relation: rel} = relation} = EtsInspector.load_relation(table1, opts) assert {:ok, ^relation} = EtsInspector.load_relation(table2, opts) assert {:ok, relation2} = EtsInspector.load_relation(table3, opts) assert relation != relation2 @@ -93,11 +148,13 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do # Check that the relations are in the ETS cache assert :ets.lookup(pg_relation_table, {relation, :relation_to_table}) == [ {{relation, :relation_to_table}, "public.items"}, + {{relation, :relation_to_table}, {"public", "items"}}, {{relation, :relation_to_table}, "PUBLIC.ITEMS"} ] assert :ets.lookup(pg_relation_table, {relation2, :relation_to_table}) == [ - {{relation2, :relation_to_table}, ~s|"ITEMS"|} + {{relation2, :relation_to_table}, ~s|"ITEMS"|}, + {{relation2, :relation_to_table}, {"public", "ITEMS"}} ] assert :ets.lookup_element(pg_info_table, {table1, :table_to_relation}, 2, :not_found) == @@ -116,6 +173,8 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do assert :ets.member(pg_relation_table, {relation, :relation_to_table}) == false assert :ets.member(pg_info_table, {table1, :table_to_relation}) == false assert :ets.member(pg_info_table, {table2, :table_to_relation}) == false + # we also remove the info cached under the {schema, name} relation + assert :ets.member(pg_info_table, {rel, :table_to_relation}) == false # relation2 should still be in the cache assert :ets.member(pg_relation_table, {relation2, :relation_to_table}) == true @@ -124,7 +183,7 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do end describe "load_column_info/2" do - setup [:with_inspector, :with_basic_tables] + setup [:with_inspector, :with_basic_tables, :with_sql_execute] setup %{inspector: {EtsInspector, opts}} do {:ok, %{opts: opts, table: {"public", "items"}}} @@ -149,5 +208,22 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do from_cache = Task.await(task) assert from_cache == original end + + @tag with_sql: [ + ~s|CREATE TABLE "partitioned_items" (a INT, b INT, c TEXT, PRIMARY KEY (a, b)) PARTITION BY RANGE (b)| + ] + test "can introspect partitioned tables", %{opts: opts} do + assert {:ok, [%{name: "a"}, %{name: "b"}, %{name: "c"}]} = + EtsInspector.load_column_info({"public", "partitioned_items"}, opts) + end + + @tag with_sql: [ + ~s|CREATE TABLE "partitioned_items" (a INT, b INT, c TEXT, PRIMARY KEY (a, b)) PARTITION BY RANGE (b)|, + ~s|CREATE TABLE "partitioned_items_100" PARTITION OF "partitioned_items" FOR VALUES FROM (0) TO (99)| + ] + test "can introspect partitions", %{opts: opts} do + assert {:ok, [%{name: "a"}, %{name: "b"}, %{name: "c"}]} = + EtsInspector.load_column_info({"public", "partitioned_items_100"}, opts) + end end end diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index 21265e61f5..02986b59a0 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -72,6 +72,12 @@ defmodule Electric.Replication.ShapeLogCollectorTest do setup ctx do parent = self() + Mock.Inspector + |> stub(:load_relation, fn {"public", "test_table"}, _ -> + {:ok, %{id: 1234, schema: "public", name: "test_table", parent: nil, children: nil}} + end) + |> allow(self(), ctx.server) + consumers = Enum.map(1..3, fn id -> {:ok, consumer} = @@ -95,7 +101,11 @@ defmodule Electric.Replication.ShapeLogCollectorTest do last_log_offset = LogOffset.new(lsn, 0) Mock.Inspector - |> expect(:load_column_info, 2, fn {"public", "test_table"}, _ -> + |> stub(:load_relation, fn + {"public", "test_table"}, _ -> + {:ok, %{id: 1234, schema: "public", name: "test_table", parent: nil, children: nil}} + end) + |> stub(:load_column_info, fn {"public", "test_table"}, _ -> {:ok, [%{pk_position: 0, name: "id"}]} end) |> allow(self(), ctx.server) @@ -133,6 +143,12 @@ defmodule Electric.Replication.ShapeLogCollectorTest do setup ctx do parent = self() + Mock.Inspector + |> stub(:load_relation, fn {"public", "test_table"}, _ -> + {:ok, %{id: 1234, schema: "public", name: "test_table", parent: nil, children: nil}} + end) + |> allow(self(), ctx.server) + consumers = Enum.map(1..3, fn id -> {:ok, consumer} = @@ -152,6 +168,16 @@ defmodule Electric.Replication.ShapeLogCollectorTest do test "should handle new relations", ctx do id = @shape.root_table_id + Mock.Inspector + |> stub(:load_relation, fn + {"public", "test_table"}, _ -> + {:ok, %{id: 1234, schema: "public", name: "test_table", parent: nil, children: nil}} + + {"public", "bar"}, _ -> + {:ok, %{id: 1235, schema: "public", name: "bar", parent: nil, children: nil}} + end) + |> allow(self(), ctx.server) + relation1 = %Relation{id: id, table: "test_table", schema: "public", columns: []} assert :ok = ShapeLogCollector.handle_relation_msg(relation1, ctx.server) diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index ad33ae144c..3557f92ced 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -216,7 +216,8 @@ defmodule Electric.ShapeCacheTest do :with_inspector, :with_shape_log_collector, :with_publication_manager, - :with_shape_cache + :with_shape_cache, + :with_sql_execute ] setup %{pool: pool} do @@ -399,6 +400,50 @@ defmodule Electric.ShapeCacheTest do log =~ ~S|** (Postgrex.Error) ERROR 42P01 (undefined_table) relation "public.nonexistent" does not exist| end + + @tag with_sql: [ + ~s|CREATE TABLE "partitioned_items" (a INT, b INT, PRIMARY KEY (a, b)) PARTITION BY RANGE (b)|, + ~s|CREATE TABLE "partitioned_items_100" PARTITION OF "partitioned_items" FOR VALUES FROM (0) TO (99)|, + ~s|CREATE TABLE "partitioned_items_200" PARTITION OF "partitioned_items" FOR VALUES FROM (100) TO (199)| + ] + test "can create shape from partitioned table", ctx do + Postgrex.query!( + ctx.pool, + "INSERT INTO partitioned_items (a, b) VALUES ($1, $2), ($3, $4), ($5, $6)", + [1, 50, 2, 150, 3, 10] + ) + + shape = %Shape{ + root_table: {"public", "partitioned_items"}, + root_table_id: 1, + table_info: %{ + {"public", "partitioned_items"} => %{ + columns: [ + %{name: "a", type: "int4", type_id: {23, -1}, pk_position: 0}, + %{name: "b", type: "int4", type_id: {23, -1}, pk_position: 1} + ], + pk: ["a", "b"] + } + } + } + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(shape, ctx.shape_cache_opts) + assert :started = ShapeCache.await_snapshot_start(shape_handle, ctx.shape_cache_opts) + storage = Storage.for_shape(shape_handle, ctx.storage) + + stream = + Storage.get_log_stream( + LogOffset.before_all(), + LogOffset.last_before_real_offsets(), + storage + ) + + assert [ + %{"value" => %{"a" => "1"}}, + %{"value" => %{"a" => "2"}}, + %{"value" => %{"a" => "3"}} + ] = stream_to_list(stream, "a") + end end describe "list_shapes/1" do @@ -929,9 +974,9 @@ defmodule Electric.ShapeCacheTest do def run_with_conn_noop(conn, cb), do: cb.(conn) - defp stream_to_list(stream) do + defp stream_to_list(stream, sort_col \\ "value") do stream |> Enum.map(&Jason.decode!/1) - |> Enum.sort_by(fn %{"value" => %{"value" => val}} -> val end) + |> Enum.sort_by(fn %{"value" => value} -> value[sort_col] end) end end diff --git a/packages/sync-service/test/electric/shapes/dispatcher_test.exs b/packages/sync-service/test/electric/shapes/dispatcher_test.exs index db55d9c532..bfdcad5a80 100644 --- a/packages/sync-service/test/electric/shapes/dispatcher_test.exs +++ b/packages/sync-service/test/electric/shapes/dispatcher_test.exs @@ -21,7 +21,7 @@ defmodule Electric.Shapes.DispatcherTest do } defp dispatcher() do - {:ok, state} = D.init([]) + {:ok, state} = D.init(inspector: @inspector) state end diff --git a/packages/sync-service/test/electric/shapes/filter_test.exs b/packages/sync-service/test/electric/shapes/filter_test.exs index d4ee168d5a..a85200838e 100644 --- a/packages/sync-service/test/electric/shapes/filter_test.exs +++ b/packages/sync-service/test/electric/shapes/filter_test.exs @@ -18,10 +18,19 @@ defmodule Electric.Shapes.FilterTest do %{name: "an_array", array_type: "int8"} ]) + defp filter(inspector \\ @inspector) do + Filter.new(inspector: inspector) + end + + defp assert_affected(filter, changes, expected) do + {_, affected} = Filter.affected_shapes(filter, changes) + assert affected == expected + end + describe "affected_shapes/2" do test "returns shapes affected by insert" do filter = - Filter.new() + filter() |> Filter.add_shape("s1", Shape.new!("t1", where: "id = 1", inspector: @inspector)) |> Filter.add_shape("s2", Shape.new!("t1", where: "id = 2", inspector: @inspector)) |> Filter.add_shape("s3", Shape.new!("t1", where: "id = 3", inspector: @inspector)) @@ -37,12 +46,12 @@ defmodule Electric.Shapes.FilterTest do ] } - assert Filter.affected_shapes(filter, insert) == MapSet.new(["s2"]) + assert_affected(filter, insert, MapSet.new(["s2"])) end test "returns shapes affected by delete" do filter = - Filter.new() + filter() |> Filter.add_shape("s1", Shape.new!("t1", where: "id = 1", inspector: @inspector)) |> Filter.add_shape("s2", Shape.new!("t1", where: "id = 2", inspector: @inspector)) |> Filter.add_shape("s3", Shape.new!("t1", where: "id = 3", inspector: @inspector)) @@ -58,12 +67,12 @@ defmodule Electric.Shapes.FilterTest do ] } - assert Filter.affected_shapes(filter, delete) == MapSet.new(["s2"]) + assert_affected(filter, delete, MapSet.new(["s2"])) end test "returns shapes affected by update" do filter = - Filter.new() + filter() |> Filter.add_shape("s1", Shape.new!("t1", where: "id = 1", inspector: @inspector)) |> Filter.add_shape("s2", Shape.new!("t1", where: "id = 2", inspector: @inspector)) |> Filter.add_shape("s3", Shape.new!("t1", where: "id = 3", inspector: @inspector)) @@ -81,12 +90,12 @@ defmodule Electric.Shapes.FilterTest do ] } - assert Filter.affected_shapes(filter, update) == MapSet.new(["s2", "s3"]) + assert_affected(filter, update, MapSet.new(["s2", "s3"])) end test "returns shapes affected by relation change" do filter = - Filter.new() + filter() |> Filter.add_shape("s1", Shape.new!("t1", where: "id = 1", inspector: @inspector)) |> Filter.add_shape("s2", Shape.new!("t1", where: "id = 2", inspector: @inspector)) |> Filter.add_shape("s3", Shape.new!("t1", where: "id > 7", inspector: @inspector)) @@ -98,7 +107,7 @@ defmodule Electric.Shapes.FilterTest do relation = %Relation{schema: "public", table: "t1"} - assert Filter.affected_shapes(filter, relation) == MapSet.new(["s1", "s2", "s3", "s4"]) + assert_affected(filter, relation, MapSet.new(["s1", "s2", "s3", "s4"])) end test "returns shapes affected by relation rename" do @@ -108,19 +117,19 @@ defmodule Electric.Shapes.FilterTest do s3 = Shape.new!("t3", inspector: @inspector) filter = - Filter.new() + filter() |> Filter.add_shape("s1", s1) |> Filter.add_shape("s2", s2) |> Filter.add_shape("s3", s3) rename = %Relation{schema: "public", table: "new_name", id: table_id} - assert Filter.affected_shapes(filter, rename) == MapSet.new(["s2"]) + assert_affected(filter, rename, MapSet.new(["s2"])) end test "returns shapes affected by truncation" do filter = - Filter.new() + filter() |> Filter.add_shape("s1", Shape.new!("t1", where: "id = 1", inspector: @inspector)) |> Filter.add_shape("s2", Shape.new!("t1", where: "id = 2", inspector: @inspector)) |> Filter.add_shape("s3", Shape.new!("t1", where: "id > 7", inspector: @inspector)) @@ -132,31 +141,31 @@ defmodule Electric.Shapes.FilterTest do truncation = %Transaction{changes: [%TruncatedRelation{relation: {"public", "t1"}}]} - assert Filter.affected_shapes(filter, truncation) == MapSet.new(["s1", "s2", "s3", "s4"]) + assert_affected(filter, truncation, MapSet.new(["s1", "s2", "s3", "s4"])) end end test "shape with no where clause is affected by all changes for the same table" do shape = Shape.new!("t1", inspector: @inspector) - filter = Filter.new() |> Filter.add_shape("s", shape) + filter = filter() |> Filter.add_shape("s", shape) - assert Filter.affected_shapes(filter, change("t1", %{"id" => "7"})) == MapSet.new(["s"]) - assert Filter.affected_shapes(filter, change("t1", %{"id" => "8"})) == MapSet.new(["s"]) - assert Filter.affected_shapes(filter, change("t2", %{"id" => "8"})) == MapSet.new([]) + assert_affected(filter, change("t1", %{"id" => "7"}), MapSet.new(["s"])) + assert_affected(filter, change("t1", %{"id" => "8"}), MapSet.new(["s"])) + assert_affected(filter, change("t2", %{"id" => "8"}), MapSet.new([])) end test "shape with a where clause is affected by changes that match that where clause" do shape = Shape.new!("t1", where: "id = 7", inspector: @inspector) - filter = Filter.new() |> Filter.add_shape("s", shape) + filter = filter() |> Filter.add_shape("s", shape) - assert Filter.affected_shapes(filter, change("t1", %{"id" => "7"})) == MapSet.new(["s"]) - assert Filter.affected_shapes(filter, change("t1", %{"id" => "8"})) == MapSet.new([]) - assert Filter.affected_shapes(filter, change("t2", %{"id" => "8"})) == MapSet.new([]) + assert_affected(filter, change("t1", %{"id" => "7"}), MapSet.new(["s"])) + assert_affected(filter, change("t1", %{"id" => "8"}), MapSet.new([])) + assert_affected(filter, change("t2", %{"id" => "8"}), MapSet.new([])) end test "invalid record value logs an error and says all shapes for the table are affected" do filter = - Filter.new() + filter() |> Filter.add_shape("shape1", Shape.new!("table", inspector: @inspector)) |> Filter.add_shape("shape2", Shape.new!("table", where: "id = 7", inspector: @inspector)) |> Filter.add_shape("shape3", Shape.new!("table", where: "id = 8", inspector: @inspector)) @@ -165,15 +174,18 @@ defmodule Electric.Shapes.FilterTest do log = capture_log(fn -> - assert Filter.affected_shapes(filter, change("table", %{"id" => "invalid_value"})) == - MapSet.new(["shape1", "shape2", "shape3", "shape4"]) + assert_affected( + filter, + change("table", %{"id" => "invalid_value"}), + MapSet.new(["shape1", "shape2", "shape3", "shape4"]) + ) end) assert log =~ ~s(Could not parse value for field "id" of type :int8) end test "Filter.remove_shape/2" do - empty = Filter.new() + empty = filter() filter1 = empty @@ -231,9 +243,12 @@ defmodule Electric.Shapes.FilterTest do transaction = change("the_table", record) - assert Filter.new() - |> Filter.add_shape("the-shape", shape) - |> Filter.affected_shapes(transaction) == MapSet.new(["the-shape"]) == affected + {_filter, shapes} = + filter() + |> Filter.add_shape("the-shape", shape) + |> Filter.affected_shapes(transaction) + + assert shapes == MapSet.new(["the-shape"]) == affected end end @@ -263,11 +278,11 @@ defmodule Electric.Shapes.FilterTest do test "where clause in the form `field = const` is optimised" do filter = 1..@shape_count - |> Enum.reduce(Filter.new(), fn i, filter -> + |> Enum.reduce(filter(), fn i, filter -> Filter.add_shape(filter, i, Shape.new!("t1", where: "id = #{i}", inspector: @inspector)) end) - assert Filter.affected_shapes(filter, change("t1", %{"id" => "7"})) == MapSet.new([7]) + assert_affected(filter, change("t1", %{"id" => "7"}), MapSet.new([7])) reductions = reductions(fn -> @@ -280,7 +295,7 @@ defmodule Electric.Shapes.FilterTest do test "where clause in the form `field = const AND another_condition` is optimised" do filter = 1..@shape_count - |> Enum.reduce(Filter.new(), fn i, filter -> + |> Enum.reduce(filter(), fn i, filter -> Filter.add_shape( filter, i, @@ -288,7 +303,7 @@ defmodule Electric.Shapes.FilterTest do ) end) - assert Filter.affected_shapes(filter, change("t1", %{"id" => "7"})) == MapSet.new([7]) + assert_affected(filter, change("t1", %{"id" => "7"}), MapSet.new([7])) reductions = reductions(fn -> @@ -301,7 +316,7 @@ defmodule Electric.Shapes.FilterTest do test "where clause in the form `a_condition AND field = const` is optimised" do filter = 1..@shape_count - |> Enum.reduce(Filter.new(), fn i, filter -> + |> Enum.reduce(filter(), fn i, filter -> Filter.add_shape( filter, i, @@ -309,7 +324,7 @@ defmodule Electric.Shapes.FilterTest do ) end) - assert Filter.affected_shapes(filter, change("t1", %{"id" => "7"})) == MapSet.new([7]) + assert_affected(filter, change("t1", %{"id" => "7"}), MapSet.new([7])) reductions = reductions(fn -> @@ -327,6 +342,197 @@ defmodule Electric.Shapes.FilterTest do end end + @partition_inspector StubInspector.new(%{ + {"public", "partitioned"} => %{ + relation: %{ + children: [{"public", "partition_01"}, {"public", "partition_02"}] + }, + columns: [ + %{name: "id", type: "int8", pk_position: 0}, + %{name: "an_array", array_type: "int8"} + ] + }, + {"public", "partition_01"} => %{ + relation: %{ + children: nil, + parent: {"public", "partitioned"} + }, + columns: [ + %{name: "id", type: "int8", pk_position: 0}, + %{name: "an_array", array_type: "int8"} + ] + }, + {"public", "partition_02"} => %{ + relation: %{ + children: nil, + parent: {"public", "partitioned"} + }, + columns: [ + %{name: "id", type: "int8", pk_position: 0}, + %{name: "an_array", array_type: "int8"} + ] + }, + {"public", "partition_03"} => %{ + relation: %{ + children: nil, + parent: {"public", "partitioned"} + }, + columns: [ + %{name: "id", type: "int8", pk_position: 0}, + %{name: "an_array", array_type: "int8"} + ] + } + }) + + describe "partitioned tables" do + test "changes to table partition are sent to root" do + filter = + Filter.new(inspector: @partition_inspector) + |> Filter.add_shape("s1", Shape.new!("partitioned", inspector: @partition_inspector)) + |> Filter.add_shape( + "s2", + Shape.new!("partitioned", where: "id = 2", inspector: @partition_inspector) + ) + |> Filter.add_shape( + "s3", + Shape.new!("partitioned", where: "id = 3", inspector: @partition_inspector) + ) + + insert = + %Transaction{ + changes: [ + %NewRecord{ + relation: {"public", "partition_01"}, + record: %{"id" => "2"} + } + ] + } + + assert_affected(filter, insert, MapSet.new(["s1", "s2"])) + end + + test "changes to table partition are always sent to partition shape" do + filter = + Filter.new(inspector: @partition_inspector) + |> Filter.add_shape("s1", Shape.new!("partitioned", inspector: @partition_inspector)) + |> Filter.add_shape( + "s2", + Shape.new!("partitioned", where: "id = 2", inspector: @partition_inspector) + ) + |> Filter.add_shape( + "s3", + Shape.new!("partition_01", inspector: @partition_inspector) + ) + + insert = + %Transaction{ + changes: [ + %NewRecord{ + relation: {"public", "partition_01"}, + record: %{"id" => "2"} + } + ] + } + + assert_affected(filter, insert, MapSet.new(["s1", "s2", "s3"])) + + insert = + %Transaction{ + changes: [ + %NewRecord{ + relation: {"public", "partition_02"}, + record: %{"id" => "2"} + } + ] + } + + assert_affected(filter, insert, MapSet.new(["s1", "s2"])) + end + + @tag :wip + test "root shape is affected by partition addition" do + filter = + Filter.new(inspector: @partition_inspector) + |> Filter.add_shape("s1", Shape.new!("partitioned", inspector: @partition_inspector)) + |> Filter.add_shape( + "s2", + Shape.new!("partitioned", where: "id = 2", inspector: @partition_inspector) + ) + |> Filter.add_shape( + "s3", + Shape.new!("partition_01", inspector: @partition_inspector) + ) + + relation = %Relation{schema: "public", table: "partition_03"} + + assert_affected(filter, relation, MapSet.new(["s1", "s2"])) + end + + test "after addition of new partition, shape receives updates" do + filter = + Filter.new(inspector: @partition_inspector) + |> Filter.add_shape("s1", Shape.new!("partitioned", inspector: @partition_inspector)) + |> Filter.add_shape( + "s2", + Shape.new!("partitioned", where: "id = 2", inspector: @partition_inspector) + ) + |> Filter.add_shape( + "s3", + Shape.new!("partition_01", inspector: @partition_inspector) + ) + + relation = %Relation{schema: "public", table: "partition_03"} + + {filter, _} = Filter.affected_shapes(filter, relation) + + insert = + %Transaction{ + changes: [ + %NewRecord{ + relation: {"public", "partition_03"}, + record: %{"id" => "2"} + } + ] + } + + assert_affected(filter, insert, MapSet.new(["s1", "s2"])) + end + + test "remove_shape/2 cleans up partition information" do + empty = Filter.new(inspector: @partition_inspector) + + filter = + empty + |> Filter.add_shape("s1", Shape.new!("partitioned", inspector: @partition_inspector)) + |> Filter.add_shape( + "s2", + Shape.new!("partitioned", where: "id = 1", inspector: @partition_inspector) + ) + |> Filter.add_shape( + "s3", + Shape.new!("partition_01", where: "id = 2", inspector: @partition_inspector) + ) + |> Filter.add_shape( + "s4", + Shape.new!("partition_02", where: "id > 2", inspector: @partition_inspector) + ) + |> Filter.add_shape( + "s5", + Shape.new!("partition_03", where: "id > 7", inspector: @partition_inspector) + ) + + clean_filter = + filter + |> Filter.remove_shape("s2") + |> Filter.remove_shape("s1") + |> Filter.remove_shape("s4") + |> Filter.remove_shape("s5") + |> Filter.remove_shape("s3") + + assert clean_filter == empty + end + end + defp change(table, record) do %Transaction{ changes: [ diff --git a/packages/sync-service/test/electric/shapes/partitioned_tables_test.exs b/packages/sync-service/test/electric/shapes/partitioned_tables_test.exs new file mode 100644 index 0000000000..b15b2f8b84 --- /dev/null +++ b/packages/sync-service/test/electric/shapes/partitioned_tables_test.exs @@ -0,0 +1,136 @@ +defmodule Electric.Shapes.PartitionedTablesTest do + use ExUnit.Case, async: true + + alias Electric.Shapes.Shape + alias Electric.ShapeCache + alias Electric.Postgres.Inspector + + import Support.ComponentSetup + import Support.DbSetup + import Support.DbStructureSetup + + @partition_schema [ + ~s|CREATE TABLE "partitioned_items" (a INT, b INT, PRIMARY KEY (a, b)) PARTITION BY RANGE (b)|, + ~s|CREATE TABLE "partitioned_items_100" PARTITION OF "partitioned_items" FOR VALUES FROM (0) TO (99)|, + ~s|CREATE TABLE "partitioned_items_200" PARTITION OF "partitioned_items" FOR VALUES FROM (100) TO (199)| + ] + + @moduletag :tmp_dir + @moduletag with_sql: @partition_schema + + setup [:with_unique_db, :with_complete_stack, :with_sql_execute] + + defp subscribe(shape_handle, ctx) do + ref = make_ref() + + Registry.register(ctx.registry, shape_handle, ref) + ref + end + + test "subscriptions to root shape receive updates", ctx do + {:ok, shape} = Shape.new("public.partitioned_items", inspector: ctx.inspector) + + {shape_handle, _} = + ShapeCache.get_or_create_shape_handle(shape, stack_id: ctx.stack_id) + + :started = ShapeCache.await_snapshot_start(shape_handle, stack_id: ctx.stack_id) + + ref = subscribe(shape_handle, ctx) + + Postgrex.query!( + ctx.db_conn, + "INSERT INTO partitioned_items (a, b) VALUES ($1, $2), ($3, $4), ($5, $6)", + [1, 50, 2, 150, 3, 10] + ) + + assert_receive {^ref, :new_changes, _latest_log_offset}, 5000 + end + + test "new partition tables are accepted by root", ctx do + {:ok, shape} = Shape.new("public.partitioned_items", inspector: ctx.inspector) + + {shape_handle, _} = + ShapeCache.get_or_create_shape_handle(shape, stack_id: ctx.stack_id) + + :started = ShapeCache.await_snapshot_start(shape_handle, stack_id: ctx.stack_id) + + Postgrex.query!( + ctx.db_conn, + ~s|CREATE TABLE "partitioned_items_300" PARTITION OF "partitioned_items" FOR VALUES FROM (200) TO (299)|, + [] + ) + + ref = subscribe(shape_handle, ctx) + + Postgrex.query!( + ctx.db_conn, + "INSERT INTO partitioned_items (a, b) VALUES ($1, $2), ($3, $4), ($5, $6)", + [1, 250, 2, 260, 3, 200] + ) + + assert_receive {^ref, :new_changes, _latest_log_offset}, 5000 + end + + test "subscriptions to partitions receive updates", ctx do + {:ok, shape} = Shape.new("public.partitioned_items_100", inspector: ctx.inspector) + + {shape_handle, _} = + ShapeCache.get_or_create_shape_handle(shape, stack_id: ctx.stack_id) + + :started = ShapeCache.await_snapshot_start(shape_handle, stack_id: ctx.stack_id) + + ref = subscribe(shape_handle, ctx) + + Postgrex.query!( + ctx.db_conn, + "INSERT INTO partitioned_items (a, b) VALUES ($1, $2), ($3, $4), ($5, $6)", + [1, 50, 2, 150, 3, 10] + ) + + assert_receive {^ref, :new_changes, _latest_log_offset}, 5000 + end + + test "new partition tables prompt reload of relation info", ctx do + {:ok, shape} = Shape.new("public.partitioned_items", inspector: ctx.inspector) + + {shape_handle, _} = + ShapeCache.get_or_create_shape_handle(shape, stack_id: ctx.stack_id) + + :started = ShapeCache.await_snapshot_start(shape_handle, stack_id: ctx.stack_id) + + {:ok, relation} = Inspector.load_relation("partitioned_items", ctx.inspector) + + assert %{ + children: [ + {"public", "partitioned_items_100"}, + {"public", "partitioned_items_200"} + ] + } = relation + + Postgrex.query!( + ctx.db_conn, + ~s|CREATE TABLE "partitioned_items_300" PARTITION OF "partitioned_items" FOR VALUES FROM (200) TO (299)|, + [] + ) + + ref = subscribe(shape_handle, ctx) + + Postgrex.query!( + ctx.db_conn, + "INSERT INTO partitioned_items (a, b) VALUES ($1, $2), ($3, $4), ($5, $6)", + [1, 50, 2, 250, 3, 10] + ) + + assert_receive {^ref, :new_changes, _latest_log_offset}, 5000 + + {:ok, relation} = Inspector.load_relation("partitioned_items", ctx.inspector) + + assert %{ + children: [ + {"public", "partitioned_items_100"}, + {"public", "partitioned_items_200"}, + {"public", "partitioned_items_300"} + ] + } = relation + end +end diff --git a/packages/sync-service/test/electric/shapes/shape_test.exs b/packages/sync-service/test/electric/shapes/shape_test.exs index 2aefad7157..1f9b04b744 100644 --- a/packages/sync-service/test/electric/shapes/shape_test.exs +++ b/packages/sync-service/test/electric/shapes/shape_test.exs @@ -207,6 +207,31 @@ defmodule Electric.Shapes.ShapeTest do assert Shape.convert_change(shape, non_matching_update) == [] end + + test "re-writes changes to partition on shape" do + shape = %Shape{ + root_table: {"public", "partition_root"}, + root_table_id: @relation_id, + partitions: %{ + {"public", "partition_01"} => {"public", "partition_root"}, + {"public", "partition_02"} => {"public", "partition_root"} + } + } + + partition_update = %UpdatedRecord{ + relation: {"public", "partition_02"}, + old_record: %{"id" => 1, "value" => "same", "other_value" => "old"}, + record: %{"id" => 1, "value" => "same", "other_value" => "new"} + } + + assert Shape.convert_change(shape, partition_update) == [ + %UpdatedRecord{ + relation: {"public", "partition_root"}, + old_record: %{"id" => 1, "value" => "same", "other_value" => "old"}, + record: %{"id" => 1, "value" => "same", "other_value" => "new"} + } + ] + end end describe "new/2" do diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 7082f44565..c076de0b7c 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -182,8 +182,7 @@ defmodule Support.ComponentSetup do stack_events_registry = Registry.StackEvents - ref = make_ref() - Electric.StackSupervisor.subscribe_to_stack_events(stack_events_registry, stack_id, ref) + ref = Electric.StackSupervisor.subscribe_to_stack_events(stack_events_registry, stack_id) stack_supervisor = start_supervised!( @@ -214,10 +213,13 @@ defmodule Support.ComponentSetup do %{ stack_id: stack_id, + registry: Electric.StackSupervisor.registry_name(stack_id), stack_events_registry: stack_events_registry, + shape_cache: {ShapeCache, [stack_id: stack_id]}, persistent_kv: kv, stack_supervisor: stack_supervisor, - storage: storage + storage: storage, + inspector: {EtsInspector, stack_id: stack_id, server: EtsInspector.name(stack_id: stack_id)} } end diff --git a/packages/sync-service/test/support/stub_inspector.ex b/packages/sync-service/test/support/stub_inspector.ex index 83d745815f..a2e5c5487b 100644 --- a/packages/sync-service/test/support/stub_inspector.ex +++ b/packages/sync-service/test/support/stub_inspector.ex @@ -1,12 +1,13 @@ defmodule Support.StubInspector do - alias Electric.Utils @behaviour Electric.Postgres.Inspector + # the opts is either a list of column details which will be applied to every table + # or a map of %{{schema, name} => [columns: column_info, relation: relation_info]} def new(opts), do: {__MODULE__, opts} @impl true - def load_column_info(_relation, column_list) when is_list(column_list) do - column_list + def load_column_info(_relation, column_info) when is_list(column_info) do + column_info |> Enum.map(fn column -> column |> Map.put_new(:pk_position, nil) @@ -19,31 +20,44 @@ defmodule Support.StubInspector do def load_column_info(relation, opts) when is_map(opts) and is_map_key(opts, relation) do opts |> Map.fetch!(relation) + |> Access.fetch!(:columns) |> then(&load_column_info(relation, &1)) end @impl true - def load_relation(table, _) do - regex = - ~r/^((?([\p{L}_][\p{L}0-9_$]*|"(""|[^"])+"))\.)?(?([\p{L}_][\p{L}0-9_$]*|"(""|[^"])+"))$/u - - case Regex.run(regex, table, capture: :all_names) do - ["", table_name] when table_name != "" -> - table_name = Utils.parse_quoted_name(table_name) - rel = {"public", table_name} - {:ok, %{relation: rel, relation_id: :erlang.phash2(rel)}} - - [schema_name, table_name] when table_name != "" -> - schema_name = Utils.parse_quoted_name(schema_name) - table_name = Utils.parse_quoted_name(table_name) - rel = {schema_name, table_name} - {:ok, %{relation: rel, relation_id: :erlang.phash2(rel)}} - - _ -> - {:error, "invalid name syntax"} + def load_relation(table, opts) when is_map(opts) do + with {:ok, rel} <- parse_relation(table), + {:ok, config} <- Map.fetch(opts, rel), + {:ok, info} <- Access.fetch(config, :relation) do + {:ok, + info + |> Map.put_new(:relation, rel) + |> Map.put_new(:relation_id, :erlang.phash2(rel)) + |> Map.put_new(:parent, nil) + |> Map.put_new(:children, nil)} + else + :error -> + raise "Invalid StubInspector config #{inspect(opts)}" + + error -> + error + end + end + + def load_relation(table, _opts) do + with {:ok, rel} <- parse_relation(table) do + {:ok, %{relation: rel, relation_id: :erlang.phash2(rel)}} end end @impl true def clean(_, _), do: true + + defp parse_relation(table) when is_binary(table) do + Electric.Postgres.Identifiers.parse_relation(table) + end + + defp parse_relation({_, _} = rel) do + {:ok, rel} + end end