diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 4de5292ccd..bec109b526 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -65,9 +65,22 @@ defmodule Electric.Shapes.Consumer do monitors: [] }) - {:consumer, state, subscribe_to: [{producer, [max_demand: 1, selector: nil]}]} + {:consumer, state, + subscribe_to: [{producer, [max_demand: 1, selector: &selector(&1, config.shape)]}]} end + defp selector(%Transaction{changes: changes}, shape) do + changes + |> Stream.flat_map(&Shape.convert_change(shape, &1)) + |> Enum.take(1) + |> case do + [] -> false + [_] -> true + end + end + + defp selector(_, _), do: false + def handle_call(:initial_state, _from, %{snapshot_xmin: xmin, latest_offset: offset} = state) do {:reply, {:ok, xmin, offset}, [], state} end @@ -111,12 +124,6 @@ defmodule Electric.Shapes.Consumer do {:noreply, [], state} end - # `Shapes.Dispatcher` only works with single-events, so we can safely assert - # that here - def handle_events([%Changes.Relation{}], _from, state) do - {:noreply, [], state} - end - # Buffer incoming transactions until we know our xmin def handle_events([%Transaction{xid: xid}] = txns, _from, %{snapshot_xmin: nil} = state) do Logger.debug(fn ->