Skip to content

Commit

Permalink
fix: Remove invalid assumption about cross-transaction LSN monotonici…
Browse files Browse the repository at this point in the history
…ty (#1556)

`ReplicationClient` was trying to be strict about LSN ordering of
operations it was receiving from Postgres. But since LSNs of operations
belonging to different concurrent transactions may be interleaved in the
WAL, we should only make assumptions about ordering between complete
transactions.

This change fixes #1548.
  • Loading branch information
alco authored Aug 26, 2024
1 parent fa88719 commit e3b0040
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 83 deletions.
5 changes: 5 additions & 0 deletions .changeset/sour-oranges-clap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Fix a bug in ReplicationClient caused by an invalid assumption about cross-transaction operation LSN ordering.
62 changes: 24 additions & 38 deletions packages/sync-service/lib/electric/postgres/replication_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Electric.Postgres.ReplicationClient do
use Postgrex.ReplicationConnection

alias Electric.Postgres.LogicalReplication.Decoder
alias Electric.Postgres.Lsn
alias Electric.Postgres.ReplicationClient.Collector
alias Electric.Postgres.ReplicationClient.ConnectionSetup
alias Electric.Replication.Changes.Relation
Expand Down Expand Up @@ -33,14 +34,14 @@ defmodule Electric.Postgres.ReplicationClient do
origin: "postgres",
txn_collector: %Collector{},
step: :disconnected,
# Keep track of the latest received and applied WAL offsets so that we can report them
# back to Postgres in standby status update messages -
# Cache the end_lsn of the last processed Commit message to report it back to Postgres
# on demand via standby status update messages -
# https://www.postgresql.org/docs/current/protocol-replication.html#PROTOCOL-REPLICATION-STANDBY-STATUS-UPDATE
#
# Postgres defines separate "flushed" and "applied" offsets but we merge those into one
# concept of "applied WAL" which is defined as the offset we have successfully processed
# and persisted in our shape log storage.
received_wal: 0,
# Postgres defines separate "received and written to disk", "flushed to disk" and
# "applied" offsets but we only keep track of the "applied" offset which we define as the
# end LSN of the last transaction that we have successfully processed and persisted in the
# shape log storage.
applied_wal: 0
]

Expand All @@ -55,7 +56,6 @@ defmodule Electric.Postgres.ReplicationClient do
txn_collector: Collector.t(),
step: Electric.Postgres.ReplicationClient.step(),
display_settings: [String.t()],
received_wal: non_neg_integer,
applied_wal: non_neg_integer
}

Expand Down Expand Up @@ -149,15 +149,21 @@ defmodule Electric.Postgres.ReplicationClient do
@spec handle_data(binary(), State.t()) ::
{:noreply, State.t()} | {:noreply, list(binary()), State.t()}
def handle_data(
<<@repl_msg_x_log_data, wal_start::64, wal_end::64, _clock::64, rest::binary>>,
<<@repl_msg_x_log_data, _wal_start::64, wal_end::64, _clock::64, rest::binary>>,
%State{} = state
) do
Logger.debug("XLogData: wal_start=#{wal_start}, wal_end=#{wal_end}")

state = update_received_wal(:xlog_data, state, wal_start, wal_end)

rest
|> Decoder.decode()
# # Useful for debugging:
# |> tap(fn %struct{} = msg ->
# message_type = struct |> to_string() |> String.split(".") |> List.last()
#
# Logger.debug(
# "XLogData: wal_start=#{wal_start} (#{Lsn.from_integer(wal_start)}), " <>
# "wal_end=#{wal_end} (#{Lsn.from_integer(wal_end)})\n" <>
# message_type <> " :: " <> inspect(Map.from_struct(msg))
# )
# end)
|> Collector.handle_message(state.txn_collector)
|> case do
%Collector{} = txn_collector ->
Expand Down Expand Up @@ -194,9 +200,9 @@ defmodule Electric.Postgres.ReplicationClient do
end

