Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimise where clause filtering #2076

Merged
merged 62 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
9cb9f5a
Add filter
robacourt Nov 27, 2024
d1d1adf
Make Shapes.record_in_shape? take a shape
robacourt Nov 27, 2024
f299dc0
Make add_shape public
robacourt Nov 27, 2024
9327bbd
Support other types of change
robacourt Nov 27, 2024
5ca7267
Test id not found
robacourt Nov 27, 2024
285105d
Add Filter struct
robacourt Nov 27, 2024
f4a5e39
Turn other_shapes into a map
robacourt Nov 27, 2024
5012b0c
Support relation changes
robacourt Nov 27, 2024
0586a0e
Support TruncateRelation
robacourt Nov 27, 2024
e97d803
Optimise where clauses
robacourt Nov 27, 2024
e4ccdcf
Optimise where clauses
robacourt Nov 27, 2024
54809ee
Add Filter.remove_shape
robacourt Nov 28, 2024
c697ce8
Update dispatcher
robacourt Nov 28, 2024
319ede5
Add TODOs
robacourt Nov 29, 2024
1e7f050
Support renames
robacourt Nov 29, 2024
1cb01da
Fix remaining tests
robacourt Nov 29, 2024
2881ad2
Remove redundant arg
robacourt Nov 29, 2024
8c3bf5f
Pass inspector to dispatcher
robacourt Nov 29, 2024
9f03c18
Refactor dispatcher so that state is a struct
robacourt Nov 29, 2024
bb2e4fa
Add inspector to dispatcher state
robacourt Nov 29, 2024
55322a6
Add TODO
robacourt Nov 29, 2024
ecf54f9
Parse record rather than converting the const to a string
robacourt Nov 29, 2024
33c8a8b
Create expr on Filter.new
robacourt Nov 29, 2024
684cd47
Move functions into TableFilter
robacourt Nov 30, 2024
04c04e9
Rename TableFilter to Table
robacourt Nov 30, 2024
ac11f0d
Rename public Table functions
robacourt Nov 30, 2024
3db9970
Introduce Table struct
robacourt Nov 30, 2024
7538087
Extract code into Field
robacourt Nov 30, 2024
8a484ab
Rename table_filter to table
robacourt Nov 30, 2024
2d2c3f6
Rename field to index
robacourt Nov 30, 2024
47fc530
Ensure table is first param of Table public functions
robacourt Nov 30, 2024
ba275c5
Neaten up remove_shape
robacourt Nov 30, 2024
f8c2703
Refactor affected_shapes function to improve argument structure and u…
robacourt Nov 30, 2024
c4af878
Refactor: Use Table.empty?/1 for cleaner table emptiness check
robacourt Dec 1, 2024
b604482
Use Index struct
robacourt Dec 1, 2024
001e0d3
Refactor add_shape function to reorder parameters for consistency.
robacourt Dec 1, 2024
f4d3f9b
Remove unused inspector option from Dispatcher and related modules.
robacourt Dec 1, 2024
be41674
Refactor: Rename handle to shape_id
robacourt Dec 1, 2024
7c22ad7
Refactor all_shapes
robacourt Dec 2, 2024
dbf1192
Gracefully handle parsing errors
robacourt Dec 2, 2024
32e75e9
Hide deliberate error in test
robacourt Dec 2, 2024
b150fed
Test error logging for invalid error
robacourt Dec 2, 2024
c660449
Update TODOs
robacourt Dec 2, 2024
6dd4a48
Refactor code into WhereClause module
robacourt Dec 2, 2024
81be707
Remove Filter.new/1
robacourt Dec 2, 2024
e9822d6
Test arrays
robacourt Dec 2, 2024
7f474a1
Test shape with no where clause
robacourt Dec 2, 2024
ab3b970
Update TODOs
robacourt Dec 2, 2024
29fb39d
Update moduledoc
robacourt Dec 2, 2024
c452bc4
Add moduledoc
robacourt Dec 2, 2024
0ac30a6
Add moduledocs
robacourt Dec 2, 2024
efe3613
Remove TODO
robacourt Dec 2, 2024
aa4e7c2
Rescue all errors in affected_shapes
robacourt Dec 2, 2024
f91c830
Remove unnecessary field from dispatcher state
robacourt Dec 2, 2024
c0fba1a
Revert unnecessary change
robacourt Dec 2, 2024
8997ce6
Add typespecs
robacourt Dec 2, 2024
3e40fd7
Add documentation
robacourt Dec 2, 2024
e478fee
Decouple Filter tests from the Filter data structure
robacourt Dec 3, 2024
93ee95e
Add optimisation tests
robacourt Dec 3, 2024
51cd083
Refactor reductions/1
robacourt Dec 3, 2024
0f8ef27
Rename Filter.empty() to Filter.new()
robacourt Dec 3, 2024
12c0644
Add changeset
robacourt Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions packages/sync-service/lib/electric/replication/eval/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1063,14 +1063,14 @@ defmodule Electric.Replication.Eval.Parser do
defp internal_node_to_error(%Func{type: type, name: name}),
do: "function #{name} returning #{type}"

