Skip to content

Commit

Permalink
Move WHERE clause evaluation to single process
Browse files Browse the repository at this point in the history
  • Loading branch information
robacourt committed Sep 26, 2024
1 parent afa44d1 commit b8deb91
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,22 @@ defmodule Electric.Shapes.Consumer do
monitors: []
})

{:consumer, state, subscribe_to: [{producer, [max_demand: 1, selector: nil]}]}
{:consumer, state,
subscribe_to: [{producer, [max_demand: 1, selector: &selector(&1, config.shape)]}]}
end

defp selector(%Transaction{changes: changes}, shape) do
changes
|> Stream.flat_map(&Shape.convert_change(shape, &1))
|> Enum.take(1)
|> case do
[] -> false
[_] -> true
end
end

defp selector(_, _), do: false

def handle_call(:initial_state, _from, %{snapshot_xmin: xmin, latest_offset: offset} = state) do
{:reply, {:ok, xmin, offset}, [], state}
end
Expand Down Expand Up @@ -111,12 +124,6 @@ defmodule Electric.Shapes.Consumer do
{:noreply, [], state}
end

# `Shapes.Dispatcher` only works with single-events, so we can safely assert
# that here
def handle_events([%Changes.Relation{}], _from, state) do
{:noreply, [], state}
end

# Buffer incoming transactions until we know our xmin
def handle_events([%Transaction{xid: xid}] = txns, _from, %{snapshot_xmin: nil} = state) do
Logger.debug(fn ->
Expand Down

0 comments on commit b8deb91

Please sign in to comment.