diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 15ec62d537..416a93339e 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -84,9 +84,18 @@ defmodule Electric.Shapes.Consumer do monitors: [] }) - {:consumer, state, subscribe_to: [{producer, [max_demand: 1, selector: nil]}]} + {:consumer, state, + subscribe_to: [{producer, [max_demand: 1, selector: &selector(&1, config.shape)]}]} end + defp selector(%Transaction{changes: changes}, shape) do + changes + |> Stream.flat_map(&Shape.convert_change(shape, &1)) + |> Enum.any?() + end + + defp selector(_, _), do: false + def handle_call(:initial_state, _from, %{snapshot_xmin: xmin, latest_offset: offset} = state) do {:reply, {:ok, xmin, offset}, [], state} end