Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (sync-service): add filters to PG publication #1660

Merged
merged 9 commits into from
Sep 11, 2024
5 changes: 5 additions & 0 deletions .changeset/late-monkeys-vanish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Add shape filters to Postgres publication to reduce processing load on Electric.
58 changes: 57 additions & 1 deletion .github/workflows/elixir_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ permissions:

jobs:
build:
name: Build and test
name: Build and test with PostgreSQL 14
runs-on: ubuntu-latest
defaults:
run:
Expand Down Expand Up @@ -65,6 +65,62 @@ jobs:
run: mix compile --force --all-warnings --warnings-as-errors
- name: Run tests
run: mix test
test_pg_15:
name: Build and test with PostgreSQL 15
runs-on: ubuntu-latest
defaults:
run:
working-directory: packages/sync-service
env:
MIX_ENV: test
services:
postgres:
image: postgres:15-alpine
env:
POSTGRES_PASSWORD: password
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 54321:5432
steps:
- uses: actions/checkout@v4
- name: 'Set PG settings'
run: |
docker exec ${{ job.services.postgres.id }} sh -c 'echo "wal_level=logical" >> /var/lib/postgresql/data/postgresql.conf'
docker restart ${{ job.services.postgres.id }}
- uses: erlef/setup-beam@v1
with:
version-type: strict
version-file: '.tool-versions'
- name: Restore dependencies cache
uses: actions/cache@v3
with:
path: packages/sync-service/deps
key: ${{ runner.os }}-mix-${{ hashFiles('packages/sync-service/mix.lock') }}
restore-keys: ${{ runner.os }}-mix-
- name: Restore compiled code
uses: actions/cache/restore@v4
with:
path: |
packages/sync-service/_build/*/lib
!packages/sync-service/_build/*/lib/electric
key: ${{ runner.os }}-build-test-${{ hashFiles('packages/sync-service/mix.lock') }}
- name: Install dependencies
run: mix deps.get && mix deps.compile
- name: Save compiled code
uses: actions/cache/save@v4
with:
path: |
packages/sync-service/_build/*/lib
!packages/sync-service/_build/*/lib/electric
key: ${{ runner.os }}-build-test-${{ hashFiles('packages/sync-service/mix.lock') }}
- name: Compiles without warnings
run: mix compile --force --all-warnings --warnings-as-errors
- name: Run tests
run: mix test test/electric/postgres/configuration_test.exs
formatting:
name: Check formatting
runs-on: ubuntu-latest
Expand Down
7 changes: 6 additions & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ defmodule Electric.Application do
with {:ok, storage_opts} <- storage_module.shared_opts(storage_opts) do
storage = {storage_module, storage_opts}

get_pg_version = fn ->
Electric.ConnectionManager.get_pg_version(Electric.ConnectionManager)
end

prepare_tables_fn =
{Electric.Postgres.Configuration, :configure_tables_for_replication!, [publication_name]}
{Electric.Postgres.Configuration, :configure_tables_for_replication!,
[get_pg_version, publication_name]}

inspector =
{Electric.Postgres.Inspector.EtsInspector,
Expand Down
31 changes: 30 additions & 1 deletion packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ defmodule Electric.ConnectionManager do
:pool_pid,
# Backoff term used for reconnection with exponential back-off.
:backoff,
# PostgreSQL server version
:pg_version,
:electric_instance_id
]
end
Expand All @@ -67,6 +69,14 @@ defmodule Electric.ConnectionManager do

@name __MODULE__

@doc """
Returns the version of the PostgreSQL server.
"""
@spec get_pg_version(GenServer.server()) :: float()
def get_pg_version(server) do
GenServer.call(server, :get_pg_version)
end

@spec start_link(options) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: @name)
Expand Down Expand Up @@ -112,6 +122,11 @@ defmodule Electric.ConnectionManager do
{:ok, state, {:continue, :start_replication_client}}
end

@impl true
def handle_call(:get_pg_version, _from, %{pg_version: pg_version} = state) do
{:reply, pg_version, state}
end

@impl true
def handle_continue(:start_replication_client, state) do
case start_replication_client(state) do
Expand Down Expand Up @@ -144,11 +159,13 @@ defmodule Electric.ConnectionManager do
{:ok, pid} ->
Electric.Timeline.check(get_pg_timeline(pid), state.timeline_opts)

pg_version = query_pg_major_version(pid)

# Now we have everything ready to start accepting and processing logical messages from
# Postgres.
Electric.Postgres.ReplicationClient.start_streaming(state.replication_client_pid)

state = %{state | pool_pid: pid}
state = %{state | pool_pid: pid, pg_version: pg_version}
{:noreply, state}

{:error, reason} ->
Expand Down Expand Up @@ -379,4 +396,16 @@ defmodule Electric.ConnectionManager do
{:error, _reason} -> nil
end
end

def query_pg_major_version(conn) do
[[setting]] =
Postgrex.query!(
conn,
"SELECT floor(setting::numeric)::integer FROM pg_settings WHERE name = 'server_version'",
[]
)
|> Map.fetch!(:rows)

setting
end
end
120 changes: 114 additions & 6 deletions packages/sync-service/lib/electric/postgres/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ defmodule Electric.Postgres.Configuration do
a provided connection.
"""
alias Electric.Utils
alias Electric.Shapes.Shape

@type filter() :: String.t() | nil
@type maybe_filter() :: filter() | :relation_not_found
@type filters() :: %{Electric.relation() => filter()}

@doc """
Ensure that all tables are configured for replication.
Expand All @@ -16,17 +21,31 @@ defmodule Electric.Postgres.Configuration do

