From cda13d94066f948a6cedac69b029dfa8184c7f87 Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 26 Sep 2024 12:55:15 +0100 Subject: [PATCH 1/2] Move WHERE clause evaluation to single process --- .../sync-service/lib/electric/shapes/consumer.ex | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 15ec62d537..c8072acbff 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -84,9 +84,22 @@ defmodule Electric.Shapes.Consumer do monitors: [] }) - {:consumer, state, subscribe_to: [{producer, [max_demand: 1, selector: nil]}]} + {:consumer, state, + subscribe_to: [{producer, [max_demand: 1, selector: &selector(&1, config.shape)]}]} end + defp selector(%Transaction{changes: changes}, shape) do + changes + |> Stream.flat_map(&Shape.convert_change(shape, &1)) + |> Enum.take(1) + |> case do + [] -> false + [_] -> true + end + end + + defp selector(_, _), do: false + def handle_call(:initial_state, _from, %{snapshot_xmin: xmin, latest_offset: offset} = state) do {:reply, {:ok, xmin, offset}, [], state} end From 4fd17d831400347741672b4d750bde12b34cc934 Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 26 Sep 2024 13:05:11 +0100 Subject: [PATCH 2/2] Refactor --- packages/sync-service/lib/electric/shapes/consumer.ex | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index c8072acbff..416a93339e 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -91,11 +91,7 @@ defmodule Electric.Shapes.Consumer do defp selector(%Transaction{changes: changes}, shape) do changes |> Stream.flat_map(&Shape.convert_change(shape, &1)) - |> Enum.take(1) - |> case do - [] -> false - [_] -> true - end + |> Enum.any?() end defp selector(_, _), do: false