Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alco/acquire lock in replication client #1850

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 59 additions & 58 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ defmodule Electric.Application do
shape_log_collector = Electric.Replication.ShapeLogCollector.name(config.electric_instance_id)

connection_manager_opts = [
electric_instance_id: config.electric_instance_id,
connection_opts: config.connection_opts,
replication_opts: [
publication_name: config.replication_opts.publication_name,
Expand All @@ -39,41 +38,46 @@ defmodule Electric.Application do
pool_size: config.pool_opts.size,
types: PgInterop.Postgrex.Types
],
timeline_opts: [
shape_cache: {Electric.ShapeCache, []},
persistent_kv: config.persistent_kv
],
log_collector:
{Electric.Replication.ShapeLogCollector,
electric_instance_id: config.electric_instance_id, inspector: config.inspector},
shape_cache: config.child_specs.shape_cache
persistent_kv: config.persistent_kv
]

# The root application supervisor starts the core global processes, including the HTTP
# server and the database connection manager. The latter is responsible for establishing
# all needed connections to the database (acquiring the exclusive access lock, opening a
# replication connection, starting a connection pool).
#
# Once there is a DB connection pool running, ConnectionManager will start the singleton
# `Electric.Shapes.Supervisor` which is responsible for starting the shape log collector
# and individual shape consumer process trees.
#
# See the moduledoc in `Electric.Connection.Supervisor` for more info.
children =
[
Electric.Telemetry,
{Registry,
name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()},
{Registry,
name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()},
{Electric.ConnectionManager, connection_manager_opts},
{Electric.Postgres.Inspector.EtsInspector, pool: Electric.DbPool},
{Bandit,
plug:
{Electric.Plug.Router,
storage: config.storage,
registry: Registry.ShapeChanges,
shape_cache: config.child_specs.shape_cache,
get_service_status: &Electric.ServiceStatus.check/0,
inspector: config.inspector,
long_poll_timeout: 20_000,
max_age: Application.fetch_env!(:electric, :cache_max_age),
stale_age: Application.fetch_env!(:electric, :cache_stale_age),
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)},
port: Application.fetch_env!(:electric, :service_port),
thousand_island_options: http_listener_options()}
]
|> add_prometheus_router(Application.fetch_env!(:electric, :prometheus_port))
Enum.concat([
[
Electric.Telemetry,
{Registry,
name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()},
{Registry,
name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()},
{Electric.Postgres.Inspector.EtsInspector, pool: Electric.DbPool},
{Bandit,
plug:
{Electric.Plug.Router,
storage: config.storage,
registry: Registry.ShapeChanges,
shape_cache: {Electric.ShapeCache, config.shape_cache_opts},
get_service_status: &Electric.ServiceStatus.check/0,
inspector: config.inspector,
long_poll_timeout: 20_000,
max_age: Application.fetch_env!(:electric, :cache_max_age),
stale_age: Application.fetch_env!(:electric, :cache_stale_age),
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)},
port: Application.fetch_env!(:electric, :service_port),
thousand_island_options: http_listener_options()}
],
prometheus_endpoint(Application.fetch_env!(:electric, :prometheus_port)),
[{Electric.Connection.Supervisor, connection_manager_opts}]
])

Supervisor.start_link(children,
strategy: :one_for_one,
Expand Down Expand Up @@ -106,16 +110,16 @@ defmodule Electric.Application do
inspector =
{Electric.Postgres.Inspector.EtsInspector, server: Electric.Postgres.Inspector.EtsInspector}

shape_cache_spec =
{Electric.ShapeCache,
electric_instance_id: electric_instance_id,
storage: storage,
inspector: inspector,
prepare_tables_fn: prepare_tables_mfa,
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold),
log_producer: Electric.Replication.ShapeLogCollector.name(electric_instance_id),
consumer_supervisor: Electric.Shapes.ConsumerSupervisor.name(electric_instance_id),
registry: Registry.ShapeChanges}
shape_cache_opts = [
electric_instance_id: electric_instance_id,
storage: storage,
inspector: inspector,
prepare_tables_fn: prepare_tables_mfa,
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold),
log_producer: Electric.Replication.ShapeLogCollector.name(electric_instance_id),
consumer_supervisor: Electric.Shapes.ConsumerSupervisor.name(electric_instance_id),
registry: Registry.ShapeChanges
]

config = %Electric.Application.Configuration{
electric_instance_id: electric_instance_id,
Expand All @@ -131,26 +135,23 @@ defmodule Electric.Application do
size: Application.fetch_env!(:electric, :db_pool_size)
},
inspector: inspector,
child_specs: %{
shape_cache: shape_cache_spec
}
shape_cache_opts: shape_cache_opts
}

Electric.Application.Configuration.save(config)
end

defp add_prometheus_router(children, nil), do: children

defp add_prometheus_router(children, port) do
children ++
[
{
Bandit,
plug: {Electric.Plug.UtilityRouter, []},
port: port,
thousand_island_options: http_listener_options()
}
]
defp prometheus_endpoint(nil), do: []

defp prometheus_endpoint(port) do
[
{
Bandit,
plug: {Electric.Plug.UtilityRouter, []},
port: port,
thousand_island_options: http_listener_options()
}
]
end

defp http_listener_options do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Electric.Application.Configuration do
replication_opts
pool_opts
inspector
child_specs
shape_cache_opts
]a

@type t :: %__MODULE__{}
Expand Down
56 changes: 56 additions & 0 deletions packages/sync-service/lib/electric/connection/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
defmodule Electric.Connection.Supervisor do
@moduledoc """
The connection supervisor is a rest-for-one supervisor that starts `ConnectionManager`,
followed by `Shapes.Supervisor`.

ConnectionManager monitors all of the connection process that it starts and if any one of
the goes down with a critical error (such as Postgres shutting down), the connection manager
itself will shut down. This will cause the shutdown of Shapes.Supervisor, due to the nature
of the rest-for-one supervision strategy, and, since the latter supervisor is started as a
`temporary` child of the connection supervisor, it won't be restarted until its child spec is
re-added by a new call to `start_shapes_supervisor/0`.

This supervision design is deliberate: none of the "shapes" processes can function without a
working DB pool and we only have a DB pool when the ConnectionManager process can see that
all of its database connections are healthy. ConnectionManager tries to reopen connections
when they are closed, with an exponential backoff, so it is the first process to know when a
connection has been restored and it's also the one that starts Shapes.Supervisor once it
has successfully initialized a database connection pool.
"""

use Supervisor

@name __MODULE__

def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: @name)
end

def init(opts) do
Supervisor.init([{Electric.ConnectionManager, opts}], strategy: :rest_for_one)
end

def start_shapes_supervisor(opts) do
app_config = Electric.Application.Configuration.get()

shape_cache_opts = app_config.shape_cache_opts ++ Keyword.take(opts, [:purge_all_shapes?])
shape_cache_spec = {Electric.ShapeCache, shape_cache_opts}

shape_log_collector_spec =
{Electric.Replication.ShapeLogCollector,
electric_instance_id: app_config.electric_instance_id, inspector: app_config.inspector}

child_spec =
Supervisor.child_spec(
{
Electric.Shapes.Supervisor,
electric_instance_id: app_config.electric_instance_id,
shape_cache: shape_cache_spec,
log_collector: shape_log_collector_spec
},
restart: :temporary
)

Supervisor.start_child(@name, child_spec)
end
end
Loading
Loading