Skip to content

Commit

Permalink
feat: Move operations from ShapeCache to Shape.Consumer (#1787)
Browse files Browse the repository at this point in the history
Addresses #1785 and
partially addresses #1770

Moves a lot of the operations that went through `ShapeCache` directly
into the `Shape.Consumer`, so that requests can be replied to directly
from the shape consumers rather than flooding the `ShapeCache` with
casts that take a while to reach the requesters.

I've tried to keep changes to a minimum in order to do this
incrementally and keep these PRs easily reviewable - the `ShapeStatus`
still persists data on every call, the relations and truncates still go
through `ShapeCache` rather than individual shapes, etc

I've also caught the `DBConnection.ConnectionError`s for queue timeouts
and converted them to 429 errors.
We need to also handle `GenServer.call` timeouts as sometimes the PG
query might not fail but take longer than the default 5 seconds for the
GenServer call.


NOTE: I have not updated any tests yet as I first want to ensure people
agree with the approach

PERFORMANCE CHECK:
- On my local machine, using in memory stores, running 1000 concurrent
new shape connections consistently took ~20sec with these changes,
compared to the ~33sec on main, so a ~30% improvement.
- I was also able to succesfully run 10k concurrent connections with
this, although it took ~10min to serve, but on main I wasn't able to
succsefully run it (@robacourt I think that was the case for you too?) -
at least we know it does not get into an unrecoverable state.
  • Loading branch information
msfstef authored Oct 9, 2024
1 parent 6831efc commit edb0f72
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 152 deletions.
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ defmodule Electric.Application do
{Electric.Plug.Router,
storage: storage,
registry: Registry.ShapeChanges,
shape_cache: {Electric.ShapeCache, []},
shape_cache: shape_cache,
get_service_status: get_service_status,
inspector: inspector,
long_poll_timeout: 20_000,
Expand Down
11 changes: 8 additions & 3 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,15 @@ defmodule Electric.Plug.ServeShapePlug do
Logger.warning(error_msg)
OpenTelemetry.record_exception(error_msg)

{status_code, message} =
if match?(%DBConnection.ConnectionError{reason: :queue_timeout}, reason),
do: {429, "Could not establish connection to database - try again later"},
else: {500, "Failed creating or fetching the snapshot"}

send_resp(
conn,
500,
Jason.encode_to_iodata!(%{error: "Failed creating or fetching the snapshot"})
status_code,
Jason.encode_to_iodata!(%{error: message})
)
end
end
Expand Down Expand Up @@ -643,7 +648,7 @@ defmodule Electric.Plug.ServeShapePlug do

@impl Plug.ErrorHandler
def handle_errors(conn, error) do
OpenTelemetry.record_exception(error.reason, error.stack)
OpenTelemetry.record_exception(error.kind, error.reason, error.stack)

error_str = Exception.format(error.kind, error.reason)

Expand Down
116 changes: 23 additions & 93 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Electric.ShapeCacheBehaviour do
@moduledoc """
Behaviour defining the ShapeCache functions to be used in mocks
"""
alias Electric.Postgres.LogicalReplication.Messages
alias Electric.Shapes.Shape
alias Electric.Replication.Changes
alias Electric.Replication.LogOffset
Expand All @@ -25,13 +26,14 @@ defmodule Electric.ShapeCacheBehaviour do
@callback clean_shape(shape_id(), keyword()) :: :ok
@callback clean_all_shapes(GenServer.name()) :: :ok
@callback has_shape?(shape_id(), keyword()) :: boolean()
@callback cast(term(), keyword()) :: :ok
end

defmodule Electric.ShapeCache do
use GenStage

alias Electric.Postgres.LogicalReplication.Messages
alias Electric.Replication.Changes
alias Electric.Replication.LogOffset
alias Electric.ShapeCache.ShapeStatus
alias Electric.Shapes
alias Electric.Shapes.Shape
Expand Down Expand Up @@ -65,6 +67,7 @@ defmodule Electric.ShapeCache do
# NimbleOptions has no "implementation of protocol" type
persistent_kv: [type: :any, required: true],
db_pool: [type: {:or, [:atom, :pid]}, default: Electric.DbPool],
run_with_conn_fn: [type: {:fun, 2}, default: &DBConnection.run/2],
prepare_tables_fn: [type: {:or, [:mfa, {:fun, 2}]}, required: true],
create_snapshot_fn: [
type: {:fun, 5},
Expand All @@ -78,12 +81,6 @@ defmodule Electric.ShapeCache do
end
end

@impl Electric.ShapeCacheBehaviour
def cast(message, opts) do
server = Access.get(opts, :server, __MODULE__)
GenStage.cast(server, message)
end

@impl Electric.ShapeCacheBehaviour
def get_shape(shape, opts \\ []) do
table = Access.get(opts, :shape_meta_table, @default_shape_meta_table)
Expand Down Expand Up @@ -158,12 +155,18 @@ defmodule Electric.ShapeCache do
def await_snapshot_start(shape_id, opts \\ []) when is_binary(shape_id) do
table = Access.get(opts, :shape_meta_table, @default_shape_meta_table)
shape_status = Access.get(opts, :shape_status, ShapeStatus)
electric_instance_id = Access.fetch!(opts, :electric_instance_id)

if shape_status.snapshot_started?(table, shape_id) do
:started
else
server = Access.get(opts, :server, __MODULE__)
GenStage.call(server, {:await_snapshot_start, shape_id})
cond do
shape_status.snapshot_started?(table, shape_id) ->
:started

!shape_status.get_existing_shape(table, shape_id) ->
{:error, :unknown}

true ->
server = Electric.Shapes.Consumer.name(electric_instance_id, shape_id)
GenServer.call(server, :await_snapshot_start)
end
end

Expand Down Expand Up @@ -196,9 +199,9 @@ defmodule Electric.ShapeCache do
inspector: opts.inspector,
shape_meta_table: opts.shape_meta_table,
shape_status: opts.shape_status,
awaiting_snapshot_start: %{},
db_pool: opts.db_pool,
persistent_state: persistent_state,
run_with_conn_fn: opts.run_with_conn_fn,
create_snapshot_fn: opts.create_snapshot_fn,
prepare_tables_fn: opts.prepare_tables_fn,
log_producer: opts.log_producer,
Expand Down Expand Up @@ -284,40 +287,23 @@ defmodule Electric.ShapeCache do
{:reply, {shape_id, latest_offset}, [], state}
end

def handle_call({:await_snapshot_start, shape_id}, from, %{shape_status: shape_status} = state) do
cond do
not is_known_shape_id?(state, shape_id) ->
{:reply, {:error, :unknown}, [], state}

shape_status.snapshot_started?(state.persistent_state, shape_id) ->
{:reply, :started, [], state}

true ->
Logger.debug("Starting a wait on the snapshot #{shape_id} for #{inspect(from)}}")

{:noreply, [], add_waiter(state, shape_id, from)}
end
end

def handle_call({:wait_shape_id, shape_id}, _from, %{shape_status: shape_status} = state) do
{:reply, !is_nil(shape_status.get_existing_shape(state.persistent_state, shape_id)), [],
state}
end

def handle_call({:truncate, shape_id}, _from, state) do
with {:ok, cleaned_up_shape} <- clean_up_shape(state, shape_id) do
Logger.info(
"Truncating and rotating shape id, previous shape id #{shape_id}, definition: #{inspect(cleaned_up_shape)}"
)
with :ok <- clean_up_shape(state, shape_id) do
Logger.info("Truncating and rotating shape id, previous shape id #{shape_id} cleaned up")
end

{:reply, :ok, [], state}
end

def handle_call({:clean, shape_id}, _from, state) do
# ignore errors when cleaning up non-existant shape id
with {:ok, cleaned_up_shape} <- clean_up_shape(state, shape_id) do
Logger.info("Cleaning up shape #{shape_id}, definition: #{inspect(cleaned_up_shape)}")
with :ok <- clean_up_shape(state, shape_id) do
Logger.info("Cleaning up shape #{shape_id}")
end

{:reply, :ok, [], state}
Expand All @@ -329,54 +315,14 @@ defmodule Electric.ShapeCache do
{:reply, :ok, [], state}
end

@impl GenStage
def handle_cast({:snapshot_xmin_known, shape_id, xmin}, %{shape_status: shape_status} = state) do
unless shape_status.set_snapshot_xmin(state.persistent_state, shape_id, xmin) do
Logger.warning(
"Got snapshot information for a #{shape_id}, that shape id is no longer valid. Ignoring."
)
end

{:noreply, [], state}
end

def handle_cast({:snapshot_started, shape_id}, %{shape_status: shape_status} = state) do
Logger.debug("Snapshot for #{shape_id} is ready")
:ok = shape_status.mark_snapshot_started(state.persistent_state, shape_id)
{waiting, state} = pop_in(state, [:awaiting_snapshot_start, shape_id])
for client <- List.wrap(waiting), not is_nil(client), do: GenStage.reply(client, :started)
{:noreply, [], state}
end

def handle_cast({:snapshot_failed, shape_id, error, _stacktrace}, state) do
Logger.error(
"Removing shape #{shape_id} due to #{Exception.format_banner(:error, error, [])}"
)

clean_up_shape(state, shape_id)
{waiting, state} = pop_in(state, [:awaiting_snapshot_start, shape_id])

# waiting may nil here if :snapshot_failed happens after :snapshot_started
if waiting do
for client <- waiting, not is_nil(client), do: GenStage.reply(client, {:error, error})
end

{:noreply, [], state}
end

defp clean_up_shape(state, shape_id) do
if state.shape_status.get_existing_shape(state.persistent_state, shape_id) !== nil do
shape_opts = Electric.ShapeCache.Storage.for_shape(shape_id, state.storage)
Electric.ShapeCache.Storage.cleanup!(shape_opts)
end

Electric.Shapes.ConsumerSupervisor.stop_shape_consumer(
state.consumer_supervisor,
state.electric_instance_id,
shape_id
)

state.shape_status.remove_shape(state.persistent_state, shape_id)
:ok
end

defp clean_up_all_shapes(state) do
Expand All @@ -388,16 +334,6 @@ defmodule Electric.ShapeCache do
end
end

defp is_known_shape_id?(state, shape_id) do
!!state.shape_status.get_existing_shape(state.persistent_state, shape_id)
end

defp add_waiter(%{awaiting_snapshot_start: waiters} = state, shape_id, waiter),
do: %{
state
| awaiting_snapshot_start: Map.update(waiters, shape_id, [waiter], &[waiter | &1])
}

defp recover_shapes(state) do
state.persistent_state
|> state.shape_status.list_shapes()
Expand All @@ -413,28 +349,22 @@ defmodule Electric.ShapeCache do
electric_instance_id: state.electric_instance_id,
shape_id: shape_id,
shape: shape,
shape_status: {state.shape_status, state.persistent_state},
storage: state.storage,
chunk_bytes_threshold: state.chunk_bytes_threshold,
log_producer: state.log_producer,
shape_cache:
{__MODULE__, %{server: state.name, shape_meta_table: state.shape_meta_table}},
registry: state.registry,
db_pool: state.db_pool,
run_with_conn_fn: state.run_with_conn_fn,
prepare_tables_fn: state.prepare_tables_fn,
create_snapshot_fn: state.create_snapshot_fn
) do
consumer = Shapes.Consumer.name(state.electric_instance_id, shape_id)

{:ok, snapshot_xmin, latest_offset} = Shapes.Consumer.initial_state(consumer)

:ok =
state.shape_status.initialise_shape(
state.persistent_state,
shape_id,
snapshot_xmin,
latest_offset
)

{:ok, pid, snapshot_xmin, latest_offset}
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ defmodule Electric.ShapeCache.ShapeStatusBehaviour do
{:ok, shape_id()} | {:error, term()}
@callback initialise_shape(ShapeStatus.t(), shape_id(), xmin(), LogOffset.t()) ::
:ok
@callback set_snapshot_xmin(ShapeStatus.t(), shape_id(), xmin()) :: :ok
@callback set_latest_offset(ShapeStatus.t(), shape_id(), LogOffset.t()) :: :ok
@callback mark_snapshot_started(ShapeStatus.t(), shape_id()) :: :ok
@callback snapshot_started?(ShapeStatus.t(), shape_id()) :: boolean()
@callback remove_shape(ShapeStatus.t(), shape_id()) ::
{:ok, Shape.t()} | {:error, term()}
Expand Down
Loading

0 comments on commit edb0f72

Please sign in to comment.