def handle_data(<<@repl_msg_primary_keepalive, wal_end::64, _clock::64, reply>>, state) do
Logger.debug("Primary Keepalive: wal_end=#{wal_end} reply=#{reply}")

state = update_received_wal(:keepalive, state, nil, wal_end)
Logger.debug(fn ->
"Primary Keepalive: wal_end=#{wal_end} (#{Lsn.from_integer(wal_end)}) reply=#{reply}"
end)

messages =
case reply do
Expand All @@ -208,20 +214,11 @@ defmodule Electric.Postgres.ReplicationClient do
end

defp encode_standby_status_update(state) do
# Even though Postgres docs say[1] that these values need to be incremented by 1,
# Postgres' own walreceiver process does not seem to be doing that.
# Given the fact that `state.applied_wal` is set to the `wal_end` value of the most
# recently processed XLogData message (which itself appears to be the end LSN + 1 of the last
# transaction's Commit message) I'm worried about Postgres skipping the next transaction by
# treating the "flushed LSN" we're reporting back to it as having passed the transaction.
# TODO: needs more testing/PG source reading/whatever.
#
# [1]: https://www.postgresql.org/docs/current/protocol-replication.html#PROTOCOL-REPLICATION-STANDBY-STATUS-UPDATE
<<
@repl_msg_standby_status_update,
state.received_wal::64,
state.applied_wal::64,
state.applied_wal::64,
state.applied_wal + 1::64,
state.applied_wal + 1::64,
state.applied_wal + 1::64,
current_time()::64,
0
>>
Expand All @@ -230,17 +227,6 @@ defmodule Electric.Postgres.ReplicationClient do
@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
defp current_time(), do: System.os_time(:microsecond) - @epoch

# wal can be 0 if the incoming logical message is e.g. Relation.
defp update_received_wal(_step, state, _, 0), do: state

defp update_received_wal(_step, %{received_wal: wal} = state, _, wal), do: state

defp update_received_wal(_step, state, _, wal) when wal > state.received_wal,
do: %{state | received_wal: wal}

defp update_applied_wal(state, wal) when wal > state.applied_wal,
do: %{state | applied_wal: wal}

# wal can be 0 if the incoming logical message is e.g. Relation.
defp update_applied_wal(state, 0), do: state
end
Original file line number Diff line number Diff line change
Expand Up @@ -144,45 +144,11 @@ defmodule Electric.Postgres.ReplicationClient.Collector do
|> Enum.reduce(state, &prepend_change/2)
end

def handle_message(
%LR.Commit{lsn: commit_lsn, end_lsn: end_lsn},
%__MODULE__{transaction: txn} = state
)
def handle_message(%LR.Commit{lsn: commit_lsn}, %__MODULE__{transaction: txn} = state)
when not is_nil(txn) and commit_lsn == txn.lsn do
# Here, `commit_lsn` is the same value as `final_lsn` on the Begin message that preceded
# this Commit. To better understand the difference between `commit_lsn` and `end_lsn`,
# let's consult Postgres' source code[1]:
#
# /* ----
# * LSN of the record that lead to this xact to be prepared or committed or
# * aborted. This can be a
# * * plain commit record
# * * plain commit record, of a parent transaction
# * * prepared transaction
# * * prepared transaction commit
# * * plain abort record
# * * prepared transaction abort
# *
# * This can also become set to earlier values than transaction end when
# * a transaction is spilled to disk; specifically it's set to the LSN of
# * the latest change written to disk so far.
# * ----
# */
# XLogRecPtr final_lsn;
#
# /*
# * LSN pointing to the end of the commit record + 1.
# */
# XLogRecPtr end_lsn;
#
# [1]: https://github.com/postgres/postgres/blob/c671e142bf4b568434eb8559baff34d32eed5b29/src/include/replication/reorderbuffer.h#L276-L296
#
# For our purposes, we're setting transaction's LSN to `end_lsn` which seems like a more
# stable value based on the above comments.
{%Transaction{
txn
| lsn: end_lsn,
changes: Enum.reverse(txn.changes),
| changes: Enum.reverse(txn.changes),
last_log_offset: LogOffset.new(txn.lsn, max(0, state.tx_op_index - 2))
}, %__MODULE__{state | transaction: nil, tx_op_index: nil}}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ defmodule Electric.Postgres.ReplicationClient.CollectorTest do

