Skip to content

Commit

Permalink
feat(sync-service): Clean up publication filters (#2154)
Browse files Browse the repository at this point in the history
Closes #1774

This work started to introduce column filters (see
#1831) but ended up on a
road block because of us using `REPLICA IDENTITY FULL` - however the
work also takes care of cleaning up filters.

- Introduced singular process for updating publication - we were locking
on it before anyway, might as well linearise it ourselves.
- Process maintains reference counted structure for the filters per
relation, including where clauses and filtered columns, in order to
produce correct overall filters per relation
- Update to the publication is debounced to allow batching together many
shape creations
- Every update does a complete rewrite of the publication filters so
they are maintained clean - but also introduced a `remove_shape` call so
that if electric remains with no shapes it should also have no
subscriptions to tables.


## TODOs
- [x] Write tests for `PublicationManager`
- [x] Write procedure for recovering in-memory state from
`shape_status.list_shapes` in `recover_shapes`
- [ ] Split where clauses at top-level `AND`s to improve filter
optimality (suggested be @icehaunter ) - [edit: not doing this now, as
we can be smart about this an do even more "merging" of where clauses
like `x = 1` and `x = 2` to `x in (1, 2)` - separate PR]
  • Loading branch information
msfstef authored Dec 17, 2024
1 parent 0565b46 commit d7e7c72
Show file tree
Hide file tree
Showing 23 changed files with 1,200 additions and 269 deletions.
5 changes: 5 additions & 0 deletions .changeset/wild-bugs-raise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Introduced `PublicationManager` process to create and clean up publication filters.
2 changes: 2 additions & 0 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ defmodule Electric.Connection.Manager do
Electric.Connection.Supervisor.start_shapes_supervisor(
stack_id: state.stack_id,
shape_cache_opts: shape_cache_opts,
pool_opts: state.pool_opts,
replication_opts: state.replication_opts,
stack_events_registry: state.stack_events_registry,
tweaks: state.tweaks
)
Expand Down
9 changes: 9 additions & 0 deletions packages/sync-service/lib/electric/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,18 @@ defmodule Electric.Connection.Supervisor do
def start_shapes_supervisor(opts) do
stack_id = Keyword.fetch!(opts, :stack_id)
shape_cache_opts = Keyword.fetch!(opts, :shape_cache_opts)
db_pool_opts = Keyword.fetch!(opts, :pool_opts)
replication_opts = Keyword.fetch!(opts, :replication_opts)
inspector = Keyword.fetch!(shape_cache_opts, :inspector)

shape_cache_spec = {Electric.ShapeCache, shape_cache_opts}

publication_manager_spec =
{Electric.Replication.PublicationManager,
stack_id: stack_id,
publication_name: Keyword.fetch!(replication_opts, :publication_name),
db_pool: Keyword.fetch!(db_pool_opts, :name)}

shape_log_collector_spec =
{Electric.Replication.ShapeLogCollector, stack_id: stack_id, inspector: inspector}

Expand All @@ -51,6 +59,7 @@ defmodule Electric.Connection.Supervisor do
Electric.Replication.Supervisor,
stack_id: stack_id,
shape_cache: shape_cache_spec,
publication_manager: publication_manager_spec,
log_collector: shape_log_collector_spec
},
restart: :temporary
Expand Down
247 changes: 156 additions & 91 deletions packages/sync-service/lib/electric/postgres/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ defmodule Electric.Postgres.Configuration do
a provided connection.
"""
require Logger
alias Electric.Replication.PublicationManager.RelationFilter
alias Electric.Utils
alias Electric.Shapes.Shape

@type filter() :: String.t() | nil
@type filters() :: %{Electric.relation() => filter()}
@type filters() :: %{Electric.relation() => RelationFilter.t()}

@pg_15 150_000

Expand All @@ -25,34 +24,75 @@ defmodule Electric.Postgres.Configuration do
"""
@spec configure_tables_for_replication!(
Postgrex.conn(),
[Shape.table_with_where_clause()],
(-> String.t()),
filters(),
String.t(),
float()
) ::
{:ok, [:ok]}
def configure_tables_for_replication!(pool, relations, get_pg_version, publication_name) do
def configure_tables_for_replication!(pool, relation_filters, pg_version, publication_name) do
configure_tables_for_replication_internal!(
pool,
relations,
get_pg_version.(),
relation_filters,
pg_version,
publication_name
)
end

defp configure_tables_for_replication_internal!(pool, relations, pg_version, publication_name)
@doc """
Get Postgres server version
"""
@spec get_pg_version(Postgrex.conn()) :: {:ok, non_neg_integer()} | {:error, term()}
def get_pg_version(conn) do
case Postgrex.query(
conn,
"SELECT current_setting('server_version_num') server_version_num",
[]
) do
{:ok, result} when result.num_rows == 1 ->
[[version_str]] = result.rows
{:ok, String.to_integer(version_str)}

{:error, err} ->
{:error, err}
end
end

defp configure_tables_for_replication_internal!(
pool,
relation_filters,
pg_version,
publication_name
)
when pg_version < @pg_15 do
Postgrex.transaction(pool, fn conn ->
for {relation, _} <- relations,
table = Utils.relation_to_sql(relation),
publication = Utils.quote_name(publication_name) do
publication = Utils.quote_name(publication_name)

relation_filters = filter_for_existing_relations(conn, relation_filters)

prev_published_tables =
get_publication_tables(conn, publication_name)
|> Enum.map(&Utils.relation_to_sql/1)
|> MapSet.new()

new_published_tables =
relation_filters
|> Map.keys()
|> Enum.map(&Utils.relation_to_sql/1)
|> MapSet.new()

alter_ops =
Enum.concat(
MapSet.difference(new_published_tables, prev_published_tables)
|> Enum.map(&{&1, "ADD"}),
MapSet.difference(prev_published_tables, new_published_tables)
|> Enum.map(&{&1, "DROP"})
)

for {table, op} <- alter_ops do
Postgrex.query!(conn, "SAVEPOINT before_publication", [])

# PG 14 and below do not support filters on tables of publications
case Postgrex.query(
conn,
"ALTER PUBLICATION #{publication} ADD TABLE #{table}",
[]
) do
case Postgrex.query(conn, "ALTER PUBLICATION #{publication} #{op} TABLE #{table}", []) do
{:ok, _} ->
Postgrex.query!(conn, "RELEASE SAVEPOINT before_publication", [])
:ok
Expand All @@ -68,41 +108,38 @@ defmodule Electric.Postgres.Configuration do
end
end

set_replica_identity!(conn, relations)
set_replica_identity!(conn, relation_filters)
end)
end

defp configure_tables_for_replication_internal!(pool, relations, _pg_version, publication_name) do
defp configure_tables_for_replication_internal!(
pool,
relation_filters,
_pg_version,
publication_name
) do
Postgrex.transaction(pool, fn conn ->
# We're using advisory locks to prevent race conditions when multiple
# processes try to read-then-update the publication configuration. We're not using `SELECT FOR UPDATE`
# because it doesn't read the value that was updated by other transaction holding the lock. This lock
# is thus acquired before reading the existing configuration, so the first read sees the latest value.
Postgrex.query!(conn, "SELECT pg_advisory_xact_lock($1)", [:erlang.phash2(publication_name)])
# Ensure that all tables are present in the publication
relation_filters = filter_for_existing_relations(conn, relation_filters)

filters = get_publication_filters(conn, publication_name)

# Get the existing filter for the table
# and extend it with the where clause for the table
# and update the table in the map with the new filter
filters =
Enum.reduce(relations, filters, fn {relation, clause}, acc ->
Map.update(acc, relation, clause, &extend_where_clause(&1, clause))
end)

Postgrex.query!(conn, make_alter_publication_query(publication_name, filters), [])
# Update the entire publication with the new filters
Postgrex.query!(
conn,
make_alter_publication_query(publication_name, relation_filters),
[]
)

# `ALTER TABLE` should be after the publication altering, because it takes out an exclusive lock over this table,
# but the publication altering takes out a shared lock on all mentioned tables, so a concurrent transaction will
# deadlock if the order is reversed.
set_replica_identity!(conn, relations)
set_replica_identity!(conn, relation_filters)

[:ok]
end)
end

defp set_replica_identity!(conn, relations) do
for {relation, _} <- relations,
defp set_replica_identity!(conn, relation_filters) do
for %RelationFilter{relation: relation} <- Map.values(relation_filters),
table = Utils.relation_to_sql(relation) do
%Postgrex.Result{rows: [[correct_identity?]]} =
Postgrex.query!(
Expand All @@ -118,72 +155,100 @@ defmodule Electric.Postgres.Configuration do
end
end

# Returns the filters grouped by table for the given publication.
@spec get_publication_filters(Postgrex.conn(), String.t()) :: filters()
defp get_publication_filters(conn, publication) do
@spec get_publication_tables(Postgrex.conn(), String.t()) :: list(Electric.relation())
defp get_publication_tables(conn, publication) do
Postgrex.query!(
conn,
"SELECT schemaname, tablename, rowfilter FROM pg_publication_tables WHERE pubname = $1",
"SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = $1",
[publication]
)
|> Map.fetch!(:rows)
|> Enum.map(&{Enum.take(&1, 2) |> List.to_tuple(), Enum.at(&1, 2)})
|> Map.new()
end

@doc """
Drops all tables from the given publication.
"""
@spec drop_all_publication_tables(Postgrex.conn(), String.t()) :: Postgrex.Result.t()
def drop_all_publication_tables(conn, publication_name) do
Postgrex.query!(
conn,
"
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = '#{publication_name}')
LOOP
EXECUTE 'ALTER PUBLICATION #{Utils.quote_name(publication_name)} DROP TABLE ' || r.schemaname || '.' || r.tablename || ';';
END LOOP;
END $$;
",
[]
)
end

# Joins the existing filter for the table with the where clause for the table.
# If one of them is `nil` (i.e. no filter) then the resulting filter is `nil`.
@spec extend_where_clause(filter(), filter()) :: filter()
defp extend_where_clause(filter, where_clause) when is_nil(filter) or is_nil(where_clause) do
nil
end

defp extend_where_clause(filter, where_clause) do
"(#{filter} OR #{where_clause})"
|> Enum.map(&(Enum.take(&1, 2) |> List.to_tuple()))
end

# Makes an SQL query that alters the given publication whith the given tables and filters.
@spec make_alter_publication_query(String.t(), filters()) :: String.t()
defp make_alter_publication_query(publication_name, filters) do
base_sql = "ALTER PUBLICATION #{Utils.quote_name(publication_name)} SET TABLE "

tables =
filters
|> Enum.map(&make_table_clause/1)
|> Enum.join(", ")

base_sql <> tables
case Map.values(filters) do
[] ->
"""
DO $$
DECLARE
tables TEXT;
BEGIN
SELECT string_agg(format('%I.%I', schemaname, tablename), ', ')
INTO tables
FROM pg_publication_tables
WHERE pubname = '#{publication_name}' ;
IF tables IS NOT NULL THEN
EXECUTE format('ALTER PUBLICATION #{Utils.quote_name(publication_name)} DROP TABLE %s', tables);
END IF;
END $$;
"""

filters ->
base_sql = "ALTER PUBLICATION #{Utils.quote_name(publication_name)} SET TABLE "

tables =
filters
|> Enum.map(&make_table_clause/1)
|> Enum.join(", ")

base_sql <> tables
end
end

@spec make_table_clause({Electric.relation(), filter()}) :: String.t()
defp make_table_clause({{schema, tbl}, nil}) do
Utils.relation_to_sql({schema, tbl})
@spec filter_for_existing_relations(Postgrex.conn(), filters()) :: filters()
defp filter_for_existing_relations(conn, filters) do
query = """
WITH input_relations AS (
SELECT
UNNEST($1::text[]) AS schemaname,
UNNEST($2::text[]) AS tablename
)
SELECT ir.schemaname, ir.tablename
FROM input_relations ir
JOIN pg_class pc ON pc.relname = ir.tablename
JOIN pg_namespace pn ON pn.oid = pc.relnamespace
WHERE pn.nspname = ir.schemaname AND pc.relkind = 'r';
"""

relations = Map.keys(filters)

Postgrex.query!(conn, query, [
Enum.map(relations, &elem(&1, 0)),
Enum.map(relations, &elem(&1, 1))
])
|> Map.fetch!(:rows)
|> Enum.map(&List.to_tuple/1)
|> Enum.reduce(%{}, fn rel, new_filters ->
case Map.get(filters, rel) do
nil -> new_filters
filter -> Map.put(new_filters, rel, filter)
end
end)
end

defp make_table_clause({{schema, tbl}, where}) do
table = Utils.relation_to_sql({schema, tbl})
table <> " WHERE " <> where
@spec make_table_clause(RelationFilter.t()) :: String.t()
defp make_table_clause(%RelationFilter{
relation: relation,
where_clauses: where_clauses
# selected_columns: cols
}) do
table = Utils.relation_to_sql(relation)

# NOTE: cannot filter on columns with REPLICA IDENTITY FULL
# cols = if cols == nil, do: "", else: " (#{Enum.join(cols, ", ")})"
cols = ""

where =
if where_clauses == nil,
do: "",
else:
" WHERE " <>
"(#{where_clauses |> Enum.map(& &1.query) |> Enum.join(" OR ")})"

table <> cols <> where
end
end
22 changes: 22 additions & 0 deletions packages/sync-service/lib/electric/replication/eval/expr.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,26 @@ defmodule Electric.Replication.Eval.Expr do
used_refs: used_refs(),
returns: Env.pg_type()
}

@doc """
Returns a flat list of all used refs used in the expression
that point to the current table
## Examples
iex> used_refs = %{["id"] => :int8, ["created_at"] => :timestamp}
iex> unqualified_refs(%Expr{query: "id = 1", used_refs: used_refs})
["created_at", "id"]
iex> used_refs = %{["id"] => :int8, ["potato", "created_at"] => :timestamp}
iex> unqualified_refs(%Expr{query: "id = 1", used_refs: used_refs, returns: :int8})
["id"]
"""
@spec unqualified_refs(t()) :: [String.t()]
def unqualified_refs(%__MODULE__{used_refs: used_refs}) do
used_refs
# Keep only used refs that are pointing to current table
|> Enum.filter(&match?({[_], _}, &1))
|> Enum.map(fn {[key], _} -> key end)
end
end
Loading

0 comments on commit d7e7c72

Please sign in to comment.