Skip to content

Commit

Permalink
fix(elixir-client): Fix monitoring registration race condition (#2171)
Browse files Browse the repository at this point in the history
Fixes race condition where the monitoring process would start,
initialise the request process, and the request would complete before
the listener registered itself with the monitoring process.

This would normally trigger an error and a retry, so overall it's not a
critical bug for production, but in our tests where our responses are
mocked exactly once, if a response was missed the test would time out.

My approach was to "hold" the response in the monitor until at least one
subscriber is present - I don't love it cause then for the monitor to
shut down it needs to have something registered to it after
initialization, but it felt like a decent fix with the assunmption that
each monitor process will be accompanied with a registration.
  • Loading branch information
msfstef authored Dec 16, 2024
1 parent ea4a4e9 commit 1b8dce0
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 25 deletions.
5 changes: 5 additions & 0 deletions .changeset/beige-mice-notice.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/elixir-client": patch
---

Fix race condition where response comes before listener has monitored itself.
50 changes: 34 additions & 16 deletions packages/elixir-client/lib/electric/client/fetch/monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ defmodule Electric.Client.Fetch.Monitor do

state = %{
request_id: request_id,
subscribers: []
subscribers: [],
response: nil
}

{:ok, state, {:continue, {:start_request, request_id, request, client}}}
Expand All @@ -83,21 +84,16 @@ defmodule Electric.Client.Fetch.Monitor do
{:noreply, state}
end

@impl true
def handle_call({:register, listener_pid}, _from, state) do
ref = Process.monitor(listener_pid)

Logger.debug(
fn -> "Registering listener pid #{inspect(listener_pid)}" end,
request_id: state.request_id
)

state = Map.update!(state, :subscribers, &[{listener_pid, ref} | &1])
def handle_continue(:handle_response, %{subscribers: _, response: nil} = state) do
{:noreply, state}
end

{:reply, ref, state}
def handle_continue(:handle_response, %{subscribers: [], response: _} = state) do
Logger.debug("Got response with no subscribers - deferring until subscribers are present")
{:noreply, state}
end

def handle_call({:reply, response}, _from, state) do
def handle_continue(:handle_response, %{subscribers: subscribers, response: response} = state) do
case response do
%{status: status} ->
Logger.debug(
Expand All @@ -124,11 +120,29 @@ defmodule Electric.Client.Fetch.Monitor do
)
end

for {pid, ref} <- state.subscribers do
for {pid, ref} <- subscribers do
send(pid, {:response, ref, response})
end

{:stop, {:shutdown, :normal}, :ok, state}
{:stop, {:shutdown, :normal}, state}
end

@impl true
def handle_call({:register, listener_pid}, _from, state) do
ref = Process.monitor(listener_pid)

Logger.debug(
fn -> "Registering listener pid #{inspect(listener_pid)}" end,
request_id: state.request_id
)

state = Map.update!(state, :subscribers, &[{listener_pid, ref} | &1])

{:reply, ref, state, {:continue, :handle_response}}
end

def handle_call({:reply, response}, _from, state) do
{:reply, :ok, %{state | response: response}, {:continue, :handle_response}}
end

@impl true
Expand All @@ -149,7 +163,7 @@ defmodule Electric.Client.Fetch.Monitor do
{:noreply, state}
end

def handle_info({:EXIT, pid, reason}, state) do
def handle_info({:EXIT, pid, reason}, %{response: nil} = state) do
Logger.debug(fn ->
"Request process #{inspect(pid)} exited with reason #{inspect(reason)} before issuing a reply. Using reason as an error and exiting."
end)
Expand All @@ -160,4 +174,8 @@ defmodule Electric.Client.Fetch.Monitor do

{:stop, {:shutdown, :normal}, state}
end

def handle_info({:EXIT, _pid, _reason}, state) do
{:noreply, state}
end
end
20 changes: 11 additions & 9 deletions packages/elixir-client/lib/electric/client/mock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule Electric.Client.Mock do
end

def init(parent) do
{:ok, %{parent: parent, from: nil, request: nil, response: nil}}
{:ok, %{parent: parent, requests: [], responses: []}}
end

def request(pid, request) do
Expand All @@ -58,22 +58,24 @@ defmodule Electric.Client.Mock do
GenServer.call(pid, {:response, response})
end

def handle_call({:request, request}, from, %{response: nil} = state) do
{:noreply, %{state | from: from, request: request}}
def handle_call({:request, request}, from, %{responses: []} = state) do
{:noreply, %{state | requests: state.requests ++ [{from, request}]}}
end

def handle_call({:request, request}, _from, %{from: from, response: %{} = response} = state) do
def handle_call({:request, request}, _from, %{responses: [{from, response} | rest]} = state) do
GenServer.reply(from, {:ok, request})
{:reply, {:ok, response}, %{state | from: nil, response: nil}}

{:reply, {:ok, response}, %{state | responses: rest}}
end

def handle_call({:response, response}, from, %{from: nil} = state) do
{:noreply, %{state | from: from, response: response}}
def handle_call({:response, response}, from, %{requests: []} = state) do
{:noreply, %{state | responses: state.responses ++ [{from, response}]}}
end

def handle_call({:response, response}, _from, %{from: from} = state) when not is_nil(from) do
def handle_call({:response, response}, _from, %{requests: [{from, request} | rest]} = state) do
GenServer.reply(from, {:ok, response})
{:reply, {:ok, state.request}, %{state | from: nil, request: nil}}

{:reply, {:ok, request}, %{state | requests: rest}}
end
end

Expand Down
2 changes: 2 additions & 0 deletions packages/elixir-client/test/electric/client_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Electric.ClientTest do
use ExUnit.Case, async: true

@moduletag :capture_log

import Support.DbSetup
import Support.ClientHelpers

Expand Down

0 comments on commit 1b8dce0

Please sign in to comment.