From afa44d1fbdd4a8d0b28605d509069091cd5400f2 Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 26 Sep 2024 13:21:04 +0100 Subject: [PATCH] Revert "Move WHERE clause evaluation to single process" This reverts commit 878165477a9e70c4ecc7d7836dfd5e7c8989b1d3. --- .../lib/electric/shapes/consumer.ex | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index bec109b526..4de5292ccd 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -65,22 +65,9 @@ defmodule Electric.Shapes.Consumer do monitors: [] }) - {:consumer, state, - subscribe_to: [{producer, [max_demand: 1, selector: &selector(&1, config.shape)]}]} + {:consumer, state, subscribe_to: [{producer, [max_demand: 1, selector: nil]}]} end - defp selector(%Transaction{changes: changes}, shape) do - changes - |> Stream.flat_map(&Shape.convert_change(shape, &1)) - |> Enum.take(1) - |> case do - [] -> false - [_] -> true - end - end - - defp selector(_, _), do: false - def handle_call(:initial_state, _from, %{snapshot_xmin: xmin, latest_offset: offset} = state) do {:reply, {:ok, xmin, offset}, [], state} end @@ -124,6 +111,12 @@ defmodule Electric.Shapes.Consumer do {:noreply, [], state} end + # `Shapes.Dispatcher` only works with single-events, so we can safely assert + # that here + def handle_events([%Changes.Relation{}], _from, state) do + {:noreply, [], state} + end + # Buffer incoming transactions until we know our xmin def handle_events([%Transaction{xid: xid}] = txns, _from, %{snapshot_xmin: nil} = state) do Logger.debug(fn ->