From c2076c832f11280f8d7684d214020b5dcf428dde Mon Sep 17 00:00:00 2001 From: Rob A'Court Date: Thu, 10 Oct 2024 14:02:11 +0100 Subject: [PATCH] feat(sync-service): Move WHERE clause evaluation to single process (#1761) This is the simplest change to meet the criteria of #1744 . It's faster and uses less memory: Screenshot 2024-10-10 at 13 48 21 Than our current version: Screenshot 2024-10-10 at 13 48 38 Because of the reduction in memory use it can handle bigger transactions at 100k shapes: Screenshot 2024-10-10 at 13 36 12 vs our la Screenshot 2024-10-10 at 13 35 59 test version: --- packages/sync-service/lib/electric/shapes/consumer.ex | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 15ec62d537..416a93339e 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -84,9 +84,18 @@ defmodule Electric.Shapes.Consumer do monitors: [] }) - {:consumer, state, subscribe_to: [{producer, [max_demand: 1, selector: nil]}]} + {:consumer, state, + subscribe_to: [{producer, [max_demand: 1, selector: &selector(&1, config.shape)]}]} end + defp selector(%Transaction{changes: changes}, shape) do + changes + |> Stream.flat_map(&Shape.convert_change(shape, &1)) + |> Enum.any?() + end + + defp selector(_, _), do: false + def handle_call(:initial_state, _from, %{snapshot_xmin: xmin, latest_offset: offset} = state) do {:reply, {:ok, xmin, offset}, [], state} end