From 8d7164dc126a2fc0c0b7265318650d26b62a52eb Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 10 Sep 2024 11:43:36 +0200 Subject: [PATCH] Add filter to subscription based on shape where clause --- .../lib/electric/postgres/configuration.ex | 35 ++++++++++++++++--- .../electric/shapes/consumer/snapshotter.ex | 1 + .../sync-service/lib/electric/shapes/shape.ex | 11 +++++- packages/sync-service/lib/electric/utils.ex | 11 ++++++ 4 files changed, 53 insertions(+), 5 deletions(-) diff --git a/packages/sync-service/lib/electric/postgres/configuration.ex b/packages/sync-service/lib/electric/postgres/configuration.ex index 3273fcad59..9139cba990 100644 --- a/packages/sync-service/lib/electric/postgres/configuration.ex +++ b/packages/sync-service/lib/electric/postgres/configuration.ex @@ -4,6 +4,7 @@ defmodule Electric.Postgres.Configuration do a provided connection. """ alias Electric.Utils + alias Electric.Shapes.Shape @doc """ Ensure that all tables are configured for replication. @@ -16,20 +17,32 @@ defmodule Electric.Postgres.Configuration do Raises if it fails to configure all the tables in the expected way. """ - @spec configure_tables_for_replication!(Postgrex.conn(), [Electric.relation()], String.t()) :: + @spec configure_tables_for_replication!( + Postgrex.conn(), + [Shape.table_with_where_clause()], + String.t() + ) :: {:ok, [:ok]} def configure_tables_for_replication!(pool, relations, publication_name) do Postgrex.transaction(pool, fn conn -> - for relation <- relations, + dbg(relations) + + for {relation, _} <- relations, table = Utils.relation_to_sql(relation), do: Postgrex.query!(conn, "ALTER TABLE #{table} REPLICA IDENTITY FULL", []) - for relation <- relations, table = Utils.relation_to_sql(relation) do + for {relation, rel_where_clause} <- relations, table = Utils.relation_to_sql(relation) do Postgrex.query!(conn, "SAVEPOINT before_publication", []) + [action, where_clause] = + case get_publication_filter(conn, publication_name) do + :publication_not_found -> ["ADD", "WHERE " <> rel_where_clause] + filter -> ["SET", Utils.join_where_clauses(filter, rel_where_clause)] + end + case Postgrex.query( conn, - "ALTER PUBLICATION #{publication_name} ADD TABLE #{table}", + "ALTER PUBLICATION #{publication_name} #{action} TABLE #{table} #{where_clause}", [] ) do {:ok, _} -> @@ -48,4 +61,18 @@ defmodule Electric.Postgres.Configuration do end end) end + + # Returns the filter of the given publication. + # If the publication has no filter it returns `nil`. + # If the publication does not exist it returns `:publication_not_found`. + @spec get_publication_filter(Postgrex.conn(), String.t()) :: + String.t() | nil | :publication_not_found + defp get_publication_filter(conn, publication_name) do + case Postgrex.query!(conn, "SELECT rowfilter FROM pg_publication_tables WHERE pubname = $1", [ + publication_name + ]).rows do + [[rowfilter]] -> rowfilter + _ -> :publication_not_found + end + end end diff --git a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex index 202d5e336a..0777108ece 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex @@ -40,6 +40,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do } = state affected_tables = Shape.affected_tables(shape) + dbg(affected_tables) OpenTelemetry.with_span( "shape_cache.create_snapshot_task", diff --git a/packages/sync-service/lib/electric/shapes/shape.ex b/packages/sync-service/lib/electric/shapes/shape.ex index e3ada4f28f..b20c94e9b4 100644 --- a/packages/sync-service/lib/electric/shapes/shape.ex +++ b/packages/sync-service/lib/electric/shapes/shape.ex @@ -24,6 +24,8 @@ defmodule Electric.Shapes.Shape do where: Electric.Replication.Eval.Expr.t() | nil } + @type table_with_where_clause() :: {Electric.relation(), String.t() | nil} + @type json_relation() :: [String.t(), ...] @type json_table_info() :: table_info() | json_relation() @type json_table_list() :: [json_table_info(), ...] @@ -117,7 +119,14 @@ defmodule Electric.Shapes.Shape do @doc """ List tables that are a part of this shape. """ - def affected_tables(%__MODULE__{root_table: table}), do: [table] + @spec affected_tables(t()) :: [table_with_where_clause()] + def affected_tables(%__MODULE__{root_table: table, where: nil}), do: [{table, nil}] + + def affected_tables(%__MODULE__{ + root_table: table, + where: %Electric.Replication.Eval.Expr{query: where_clause} + }), + do: [{table, "(" <> where_clause <> ")"}] @doc """ Convert a change to be correctly represented within the shape. diff --git a/packages/sync-service/lib/electric/utils.ex b/packages/sync-service/lib/electric/utils.ex index 7f3a5c02bb..897052a67e 100644 --- a/packages/sync-service/lib/electric/utils.ex +++ b/packages/sync-service/lib/electric/utils.ex @@ -197,6 +197,17 @@ defmodule Electric.Utils do ~s|"#{escape_quotes(schema)}"."#{escape_quotes(table)}"| end + @doc """ + Joins two WHERE clauses with an OR operator. + If one of the where clauses is nil that means that table has no where clause + which is equivalent to `WHERE true` and thus the resulting joined where clause + is `WHERE true OR new_where_clause` which is the same as `WHERE true` which is the same as no where clause. + """ + @spec join_where_clauses(String.t(), String.t()) :: String.t() + def join_where_clauses(nil, _), do: "" + def join_where_clauses(_, nil), do: "" + def join_where_clauses(old, new), do: "WHERE (#{old} OR #{new})" + def escape_quotes(text), do: :binary.replace(text, ~S|"|, ~S|""|, [:global]) @doc """