Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an optional DATABASE_POOL_URL config #2085

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stupid-weeks-look.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Restore the automatic fallback to unencrypted database connections when SSL isn't available.
2 changes: 1 addition & 1 deletion examples/gatekeeper-auth/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ $ curl -sv --header "Authorization: Bearer ${AUTH_TOKEN}" \
Note that we got an empty response when successfully proxied through to Electric above because there are no `items` in the database. If you like, you can create some, e.g. using `psql`:

```console
$ psql "postgresql://postgres:password@localhost:54321/electric?sslmode=disable"
$ psql "postgresql://postgres:password@localhost:54321/electric"
psql (16.4)
Type "help" for help.

Expand Down
8 changes: 6 additions & 2 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,15 @@ database_ipv6_config =
env!("ELECTRIC_DATABASE_USE_IPV6", :boolean, false)

{:ok, database_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_url)

connection_opts = database_url_config ++ [ipv6: database_ipv6_config]

config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts)

if database_pool_url = env!("DATABASE_POOL_URL", :string, nil) do
{:ok, database_pool_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_pool_url)
connection_opts = database_pool_url_config ++ [ipv6: database_ipv6_config]
config :electric, pool_connection_opts: Electric.Utils.obfuscate_password(connection_opts)
end

enable_integration_testing = env!("ELECTRIC_ENABLE_INTEGRATION_TESTING", :boolean, false)
cache_max_age = env!("ELECTRIC_CACHE_MAX_AGE", :integer, 60)
cache_stale_age = env!("ELECTRIC_CACHE_STALE_AGE", :integer, 60 * 5)
Expand Down
20 changes: 14 additions & 6 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ defmodule Electric.Application do
publication_name = "electric_publication_#{replication_stream_id}"
slot_name = "electric_slot_#{replication_stream_id}"

replication_connection_opts = Application.fetch_env!(:electric, :connection_opts)
pool_connection_opts = Application.get_env(:electric, :pool_connection_opts)

connection_opts = pool_connection_opts || replication_connection_opts

replication_opts = [
connection_opts: replication_connection_opts,
publication_name: publication_name,
slot_name: slot_name,
slot_temporary?: Application.fetch_env!(:electric, :replication_slot_temporary?)
]

# The root application supervisor starts the core global processes, including the HTTP
# server and the database connection manager. The latter is responsible for establishing
# all needed connections to the database (acquiring the exclusive access lock, opening a
Expand All @@ -53,13 +65,9 @@ defmodule Electric.Application do
{Electric.StackSupervisor,
stack_id: stack_id,
stack_events_registry: Registry.StackEvents,
connection_opts: Application.fetch_env!(:electric, :connection_opts),
persistent_kv: persistent_kv,
replication_opts: [
publication_name: publication_name,
slot_name: slot_name,
slot_temporary?: Application.fetch_env!(:electric, :replication_slot_temporary?)
],
connection_opts: connection_opts,
replication_opts: replication_opts,
pool_opts: [pool_size: Application.fetch_env!(:electric, :db_pool_size)],
storage: Application.fetch_env!(:electric, :storage),
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold)},
Expand Down
95 changes: 61 additions & 34 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -242,37 +242,45 @@ defmodule Electric.Connection.Manager do

@impl true
def handle_continue(:start_lock_connection, %State{lock_connection_pid: nil} = state) do
case Electric.Postgres.LockConnection.start_link(
connection_opts: state.connection_opts,
connection_manager: self(),
lock_name: Keyword.fetch!(state.replication_opts, :slot_name)
) do
{:ok, lock_connection_pid} ->
opts = [
connection_opts: state.connection_opts,
connection_manager: self(),
lock_name: Keyword.fetch!(state.replication_opts, :slot_name)
]

case start_lock_connection(opts) do
{:ok, pid, connection_opts} ->
state = %{state | lock_connection_pid: pid, connection_opts: connection_opts}

Electric.StackSupervisor.dispatch_stack_event(
state.stack_events_registry,
state.stack_id,
:waiting_for_connection_lock
)

Process.send_after(self(), :log_lock_connection_status, @lock_status_logging_interval)
{:noreply, %{state | lock_connection_pid: lock_connection_pid}}
{:noreply, state}

{:error, reason} ->
handle_connection_error(reason, state, "lock_connection")
end
end

def handle_continue(:start_replication_client, %State{replication_client_pid: nil} = state) do
opts =
state
|> Map.take([:stack_id, :replication_opts, :connection_opts])
|> Map.to_list()

Logger.debug("Starting replication client for stack #{state.stack_id}")

{connection_opts, replication_opts} = Keyword.pop(state.replication_opts, :connection_opts)

opts = [
connection_opts: connection_opts,
replication_opts: replication_opts,
stack_id: state.stack_id
]

case start_replication_client(opts) do
{:ok, pid, connection_opts} ->
state = %{state | replication_client_pid: pid, connection_opts: connection_opts}
replication_opts = Keyword.put(replication_opts, :connection_opts, connection_opts)
state = %{state | replication_client_pid: pid, replication_opts: replication_opts}

if is_nil(state.pool_pid) do
# This is the case where Connection.Manager starts connections from the initial state.
Expand Down Expand Up @@ -435,32 +443,27 @@ defmodule Electric.Connection.Manager do
}}
end

defp start_replication_client(opts) do
case Electric.Postgres.ReplicationClient.start_link(opts) do
defp start_lock_connection(opts) do
case Electric.Postgres.LockConnection.start_link(opts) do
{:ok, pid} ->
{:ok, pid, Keyword.fetch!(opts, :connection_opts)}
{:ok, pid, opts[:connection_opts]}

{:error, %Postgrex.Error{message: "ssl not available"}} = error ->
sslmode = get_in(opts, [:connection_opts, :sslmode])

if sslmode == :require do
error
else
if not is_nil(sslmode) do
# Only log a warning when there's an explicit sslmode parameter in the database
# config, meaning the user has requested a certain sslmode.
Logger.warning(
"Failed to connect to the database using SSL. Trying again, using an unencrypted connection."
)
end

opts
|> Keyword.update!(:connection_opts, &Keyword.put(&1, :ssl, false))
|> start_replication_client()
error ->
with {:ok, opts} <- maybe_fallback_to_no_ssl(error, opts) do
start_lock_connection(opts)
end
end
end

defp start_replication_client(opts) do
case Electric.Postgres.ReplicationClient.start_link(opts) do
{:ok, pid} ->
{:ok, pid, opts[:connection_opts]}

error ->
error
with {:ok, opts} <- maybe_fallback_to_no_ssl(error, opts) do
start_replication_client(opts)
end
end
end

Expand All @@ -478,6 +481,30 @@ defmodule Electric.Connection.Manager do
)
end

defp maybe_fallback_to_no_ssl(
{:error, %Postgrex.Error{message: "ssl not available"}} = error,
opts
) do
sslmode = get_in(opts, [:connection_opts, :sslmode])

if sslmode == :require do
error
else
if not is_nil(sslmode) do
# Only log a warning when there's an explicit sslmode parameter in the database
# config, meaning the user has requested a certain sslmode.
Logger.warning(
"Failed to connect to the database using SSL. Trying again, using an unencrypted connection."
)
end

opts = Keyword.update!(opts, :connection_opts, &Keyword.put(&1, :ssl, false))
{:ok, opts}
end
end

defp maybe_fallback_to_no_ssl(error, _opts), do: error

defp handle_connection_error(
{:shutdown, {:failed_to_start_child, Electric.Postgres.ReplicationClient, error}},
state,
Expand Down
29 changes: 16 additions & 13 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,31 @@ defmodule Electric.StackSupervisor do
"""
use Supervisor, restart: :transient

@connection_opts_schema [
type: :keyword_list,
required: true,
keys: [
hostname: [type: :string, required: true],
port: [type: :integer, required: true],
database: [type: :string, required: true],
username: [type: :string, required: true],
password: [type: {:fun, 0}, required: true],
sslmode: [type: :atom, required: false],
ipv6: [type: :boolean, required: false]
]
]

@opts_schema NimbleOptions.new!(
name: [type: :any, required: false],
stack_id: [type: :string, required: true],
persistent_kv: [type: :any, required: true],
stack_events_registry: [type: :atom, required: true],
connection_opts: [
type: :keyword_list,
required: true,
keys: [
hostname: [type: :string, required: true],
port: [type: :integer, required: true],
database: [type: :string, required: true],
username: [type: :string, required: true],
password: [type: {:fun, 0}, required: true],
sslmode: [type: :atom, required: false],
ipv6: [type: :boolean, required: false]
]
],
connection_opts: @connection_opts_schema,
replication_opts: [
type: :keyword_list,
required: true,
keys: [
connection_opts: @connection_opts_schema,
publication_name: [type: :string, required: true],
slot_name: [type: :string, required: true],
slot_temporary?: [type: :boolean, default: false],
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/test/support/component_setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ defmodule Support.ComponentSetup do
storage: storage,
connection_opts: ctx.db_config,
replication_opts: [
connection_opts: ctx.db_config,
slot_name: "electric_test_slot_#{:erlang.phash2(stack_id)}",
publication_name: "electric_test_pub_#{:erlang.phash2(stack_id)}",
try_creating_publication?: true,
Expand Down
Loading