Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimise where clause filtering #2076

Merged
merged 62 commits into from
Dec 3, 2024
Merged
Changes from 1 commit
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
9cb9f5a
Add filter
robacourt Nov 27, 2024
d1d1adf
Make Shapes.record_in_shape? take a shape
robacourt Nov 27, 2024
f299dc0
Make add_shape public
robacourt Nov 27, 2024
9327bbd
Support other types of change
robacourt Nov 27, 2024
5ca7267
Test id not found
robacourt Nov 27, 2024
285105d
Add Filter struct
robacourt Nov 27, 2024
f4a5e39
Turn other_shapes into a map
robacourt Nov 27, 2024
5012b0c
Support relation changes
robacourt Nov 27, 2024
0586a0e
Support TruncateRelation
robacourt Nov 27, 2024
e97d803
Optimise where clauses
robacourt Nov 27, 2024
e4ccdcf
Optimise where clauses
robacourt Nov 27, 2024
54809ee
Add Filter.remove_shape
robacourt Nov 28, 2024
c697ce8
Update dispatcher
robacourt Nov 28, 2024
319ede5
Add TODOs
robacourt Nov 29, 2024
1e7f050
Support renames
robacourt Nov 29, 2024
1cb01da
Fix remaining tests
robacourt Nov 29, 2024
2881ad2
Remove redundant arg
robacourt Nov 29, 2024
8c3bf5f
Pass inspector to dispatcher
robacourt Nov 29, 2024
9f03c18
Refactor dispatcher so that state is a struct
robacourt Nov 29, 2024
bb2e4fa
Add inspector to dispatcher state
robacourt Nov 29, 2024
55322a6
Add TODO
robacourt Nov 29, 2024
ecf54f9
Parse record rather than converting the const to a string
robacourt Nov 29, 2024
33c8a8b
Create expr on Filter.new
robacourt Nov 29, 2024
684cd47
Move functions into TableFilter
robacourt Nov 30, 2024
04c04e9
Rename TableFilter to Table
robacourt Nov 30, 2024
ac11f0d
Rename public Table functions
robacourt Nov 30, 2024
3db9970
Introduce Table struct
robacourt Nov 30, 2024
7538087
Extract code into Field
robacourt Nov 30, 2024
8a484ab
Rename table_filter to table
robacourt Nov 30, 2024
2d2c3f6
Rename field to index
robacourt Nov 30, 2024
47fc530
Ensure table is first param of Table public functions
robacourt Nov 30, 2024
ba275c5
Neaten up remove_shape
robacourt Nov 30, 2024
f8c2703
Refactor affected_shapes function to improve argument structure and u…
robacourt Nov 30, 2024
c4af878
Refactor: Use Table.empty?/1 for cleaner table emptiness check
robacourt Dec 1, 2024
b604482
Use Index struct
robacourt Dec 1, 2024
001e0d3
Refactor add_shape function to reorder parameters for consistency.
robacourt Dec 1, 2024
f4d3f9b
Remove unused inspector option from Dispatcher and related modules.
robacourt Dec 1, 2024
be41674
Refactor: Rename handle to shape_id
robacourt Dec 1, 2024
7c22ad7
Refactor all_shapes
robacourt Dec 2, 2024
dbf1192
Gracefully handle parsing errors
robacourt Dec 2, 2024
32e75e9
Hide deliberate error in test
robacourt Dec 2, 2024
b150fed
Test error logging for invalid error
robacourt Dec 2, 2024
c660449
Update TODOs
robacourt Dec 2, 2024
6dd4a48
Refactor code into WhereClause module
robacourt Dec 2, 2024
81be707
Remove Filter.new/1
robacourt Dec 2, 2024
e9822d6
Test arrays
robacourt Dec 2, 2024
7f474a1
Test shape with no where clause
robacourt Dec 2, 2024
ab3b970
Update TODOs
robacourt Dec 2, 2024
29fb39d
Update moduledoc
robacourt Dec 2, 2024
c452bc4
Add moduledoc
robacourt Dec 2, 2024
0ac30a6
Add moduledocs
robacourt Dec 2, 2024
efe3613
Remove TODO
robacourt Dec 2, 2024
aa4e7c2
Rescue all errors in affected_shapes
robacourt Dec 2, 2024
f91c830
Remove unnecessary field from dispatcher state
robacourt Dec 2, 2024
c0fba1a
Revert unnecessary change
robacourt Dec 2, 2024
8997ce6
Add typespecs
robacourt Dec 2, 2024
3e40fd7
Add documentation
robacourt Dec 2, 2024
e478fee
Decouple Filter tests from the Filter data structure
robacourt Dec 3, 2024
93ee95e
Add optimisation tests
robacourt Dec 3, 2024
51cd083
Refactor reductions/1
robacourt Dec 3, 2024
0f8ef27
Rename Filter.empty() to Filter.new()
robacourt Dec 3, 2024
12c0644
Add changeset
robacourt Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions packages/sync-service/lib/electric/shapes/filter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule Electric.Shapes.Filter do
alias Electric.Shapes.Filter
alias Electric.Shapes.Filter.Table
alias Electric.Shapes.Shape
require Logger