defp find_refs(tree, acc \\ %{})
defp find_refs(%Const{}, acc), do: acc
defp find_refs(%Ref{path: path, type: type}, acc), do: Map.put_new(acc, path, type)
def find_refs(tree, acc \\ %{})
def find_refs(%Const{}, acc), do: acc
def find_refs(%Ref{path: path, type: type}, acc), do: Map.put_new(acc, path, type)

defp find_refs(%Func{args: args, variadic_arg: nil}, acc),
def find_refs(%Func{args: args, variadic_arg: nil}, acc),
do: Enum.reduce(args, acc, &find_refs/2)

defp find_refs(%Func{args: args, variadic_arg: position}, acc),
def find_refs(%Func{args: args, variadic_arg: position}, acc),
do:
Enum.reduce(Enum.with_index(args), acc, fn
{arg, ^position}, acc -> Enum.reduce(arg, acc, &find_refs/2)
Expand Down
22 changes: 1 addition & 21 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,27 +87,7 @@ defmodule Electric.Shapes.Consumer do
monitors: []
})

{:consumer, state,
subscribe_to: [{producer, [max_demand: 1, selector: &selector(&1, config.shape)]}]}
end

defp selector(event, shape) do
process_event?(event, shape)
rescue
# Swallow errors here to avoid crashing the ShapeLogCollector.
# Return `true` so the event is processed, which will then error
# for the same reason and cleanup the shape.
_ -> true
end

defp process_event?(%Transaction{changes: changes}, shape) do
changes
|> Stream.flat_map(&Shape.convert_change(shape, &1))
|> Enum.any?()
end

defp process_event?(%Changes.Relation{} = relation_change, shape) do
Shape.is_affected_by_relation_change?(shape, relation_change)
{:consumer, state, subscribe_to: [{producer, [max_demand: 1, shape: config.shape]}]}
end

def handle_call(:initial_state, _from, %{snapshot_xmin: xmin, latest_offset: offset} = state) do
Expand Down
132 changes: 75 additions & 57 deletions packages/sync-service/lib/electric/shapes/dispatcher.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
defmodule Electric.Shapes.Dispatcher do
@moduledoc """
Dispatches transactions and relations to consumers filtered according to the
subscriber's `selector` function.

To receive all messages, don't pass a selector function or use `nil`, e.g.

```
def init(producer) do
{:consumer, :nostate, subscribe_to: [{producer, [max_demand: 1, selector: nil]}]}
end

```
Dispatches transactions and relations to consumers filtered using `Filter`
and the consumer's shape.

The essential behaviour is that the dispatcher only asks the producer for
more demand once all relevant subscribers have processed the last message and
Expand All @@ -31,16 +22,29 @@ defmodule Electric.Shapes.Dispatcher do
"""

require Logger
alias Electric.Shapes.Filter

defmodule State do
defstruct [:waiting, :pending, :subscribers, :filter, :pids]
end

@behaviour GenStage.Dispatcher

@impl GenStage.Dispatcher

def init(_opts) do
{:ok, {0, 0, nil, [], MapSet.new()}}
{:ok,
%State{
waiting: 0,
pending: nil,
subscribers: [],
filter: Filter.new(),
pids: MapSet.new()
}}
end

