Skip to content

Commit

Permalink
Add an optional DATABASE_POOL_URL config
Browse files Browse the repository at this point in the history
When configured, the lock connection and the database pool will connect
to this pooled URL.

By default, when DATABASE_POOL_URL is not set, all connections are open
using the direct database connection URL configured with DATABASE_URL.
  • Loading branch information
alco committed Dec 3, 2024
1 parent ae20ae1 commit 63676b9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 27 deletions.
8 changes: 6 additions & 2 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,15 @@ database_ipv6_config =
env!("ELECTRIC_DATABASE_USE_IPV6", :boolean, false)

{:ok, database_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_url)

connection_opts = database_url_config ++ [ipv6: database_ipv6_config]

config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts)

if database_pool_url = env!("DATABASE_POOL_URL", :string, nil) do
{:ok, database_pool_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_pool_url)
connection_opts = database_pool_url_config ++ [ipv6: database_ipv6_config]
config :electric, pool_connection_opts: Electric.Utils.obfuscate_password(connection_opts)
end

enable_integration_testing = env!("ELECTRIC_ENABLE_INTEGRATION_TESTING", :boolean, false)
cache_max_age = env!("ELECTRIC_CACHE_MAX_AGE", :integer, 60)
cache_stale_age = env!("ELECTRIC_CACHE_STALE_AGE", :integer, 60 * 5)
Expand Down
20 changes: 14 additions & 6 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ defmodule Electric.Application do
publication_name = "electric_publication_#{replication_stream_id}"
slot_name = "electric_slot_#{replication_stream_id}"

replication_connection_opts = Application.fetch_env!(:electric, :connection_opts)
pool_connection_opts = Application.get_env(:electric, :pool_connection_opts)

connection_opts = pool_connection_opts || replication_connection_opts

replication_opts = [
connection_opts: replication_connection_opts,
publication_name: publication_name,
slot_name: slot_name,
slot_temporary?: Application.fetch_env!(:electric, :replication_slot_temporary?)
]

# The root application supervisor starts the core global processes, including the HTTP
# server and the database connection manager. The latter is responsible for establishing
# all needed connections to the database (acquiring the exclusive access lock, opening a
Expand All @@ -53,13 +65,9 @@ defmodule Electric.Application do
{Electric.StackSupervisor,
stack_id: stack_id,
stack_events_registry: Registry.StackEvents,
connection_opts: Application.fetch_env!(:electric, :connection_opts),
persistent_kv: persistent_kv,
replication_opts: [
publication_name: publication_name,
slot_name: slot_name,
slot_temporary?: Application.fetch_env!(:electric, :replication_slot_temporary?)
],
connection_opts: connection_opts,
replication_opts: replication_opts,
pool_opts: [pool_size: Application.fetch_env!(:electric, :db_pool_size)],
storage: Application.fetch_env!(:electric, :storage),
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold)},
Expand Down
16 changes: 10 additions & 6 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,20 @@ defmodule Electric.Connection.Manager do
end

def handle_continue(:start_replication_client, %State{replication_client_pid: nil} = state) do
opts =
state
|> Map.take([:stack_id, :replication_opts, :connection_opts])
|> Map.to_list()

Logger.debug("Starting replication client for stack #{state.stack_id}")

{connection_opts, replication_opts} = Keyword.pop(state.replication_opts, :connection_opts)

opts = [
connection_opts: connection_opts,
replication_opts: replication_opts,
stack_id: state.stack_id
]

case start_replication_client(opts) do
{:ok, pid, connection_opts} ->
state = %{state | replication_client_pid: pid, connection_opts: connection_opts}
replication_opts = Keyword.put(replication_opts, :connection_opts, connection_opts)
state = %{state | replication_client_pid: pid, replication_opts: replication_opts}

if is_nil(state.pool_pid) do
# This is the case where Connection.Manager starts connections from the initial state.
Expand Down
29 changes: 16 additions & 13 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,31 @@ defmodule Electric.StackSupervisor do
"""
use Supervisor, restart: :transient

@connection_opts_schema [
type: :keyword_list,
required: true,
keys: [
hostname: [type: :string, required: true],
port: [type: :integer, required: true],
database: [type: :string, required: true],
username: [type: :string, required: true],
password: [type: {:fun, 0}, required: true],
sslmode: [type: :atom, required: false],
ipv6: [type: :boolean, required: false]
]
]

@opts_schema NimbleOptions.new!(
name: [type: :any, required: false],
stack_id: [type: :string, required: true],
persistent_kv: [type: :any, required: true],
stack_events_registry: [type: :atom, required: true],
connection_opts: [
type: :keyword_list,
required: true,
keys: [
hostname: [type: :string, required: true],
port: [type: :integer, required: true],
database: [type: :string, required: true],
username: [type: :string, required: true],
password: [type: {:fun, 0}, required: true],
sslmode: [type: :atom, required: false],
ipv6: [type: :boolean, required: false]
]
],
connection_opts: @connection_opts_schema,
replication_opts: [
type: :keyword_list,
required: true,
keys: [
connection_opts: @connection_opts_schema,
publication_name: [type: :string, required: true],
slot_name: [type: :string, required: true],
slot_temporary?: [type: :boolean, default: false],
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/test/support/component_setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ defmodule Support.ComponentSetup do
storage: storage,
connection_opts: ctx.db_config,
replication_opts: [
connection_opts: ctx.db_config,
slot_name: "electric_test_slot_#{:erlang.phash2(stack_id)}",
publication_name: "electric_test_pub_#{:erlang.phash2(stack_id)}",
try_creating_publication?: true,
Expand Down

0 comments on commit 63676b9

Please sign in to comment.