diff --git a/.changeset/eight-pugs-guess.md b/.changeset/eight-pugs-guess.md new file mode 100644 index 0000000000..6ebdb1fc4a --- /dev/null +++ b/.changeset/eight-pugs-guess.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Update acknowledged WAL on keep alive messages diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index 2e06e122a2..aa4657507e 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -192,13 +192,14 @@ defmodule Electric.Postgres.ReplicationClient do "Primary Keepalive: wal_end=#{wal_end} (#{Lsn.from_integer(wal_end)}) reply=#{reply}" end) - messages = - case reply do - 1 -> [encode_standby_status_update(state)] - 0 -> [] - end + case reply do + 1 -> + state = update_applied_wal(state, wal_end) + {:noreply, [encode_standby_status_update(state)], state} - {:noreply, messages, state} + 0 -> + {:noreply, [], state} + end end defp process_x_log_data(data, wal_end, %State{} = state) do @@ -289,6 +290,6 @@ defmodule Electric.Postgres.ReplicationClient do @epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) defp current_time(), do: System.os_time(:microsecond) - @epoch - defp update_applied_wal(state, wal) when wal > state.applied_wal, + defp update_applied_wal(state, wal) when wal >= state.applied_wal, do: %{state | applied_wal: wal} end diff --git a/packages/sync-service/test/electric/postgres/replication_client_test.exs b/packages/sync-service/test/electric/postgres/replication_client_test.exs index 46ee74b0f6..0bca05e6f9 100644 --- a/packages/sync-service/test/electric/postgres/replication_client_test.exs +++ b/packages/sync-service/test/electric/postgres/replication_client_test.exs @@ -317,8 +317,6 @@ defmodule Electric.Postgres.ReplicationClientTest do end test "correctly responds to a status update request message from PG", ctx do - pg_wal = lsn_to_wal("0/10") - state = ReplicationClient.State.new( transaction_received: nil, @@ -329,17 +327,13 @@ defmodule Electric.Postgres.ReplicationClientTest do connection_manager: ctx.dummy_pid ) - # All offsets are 0+1 until we've processed a transaction and bumped `state.applied_wal`. - assert {:noreply, [<>], state} = - ReplicationClient.handle_data(<>, state) - - ### - - state = %{state | applied_wal: lsn_to_wal("0/10")} + state = %{state | applied_wal: lsn_to_wal("0/0")} + pg_wal = lsn_to_wal("0/10") assert {:noreply, [<>], state} = ReplicationClient.handle_data(<>, state) + assert state.applied_wal == pg_wal assert app_wal == state.applied_wal + 1 end