diff --git a/.changeset/spotty-ears-build.md b/.changeset/spotty-ears-build.md new file mode 100644 index 0000000000..14dae5cd32 --- /dev/null +++ b/.changeset/spotty-ears-build.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Improved replication steam processing for where clauses in the form `field = const` or `field = const AND another_condition` diff --git a/packages/sync-service/lib/electric/replication/eval/parser.ex b/packages/sync-service/lib/electric/replication/eval/parser.ex index ad1a90cb49..20b776aba1 100644 --- a/packages/sync-service/lib/electric/replication/eval/parser.ex +++ b/packages/sync-service/lib/electric/replication/eval/parser.ex @@ -1063,14 +1063,14 @@ defmodule Electric.Replication.Eval.Parser do defp internal_node_to_error(%Func{type: type, name: name}), do: "function #{name} returning #{type}" - defp find_refs(tree, acc \\ %{}) - defp find_refs(%Const{}, acc), do: acc - defp find_refs(%Ref{path: path, type: type}, acc), do: Map.put_new(acc, path, type) + def find_refs(tree, acc \\ %{}) + def find_refs(%Const{}, acc), do: acc + def find_refs(%Ref{path: path, type: type}, acc), do: Map.put_new(acc, path, type) - defp find_refs(%Func{args: args, variadic_arg: nil}, acc), + def find_refs(%Func{args: args, variadic_arg: nil}, acc), do: Enum.reduce(args, acc, &find_refs/2) - defp find_refs(%Func{args: args, variadic_arg: position}, acc), + def find_refs(%Func{args: args, variadic_arg: position}, acc), do: Enum.reduce(Enum.with_index(args), acc, fn {arg, ^position}, acc -> Enum.reduce(arg, acc, &find_refs/2) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 7e694eba33..5d067be7db 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -87,27 +87,7 @@ defmodule Electric.Shapes.Consumer do monitors: [] }) - {:consumer, state, - subscribe_to: [{producer, [max_demand: 1, selector: &selector(&1, config.shape)]}]} - end - - defp selector(event, shape) do - process_event?(event, shape) - rescue - # Swallow errors here to avoid crashing the ShapeLogCollector. - # Return `true` so the event is processed, which will then error - # for the same reason and cleanup the shape. - _ -> true - end - - defp process_event?(%Transaction{changes: changes}, shape) do - changes - |> Stream.flat_map(&Shape.convert_change(shape, &1)) - |> Enum.any?() - end - - defp process_event?(%Changes.Relation{} = relation_change, shape) do - Shape.is_affected_by_relation_change?(shape, relation_change) + {:consumer, state, subscribe_to: [{producer, [max_demand: 1, shape: config.shape]}]} end def handle_call(:initial_state, _from, %{snapshot_xmin: xmin, latest_offset: offset} = state) do diff --git a/packages/sync-service/lib/electric/shapes/dispatcher.ex b/packages/sync-service/lib/electric/shapes/dispatcher.ex index 92228bbdcd..65743dc6d1 100644 --- a/packages/sync-service/lib/electric/shapes/dispatcher.ex +++ b/packages/sync-service/lib/electric/shapes/dispatcher.ex @@ -1,16 +1,7 @@ defmodule Electric.Shapes.Dispatcher do @moduledoc """ - Dispatches transactions and relations to consumers filtered according to the - subscriber's `selector` function. - - To receive all messages, don't pass a selector function or use `nil`, e.g. - - ``` - def init(producer) do - {:consumer, :nostate, subscribe_to: [{producer, [max_demand: 1, selector: nil]}]} - end - - ``` + Dispatches transactions and relations to consumers filtered using `Filter` + and the consumer's shape. The essential behaviour is that the dispatcher only asks the producer for more demand once all relevant subscribers have processed the last message and @@ -31,16 +22,29 @@ defmodule Electric.Shapes.Dispatcher do """ require Logger + alias Electric.Shapes.Filter + + defmodule State do + defstruct [:waiting, :pending, :subscribers, :filter, :pids] + end @behaviour GenStage.Dispatcher @impl GenStage.Dispatcher + def init(_opts) do - {:ok, {0, 0, nil, [], MapSet.new()}} + {:ok, + %State{ + waiting: 0, + pending: nil, + subscribers: [], + filter: Filter.new(), + pids: MapSet.new() + }} end @impl GenStage.Dispatcher - def subscribe(opts, {pid, _ref} = from, {n, waiting, pending, subs, pids}) do + def subscribe(opts, {pid, _ref} = from, %State{pids: pids} = state) do if MapSet.member?(pids, pid) do Logger.error(fn -> "#{inspect(pid)} is already registered with #{inspect(self())}. " <> @@ -49,48 +53,68 @@ defmodule Electric.Shapes.Dispatcher do {:error, :already_subscribed} else - selector = - case Keyword.get(opts, :selector) do - nil -> - nil + shape = Keyword.fetch!(opts, :shape) - selector when is_function(selector, 1) -> - selector + demand = if state.subscribers == [], do: 1, else: 0 - other -> - raise ArgumentError, - ":selector option must be passed a unary function, got: #{inspect(other)}" - end - - subs = [{from, selector} | subs] + subscribers = [{from, shape} | state.subscribers] - demand = if n == 0, do: 1, else: 0 + filter = Filter.add_shape(state.filter, from, shape) - {:ok, demand, {n + 1, waiting, pending, subs, MapSet.put(pids, pid)}} + {:ok, demand, + %State{ + state + | subscribers: subscribers, + filter: filter, + pids: MapSet.put(state.pids, pid) + }} end end @impl GenStage.Dispatcher - def cancel({pid, _ref} = from, {n, waiting, pending, subs, pids}) do - if MapSet.member?(pids, pid) do - subs = List.keydelete(subs, from, 0) + def cancel({pid, _ref} = from, %State{waiting: waiting, pending: pending} = state) do + if MapSet.member?(state.pids, pid) do + subscribers = List.keydelete(state.subscribers, from, 0) + + filter = Filter.remove_shape(state.filter, from) if pending && MapSet.member?(pending, from) do case waiting - 1 do 0 -> # the only remaining unacked subscriber has cancelled, so we # return some demand - {:ok, 1, {n - 1, 0, nil, subs, MapSet.delete(pids, pid)}} + {:ok, 1, + %State{ + state + | waiting: 0, + pending: nil, + subscribers: subscribers, + filter: filter, + pids: MapSet.delete(state.pids, pid) + }} new_waiting -> {:ok, 0, - {n - 1, new_waiting, MapSet.delete(pending, from), subs, MapSet.delete(pids, pid)}} + %State{ + state + | waiting: new_waiting, + pending: MapSet.delete(pending, from), + subscribers: subscribers, + filter: filter, + pids: MapSet.delete(state.pids, pid) + }} end else - {:ok, 0, {n - 1, waiting, pending, subs, MapSet.delete(pids, pid)}} + {:ok, 0, + %State{ + state + | subscribers: subscribers, + filter: filter, + pids: MapSet.delete(state.pids, pid) + }} end else - {:ok, 0, {n, waiting, pending, subs, pids}} + {:ok, 0, state} end end @@ -98,48 +122,45 @@ defmodule Electric.Shapes.Dispatcher do # consumers sending demand before we have produced a message just ignore as # we have already sent initial demand of 1 to the producer when the first # consumer subscribed. - def ask(1, {_pid, _ref}, {n, 0, nil, subs, pids}) do - {:ok, 0, {n, 0, nil, subs, pids}} + def ask(1, {_pid, _ref}, %State{waiting: 0, pending: nil} = state) do + {:ok, 0, state} end - def ask(1, {_pid, _ref}, {n, 1, _pending, subs, pids}) do - {:ok, 1, {n, 0, nil, subs, pids}} + def ask(1, {_pid, _ref}, %State{waiting: 1} = state) do + {:ok, 1, %State{state | waiting: 0, pending: nil}} end - def ask(1, from, {n, waiting, pending, subs, pids}) when waiting > 1 do - {:ok, 0, {n, waiting - 1, MapSet.delete(pending, from), subs, pids}} + def ask(1, from, %State{waiting: waiting, pending: pending} = state) when waiting > 1 do + {:ok, 0, %State{state | waiting: waiting - 1, pending: MapSet.delete(pending, from)}} end @impl GenStage.Dispatcher # handle the no subscribers case here to make the real dispatch impl easier - def dispatch([event], _length, {_n, 0, _pending, [], _pids} = state) do + def dispatch([event], _length, %State{waiting: 0, subscribers: []} = state) do {:ok, [event], state} end - def dispatch([event], _length, {n, 0, _pending, subs, pids}) do + def dispatch([event], _length, %State{waiting: 0, subscribers: subscribers} = state) do {waiting, pending} = - subs - |> Enum.reduce({0, MapSet.new()}, fn {{pid, ref} = sub, selector}, {waiting, pending} -> - if subscriber_wants_message?(event, selector) do - Process.send(pid, {:"$gen_consumer", {self(), ref}, [event]}, [:noconnect]) - {waiting + 1, MapSet.put(pending, sub)} - else - {waiting, pending} - end + state.filter + |> Filter.affected_shapes(event) + |> Enum.reduce({0, MapSet.new()}, fn {pid, ref} = subscriber, {waiting, pending} -> + Process.send(pid, {:"$gen_consumer", {self(), ref}, [event]}, [:noconnect]) + {waiting + 1, MapSet.put(pending, subscriber)} end) |> case do {0, _pending} -> # even though no subscriber wants the event, we still need to generate demand # so that we can complete the loop in the log collector - [{sub, _selector} | _] = subs - send(self(), {:"$gen_producer", sub, {:ask, 1}}) - {1, MapSet.new([sub])} + [{subscriber, _selector} | _] = subscribers + send(self(), {:"$gen_producer", subscriber, {:ask, 1}}) + {1, MapSet.new([subscriber])} {waiting, pending} -> {waiting, pending} end - {:ok, [], {n, waiting, pending, subs, pids}} + {:ok, [], %State{state | waiting: waiting, pending: pending}} end @impl GenStage.Dispatcher @@ -147,7 +168,4 @@ defmodule Electric.Shapes.Dispatcher do send(self(), msg) {:ok, state} end - - defp subscriber_wants_message?(_event, nil), do: true - defp subscriber_wants_message?(event, selector), do: selector.(event) end diff --git a/packages/sync-service/lib/electric/shapes/filter.ex b/packages/sync-service/lib/electric/shapes/filter.ex new file mode 100644 index 0000000000..e43de40e18 --- /dev/null +++ b/packages/sync-service/lib/electric/shapes/filter.ex @@ -0,0 +1,154 @@ +defmodule Electric.Shapes.Filter do + @moduledoc """ + Responsible for knowing which shapes are affected by a change. + + `affected_shapes(filter, change)` will return a set of IDs for the shapes that are affected by the change + considering all the shapes that have been added to the filter using `add_shape/3`. + + + The `Filter` module keeps track of what tables are referenced by the shapes and changes and delegates + the table specific logic to the `Filter.Table` module. + """ + + alias Electric.Replication.Changes + alias Electric.Replication.Changes.DeletedRecord + alias Electric.Replication.Changes.NewRecord + alias Electric.Replication.Changes.Relation + alias Electric.Replication.Changes.Transaction + alias Electric.Replication.Changes.TruncatedRelation + alias Electric.Replication.Changes.UpdatedRecord + alias Electric.Shapes.Filter + alias Electric.Shapes.Filter.Table + alias Electric.Shapes.Shape + require Logger + + defstruct tables: %{} + + @type t :: %Filter{} + @type shape_id :: any() + + @spec new() :: Filter.t() + def new, do: %Filter{} + + @doc """ + Add a shape for the filter to track. + + The `shape_id` can be any term you like to identify the shape. Whatever you use will be returned + by `affected_shapes/2` when the shape is affected by a change. + """ + @spec add_shape(Filter.t(), shape_id(), Shape.t()) :: Filter.t() + def add_shape(%Filter{tables: tables}, shape_id, shape) do + %Filter{ + tables: + Map.update( + tables, + shape.root_table, + Table.add_shape(Table.new(), {shape_id, shape}), + fn table -> + Table.add_shape(table, {shape_id, shape}) + end + ) + } + end + + @doc """ + Remove a shape from the filter. + """ + @spec remove_shape(Filter.t(), shape_id()) :: Filter.t() + def remove_shape(%Filter{tables: tables}, shape_id) do + %Filter{ + tables: + tables + |> Enum.map(fn {table_name, table} -> + {table_name, Table.remove_shape(table, shape_id)} + end) + |> Enum.reject(fn {_table, table} -> Table.empty?(table) end) + |> Map.new() + } + end + + @doc """ + Returns the shape IDs for all shapes that have been added to the filter + that are affected by the given change. + """ + @spec affected_shapes(Filter.t(), Changes.change()) :: MapSet.t(shape_id()) + def affected_shapes(%Filter{} = filter, change) do + shapes_affected_by_change(filter, change) + rescue + error -> + Logger.error(""" + Unexpected error in Filter.affected_shapes: + #{Exception.format(:error, error, __STACKTRACE__)} + """) + + # We can't tell which shapes are affected, the safest thing to do is return all shapes + filter + |> all_shapes() + |> MapSet.new(fn {shape_id, _shape} -> shape_id end) + end + + defp shapes_affected_by_change(%Filter{} = filter, %Relation{} = relation) do + # Check all shapes is all tables becuase the table may have been renamed + for {shape_id, shape} <- all_shapes(filter), + Shape.is_affected_by_relation_change?(shape, relation), + into: MapSet.new() do + shape_id + end + end + + defp shapes_affected_by_change(%Filter{} = filter, %Transaction{changes: changes}) do + changes + |> Enum.map(&affected_shapes(filter, &1)) + |> Enum.reduce(MapSet.new(), &MapSet.union(&1, &2)) + end + + defp shapes_affected_by_change(%Filter{} = filter, %NewRecord{ + relation: relation, + record: record + }) do + shapes_affected_by_record(filter, relation, record) + end + + defp shapes_affected_by_change(%Filter{} = filter, %DeletedRecord{ + relation: relation, + old_record: record + }) do + shapes_affected_by_record(filter, relation, record) + end + + defp shapes_affected_by_change(%Filter{} = filter, %UpdatedRecord{relation: relation} = change) do + MapSet.union( + shapes_affected_by_record(filter, relation, change.record), + shapes_affected_by_record(filter, relation, change.old_record) + ) + end + + defp shapes_affected_by_change(%Filter{} = filter, %TruncatedRelation{relation: table_name}) do + for {shape_id, _shape} <- all_shapes_for_table(filter, table_name), + into: MapSet.new() do + shape_id + end + end + + defp shapes_affected_by_record(filter, table_name, record) do + case Map.get(filter.tables, table_name) do + nil -> MapSet.new() + table -> Table.affected_shapes(table, record) + end + end + + defp all_shapes(%Filter{} = filter) do + for {_table, table} <- filter.tables, + {shape_id, shape} <- Table.all_shapes(table), + into: %{} do + {shape_id, shape} + end + end + + defp all_shapes_for_table(%Filter{} = filter, table_name) do + case Map.get(filter.tables, table_name) do + nil -> %{} + table -> Table.all_shapes(table) + end + end +end diff --git a/packages/sync-service/lib/electric/shapes/filter/index.ex b/packages/sync-service/lib/electric/shapes/filter/index.ex new file mode 100644 index 0000000000..7a374e481a --- /dev/null +++ b/packages/sync-service/lib/electric/shapes/filter/index.ex @@ -0,0 +1,81 @@ +defmodule Electric.Shapes.Filter.Index do + @moduledoc """ + Responsible for knowing which shapes are affected by a change to a specific field. + + The `%Table{}` struct contains `values` a map of values for a specific field to shapes that are affected by that field value. + This acts as an index for the shapes, providing a fast way to know which shapes have been affected without having to + iterate over all the shapes. + + Currently only `=` operations are indexed. + """ + + alias Electric.Replication.Eval.Env + alias Electric.Shapes.Filter.Index + alias Electric.Shapes.WhereClause + require Logger + + defstruct [:type, :values] + + def new(type), do: %Index{type: type, values: %{}} + + def empty?(%Index{values: values}), do: values == %{} + + def add_shape(%Index{} = index, value, {shape_id, shape}, and_where) do + %{ + index + | values: + Map.update( + index.values, + value, + [%{shape_id: shape_id, and_where: and_where, shape: shape}], + fn shapes -> [%{shape_id: shape_id, and_where: and_where, shape: shape} | shapes] end + ) + } + end + + def remove_shape(%Index{} = index, shape_id) do + %{ + index + | values: + index.values + |> Map.new(fn {value, shapes} -> + {value, shapes |> Enum.reject(&(&1.shape_id == shape_id))} + end) + |> Enum.reject(fn {_value, shapes} -> shapes == [] end) + |> Map.new() + } + end + + def affected_shapes(%Index{values: values, type: type}, field, record) do + case Map.get(values, value_from_record(record, field, type)) do + nil -> + MapSet.new() + + shapes -> + shapes + |> Enum.filter(&WhereClause.includes_record?(&1.and_where, record)) + |> Enum.map(& &1.shape_id) + |> MapSet.new() + end + end + + @env Env.new() + defp value_from_record(record, field, type) do + case Env.parse_const(@env, record[field], type) do + {:ok, value} -> + value + + :error -> + raise RuntimeError, + message: "Could not parse value for field #{inspect(field)} of type #{inspect(type)}" + end + end + + def all_shapes(%Index{values: values}) do + for {_value, shapes} <- values, + %{shape_id: shape_id, shape: shape} <- shapes, + into: %{} do + {shape_id, shape} + end + end +end diff --git a/packages/sync-service/lib/electric/shapes/filter/table.ex b/packages/sync-service/lib/electric/shapes/filter/table.ex new file mode 100644 index 0000000000..898fe4c049 --- /dev/null +++ b/packages/sync-service/lib/electric/shapes/filter/table.ex @@ -0,0 +1,142 @@ +defmodule Electric.Shapes.Filter.Table do + @moduledoc """ + Responsible for knowing which shapes are affected by a change to a specific table. + + The `%Table{}` struct contains `indexes`, a map of indexes for shapes that have been optimised, and `other_shapes` for shapes + that have not been optimised. The logic for specific indexes is delegated to the `Filter.Index` module. + + """ + + alias Electric.Replication.Eval.Expr + alias Electric.Replication.Eval.Parser + alias Electric.Replication.Eval.Parser.Const + alias Electric.Replication.Eval.Parser.Func + alias Electric.Replication.Eval.Parser.Ref + alias Electric.Shapes.Filter.Index + alias Electric.Shapes.Filter.Table + alias Electric.Shapes.WhereClause + + require Logger + + defstruct indexes: %{}, other_shapes: %{} + + def new, do: %Table{} + + def empty?(%Table{indexes: indexes, other_shapes: other_shapes}) do + indexes == %{} && other_shapes == %{} + end + + def add_shape(%Table{} = table, {shape_id, shape} = shape_instance) do + case optimise_where(shape.where) do + %{operation: "=", field: field, type: type, value: value, and_where: and_where} -> + %{ + table + | indexes: + add_shape_to_indexes( + field, + type, + value, + shape_instance, + table.indexes, + and_where + ) + } + + :not_optimised -> + %{table | other_shapes: Map.put(table.other_shapes, shape_id, shape)} + end + end + + defp add_shape_to_indexes(field, type, value, shape_instance, indexes, and_where) do + Map.update( + indexes, + field, + Index.add_shape(Index.new(type), value, shape_instance, and_where), + fn index -> + Index.add_shape(index, value, shape_instance, and_where) + end + ) + end + + defp optimise_where(%Expr{eval: eval}), do: optimise_where(eval) + + defp optimise_where(%Func{ + name: ~s("="), + args: [%Ref{path: [field], type: type}, %Const{value: value}] + }) do + %{operation: "=", field: field, type: type, value: value, and_where: nil} + end + + defp optimise_where(%Func{ + name: ~s("="), + args: [%Const{value: value}, %Ref{path: [field], type: type}] + }) do + %{operation: "=", field: field, type: type, value: value, and_where: nil} + end + + defp optimise_where(%Func{name: "and", args: [arg1, arg2]}) do + case {optimise_where(arg1), optimise_where(arg2)} do + {%{operation: "=", and_where: nil} = params, _} -> + %{params | and_where: where_expr(arg2)} + + {_, %{operation: "=", and_where: nil} = params} -> + %{params | and_where: where_expr(arg1)} + + _ -> + :not_optimised + end + end + + defp optimise_where(_), do: :not_optimised + + defp where_expr(eval) do + %Expr{eval: eval, used_refs: Parser.find_refs(eval), returns: :bool} + end + + def remove_shape(%Table{indexes: indexes, other_shapes: other_shapes}, shape_id) do + %Table{ + indexes: remove_shape_from_indexes(indexes, shape_id), + other_shapes: Map.delete(other_shapes, shape_id) + } + end + + defp remove_shape_from_indexes(indexes, shape_id) do + indexes + |> Map.new(fn {field, index} -> {field, Index.remove_shape(index, shape_id)} end) + |> Enum.reject(fn {_field, index} -> Index.empty?(index) end) + |> Map.new() + end + + def affected_shapes(%Table{indexes: indexes} = table, record) do + indexes + |> Enum.map(fn {field, index} -> Index.affected_shapes(index, field, record) end) + |> Enum.reduce(MapSet.new(), &MapSet.union(&1, &2)) + |> MapSet.union(other_shapes_affected(table, record)) + rescue + error -> + Logger.error(""" + Unexpected error in Filter.Table.affected_shapes: + #{Exception.format(:error, error, __STACKTRACE__)} + """) + + # We can't tell which shapes are affected, the safest thing to do is return all shapes + table + |> all_shapes() + |> MapSet.new(fn {shape_id, _shape} -> shape_id end) + end + + defp other_shapes_affected(%{other_shapes: shapes}, record) do + for {shape_id, shape} <- shapes, + WhereClause.includes_record?(shape.where, record), + into: MapSet.new() do + shape_id + end + end + + def all_shapes(%Table{indexes: indexes, other_shapes: other_shapes}) do + for {_field, index} <- indexes, {shape_id, shape} <- Index.all_shapes(index), into: %{} do + {shape_id, shape} + end + |> Map.merge(other_shapes) + end +end diff --git a/packages/sync-service/lib/electric/shapes/shape.ex b/packages/sync-service/lib/electric/shapes/shape.ex index a28faf7892..9333ffa398 100644 --- a/packages/sync-service/lib/electric/shapes/shape.ex +++ b/packages/sync-service/lib/electric/shapes/shape.ex @@ -5,8 +5,8 @@ defmodule Electric.Shapes.Shape do require Logger alias Electric.Postgres.Inspector alias Electric.Replication.Eval.Parser - alias Electric.Replication.Eval.Runner alias Electric.Replication.Changes + alias Electric.Shapes.WhereClause @enforce_keys [:root_table, :root_table_id] defstruct [ @@ -218,7 +218,7 @@ defmodule Electric.Shapes.Shape do when is_struct(change, Changes.DeletedRecord) do record = if is_struct(change, Changes.NewRecord), do: change.record, else: change.old_record - if record_in_shape?(where, record), + if WhereClause.includes_record?(where, record), do: [filter_change_columns(selected_columns, change)], else: [] end @@ -227,8 +227,8 @@ defmodule Electric.Shapes.Shape do %__MODULE__{where: where, selected_columns: selected_columns}, %Changes.UpdatedRecord{old_record: old_record, record: record} = change ) do - old_record_in_shape = record_in_shape?(where, old_record) - new_record_in_shape = record_in_shape?(where, record) + old_record_in_shape = WhereClause.includes_record?(where, old_record) + new_record_in_shape = WhereClause.includes_record?(where, record) converted_changes = case {old_record_in_shape, new_record_in_shape} do @@ -254,17 +254,6 @@ defmodule Electric.Shapes.Shape do defp filtered_columns_changed(_), do: true - defp record_in_shape?(nil, _record), do: true - - defp record_in_shape?(where, record) do - with {:ok, refs} <- Runner.record_to_ref_values(where.used_refs, record), - {:ok, evaluated} <- Runner.execute(where, refs) do - if is_nil(evaluated), do: false, else: evaluated - else - _ -> false - end - end - # If relation OID matches, but qualified table name does not, then shape is affected def is_affected_by_relation_change?( %__MODULE__{root_table_id: id, root_table: {shape_schema, shape_table}}, diff --git a/packages/sync-service/lib/electric/shapes/where_clause.ex b/packages/sync-service/lib/electric/shapes/where_clause.ex new file mode 100644 index 0000000000..f3bb276cea --- /dev/null +++ b/packages/sync-service/lib/electric/shapes/where_clause.ex @@ -0,0 +1,14 @@ +defmodule Electric.Shapes.WhereClause do + alias Electric.Replication.Eval.Runner + + def includes_record?(nil = _where_clause, _record), do: true + + def includes_record?(where_clause, record) do + with {:ok, refs} <- Runner.record_to_ref_values(where_clause.used_refs, record), + {:ok, evaluated} <- Runner.execute(where_clause, refs) do + if is_nil(evaluated), do: false, else: evaluated + else + _ -> false + end + end +end diff --git a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs index 9d7cebcef7..3260248b44 100644 --- a/packages/sync-service/test/electric/replication/shape_log_collector_test.exs +++ b/packages/sync-service/test/electric/replication/shape_log_collector_test.exs @@ -6,8 +6,10 @@ defmodule Electric.Replication.ShapeLogCollectorTest do alias Electric.Replication.Changes.{Transaction, Relation} alias Electric.Replication.Changes alias Electric.Replication.LogOffset + alias Electric.Shapes.Shape alias Support.Mock + alias Support.StubInspector import Support.ComponentSetup, only: [with_in_memory_storage: 1, with_stack_id_from_test: 1] import Mox @@ -58,13 +60,21 @@ defmodule Electric.Replication.ShapeLogCollectorTest do end describe "store_transaction/2" do + @inspector StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]) + @shape Shape.new!("test_table", where: "id = 2", inspector: @inspector) + setup ctx do parent = self() consumers = Enum.map(1..3, fn id -> {:ok, consumer} = - Support.TransactionConsumer.start_link(id: id, parent: parent, producer: ctx.server) + Support.TransactionConsumer.start_link( + id: id, + parent: parent, + producer: ctx.server, + shape: @shape + ) {id, consumer} end) @@ -88,7 +98,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do %Transaction{xid: xmin, lsn: lsn, last_log_offset: last_log_offset} |> Transaction.prepend_change(%Changes.NewRecord{ relation: {"public", "test_table"}, - record: %{"id" => "1"} + record: %{"id" => "2", "name" => "foo"} }) assert :ok = ShapeLogCollector.store_transaction(txn, ctx.server) @@ -102,7 +112,7 @@ defmodule Electric.Replication.ShapeLogCollectorTest do %Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset} |> Transaction.prepend_change(%Changes.NewRecord{ relation: {"public", "test_table"}, - record: %{"id" => "2"} + record: %{"id" => "2", "name" => "bar"} }) assert :ok = ShapeLogCollector.store_transaction(txn2, ctx.server) @@ -120,7 +130,12 @@ defmodule Electric.Replication.ShapeLogCollectorTest do consumers = Enum.map(1..3, fn id -> {:ok, consumer} = - Support.TransactionConsumer.start_link(id: id, parent: parent, producer: ctx.server) + Support.TransactionConsumer.start_link( + id: id, + parent: parent, + producer: ctx.server, + shape: @shape + ) {id, consumer} end) @@ -129,11 +144,13 @@ defmodule Electric.Replication.ShapeLogCollectorTest do end test "should handle new relations", ctx do - relation1 = %Relation{id: 1, table: "test_table", schema: "public"} + id = @shape.root_table_id + + relation1 = %Relation{id: id, table: "test_table", schema: "public", columns: []} assert :ok = ShapeLogCollector.handle_relation_msg(relation1, ctx.server) - relation2 = %Relation{id: 2, table: "bar", schema: "public"} + relation2 = %Relation{id: id, table: "bar", schema: "public", columns: []} assert :ok = ShapeLogCollector.handle_relation_msg(relation2, ctx.server) diff --git a/packages/sync-service/test/electric/shapes/dispatcher_test.exs b/packages/sync-service/test/electric/shapes/dispatcher_test.exs index fa930dbb74..db55d9c532 100644 --- a/packages/sync-service/test/electric/shapes/dispatcher_test.exs +++ b/packages/sync-service/test/electric/shapes/dispatcher_test.exs @@ -1,10 +1,27 @@ defmodule Electric.Shapes.DispatcherTest do use ExUnit.Case, async: true + alias Electric.Shapes.Shape + alias Electric.Replication.Changes.NewRecord + alias Electric.Replication.Changes.Transaction alias Electric.Shapes.Dispatcher, as: D + alias Support.StubInspector - defp dispatcher(opts \\ []) do - {:ok, state} = D.init(opts) + @inspector StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]) + @shape Shape.new!("the_table", where: "id = 1", inspector: @inspector) + @other_shape Shape.new!("the_table", where: "id = 2", inspector: @inspector) + + @transaction %Transaction{ + changes: [ + %NewRecord{ + relation: {"public", "the_table"}, + record: %{"id" => "1"} + } + ] + } + + defp dispatcher() do + {:ok, state} = D.init([]) state end @@ -32,9 +49,6 @@ defmodule Electric.Shapes.DispatcherTest do {pid, ref} end - defp is_even(n), do: rem(n, 2) == 0 - defp is_odd(n), do: rem(n, 2) == 1 - test "demand is only sent to producer once all subscribers have processed the message" do dispatcher = dispatcher() @@ -43,11 +57,11 @@ defmodule Electric.Shapes.DispatcherTest do c3 = {_pid3, ref3} = consumer(3) # we only want to send a single event for any number of consumers - {:ok, 1, dispatcher} = D.subscribe([selector: &is_even/1], c1, dispatcher) - {:ok, 0, dispatcher} = D.subscribe([selector: &is_even/1], c2, dispatcher) - {:ok, 0, dispatcher} = D.subscribe([selector: &is_even/1], c3, dispatcher) + {:ok, 1, dispatcher} = D.subscribe([shape: @shape], c1, dispatcher) + {:ok, 0, dispatcher} = D.subscribe([shape: @shape], c2, dispatcher) + {:ok, 0, dispatcher} = D.subscribe([shape: @shape], c3, dispatcher) - event = 2 + event = @transaction {:ok, [], dispatcher} = D.dispatch([event], 1, dispatcher) @@ -68,11 +82,11 @@ defmodule Electric.Shapes.DispatcherTest do c2 = {_pid2, ref2} = consumer(2) c3 = {_pid3, ref3} = consumer(3) - {:ok, 1, dispatcher} = D.subscribe([selector: &is_odd/1], c1, dispatcher) - {:ok, 0, dispatcher} = D.subscribe([selector: &is_even/1], c2, dispatcher) - {:ok, 0, dispatcher} = D.subscribe([selector: &is_even/1], c3, dispatcher) + {:ok, 1, dispatcher} = D.subscribe([shape: @other_shape], c1, dispatcher) + {:ok, 0, dispatcher} = D.subscribe([shape: @shape], c2, dispatcher) + {:ok, 0, dispatcher} = D.subscribe([shape: @shape], c3, dispatcher) - event = 2 + event = @transaction {:ok, [], dispatcher} = D.dispatch([event], 1, dispatcher) @@ -91,11 +105,11 @@ defmodule Electric.Shapes.DispatcherTest do c2 = {_pid2, ref2} = consumer(2) c3 = {_pid3, ref3} = consumer(3) - {:ok, 1, dispatcher} = D.subscribe([selector: &is_even/1], c1, dispatcher) - {:ok, 0, dispatcher} = D.subscribe([selector: &is_even/1], c2, dispatcher) - {:ok, 0, dispatcher} = D.subscribe([selector: &is_even/1], c3, dispatcher) + {:ok, 1, dispatcher} = D.subscribe([shape: @shape], c1, dispatcher) + {:ok, 0, dispatcher} = D.subscribe([shape: @shape], c2, dispatcher) + {:ok, 0, dispatcher} = D.subscribe([shape: @shape], c3, dispatcher) - event = 2 + event = @transaction {:ok, [], dispatcher} = D.dispatch([event], 1, dispatcher) @@ -117,11 +131,11 @@ defmodule Electric.Shapes.DispatcherTest do c2 = {_pid2, ref2} = consumer(2) c3 = {_pid3, ref3} = consumer(3) - {:ok, 1, dispatcher} = D.subscribe([], c1, dispatcher) - {:ok, 0, dispatcher} = D.subscribe([selector: &is_even/1], c2, dispatcher) - {:ok, 0, dispatcher} = D.subscribe([selector: &is_even/1], c3, dispatcher) + {:ok, 1, dispatcher} = D.subscribe([shape: @shape], c1, dispatcher) + {:ok, 0, dispatcher} = D.subscribe([shape: @shape], c2, dispatcher) + {:ok, 0, dispatcher} = D.subscribe([shape: @shape], c3, dispatcher) - event = 2 + event = @transaction {:ok, [], dispatcher} = D.dispatch([event], 1, dispatcher) @@ -146,11 +160,11 @@ defmodule Electric.Shapes.DispatcherTest do c3 = {_pid3, _ref3} = consumer(3) # we only want to send a single event for any number of consumers - {:ok, 1, dispatcher} = D.subscribe([], c1, dispatcher) - {:ok, 0, dispatcher} = D.subscribe([selector: &is_even/1], c2, dispatcher) - {:ok, 0, dispatcher} = D.subscribe([selector: &is_even/1], c3, dispatcher) + {:ok, 1, dispatcher} = D.subscribe([shape: @shape], c1, dispatcher) + {:ok, 0, dispatcher} = D.subscribe([shape: @shape], c2, dispatcher) + {:ok, 0, dispatcher} = D.subscribe([shape: @shape], c3, dispatcher) - event = 2 + event = @transaction {:ok, [], dispatcher} = D.dispatch([event], 1, dispatcher) @@ -168,11 +182,11 @@ defmodule Electric.Shapes.DispatcherTest do c2 = {_pid2, ref2} = consumer(2) c3 = {_pid3, ref3} = consumer(3) - {:ok, 1, dispatcher} = D.subscribe([selector: &is_even/1], c1, dispatcher) - {:ok, 0, dispatcher} = D.subscribe([selector: &is_even/1], c2, dispatcher) - {:ok, 0, dispatcher} = D.subscribe([selector: &is_even/1], c3, dispatcher) + {:ok, 1, dispatcher} = D.subscribe([shape: @other_shape], c1, dispatcher) + {:ok, 0, dispatcher} = D.subscribe([shape: @other_shape], c2, dispatcher) + {:ok, 0, dispatcher} = D.subscribe([shape: @other_shape], c3, dispatcher) - event = 3 + event = @transaction {:ok, [], dispatcher} = D.dispatch([event], 1, dispatcher) refute_receive {C, ^ref1, [^event]} diff --git a/packages/sync-service/test/electric/shapes/filter_test.exs b/packages/sync-service/test/electric/shapes/filter_test.exs new file mode 100644 index 0000000000..4b114d184f --- /dev/null +++ b/packages/sync-service/test/electric/shapes/filter_test.exs @@ -0,0 +1,310 @@ +defmodule Electric.Shapes.FilterTest do + use ExUnit.Case + + import ExUnit.CaptureLog + + alias Electric.Replication.Changes.DeletedRecord + alias Electric.Replication.Changes.NewRecord + alias Electric.Replication.Changes.Relation + alias Electric.Replication.Changes.Transaction + alias Electric.Replication.Changes.TruncatedRelation + alias Electric.Replication.Changes.UpdatedRecord + alias Electric.Shapes.Filter + alias Electric.Shapes.Shape + alias Support.StubInspector + + @inspector StubInspector.new([ + %{name: "id", type: "int8", pk_position: 0}, + %{name: "an_array", array_type: "int8"} + ]) + + describe "affected_shapes/2" do + test "returns shapes affected by insert" do + filter = + Filter.new() + |> Filter.add_shape("s1", Shape.new!("t1", where: "id = 1", inspector: @inspector)) + |> Filter.add_shape("s2", Shape.new!("t1", where: "id = 2", inspector: @inspector)) + |> Filter.add_shape("s3", Shape.new!("t1", where: "id = 3", inspector: @inspector)) + |> Filter.add_shape("s4", Shape.new!("t2", where: "id = 2", inspector: @inspector)) + + insert = + %Transaction{ + changes: [ + %NewRecord{ + relation: {"public", "t1"}, + record: %{"id" => "2"} + } + ] + } + + assert Filter.affected_shapes(filter, insert) == MapSet.new(["s2"]) + end + + test "returns shapes affected by delete" do + filter = + Filter.new() + |> Filter.add_shape("s1", Shape.new!("t1", where: "id = 1", inspector: @inspector)) + |> Filter.add_shape("s2", Shape.new!("t1", where: "id = 2", inspector: @inspector)) + |> Filter.add_shape("s3", Shape.new!("t1", where: "id = 3", inspector: @inspector)) + |> Filter.add_shape("s4", Shape.new!("t2", where: "id = 2", inspector: @inspector)) + + delete = + %Transaction{ + changes: [ + %DeletedRecord{ + relation: {"public", "t1"}, + old_record: %{"id" => "2"} + } + ] + } + + assert Filter.affected_shapes(filter, delete) == MapSet.new(["s2"]) + end + + test "returns shapes affected by update" do + filter = + Filter.new() + |> Filter.add_shape("s1", Shape.new!("t1", where: "id = 1", inspector: @inspector)) + |> Filter.add_shape("s2", Shape.new!("t1", where: "id = 2", inspector: @inspector)) + |> Filter.add_shape("s3", Shape.new!("t1", where: "id = 3", inspector: @inspector)) + |> Filter.add_shape("s4", Shape.new!("t1", where: "id = 4", inspector: @inspector)) + |> Filter.add_shape("s2", Shape.new!("t2", where: "id = 2", inspector: @inspector)) + + update = + %Transaction{ + changes: [ + %UpdatedRecord{ + relation: {"public", "t1"}, + record: %{"id" => "2"}, + old_record: %{"id" => "3"} + } + ] + } + + assert Filter.affected_shapes(filter, update) == MapSet.new(["s2", "s3"]) + end + + test "returns shapes affected by relation change" do + filter = + Filter.new() + |> Filter.add_shape("s1", Shape.new!("t1", where: "id = 1", inspector: @inspector)) + |> Filter.add_shape("s2", Shape.new!("t1", where: "id = 2", inspector: @inspector)) + |> Filter.add_shape("s3", Shape.new!("t1", where: "id > 7", inspector: @inspector)) + |> Filter.add_shape("s4", Shape.new!("t1", where: "id > 8", inspector: @inspector)) + |> Filter.add_shape("s5", Shape.new!("t2", where: "id = 1", inspector: @inspector)) + |> Filter.add_shape("s6", Shape.new!("t2", where: "id = 2", inspector: @inspector)) + |> Filter.add_shape("s7", Shape.new!("t2", where: "id > 7", inspector: @inspector)) + |> Filter.add_shape("s8", Shape.new!("t2", where: "id > 8", inspector: @inspector)) + + relation = %Relation{schema: "public", table: "t1"} + + assert Filter.affected_shapes(filter, relation) == MapSet.new(["s1", "s2", "s3", "s4"]) + end + + test "returns shapes affected by relation rename" do + table_id = 123 + s1 = Shape.new!("t1", inspector: @inspector) + s2 = Shape.new!("t2", inspector: @inspector) |> Map.put(:root_table_id, table_id) + s3 = Shape.new!("t3", inspector: @inspector) + + filter = + Filter.new() + |> Filter.add_shape("s1", s1) + |> Filter.add_shape("s2", s2) + |> Filter.add_shape("s3", s3) + + rename = %Relation{schema: "public", table: "new_name", id: table_id} + + assert Filter.affected_shapes(filter, rename) == MapSet.new(["s2"]) + end + + test "returns shapes affected by truncation" do + filter = + Filter.new() + |> Filter.add_shape("s1", Shape.new!("t1", where: "id = 1", inspector: @inspector)) + |> Filter.add_shape("s2", Shape.new!("t1", where: "id = 2", inspector: @inspector)) + |> Filter.add_shape("s3", Shape.new!("t1", where: "id > 7", inspector: @inspector)) + |> Filter.add_shape("s4", Shape.new!("t1", where: "id > 8", inspector: @inspector)) + |> Filter.add_shape("s5", Shape.new!("t2", where: "id = 1", inspector: @inspector)) + |> Filter.add_shape("s6", Shape.new!("t2", where: "id = 2", inspector: @inspector)) + |> Filter.add_shape("s7", Shape.new!("t2", where: "id > 7", inspector: @inspector)) + |> Filter.add_shape("s8", Shape.new!("t2", where: "id > 8", inspector: @inspector)) + + truncation = %Transaction{changes: [%TruncatedRelation{relation: {"public", "t1"}}]} + + assert Filter.affected_shapes(filter, truncation) == MapSet.new(["s1", "s2", "s3", "s4"]) + end + + test "where clause in the form `field = const` is optimised" do + filter = + 1..1000 + |> Enum.reduce(Filter.new(), fn i, filter -> + Filter.add_shape(filter, i, Shape.new!("t1", where: "id = #{i}", inspector: @inspector)) + end) + + reductions = + reductions(fn -> + assert Filter.affected_shapes(filter, change("t1", %{"id" => "7"})) == MapSet.new([7]) + end) + + assert reductions < 500 + end + + test "where clause in the form `field = const AND another_condition` is optimised" do + filter = + 1..1000 + |> Enum.reduce(Filter.new(), fn i, filter -> + Filter.add_shape( + filter, + i, + Shape.new!("t1", where: "id = #{i} AND id > 6", inspector: @inspector) + ) + end) + + reductions = + reductions(fn -> + assert Filter.affected_shapes(filter, change("t1", %{"id" => "7"})) == MapSet.new([7]) + end) + + assert reductions < 500 + end + + test "where clause in the form `a_condition AND field = const` is optimised" do + filter = + 1..1000 + |> Enum.reduce(Filter.new(), fn i, filter -> + Filter.add_shape( + filter, + i, + Shape.new!("t1", where: "id > 6 AND id = #{i}", inspector: @inspector) + ) + end) + + reductions = + reductions(fn -> + assert Filter.affected_shapes(filter, change("t1", %{"id" => "7"})) == MapSet.new([7]) + end) + + assert reductions < 500 + end + end + + test "shape with no where clause is affected by all changes for the same table" do + shape = Shape.new!("t1", inspector: @inspector) + filter = Filter.new() |> Filter.add_shape("s", shape) + + assert Filter.affected_shapes(filter, change("t1", %{"id" => "7"})) == MapSet.new(["s"]) + assert Filter.affected_shapes(filter, change("t1", %{"id" => "8"})) == MapSet.new(["s"]) + assert Filter.affected_shapes(filter, change("t2", %{"id" => "8"})) == MapSet.new([]) + end + + test "shape with a where clause is affected by changes that match that where clause" do + shape = Shape.new!("t1", where: "id = 7", inspector: @inspector) + filter = Filter.new() |> Filter.add_shape("s", shape) + + assert Filter.affected_shapes(filter, change("t1", %{"id" => "7"})) == MapSet.new(["s"]) + assert Filter.affected_shapes(filter, change("t1", %{"id" => "8"})) == MapSet.new([]) + assert Filter.affected_shapes(filter, change("t2", %{"id" => "8"})) == MapSet.new([]) + end + + test "invalid record value logs an error and says all shapes for the table are affected" do + filter = + Filter.new() + |> Filter.add_shape("shape1", Shape.new!("table", inspector: @inspector)) + |> Filter.add_shape("shape2", Shape.new!("table", where: "id = 7", inspector: @inspector)) + |> Filter.add_shape("shape3", Shape.new!("table", where: "id = 8", inspector: @inspector)) + |> Filter.add_shape("shape4", Shape.new!("table", where: "id > 9", inspector: @inspector)) + |> Filter.add_shape("shape5", Shape.new!("another_table", inspector: @inspector)) + + log = + capture_log(fn -> + assert Filter.affected_shapes(filter, change("table", %{"id" => "invalid_value"})) == + MapSet.new(["shape1", "shape2", "shape3", "shape4"]) + end) + + assert log =~ ~s(Could not parse value for field "id" of type :int8) + end + + test "Filter.remove_shape/2" do + empty = Filter.new() + + filter1 = + empty + |> Filter.add_shape("shape1", Shape.new!("table", inspector: @inspector)) + + filter2 = + filter1 + |> Filter.add_shape("shape2", Shape.new!("another_table", inspector: @inspector)) + + filter3 = + filter2 + |> Filter.add_shape("shape3", Shape.new!("table", where: "id = 1", inspector: @inspector)) + + filter4 = + filter3 + |> Filter.add_shape("shape4", Shape.new!("table", where: "id = 2", inspector: @inspector)) + + filter5 = + filter4 + |> Filter.add_shape("shape5", Shape.new!("table", where: "id > 2", inspector: @inspector)) + + filter6 = + filter5 + |> Filter.add_shape("shape6", Shape.new!("table", where: "id > 7", inspector: @inspector)) + + assert Filter.remove_shape(filter6, "shape6") == filter5 + assert Filter.remove_shape(filter5, "shape5") == filter4 + assert Filter.remove_shape(filter4, "shape4") == filter3 + assert Filter.remove_shape(filter3, "shape3") == filter2 + assert Filter.remove_shape(filter2, "shape2") == filter1 + assert Filter.remove_shape(filter1, "shape1") == empty + end + + for test <- [ + %{where: "id = 7", record: %{"id" => "7"}, affected: true}, + %{where: "id = 7", record: %{"id" => "8"}, affected: false}, + %{where: "id = 7", record: %{"id" => nil}, affected: false}, + %{where: "7 = id", record: %{"id" => "7"}, affected: true}, + %{where: "7 = id", record: %{"id" => "8"}, affected: false}, + %{where: "7 = id", record: %{"id" => nil}, affected: false}, + %{where: "id = 7 AND id > 1", record: %{"id" => "7"}, affected: true}, + %{where: "id = 7 AND id > 1", record: %{"id" => "8"}, affected: false}, + %{where: "id = 7 AND id > 8", record: %{"id" => "7"}, affected: false}, + %{where: "id > 1 AND id = 7", record: %{"id" => "7"}, affected: true}, + %{where: "id > 1 AND id = 7", record: %{"id" => "8"}, affected: false}, + %{where: "id > 8 AND id = 7", record: %{"id" => "7"}, affected: false}, + %{where: "an_array = '{1}'", record: %{"an_array" => "{1}"}, affected: true}, + %{where: "an_array = '{1}'", record: %{"an_array" => "{2}"}, affected: false}, + %{where: "an_array = '{1}'", record: %{"an_array" => "{1,2}"}, affected: false} + ] do + test "where: #{test.where}, record: #{inspect(test.record)}" do + %{where: where, record: record, affected: affected} = unquote(Macro.escape(test)) + + shape = Shape.new!("the_table", where: where, inspector: @inspector) + + transaction = change("the_table", record) + + assert Filter.new() + |> Filter.add_shape("the-shape", shape) + |> Filter.affected_shapes(transaction) == MapSet.new(["the-shape"]) == affected + end + end + + defp change(table, record) do + %Transaction{ + changes: [ + %NewRecord{ + relation: {"public", table}, + record: record + } + ] + } + end + + defp reductions(fun) do + {:reductions, reductions_before} = :erlang.process_info(self(), :reductions) + fun.() + {:reductions, reductions_after} = :erlang.process_info(self(), :reductions) + reductions_after - reductions_before + end +end diff --git a/packages/sync-service/test/support/transaction_consumer.ex b/packages/sync-service/test/support/transaction_consumer.ex index 6c51b61928..d6fe94e9c0 100644 --- a/packages/sync-service/test/support/transaction_consumer.ex +++ b/packages/sync-service/test/support/transaction_consumer.ex @@ -35,9 +35,9 @@ defmodule Support.TransactionConsumer do {:ok, producer} = Keyword.fetch(opts, :producer) {:ok, parent} = Keyword.fetch(opts, :parent) {:ok, id} = Keyword.fetch(opts, :id) - partition = Keyword.get(opts, :partition, :transaction) + shape = Keyword.fetch!(opts, :shape) - {:consumer, {id, nil, parent}, subscribe_to: [{producer, [partition: partition]}]} + {:consumer, {id, nil, parent}, subscribe_to: [{producer, [shape: shape]}]} end def handle_subscribe(:producer, _options, from, {id, _, parent}) do