Raises if it fails to configure all the tables in the expected way.
"""
@spec configure_tables_for_replication!(Postgrex.conn(), [Electric.relation()], String.t()) ::
@spec configure_tables_for_replication!(
Postgrex.conn(),
[Shape.table_with_where_clause()],
(-> String.t()),
float()
) ::
{:ok, [:ok]}
def configure_tables_for_replication!(pool, relations, publication_name) do
def configure_tables_for_replication!(pool, relations, get_pg_version, publication_name) do
configure_tables_for_replication_internal!(
pool,
relations,
get_pg_version.(),
publication_name
)
end

defp configure_tables_for_replication_internal!(pool, relations, pg_version, publication_name)
when pg_version <= 14 do
Postgrex.transaction(pool, fn conn ->
for relation <- relations,
table = Utils.relation_to_sql(relation),
do: Postgrex.query!(conn, "ALTER TABLE #{table} REPLICA IDENTITY FULL", [])
set_replica_identity!(conn, relations)

for relation <- relations, table = Utils.relation_to_sql(relation) do
for {relation, _} <- relations, table = Utils.relation_to_sql(relation) do
Postgrex.query!(conn, "SAVEPOINT before_publication", [])

# PG 14 and below do not support filters on tables of publications
case Postgrex.query(
conn,
"ALTER PUBLICATION #{publication_name} ADD TABLE #{table}",
Expand All @@ -48,4 +67,93 @@ defmodule Electric.Postgres.Configuration do
end
end)
end

defp configure_tables_for_replication_internal!(pool, relations, _pg_version, publication_name) do
Postgrex.transaction(pool, fn conn ->
set_replica_identity!(conn, relations)

for {relation, rel_where_clause} <- relations do
Postgrex.query!(conn, "SAVEPOINT before_publication", [])

filters = get_publication_filters(conn, publication_name)

# Get the existing filter for the table
# and extend it with the where clause for the table
# and update the table in the map with the new filter
filter = Map.get(filters, relation, :relation_not_found)
rel_filter = extend_where_clause(filter, rel_where_clause)
filters = Map.put(filters, relation, rel_filter)

alter_publication_sql =
make_alter_publication_query(publication_name, filters)

case Postgrex.query(conn, alter_publication_sql, []) do
{:ok, _} ->
Postgrex.query!(conn, "RELEASE SAVEPOINT before_publication", [])
:ok

{:error, reason} ->
raise reason
end
end
end)
end

defp set_replica_identity!(conn, relations) do
for {relation, _} <- relations,
table = Utils.relation_to_sql(relation) do
Postgrex.query!(conn, "ALTER TABLE #{table} REPLICA IDENTITY FULL", [])
end
end

# Returns the filters grouped by table for the given publication.
@spec get_publication_filters(Postgrex.conn(), String.t()) :: filters()
defp get_publication_filters(conn, publication) do
Postgrex.query!(
conn,
"SELECT schemaname, tablename, rowfilter FROM pg_publication_tables WHERE pubname = $1",
[publication]
)
|> Map.fetch!(:rows)
|> Enum.map(&{Enum.take(&1, 2) |> List.to_tuple(), Enum.at(&1, 2)})
|> Map.new()
end

# Joins the existing filter for the table with the where clause for the table.
# If one of them is `nil` (i.e. no filter) then the resulting filter is `nil`.
@spec extend_where_clause(maybe_filter(), filter()) :: filter()
defp extend_where_clause(:relation_not_found, where_clause) do
where_clause
end

defp extend_where_clause(filter, where_clause) when is_nil(filter) or is_nil(where_clause) do
nil
end

defp extend_where_clause(filter, where_clause) do
"(#{filter} OR #{where_clause})"
end

# Makes an SQL query that alters the given publication whith the given tables and filters.
@spec make_alter_publication_query(String.t(), filters()) :: String.t()
defp make_alter_publication_query(publication_name, filters) do
base_sql = "ALTER PUBLICATION #{publication_name} SET TABLE "

tables =
filters
|> Enum.map(&make_table_clause/1)
|> Enum.join(", ")

base_sql <> tables
end

@spec make_table_clause(filter()) :: String.t()
defp make_table_clause({{schema, tbl}, nil}) do
Utils.relation_to_sql({schema, tbl})
end

defp make_table_clause({{schema, tbl}, where}) do
table = Utils.relation_to_sql({schema, tbl})
table <> " WHERE " <> where
end
end
11 changes: 10 additions & 1 deletion packages/sync-service/lib/electric/shapes/shape.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ defmodule Electric.Shapes.Shape do
where: Electric.Replication.Eval.Expr.t() | nil
}

@type table_with_where_clause() :: {Electric.relation(), String.t() | nil}

@type json_relation() :: [String.t(), ...]
@type json_table_info() :: table_info() | json_relation()
@type json_table_list() :: [json_table_info(), ...]
Expand Down Expand Up @@ -117,7 +119,14 @@ defmodule Electric.Shapes.Shape do
@doc """
List tables that are a part of this shape.
"""
def affected_tables(%__MODULE__{root_table: table}), do: [table]
@spec affected_tables(t()) :: [table_with_where_clause()]
def affected_tables(%__MODULE__{root_table: table, where: nil}), do: [{table, nil}]

def affected_tables(%__MODULE__{
root_table: table,
where: %Electric.Replication.Eval.Expr{query: where_clause}
}),
do: [{table, "(" <> where_clause <> ")"}]

@doc """
Convert a change to be correctly represented within the shape.
Expand Down
Loading
Loading