Skip to content

Commit

Permalink
fix: address deadlock with concurrent snapshot creation
Browse files Browse the repository at this point in the history
This commit addresses 2 issues:

First, the deadlock occurred when trying to add where clauses to existing tables - we executed `ALTER TABLE` that took out an exclusive access lock over one table definition, then executed `SET TABLES` on a publication that requested shared access lock over all tables mentioned in the publication which includes tables that are already exclusively locked by another concurrent shape creation. This caused a deadlock. This was addressed by switching the order of alter table and alter publication.

Second, there was a race condition when 2 new shapes tried updating the publication where clause and because it's done in a read-then-update way, not as a single statement, the read might have missed the concurrent update and the resulting state was wrong. This was addressed by a lock.
  • Loading branch information
icehaunter committed Oct 31, 2024
1 parent b40c425 commit afc20b6
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .changeset/tall-chairs-punch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Fix a possible deadlock issue when creating or updating multiple where-claused shapes that occured while updating the Postgres publication (only on PG 15+). Fix a possible race condition between reading the existing publication and writing the updated version.
57 changes: 24 additions & 33 deletions packages/sync-service/lib/electric/postgres/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule Electric.Postgres.Configuration do
alias Electric.Shapes.Shape

@type filter() :: String.t() | nil
@type maybe_filter() :: filter() | :relation_not_found
@type filters() :: %{Electric.relation() => filter()}

@pg_15 150_000
Expand Down Expand Up @@ -42,8 +41,6 @@ defmodule Electric.Postgres.Configuration do
defp configure_tables_for_replication_internal!(pool, relations, pg_version, publication_name)
when pg_version < @pg_15 do
Postgrex.transaction(pool, fn conn ->
set_replica_identity!(conn, relations)

for {relation, _} <- relations,
table = Utils.relation_to_sql(relation),
publication = Utils.quote_name(publication_name) do
Expand Down Expand Up @@ -74,32 +71,30 @@ defmodule Electric.Postgres.Configuration do

defp configure_tables_for_replication_internal!(pool, relations, _pg_version, publication_name) do
Postgrex.transaction(pool, fn conn ->
# We're using advisory locks to prevent race conditions when multiple
# processes try to read-then-update the publication configuration. We're not using `SELECT FOR UPDATE`
# because it doesn't read the value that was updated by other transaction holding the lock. This lock
# is thus acquired before reading the existing configuration, so the first read sees the latest value.
Postgrex.query!(conn, "SELECT pg_advisory_xact_lock($1)", [:erlang.phash2(publication_name)])

filters = get_publication_filters(conn, publication_name)

# Get the existing filter for the table
# and extend it with the where clause for the table
# and update the table in the map with the new filter
filters =
Enum.reduce(relations, filters, fn {relation, clause}, acc ->
Map.update(acc, relation, clause, &extend_where_clause(&1, clause))
end)

Postgrex.query!(conn, make_alter_publication_query(publication_name, filters), [])

# `ALTER TABLE` should be after the publication altering, because it takes out an exclusive lock over this table,
# but the publication altering takes out a shared lock on all mentioned tables, so a concurrent transaction will
# deadlock if the order is reversed.
set_replica_identity!(conn, relations)

for {relation, rel_where_clause} <- relations do
Postgrex.query!(conn, "SAVEPOINT before_publication", [])

filters = get_publication_filters(conn, publication_name)

# Get the existing filter for the table
# and extend it with the where clause for the table
# and update the table in the map with the new filter
filter = Map.get(filters, relation, :relation_not_found)
rel_filter = extend_where_clause(filter, rel_where_clause)
filters = Map.put(filters, relation, rel_filter)

alter_publication_sql =
make_alter_publication_query(publication_name, filters)

case Postgrex.query(conn, alter_publication_sql, []) do
{:ok, _} ->
Postgrex.query!(conn, "RELEASE SAVEPOINT before_publication", [])
:ok

{:error, reason} ->
raise reason
end
end
[:ok]
end)
end

Expand Down Expand Up @@ -147,11 +142,7 @@ defmodule Electric.Postgres.Configuration do

# Joins the existing filter for the table with the where clause for the table.
# If one of them is `nil` (i.e. no filter) then the resulting filter is `nil`.
@spec extend_where_clause(maybe_filter(), filter()) :: filter()
defp extend_where_clause(:relation_not_found, where_clause) do
where_clause
end

@spec extend_where_clause(filter(), filter()) :: filter()
defp extend_where_clause(filter, where_clause) when is_nil(filter) or is_nil(where_clause) do
nil
end
Expand All @@ -173,7 +164,7 @@ defmodule Electric.Postgres.Configuration do
base_sql <> tables
end

@spec make_table_clause(filter()) :: String.t()
@spec make_table_clause({Electric.relation(), filter()}) :: String.t()
defp make_table_clause({{schema, tbl}, nil}) do
Utils.relation_to_sql({schema, tbl})
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ defmodule Electric.Postgres.ConfigurationTest do
[]
)

Postgrex.query!(
conn,
"""
CREATE TABLE other_other_table (
id UUID PRIMARY KEY,
value TEXT NOT NULL
)
""",
[]
)

:ok
end

Expand Down Expand Up @@ -218,6 +229,67 @@ defmodule Electric.Postgres.ConfigurationTest do
)
end
end

test "concurrent alters to the publication don't deadlock and run correctly", %{
pool: conn,
publication_name: publication,
get_pg_version: get_pg_version
} do
# Create the publication first
Configuration.configure_tables_for_replication!(
conn,
[
{{"public", "items"}, "(value ILIKE 'yes%')"},
{{"public", "other_table"}, "(value ILIKE '1%')"},
{{"public", "other_other_table"}, "(value ILIKE '1%')"}
],
get_pg_version,
publication
)

task1 =
Task.async(fn ->
Postgrex.transaction(conn, fn conn ->
Configuration.configure_tables_for_replication!(
conn,
[{{"public", "other_other_table"}, "(value ILIKE '2%')"}],
get_pg_version,
publication
)

:ok
end)
end)

task2 =
Task.async(fn ->
Postgrex.transaction(conn, fn conn ->
Configuration.configure_tables_for_replication!(
conn,
[{{"public", "other_table"}, "(value ILIKE '2%')"}],
get_pg_version,
publication
)

:ok
end)
end)

# First check: both tasks completed successfully, that means there were no deadlocks
assert [{:ok, :ok}, {:ok, :ok}] == Task.await_many([task1, task2])

# Second check: the publication has the correct filters, that means one didn't override the other
assert list_tables_in_publication(conn, publication) |> Enum.sort() ==
expected_filters(
[
{"public", "items", "(value ~~* 'yes%'::text)"},
{"public", "other_other_table",
"((value ~~* '1%'::text) OR (value ~~* '2%'::text))"},
{"public", "other_table", "((value ~~* '1%'::text) OR (value ~~* '2%'::text))"}
],
get_pg_version.()
)
end
end

defp get_table_identity(conn, {schema, table}) do
Expand Down

0 comments on commit afc20b6

Please sign in to comment.