@impl GenStage.Dispatcher
def subscribe(opts, {pid, _ref} = from, {n, waiting, pending, subs, pids}) do
def subscribe(opts, {pid, _ref} = from, %State{pids: pids} = state) do
if MapSet.member?(pids, pid) do
Logger.error(fn ->
"#{inspect(pid)} is already registered with #{inspect(self())}. " <>
Expand All @@ -49,105 +53,119 @@ defmodule Electric.Shapes.Dispatcher do

{:error, :already_subscribed}
else
selector =
case Keyword.get(opts, :selector) do
nil ->
nil
shape = Keyword.fetch!(opts, :shape)

selector when is_function(selector, 1) ->
selector
demand = if state.subscribers == [], do: 1, else: 0

other ->
raise ArgumentError,
":selector option must be passed a unary function, got: #{inspect(other)}"
end

subs = [{from, selector} | subs]
subscribers = [{from, shape} | state.subscribers]

demand = if n == 0, do: 1, else: 0
filter = Filter.add_shape(state.filter, from, shape)

{:ok, demand, {n + 1, waiting, pending, subs, MapSet.put(pids, pid)}}
{:ok, demand,
%State{
state
| subscribers: subscribers,
filter: filter,
pids: MapSet.put(state.pids, pid)
}}
end
end

@impl GenStage.Dispatcher
def cancel({pid, _ref} = from, {n, waiting, pending, subs, pids}) do
if MapSet.member?(pids, pid) do
subs = List.keydelete(subs, from, 0)
def cancel({pid, _ref} = from, %State{waiting: waiting, pending: pending} = state) do
if MapSet.member?(state.pids, pid) do
subscribers = List.keydelete(state.subscribers, from, 0)

filter = Filter.remove_shape(state.filter, from)

if pending && MapSet.member?(pending, from) do
case waiting - 1 do
0 ->
# the only remaining unacked subscriber has cancelled, so we
# return some demand
{:ok, 1, {n - 1, 0, nil, subs, MapSet.delete(pids, pid)}}
{:ok, 1,
%State{
state
| waiting: 0,
pending: nil,
subscribers: subscribers,
filter: filter,
pids: MapSet.delete(state.pids, pid)
}}

new_waiting ->
{:ok, 0,
{n - 1, new_waiting, MapSet.delete(pending, from), subs, MapSet.delete(pids, pid)}}
%State{
state
| waiting: new_waiting,
pending: MapSet.delete(pending, from),
subscribers: subscribers,
filter: filter,
pids: MapSet.delete(state.pids, pid)
}}
end
else
{:ok, 0, {n - 1, waiting, pending, subs, MapSet.delete(pids, pid)}}
{:ok, 0,
%State{
state
| subscribers: subscribers,
filter: filter,
pids: MapSet.delete(state.pids, pid)
}}
end
else
{:ok, 0, {n, waiting, pending, subs, pids}}
{:ok, 0, state}
end
end

@impl GenStage.Dispatcher
# consumers sending demand before we have produced a message just ignore as
# we have already sent initial demand of 1 to the producer when the first
# consumer subscribed.
def ask(1, {_pid, _ref}, {n, 0, nil, subs, pids}) do
{:ok, 0, {n, 0, nil, subs, pids}}
def ask(1, {_pid, _ref}, %State{waiting: 0, pending: nil} = state) do
{:ok, 0, state}
end

def ask(1, {_pid, _ref}, {n, 1, _pending, subs, pids}) do
{:ok, 1, {n, 0, nil, subs, pids}}
def ask(1, {_pid, _ref}, %State{waiting: 1} = state) do
{:ok, 1, %State{state | waiting: 0, pending: nil}}
end

def ask(1, from, {n, waiting, pending, subs, pids}) when waiting > 1 do
{:ok, 0, {n, waiting - 1, MapSet.delete(pending, from), subs, pids}}
def ask(1, from, %State{waiting: waiting, pending: pending} = state) when waiting > 1 do
{:ok, 0, %State{state | waiting: waiting - 1, pending: MapSet.delete(pending, from)}}
end

@impl GenStage.Dispatcher
# handle the no subscribers case here to make the real dispatch impl easier
def dispatch([event], _length, {_n, 0, _pending, [], _pids} = state) do
def dispatch([event], _length, %State{waiting: 0, subscribers: []} = state) do
{:ok, [event], state}
end

def dispatch([event], _length, {n, 0, _pending, subs, pids}) do
def dispatch([event], _length, %State{waiting: 0, subscribers: subscribers} = state) do
{waiting, pending} =
subs
|> Enum.reduce({0, MapSet.new()}, fn {{pid, ref} = sub, selector}, {waiting, pending} ->
if subscriber_wants_message?(event, selector) do
Process.send(pid, {:"$gen_consumer", {self(), ref}, [event]}, [:noconnect])
{waiting + 1, MapSet.put(pending, sub)}
else
{waiting, pending}
end
state.filter
|> Filter.affected_shapes(event)
|> Enum.reduce({0, MapSet.new()}, fn {pid, ref} = subscriber, {waiting, pending} ->
Process.send(pid, {:"$gen_consumer", {self(), ref}, [event]}, [:noconnect])
{waiting + 1, MapSet.put(pending, subscriber)}
end)
|> case do
{0, _pending} ->
# even though no subscriber wants the event, we still need to generate demand
# so that we can complete the loop in the log collector
[{sub, _selector} | _] = subs
send(self(), {:"$gen_producer", sub, {:ask, 1}})
{1, MapSet.new([sub])}
[{subscriber, _selector} | _] = subscribers
send(self(), {:"$gen_producer", subscriber, {:ask, 1}})
{1, MapSet.new([subscriber])}

{waiting, pending} ->
{waiting, pending}
end

{:ok, [], {n, waiting, pending, subs, pids}}
{:ok, [], %State{state | waiting: waiting, pending: pending}}
end

@impl GenStage.Dispatcher
def info(msg, state) do
send(self(), msg)
{:ok, state}
end

defp subscriber_wants_message?(_event, nil), do: true
defp subscriber_wants_message?(event, selector), do: selector.(event)
end
Loading
Loading