Skip to content

Commit

Permalink
Add an optional DATABASE_POOL_URL config
Browse files Browse the repository at this point in the history
When configured, the lock connection and the database pool will connect
to this pooled URL.

By default, when DATABASE_POOL_URL is not set, all connections are open
using the direct database connection URL configured with DATABASE_URL.
  • Loading branch information
alco committed Dec 3, 2024
1 parent ae20ae1 commit a53b820
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
8 changes: 6 additions & 2 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,15 @@ database_ipv6_config =
env!("ELECTRIC_DATABASE_USE_IPV6", :boolean, false)

{:ok, database_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_url)

connection_opts = database_url_config ++ [ipv6: database_ipv6_config]

config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts)

if database_pool_url = env!("DATABASE_POOL_URL", :string) do
{:ok, database_pool_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_pool_url)
connection_opts = database_pool_url_config ++ [ipv6: database_ipv6_config]
config :electric, pool_connection_opts: Electric.Utils.obfuscate_password(connection_opts)
end

enable_integration_testing = env!("ELECTRIC_ENABLE_INTEGRATION_TESTING", :boolean, false)
cache_max_age = env!("ELECTRIC_CACHE_MAX_AGE", :integer, 60)
cache_stale_age = env!("ELECTRIC_CACHE_STALE_AGE", :integer, 60 * 5)
Expand Down
20 changes: 14 additions & 6 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ defmodule Electric.Application do
publication_name = "electric_publication_#{replication_stream_id}"
slot_name = "electric_slot_#{replication_stream_id}"

replication_connection_opts = Application.fetch_env!(:electric, :connection_opts)
pool_connection_opts = Application.get_env(:electric, :pool_connection_opts)

connection_opts = pool_connection_opts || replication_connection_opts

replication_opts = [
connection_opts: replication_connection_opts,
publication_name: publication_name,
slot_name: slot_name,
slot_temporary?: Application.fetch_env!(:electric, :replication_slot_temporary?)
]

# The root application supervisor starts the core global processes, including the HTTP
# server and the database connection manager. The latter is responsible for establishing
# all needed connections to the database (acquiring the exclusive access lock, opening a
Expand All @@ -53,13 +65,9 @@ defmodule Electric.Application do
{Electric.StackSupervisor,
stack_id: stack_id,
stack_events_registry: Registry.StackEvents,
connection_opts: Application.fetch_env!(:electric, :connection_opts),
persistent_kv: persistent_kv,
replication_opts: [
publication_name: publication_name,
slot_name: slot_name,
slot_temporary?: Application.fetch_env!(:electric, :replication_slot_temporary?)
],
connection_opts: connection_opts,
replication_opts: replication_opts,
pool_opts: [pool_size: Application.fetch_env!(:electric, :db_pool_size)],
storage: Application.fetch_env!(:electric, :storage),
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold)},
Expand Down
14 changes: 8 additions & 6 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,18 @@ defmodule Electric.Connection.Manager do
end

def handle_continue(:start_replication_client, %State{replication_client_pid: nil} = state) do
opts =
state
|> Map.take([:stack_id, :replication_opts, :connection_opts])
|> Map.to_list()

Logger.debug("Starting replication client for stack #{state.stack_id}")

opts = [
connection_opts: state.replication_opts[:connection_opts],
replication_opts: state.replication_opts,
stack_id: state.stack_it
]

case start_replication_client(opts) do
{:ok, pid, connection_opts} ->
state = %{state | replication_client_pid: pid, connection_opts: connection_opts}
replication_opts = Keyword.put(state.replication_opts, :connection_opts, connection_opts)
state = %{state | replication_client_pid: pid, replication_opts: replication_opts}

if is_nil(state.pool_pid) do
# This is the case where Connection.Manager starts connections from the initial state.
Expand Down

0 comments on commit a53b820

Please sign in to comment.