From 1b8dce04b2eb4079e1a7fd076699a9f5f135c631 Mon Sep 17 00:00:00 2001 From: Stefanos Mousafeiris Date: Mon, 16 Dec 2024 15:35:26 +0200 Subject: [PATCH] fix(elixir-client): Fix monitoring registration race condition (#2171) Fixes race condition where the monitoring process would start, initialise the request process, and the request would complete before the listener registered itself with the monitoring process. This would normally trigger an error and a retry, so overall it's not a critical bug for production, but in our tests where our responses are mocked exactly once, if a response was missed the test would time out. My approach was to "hold" the response in the monitor until at least one subscriber is present - I don't love it cause then for the monitor to shut down it needs to have something registered to it after initialization, but it felt like a decent fix with the assunmption that each monitor process will be accompanied with a registration. --- .changeset/beige-mice-notice.md | 5 ++ .../lib/electric/client/fetch/monitor.ex | 50 +++++++++++++------ .../elixir-client/lib/electric/client/mock.ex | 20 ++++---- .../test/electric/client_test.exs | 2 + 4 files changed, 52 insertions(+), 25 deletions(-) create mode 100644 .changeset/beige-mice-notice.md diff --git a/.changeset/beige-mice-notice.md b/.changeset/beige-mice-notice.md new file mode 100644 index 0000000000..8698c71cb4 --- /dev/null +++ b/.changeset/beige-mice-notice.md @@ -0,0 +1,5 @@ +--- +"@core/elixir-client": patch +--- + +Fix race condition where response comes before listener has monitored itself. diff --git a/packages/elixir-client/lib/electric/client/fetch/monitor.ex b/packages/elixir-client/lib/electric/client/fetch/monitor.ex index 450a8d2d2f..fb8453f0aa 100644 --- a/packages/elixir-client/lib/electric/client/fetch/monitor.ex +++ b/packages/elixir-client/lib/electric/client/fetch/monitor.ex @@ -70,7 +70,8 @@ defmodule Electric.Client.Fetch.Monitor do state = %{ request_id: request_id, - subscribers: [] + subscribers: [], + response: nil } {:ok, state, {:continue, {:start_request, request_id, request, client}}} @@ -83,21 +84,16 @@ defmodule Electric.Client.Fetch.Monitor do {:noreply, state} end - @impl true - def handle_call({:register, listener_pid}, _from, state) do - ref = Process.monitor(listener_pid) - - Logger.debug( - fn -> "Registering listener pid #{inspect(listener_pid)}" end, - request_id: state.request_id - ) - - state = Map.update!(state, :subscribers, &[{listener_pid, ref} | &1]) + def handle_continue(:handle_response, %{subscribers: _, response: nil} = state) do + {:noreply, state} + end - {:reply, ref, state} + def handle_continue(:handle_response, %{subscribers: [], response: _} = state) do + Logger.debug("Got response with no subscribers - deferring until subscribers are present") + {:noreply, state} end - def handle_call({:reply, response}, _from, state) do + def handle_continue(:handle_response, %{subscribers: subscribers, response: response} = state) do case response do %{status: status} -> Logger.debug( @@ -124,11 +120,29 @@ defmodule Electric.Client.Fetch.Monitor do ) end - for {pid, ref} <- state.subscribers do + for {pid, ref} <- subscribers do send(pid, {:response, ref, response}) end - {:stop, {:shutdown, :normal}, :ok, state} + {:stop, {:shutdown, :normal}, state} + end + + @impl true + def handle_call({:register, listener_pid}, _from, state) do + ref = Process.monitor(listener_pid) + + Logger.debug( + fn -> "Registering listener pid #{inspect(listener_pid)}" end, + request_id: state.request_id + ) + + state = Map.update!(state, :subscribers, &[{listener_pid, ref} | &1]) + + {:reply, ref, state, {:continue, :handle_response}} + end + + def handle_call({:reply, response}, _from, state) do + {:reply, :ok, %{state | response: response}, {:continue, :handle_response}} end @impl true @@ -149,7 +163,7 @@ defmodule Electric.Client.Fetch.Monitor do {:noreply, state} end - def handle_info({:EXIT, pid, reason}, state) do + def handle_info({:EXIT, pid, reason}, %{response: nil} = state) do Logger.debug(fn -> "Request process #{inspect(pid)} exited with reason #{inspect(reason)} before issuing a reply. Using reason as an error and exiting." end) @@ -160,4 +174,8 @@ defmodule Electric.Client.Fetch.Monitor do {:stop, {:shutdown, :normal}, state} end + + def handle_info({:EXIT, _pid, _reason}, state) do + {:noreply, state} + end end diff --git a/packages/elixir-client/lib/electric/client/mock.ex b/packages/elixir-client/lib/electric/client/mock.ex index d0ff235ad0..bd203f12bb 100644 --- a/packages/elixir-client/lib/electric/client/mock.ex +++ b/packages/elixir-client/lib/electric/client/mock.ex @@ -43,7 +43,7 @@ defmodule Electric.Client.Mock do end def init(parent) do - {:ok, %{parent: parent, from: nil, request: nil, response: nil}} + {:ok, %{parent: parent, requests: [], responses: []}} end def request(pid, request) do @@ -58,22 +58,24 @@ defmodule Electric.Client.Mock do GenServer.call(pid, {:response, response}) end - def handle_call({:request, request}, from, %{response: nil} = state) do - {:noreply, %{state | from: from, request: request}} + def handle_call({:request, request}, from, %{responses: []} = state) do + {:noreply, %{state | requests: state.requests ++ [{from, request}]}} end - def handle_call({:request, request}, _from, %{from: from, response: %{} = response} = state) do + def handle_call({:request, request}, _from, %{responses: [{from, response} | rest]} = state) do GenServer.reply(from, {:ok, request}) - {:reply, {:ok, response}, %{state | from: nil, response: nil}} + + {:reply, {:ok, response}, %{state | responses: rest}} end - def handle_call({:response, response}, from, %{from: nil} = state) do - {:noreply, %{state | from: from, response: response}} + def handle_call({:response, response}, from, %{requests: []} = state) do + {:noreply, %{state | responses: state.responses ++ [{from, response}]}} end - def handle_call({:response, response}, _from, %{from: from} = state) when not is_nil(from) do + def handle_call({:response, response}, _from, %{requests: [{from, request} | rest]} = state) do GenServer.reply(from, {:ok, response}) - {:reply, {:ok, state.request}, %{state | from: nil, request: nil}} + + {:reply, {:ok, request}, %{state | requests: rest}} end end diff --git a/packages/elixir-client/test/electric/client_test.exs b/packages/elixir-client/test/electric/client_test.exs index b0b697ca5a..981691d391 100644 --- a/packages/elixir-client/test/electric/client_test.exs +++ b/packages/elixir-client/test/electric/client_test.exs @@ -1,6 +1,8 @@ defmodule Electric.ClientTest do use ExUnit.Case, async: true + @moduletag :capture_log + import Support.DbSetup import Support.ClientHelpers