{completed_txn, updated_collector} = Collector.handle_message(commit_msg, collector)

assert %Transaction{xid: 456, lsn: @test_end_lsn, last_log_offset: @test_log_offset} =
assert %Transaction{xid: 456, lsn: @test_lsn, last_log_offset: @test_log_offset} =
completed_txn

assert %Collector{transaction: nil, tx_op_index: nil} = updated_collector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,54 @@ defmodule Electric.Postgres.ReplicationClientTest do
refute_receive _
end

# Regression test for https://github.com/electric-sql/electric/issues/1548
test "fares well when multiple concurrent transactions are writing to WAL", %{
db_config: config,
replication_opts: replication_opts,
db_conn: conn
} do
assert {:ok, _pid} = ReplicationClient.start_link(config, replication_opts)

num_txn = 2
num_ops = 8
max_sleep = 20
receive_timeout = (num_txn + num_ops) * max_sleep * 2

# Insert `num_txn` transactions, each in a separate process. Every transaction has
# `num_ops` INSERTs with a random delay between each operation.
# The end result is that INSERTs from different transactions get interleaved in
# the WAL, challenging any assumptions in ReplicationClient about cross-transaction operation
# ordering.
Enum.each(1..num_txn, fn i ->
tx_fun = fn conn ->
pid_str = inspect(self())

Enum.each(1..num_ops, fn j ->
insert_item(conn, "#{i}-#{j} in process #{pid_str}")
Process.sleep(:rand.uniform(max_sleep))
end)
end

spawn_link(Postgrex, :transaction, [conn, tx_fun])
end)

# Receive every transaction sent by ReplicationClient to the test process.
set =
Enum.reduce(1..num_txn, MapSet.new(1..num_txn), fn _, set ->
assert_receive {:from_replication, %Transaction{changes: records}}, receive_timeout
assert num_ops == length(records)

[%NewRecord{record: %{"value" => val}} | _] = records
{i, _} = Integer.parse(val)

MapSet.delete(set, i)
end)

# Make sure there are no extraneous messages left.
assert MapSet.size(set) == 0
refute_receive _
end

# Set the DB's display settings to something else than Electric.Postgres.display_settings
@tag database_settings: [
"DateStyle='Postgres, DMY'",
Expand Down Expand Up @@ -285,22 +333,18 @@ defmodule Electric.Postgres.ReplicationClientTest do
slot_name: ""
)

# Received WAL is PG WAL while "applied" and "flushed" WAL are still at zero based on the `state`.
assert {:noreply, [<<?r, wal::64, 0::64, 0::64, _time::64, 0::8>>], state} =
# All offsets are 0+1 until we've processed a transaction and bumped `state.applied_wal`.
assert {:noreply, [<<?r, 1::64, 1::64, 1::64, _time::64, 0::8>>], state} =
ReplicationClient.handle_data(<<?k, pg_wal::64, 0::64, 1::8>>, state)

assert wal == pg_wal

###

state = %{state | applied_wal: lsn_to_wal("0/10")}
pg_wal = lsn_to_wal("1/20")

assert {:noreply, [<<?r, wal::64, app_wal::64, app_wal::64, _time::64, 0::8>>], state} =
assert {:noreply, [<<?r, app_wal::64, app_wal::64, app_wal::64, _time::64, 0::8>>], state} =
ReplicationClient.handle_data(<<?k, pg_wal::64, 0::64, 1::8>>, state)

assert wal == pg_wal
assert app_wal == state.applied_wal
assert app_wal == state.applied_wal + 1
end

defp with_publication(%{db_conn: conn}) do
Expand Down

0 comments on commit e3b0040

Please sign in to comment.