diff --git a/.changeset/sour-oranges-clap.md b/.changeset/sour-oranges-clap.md new file mode 100644 index 0000000000..2c57890e3b --- /dev/null +++ b/.changeset/sour-oranges-clap.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Fix a bug in ReplicationClient caused by an invalid assumption about cross-transaction operation LSN ordering. diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index 3b0b33a14d..c71b7dbade 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -5,6 +5,7 @@ defmodule Electric.Postgres.ReplicationClient do use Postgrex.ReplicationConnection alias Electric.Postgres.LogicalReplication.Decoder + alias Electric.Postgres.Lsn alias Electric.Postgres.ReplicationClient.Collector alias Electric.Postgres.ReplicationClient.ConnectionSetup alias Electric.Replication.Changes.Relation @@ -33,14 +34,14 @@ defmodule Electric.Postgres.ReplicationClient do origin: "postgres", txn_collector: %Collector{}, step: :disconnected, - # Keep track of the latest received and applied WAL offsets so that we can report them - # back to Postgres in standby status update messages - + # Cache the end_lsn of the last processed Commit message to report it back to Postgres + # on demand via standby status update messages - # https://www.postgresql.org/docs/current/protocol-replication.html#PROTOCOL-REPLICATION-STANDBY-STATUS-UPDATE # - # Postgres defines separate "flushed" and "applied" offsets but we merge those into one - # concept of "applied WAL" which is defined as the offset we have successfully processed - # and persisted in our shape log storage. - received_wal: 0, + # Postgres defines separate "received and written to disk", "flushed to disk" and + # "applied" offsets but we only keep track of the "applied" offset which we define as the + # end LSN of the last transaction that we have successfully processed and persisted in the + # shape log storage. applied_wal: 0 ] @@ -55,7 +56,6 @@ defmodule Electric.Postgres.ReplicationClient do txn_collector: Collector.t(), step: Electric.Postgres.ReplicationClient.step(), display_settings: [String.t()], - received_wal: non_neg_integer, applied_wal: non_neg_integer } @@ -149,15 +149,21 @@ defmodule Electric.Postgres.ReplicationClient do @spec handle_data(binary(), State.t()) :: {:noreply, State.t()} | {:noreply, list(binary()), State.t()} def handle_data( - <<@repl_msg_x_log_data, wal_start::64, wal_end::64, _clock::64, rest::binary>>, + <<@repl_msg_x_log_data, _wal_start::64, wal_end::64, _clock::64, rest::binary>>, %State{} = state ) do - Logger.debug("XLogData: wal_start=#{wal_start}, wal_end=#{wal_end}") - - state = update_received_wal(:xlog_data, state, wal_start, wal_end) - rest |> Decoder.decode() + # # Useful for debugging: + # |> tap(fn %struct{} = msg -> + # message_type = struct |> to_string() |> String.split(".") |> List.last() + # + # Logger.debug( + # "XLogData: wal_start=#{wal_start} (#{Lsn.from_integer(wal_start)}), " <> + # "wal_end=#{wal_end} (#{Lsn.from_integer(wal_end)})\n" <> + # message_type <> " :: " <> inspect(Map.from_struct(msg)) + # ) + # end) |> Collector.handle_message(state.txn_collector) |> case do %Collector{} = txn_collector -> @@ -194,9 +200,9 @@ defmodule Electric.Postgres.ReplicationClient do end def handle_data(<<@repl_msg_primary_keepalive, wal_end::64, _clock::64, reply>>, state) do - Logger.debug("Primary Keepalive: wal_end=#{wal_end} reply=#{reply}") - - state = update_received_wal(:keepalive, state, nil, wal_end) + Logger.debug(fn -> + "Primary Keepalive: wal_end=#{wal_end} (#{Lsn.from_integer(wal_end)}) reply=#{reply}" + end) messages = case reply do @@ -208,20 +214,11 @@ defmodule Electric.Postgres.ReplicationClient do end defp encode_standby_status_update(state) do - # Even though Postgres docs say[1] that these values need to be incremented by 1, - # Postgres' own walreceiver process does not seem to be doing that. - # Given the fact that `state.applied_wal` is set to the `wal_end` value of the most - # recently processed XLogData message (which itself appears to be the end LSN + 1 of the last - # transaction's Commit message) I'm worried about Postgres skipping the next transaction by - # treating the "flushed LSN" we're reporting back to it as having passed the transaction. - # TODO: needs more testing/PG source reading/whatever. - # - # [1]: https://www.postgresql.org/docs/current/protocol-replication.html#PROTOCOL-REPLICATION-STANDBY-STATUS-UPDATE << @repl_msg_standby_status_update, - state.received_wal::64, - state.applied_wal::64, - state.applied_wal::64, + state.applied_wal + 1::64, + state.applied_wal + 1::64, + state.applied_wal + 1::64, current_time()::64, 0 >> @@ -230,17 +227,6 @@ defmodule Electric.Postgres.ReplicationClient do @epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) defp current_time(), do: System.os_time(:microsecond) - @epoch - # wal can be 0 if the incoming logical message is e.g. Relation. - defp update_received_wal(_step, state, _, 0), do: state - - defp update_received_wal(_step, %{received_wal: wal} = state, _, wal), do: state - - defp update_received_wal(_step, state, _, wal) when wal > state.received_wal, - do: %{state | received_wal: wal} - defp update_applied_wal(state, wal) when wal > state.applied_wal, do: %{state | applied_wal: wal} - - # wal can be 0 if the incoming logical message is e.g. Relation. - defp update_applied_wal(state, 0), do: state end diff --git a/packages/sync-service/lib/electric/postgres/replication_client/collector.ex b/packages/sync-service/lib/electric/postgres/replication_client/collector.ex index 2573e0035e..f7bf5d1e1e 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/collector.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/collector.ex @@ -144,45 +144,11 @@ defmodule Electric.Postgres.ReplicationClient.Collector do |> Enum.reduce(state, &prepend_change/2) end - def handle_message( - %LR.Commit{lsn: commit_lsn, end_lsn: end_lsn}, - %__MODULE__{transaction: txn} = state - ) + def handle_message(%LR.Commit{lsn: commit_lsn}, %__MODULE__{transaction: txn} = state) when not is_nil(txn) and commit_lsn == txn.lsn do - # Here, `commit_lsn` is the same value as `final_lsn` on the Begin message that preceded - # this Commit. To better understand the difference between `commit_lsn` and `end_lsn`, - # let's consult Postgres' source code[1]: - # - # /* ---- - # * LSN of the record that lead to this xact to be prepared or committed or - # * aborted. This can be a - # * * plain commit record - # * * plain commit record, of a parent transaction - # * * prepared transaction - # * * prepared transaction commit - # * * plain abort record - # * * prepared transaction abort - # * - # * This can also become set to earlier values than transaction end when - # * a transaction is spilled to disk; specifically it's set to the LSN of - # * the latest change written to disk so far. - # * ---- - # */ - # XLogRecPtr final_lsn; - # - # /* - # * LSN pointing to the end of the commit record + 1. - # */ - # XLogRecPtr end_lsn; - # - # [1]: https://github.com/postgres/postgres/blob/c671e142bf4b568434eb8559baff34d32eed5b29/src/include/replication/reorderbuffer.h#L276-L296 - # - # For our purposes, we're setting transaction's LSN to `end_lsn` which seems like a more - # stable value based on the above comments. {%Transaction{ txn - | lsn: end_lsn, - changes: Enum.reverse(txn.changes), + | changes: Enum.reverse(txn.changes), last_log_offset: LogOffset.new(txn.lsn, max(0, state.tx_op_index - 2)) }, %__MODULE__{state | transaction: nil, tx_op_index: nil}} end diff --git a/packages/sync-service/test/electric/postgres/replication_client/collector_test.exs b/packages/sync-service/test/electric/postgres/replication_client/collector_test.exs index 127a53f0b0..b5b94ea1d8 100644 --- a/packages/sync-service/test/electric/postgres/replication_client/collector_test.exs +++ b/packages/sync-service/test/electric/postgres/replication_client/collector_test.exs @@ -227,7 +227,7 @@ defmodule Electric.Postgres.ReplicationClient.CollectorTest do {completed_txn, updated_collector} = Collector.handle_message(commit_msg, collector) - assert %Transaction{xid: 456, lsn: @test_end_lsn, last_log_offset: @test_log_offset} = + assert %Transaction{xid: 456, lsn: @test_lsn, last_log_offset: @test_log_offset} = completed_txn assert %Collector{transaction: nil, tx_op_index: nil} = updated_collector diff --git a/packages/sync-service/test/electric/postgres/replication_client_test.exs b/packages/sync-service/test/electric/postgres/replication_client_test.exs index 736128fc7b..bf31d5abc9 100644 --- a/packages/sync-service/test/electric/postgres/replication_client_test.exs +++ b/packages/sync-service/test/electric/postgres/replication_client_test.exs @@ -143,6 +143,54 @@ defmodule Electric.Postgres.ReplicationClientTest do refute_receive _ end + # Regression test for https://github.com/electric-sql/electric/issues/1548 + test "fares well when multiple concurrent transactions are writing to WAL", %{ + db_config: config, + replication_opts: replication_opts, + db_conn: conn + } do + assert {:ok, _pid} = ReplicationClient.start_link(config, replication_opts) + + num_txn = 2 + num_ops = 8 + max_sleep = 20 + receive_timeout = (num_txn + num_ops) * max_sleep * 2 + + # Insert `num_txn` transactions, each in a separate process. Every transaction has + # `num_ops` INSERTs with a random delay between each operation. + # The end result is that INSERTs from different transactions get interleaved in + # the WAL, challenging any assumptions in ReplicationClient about cross-transaction operation + # ordering. + Enum.each(1..num_txn, fn i -> + tx_fun = fn conn -> + pid_str = inspect(self()) + + Enum.each(1..num_ops, fn j -> + insert_item(conn, "#{i}-#{j} in process #{pid_str}") + Process.sleep(:rand.uniform(max_sleep)) + end) + end + + spawn_link(Postgrex, :transaction, [conn, tx_fun]) + end) + + # Receive every transaction sent by ReplicationClient to the test process. + set = + Enum.reduce(1..num_txn, MapSet.new(1..num_txn), fn _, set -> + assert_receive {:from_replication, %Transaction{changes: records}}, receive_timeout + assert num_ops == length(records) + + [%NewRecord{record: %{"value" => val}} | _] = records + {i, _} = Integer.parse(val) + + MapSet.delete(set, i) + end) + + # Make sure there are no extraneous messages left. + assert MapSet.size(set) == 0 + refute_receive _ + end + # Set the DB's display settings to something else than Electric.Postgres.display_settings @tag database_settings: [ "DateStyle='Postgres, DMY'", @@ -285,22 +333,18 @@ defmodule Electric.Postgres.ReplicationClientTest do slot_name: "" ) - # Received WAL is PG WAL while "applied" and "flushed" WAL are still at zero based on the `state`. - assert {:noreply, [<>], state} = + # All offsets are 0+1 until we've processed a transaction and bumped `state.applied_wal`. + assert {:noreply, [<>], state} = ReplicationClient.handle_data(<>, state) - assert wal == pg_wal - ### state = %{state | applied_wal: lsn_to_wal("0/10")} - pg_wal = lsn_to_wal("1/20") - assert {:noreply, [<>], state} = + assert {:noreply, [<>], state} = ReplicationClient.handle_data(<>, state) - assert wal == pg_wal - assert app_wal == state.applied_wal + assert app_wal == state.applied_wal + 1 end defp with_publication(%{db_conn: conn}) do