From 664f80c6cf1e483415295c9d83080284e3946058 Mon Sep 17 00:00:00 2001 From: Ilia Borovitinov Date: Thu, 31 Oct 2024 14:48:24 +0300 Subject: [PATCH] fix: address deadlock with concurrent snapshot creation This commit addresses 2 issues: First, the deadlock occurred when trying to add where clauses to existing tables - we executed `ALTER TABLE` that took out an exclusive access lock over one table definition, then executed `SET TABLES` on a publication that requested shared access lock over all tables mentioned in the publication which includes tables that are already exclusively locked by another concurrent shape creation. This caused a deadlock. This was addressed by switching the order of alter table and alter publication. Second, there was a race condition when 2 new shapes tried updating the publication where clause and because it's done in a read-then-update way, not as a single statement, the read might have missed the concurrent update and the resulting state was wrong. This was addressed by a lock. --- .changeset/tall-chairs-punch.md | 5 ++ .../lib/electric/postgres/configuration.ex | 59 +++++++-------- .../electric/postgres/configuration_test.exs | 72 +++++++++++++++++++ 3 files changed, 103 insertions(+), 33 deletions(-) create mode 100644 .changeset/tall-chairs-punch.md diff --git a/.changeset/tall-chairs-punch.md b/.changeset/tall-chairs-punch.md new file mode 100644 index 0000000000..408d38d20a --- /dev/null +++ b/.changeset/tall-chairs-punch.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Fix a possible deadlock issue when creating or updating multiple where-claused shapes that occured while updating the Postgres publication (only on PG 15+). Fix a possible race condition between reading the existing publication and writing the updated version. diff --git a/packages/sync-service/lib/electric/postgres/configuration.ex b/packages/sync-service/lib/electric/postgres/configuration.ex index 56ce721f38..7c74fb5aa7 100644 --- a/packages/sync-service/lib/electric/postgres/configuration.ex +++ b/packages/sync-service/lib/electric/postgres/configuration.ex @@ -7,7 +7,6 @@ defmodule Electric.Postgres.Configuration do alias Electric.Shapes.Shape @type filter() :: String.t() | nil - @type maybe_filter() :: filter() | :relation_not_found @type filters() :: %{Electric.relation() => filter()} @pg_15 150_000 @@ -42,8 +41,6 @@ defmodule Electric.Postgres.Configuration do defp configure_tables_for_replication_internal!(pool, relations, pg_version, publication_name) when pg_version < @pg_15 do Postgrex.transaction(pool, fn conn -> - set_replica_identity!(conn, relations) - for {relation, _} <- relations, table = Utils.relation_to_sql(relation), publication = Utils.quote_name(publication_name) do @@ -69,37 +66,37 @@ defmodule Electric.Postgres.Configuration do raise reason end end + + set_replica_identity!(conn, relations) end) end defp configure_tables_for_replication_internal!(pool, relations, _pg_version, publication_name) do Postgrex.transaction(pool, fn conn -> + # We're using advisory locks to prevent race conditions when multiple + # processes try to read-then-update the publication configuration. We're not using `SELECT FOR UPDATE` + # because it doesn't read the value that was updated by other transaction holding the lock. This lock + # is thus acquired before reading the existing configuration, so the first read sees the latest value. + Postgrex.query!(conn, "SELECT pg_advisory_xact_lock($1)", [:erlang.phash2(publication_name)]) + + filters = get_publication_filters(conn, publication_name) + + # Get the existing filter for the table + # and extend it with the where clause for the table + # and update the table in the map with the new filter + filters = + Enum.reduce(relations, filters, fn {relation, clause}, acc -> + Map.update(acc, relation, clause, &extend_where_clause(&1, clause)) + end) + + Postgrex.query!(conn, make_alter_publication_query(publication_name, filters), []) + + # `ALTER TABLE` should be after the publication altering, because it takes out an exclusive lock over this table, + # but the publication altering takes out a shared lock on all mentioned tables, so a concurrent transaction will + # deadlock if the order is reversed. set_replica_identity!(conn, relations) - for {relation, rel_where_clause} <- relations do - Postgrex.query!(conn, "SAVEPOINT before_publication", []) - - filters = get_publication_filters(conn, publication_name) - - # Get the existing filter for the table - # and extend it with the where clause for the table - # and update the table in the map with the new filter - filter = Map.get(filters, relation, :relation_not_found) - rel_filter = extend_where_clause(filter, rel_where_clause) - filters = Map.put(filters, relation, rel_filter) - - alter_publication_sql = - make_alter_publication_query(publication_name, filters) - - case Postgrex.query(conn, alter_publication_sql, []) do - {:ok, _} -> - Postgrex.query!(conn, "RELEASE SAVEPOINT before_publication", []) - :ok - - {:error, reason} -> - raise reason - end - end + [:ok] end) end @@ -147,11 +144,7 @@ defmodule Electric.Postgres.Configuration do # Joins the existing filter for the table with the where clause for the table. # If one of them is `nil` (i.e. no filter) then the resulting filter is `nil`. - @spec extend_where_clause(maybe_filter(), filter()) :: filter() - defp extend_where_clause(:relation_not_found, where_clause) do - where_clause - end - + @spec extend_where_clause(filter(), filter()) :: filter() defp extend_where_clause(filter, where_clause) when is_nil(filter) or is_nil(where_clause) do nil end @@ -173,7 +166,7 @@ defmodule Electric.Postgres.Configuration do base_sql <> tables end - @spec make_table_clause(filter()) :: String.t() + @spec make_table_clause({Electric.relation(), filter()}) :: String.t() defp make_table_clause({{schema, tbl}, nil}) do Utils.relation_to_sql({schema, tbl}) end diff --git a/packages/sync-service/test/electric/postgres/configuration_test.exs b/packages/sync-service/test/electric/postgres/configuration_test.exs index fab57ab085..dd67f42d79 100644 --- a/packages/sync-service/test/electric/postgres/configuration_test.exs +++ b/packages/sync-service/test/electric/postgres/configuration_test.exs @@ -32,6 +32,17 @@ defmodule Electric.Postgres.ConfigurationTest do [] ) + Postgrex.query!( + conn, + """ + CREATE TABLE other_other_table ( + id UUID PRIMARY KEY, + value TEXT NOT NULL + ) + """, + [] + ) + :ok end @@ -218,6 +229,67 @@ defmodule Electric.Postgres.ConfigurationTest do ) end end + + test "concurrent alters to the publication don't deadlock and run correctly", %{ + pool: conn, + publication_name: publication, + get_pg_version: get_pg_version + } do + # Create the publication first + Configuration.configure_tables_for_replication!( + conn, + [ + {{"public", "items"}, "(value ILIKE 'yes%')"}, + {{"public", "other_table"}, "(value ILIKE '1%')"}, + {{"public", "other_other_table"}, "(value ILIKE '1%')"} + ], + get_pg_version, + publication + ) + + task1 = + Task.async(fn -> + Postgrex.transaction(conn, fn conn -> + Configuration.configure_tables_for_replication!( + conn, + [{{"public", "other_other_table"}, "(value ILIKE '2%')"}], + get_pg_version, + publication + ) + + :ok + end) + end) + + task2 = + Task.async(fn -> + Postgrex.transaction(conn, fn conn -> + Configuration.configure_tables_for_replication!( + conn, + [{{"public", "other_table"}, "(value ILIKE '2%')"}], + get_pg_version, + publication + ) + + :ok + end) + end) + + # First check: both tasks completed successfully, that means there were no deadlocks + assert [{:ok, :ok}, {:ok, :ok}] == Task.await_many([task1, task2]) + + # Second check: the publication has the correct filters, that means one didn't override the other + assert list_tables_in_publication(conn, publication) |> Enum.sort() == + expected_filters( + [ + {"public", "items", "(value ~~* 'yes%'::text)"}, + {"public", "other_other_table", + "((value ~~* '1%'::text) OR (value ~~* '2%'::text))"}, + {"public", "other_table", "((value ~~* '1%'::text) OR (value ~~* '2%'::text))"} + ], + get_pg_version.() + ) + end end defp get_table_identity(conn, {schema, table}) do