diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index b1b6b79f2b..2fb6e87c1c 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -105,11 +105,15 @@ database_ipv6_config = env!("ELECTRIC_DATABASE_USE_IPV6", :boolean, false) {:ok, database_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_url) - connection_opts = database_url_config ++ [ipv6: database_ipv6_config] - config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts) +if database_pool_url = env!("DATABASE_POOL_URL", :string) do + {:ok, database_pool_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_pool_url) + connection_opts = database_pool_url_config ++ [ipv6: database_ipv6_config] + config :electric, pool_connection_opts: Electric.Utils.obfuscate_password(connection_opts) +end + enable_integration_testing = env!("ELECTRIC_ENABLE_INTEGRATION_TESTING", :boolean, false) cache_max_age = env!("ELECTRIC_CACHE_MAX_AGE", :integer, 60) cache_stale_age = env!("ELECTRIC_CACHE_STALE_AGE", :integer, 60 * 5) diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 44bf27b09a..9a83ec340f 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -36,6 +36,18 @@ defmodule Electric.Application do publication_name = "electric_publication_#{replication_stream_id}" slot_name = "electric_slot_#{replication_stream_id}" + replication_connection_opts = Application.fetch_env!(:electric, :connection_opts) + pool_connection_opts = Application.get_env(:electric, :pool_connection_opts) + + connection_opts = pool_connection_opts || replication_connection_opts + + replication_opts = [ + connection_opts: replication_connection_opts, + publication_name: publication_name, + slot_name: slot_name, + slot_temporary?: Application.fetch_env!(:electric, :replication_slot_temporary?) + ] + # The root application supervisor starts the core global processes, including the HTTP # server and the database connection manager. The latter is responsible for establishing # all needed connections to the database (acquiring the exclusive access lock, opening a @@ -53,13 +65,9 @@ defmodule Electric.Application do {Electric.StackSupervisor, stack_id: stack_id, stack_events_registry: Registry.StackEvents, - connection_opts: Application.fetch_env!(:electric, :connection_opts), persistent_kv: persistent_kv, - replication_opts: [ - publication_name: publication_name, - slot_name: slot_name, - slot_temporary?: Application.fetch_env!(:electric, :replication_slot_temporary?) - ], + connection_opts: connection_opts, + replication_opts: replication_opts, pool_opts: [pool_size: Application.fetch_env!(:electric, :db_pool_size)], storage: Application.fetch_env!(:electric, :storage), chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold)}, diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 343121e957..fc8de64246 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -267,16 +267,18 @@ defmodule Electric.Connection.Manager do end def handle_continue(:start_replication_client, %State{replication_client_pid: nil} = state) do - opts = - state - |> Map.take([:stack_id, :replication_opts, :connection_opts]) - |> Map.to_list() - Logger.debug("Starting replication client for stack #{state.stack_id}") + opts = [ + connection_opts: state.replication_opts[:connection_opts], + replication_opts: state.replication_opts, + stack_id: state.stack_it + ] + case start_replication_client(opts) do {:ok, pid, connection_opts} -> - state = %{state | replication_client_pid: pid, connection_opts: connection_opts} + replication_opts = Keyword.put(state.replication_opts, :connection_opts, connection_opts) + state = %{state | replication_client_pid: pid, replication_opts: replication_opts} if is_nil(state.pool_pid) do # This is the case where Connection.Manager starts connections from the initial state.