defstruct tables: %{}

Expand Down Expand Up @@ -50,51 +51,72 @@ defmodule Electric.Shapes.Filter do
}
end

def affected_shapes(%Filter{} = filter, %Relation{} = relation) do
def affected_shapes(%Filter{} = filter, change) do
shapes_affected_by_change(filter, change)
rescue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? What kinds of errors do we expect to see here?

Copy link
Contributor Author

@robacourt robacourt Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super safety 😄 My reasoning is that if filtering crashes, all the shapes crash, so we need to be more careful than if this was shape specific code. There's a much more targeted rescue in Table that deals with record parsing errors which is much more likely to be hit. This one is more of a catch-all just to be extra safe.

Types of error it could catch:

  1. Shape.is_affected_by_relation_change? exceptions
  2. change types the filter is not expecting are passed to affected_shapes/2

These hopefully will never happen, but if they do, we'd rather the filter didn't crash.

error ->
Logger.error("""
Unexpected error in Filter.affected_shapes:
#{Exception.format(:error, error, __STACKTRACE__)}
""")

# We can't tell which shapes are affected, the safest thing to do is return all shapes
filter
|> all_shapes()
|> MapSet.new(fn {shape_id, _shape} -> shape_id end)
end

defp shapes_affected_by_change(%Filter{} = filter, %Relation{} = relation) do
# Check all shapes is all tables becuase the table may have been renamed
for {shape_id, shape} <- all_shapes_in_filter(filter),
for {shape_id, shape} <- all_shapes(filter),
Shape.is_affected_by_relation_change?(shape, relation),
into: MapSet.new() do
shape_id
end
end

def affected_shapes(%Filter{} = filter, %Transaction{changes: changes}) do
defp shapes_affected_by_change(%Filter{} = filter, %Transaction{changes: changes}) do
changes
|> Enum.map(&affected_shapes(filter, &1))
|> Enum.reduce(MapSet.new(), &MapSet.union(&1, &2))
end

def affected_shapes(%Filter{} = filter, %NewRecord{relation: relation, record: record}) do
affected_shapes_by_record(filter, relation, record)
defp shapes_affected_by_change(%Filter{} = filter, %NewRecord{
relation: relation,
record: record
}) do
shapes_affected_by_record(filter, relation, record)
end

def affected_shapes(%Filter{} = filter, %DeletedRecord{relation: relation, old_record: record}) do
affected_shapes_by_record(filter, relation, record)
defp shapes_affected_by_change(%Filter{} = filter, %DeletedRecord{
relation: relation,
old_record: record
}) do
shapes_affected_by_record(filter, relation, record)
end

def affected_shapes(%Filter{} = filter, %UpdatedRecord{relation: relation} = change) do
defp shapes_affected_by_change(%Filter{} = filter, %UpdatedRecord{relation: relation} = change) do
MapSet.union(
affected_shapes_by_record(filter, relation, change.record),
affected_shapes_by_record(filter, relation, change.old_record)
shapes_affected_by_record(filter, relation, change.record),
shapes_affected_by_record(filter, relation, change.old_record)
)
end

def affected_shapes(%Filter{} = filter, %TruncatedRelation{relation: table_name}) do
defp shapes_affected_by_change(%Filter{} = filter, %TruncatedRelation{relation: table_name}) do
for {shape_id, _shape} <- all_shapes_for_table(filter, table_name),
into: MapSet.new() do
shape_id
end
end

defp affected_shapes_by_record(filter, table_name, record) do
defp shapes_affected_by_record(filter, table_name, record) do
case Map.get(filter.tables, table_name) do
nil -> MapSet.new()
table -> Table.affected_shapes(table, record)
end
end

defp all_shapes_in_filter(%Filter{} = filter) do
defp all_shapes(%Filter{} = filter) do
for {_table, table} <- filter.tables,
{shape_id, shape} <- Table.all_shapes(table),
into: %{} do
Expand Down
Loading