Skip to content

Commit

Permalink
Add filter to subscription based on shape where clause
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin-dp committed Sep 10, 2024
1 parent 6895352 commit 8d7164d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 5 deletions.
35 changes: 31 additions & 4 deletions packages/sync-service/lib/electric/postgres/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Electric.Postgres.Configuration do
a provided connection.
"""
alias Electric.Utils
alias Electric.Shapes.Shape

@doc """
Ensure that all tables are configured for replication.
Expand All @@ -16,20 +17,32 @@ defmodule Electric.Postgres.Configuration do
Raises if it fails to configure all the tables in the expected way.
"""
@spec configure_tables_for_replication!(Postgrex.conn(), [Electric.relation()], String.t()) ::
@spec configure_tables_for_replication!(
Postgrex.conn(),
[Shape.table_with_where_clause()],
String.t()
) ::
{:ok, [:ok]}
def configure_tables_for_replication!(pool, relations, publication_name) do
Postgrex.transaction(pool, fn conn ->
for relation <- relations,
dbg(relations)

for {relation, _} <- relations,
table = Utils.relation_to_sql(relation),
do: Postgrex.query!(conn, "ALTER TABLE #{table} REPLICA IDENTITY FULL", [])

for relation <- relations, table = Utils.relation_to_sql(relation) do
for {relation, rel_where_clause} <- relations, table = Utils.relation_to_sql(relation) do
Postgrex.query!(conn, "SAVEPOINT before_publication", [])

[action, where_clause] =
case get_publication_filter(conn, publication_name) do
:publication_not_found -> ["ADD", "WHERE " <> rel_where_clause]
filter -> ["SET", Utils.join_where_clauses(filter, rel_where_clause)]
end

case Postgrex.query(
conn,
"ALTER PUBLICATION #{publication_name} ADD TABLE #{table}",
"ALTER PUBLICATION #{publication_name} #{action} TABLE #{table} #{where_clause}",
[]
) do
{:ok, _} ->
Expand All @@ -48,4 +61,18 @@ defmodule Electric.Postgres.Configuration do
end
end)
end

# Returns the filter of the given publication.
# If the publication has no filter it returns `nil`.
# If the publication does not exist it returns `:publication_not_found`.
@spec get_publication_filter(Postgrex.conn(), String.t()) ::
String.t() | nil | :publication_not_found
defp get_publication_filter(conn, publication_name) do
case Postgrex.query!(conn, "SELECT rowfilter FROM pg_publication_tables WHERE pubname = $1", [
publication_name
]).rows do
[[rowfilter]] -> rowfilter
_ -> :publication_not_found
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
} = state

affected_tables = Shape.affected_tables(shape)
dbg(affected_tables)

OpenTelemetry.with_span(
"shape_cache.create_snapshot_task",
Expand Down
11 changes: 10 additions & 1 deletion packages/sync-service/lib/electric/shapes/shape.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ defmodule Electric.Shapes.Shape do
where: Electric.Replication.Eval.Expr.t() | nil
}

@type table_with_where_clause() :: {Electric.relation(), String.t() | nil}

@type json_relation() :: [String.t(), ...]
@type json_table_info() :: table_info() | json_relation()
@type json_table_list() :: [json_table_info(), ...]
Expand Down Expand Up @@ -117,7 +119,14 @@ defmodule Electric.Shapes.Shape do
@doc """
List tables that are a part of this shape.
"""
def affected_tables(%__MODULE__{root_table: table}), do: [table]
@spec affected_tables(t()) :: [table_with_where_clause()]
def affected_tables(%__MODULE__{root_table: table, where: nil}), do: [{table, nil}]

def affected_tables(%__MODULE__{
root_table: table,
where: %Electric.Replication.Eval.Expr{query: where_clause}
}),
do: [{table, "(" <> where_clause <> ")"}]

@doc """
Convert a change to be correctly represented within the shape.
Expand Down
11 changes: 11 additions & 0 deletions packages/sync-service/lib/electric/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ defmodule Electric.Utils do
~s|"#{escape_quotes(schema)}"."#{escape_quotes(table)}"|
end

@doc """
Joins two WHERE clauses with an OR operator.
If one of the where clauses is nil that means that table has no where clause
which is equivalent to `WHERE true` and thus the resulting joined where clause
is `WHERE true OR new_where_clause` which is the same as `WHERE true` which is the same as no where clause.
"""
@spec join_where_clauses(String.t(), String.t()) :: String.t()
def join_where_clauses(nil, _), do: ""
def join_where_clauses(_, nil), do: ""
def join_where_clauses(old, new), do: "WHERE (#{old} OR #{new})"

def escape_quotes(text), do: :binary.replace(text, ~S|"|, ~S|""|, [:global])

@doc """
Expand Down

0 comments on commit 8d7164d

Please sign in to comment.