Skip to content

Commit

Permalink
Revert "Move WHERE clause evaluation to single process"
Browse files Browse the repository at this point in the history
This reverts commit 8781654.
  • Loading branch information
robacourt committed Sep 26, 2024
1 parent 8781654 commit afa44d1
Showing 1 changed file with 7 additions and 14 deletions.
21 changes: 7 additions & 14 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,9 @@ defmodule Electric.Shapes.Consumer do
monitors: []
})

{:consumer, state,
subscribe_to: [{producer, [max_demand: 1, selector: &selector(&1, config.shape)]}]}
{:consumer, state, subscribe_to: [{producer, [max_demand: 1, selector: nil]}]}
end

defp selector(%Transaction{changes: changes}, shape) do
changes
|> Stream.flat_map(&Shape.convert_change(shape, &1))
|> Enum.take(1)
|> case do
[] -> false
[_] -> true
end
end

defp selector(_, _), do: false

def handle_call(:initial_state, _from, %{snapshot_xmin: xmin, latest_offset: offset} = state) do
{:reply, {:ok, xmin, offset}, [], state}
end
Expand Down Expand Up @@ -124,6 +111,12 @@ defmodule Electric.Shapes.Consumer do
{:noreply, [], state}
end

# `Shapes.Dispatcher` only works with single-events, so we can safely assert
# that here
def handle_events([%Changes.Relation{}], _from, state) do
{:noreply, [], state}
end

# Buffer incoming transactions until we know our xmin
def handle_events([%Transaction{xid: xid}] = txns, _from, %{snapshot_xmin: nil} = state) do
Logger.debug(fn ->
Expand Down

0 comments on commit afa44d1

Please sign in to comment.