Skip to content

Commit

Permalink
chore(electric): Cleanups in code that deals with Postgres connections (
Browse files Browse the repository at this point in the history
  • Loading branch information
alco authored Nov 13, 2023
1 parent 0ad1867 commit 0673596
Show file tree
Hide file tree
Showing 13 changed files with 41 additions and 64 deletions.
2 changes: 1 addition & 1 deletion components/electric/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export UID=$(shell id -u)
export GID=$(shell id -g)

stop_dev_env:
docker compose -f ${DC_CONFIG} down
docker compose -f ${DC_CONFIG} down --volumes

DOCKER_PREFIX:=$(shell basename $(CURDIR))
docker-pgsql-%:
Expand Down
4 changes: 2 additions & 2 deletions components/electric/dev/compose.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Run using `docker compose -f databases.yaml up`.
version: "3.1"
name: "electric_dev"

services:
db_a:
postgres:
image: postgres:14-alpine
environment:
POSTGRES_DB: electric
Expand Down
24 changes: 8 additions & 16 deletions components/electric/lib/electric/postgres/extension.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Electric.Postgres.Extension do
"""

alias Electric.Postgres.{Schema, Schema.Proto, Extension.Functions, Extension.Migration}
alias Electric.Replication.Postgres.Client
alias Electric.Utils

require Logger
Expand Down Expand Up @@ -391,16 +392,13 @@ defmodule Electric.Postgres.Extension do

defp ensure_transaction(conn, fun) when is_function(fun, 1) do
case :epgsql.squery(conn, @is_transaction_sql) do
{:ok, _cols, [{"t"}]} ->
fun.(conn)

{:ok, _cols, [{"f"}]} ->
:epgsql.with_transaction(conn, fun)
{:ok, _cols, [{"t"}]} -> fun.(conn)
{:ok, _cols, [{"f"}]} -> Client.with_transaction(conn, fun)
end
end

def create_schema(conn) do
ddl(conn, ~s|CREATE SCHEMA IF NOT EXISTS "#{@schema}"|)
{:ok, [], []} = :epgsql.squery(conn, ~s|CREATE SCHEMA IF NOT EXISTS "#{@schema}"|)
end

@create_migration_table_sql """
Expand All @@ -411,11 +409,13 @@ defmodule Electric.Postgres.Extension do
"""

def create_migration_table(conn) do
ddl(conn, @create_migration_table_sql)
{:ok, [], []} = :epgsql.squery(conn, @create_migration_table_sql)
end

defp with_migration_lock(conn, fun) do
ddl(conn, "LOCK TABLE #{@migration_table} IN SHARE UPDATE EXCLUSIVE MODE")
{:ok, [], []} =
:epgsql.squery(conn, "LOCK TABLE #{@migration_table} IN SHARE UPDATE EXCLUSIVE MODE")

fun.()
end

Expand All @@ -426,14 +426,6 @@ defmodule Electric.Postgres.Extension do
Enum.map(rows, fn {version} -> String.to_integer(version) end)
end

defp ddl(conn, sql, _bind \\ []) do
case :epgsql.squery(conn, sql) do
{:ok, _count} -> conn
{:ok, _count, _cols, _rows} -> conn
{:ok, _cols, _rows} -> conn
end
end

def migration_versions(module) when is_atom(module) do
unless function_exported?(module, :migrations, 0),
do: raise(ArgumentError, message: "Module #{module} does not have a migrations/0 function")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule Electric.Postgres.Extension.SchemaLoader.Epgsql do

:error ->
conn_config
|> Connectors.get_connection_opts(replication: false)
|> Connectors.get_connection_opts()
|> Electric.Utils.epgsql_config()
|> :epgsql.connect()
end
Expand Down
2 changes: 1 addition & 1 deletion components/electric/lib/electric/replication/connectors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ defmodule Electric.Replication.Connectors do

@spec get_connection_opts(config()) :: connection_opts()
def get_connection_opts(config, opts \\ []) do
replication? = Keyword.get(opts, :replication, true)
replication? = Keyword.get(opts, :replication, false)

config
|> Keyword.fetch!(:connection)
Expand Down
11 changes: 6 additions & 5 deletions components/electric/lib/electric/replication/initial_sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ defmodule Electric.Replication.InitialSync do
connection: opts,
telemetry_span: span
) do
Client.with_conn(Connectors.get_connection_opts(opts, replication: false), fn conn ->
Client.with_conn(Connectors.get_connection_opts(opts), fn conn ->
origin = Connectors.origin(opts)
{:ok, _, schema} = Extension.SchemaCache.load(origin)

:epgsql.with_transaction(
Client.with_transaction(
"ISOLATION LEVEL REPEATABLE READ READ ONLY",
conn,
fn conn ->
# Do the magic write described in the function docs. It's important that this is
Expand Down Expand Up @@ -144,14 +145,14 @@ defmodule Electric.Replication.InitialSync do
results ->
send(parent, {:subscription_data, subscription_id, results})
end
end,
begin_opts: "ISOLATION LEVEL REPEATABLE READ READ ONLY"
end
)
end)
end

defp perform_magic_write(opts, subscription_id) do
Connectors.get_connection_opts(opts, replication: false)
opts
|> Connectors.get_connection_opts()
|> Client.with_conn(
&Extension.update_transaction_marker(&1, "subscription:" <> subscription_id)
)
Expand Down
35 changes: 10 additions & 25 deletions components/electric/lib/electric/replication/postgres/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ defmodule Electric.Replication.Postgres.Client do
end
end

@doc """
Wrapper for :epgsql.with_transaction/3 that always sets `reraise` to `true` by default and makes `begin_opts` a
standalone function argument for easier code reading.
"""
def with_transaction(mode \\ "", conn, fun, in_opts \\ [])
when is_binary(mode) and is_list(in_opts) do
opts = Keyword.merge([reraise: true, begin_opts: mode], in_opts)
:epgsql.with_transaction(conn, fun, opts)
end

def close(conn) do
:epgsql.close(conn)
end
Expand Down Expand Up @@ -89,31 +99,6 @@ defmodule Electric.Replication.Postgres.Client do
end
end

@spec create_publication(connection(), publication(), :all | binary | [binary]) ::
{:ok, String.t()}
def create_publication(conn, name, :all) do
# squery(conn, "CREATE PUBLICATION #{name} FOR ALL TABLES")
create_publication(conn, name, "ALL TABLES")
end

def create_publication(conn, name, tables) when is_list(tables) do
# squery(conn, "CREATE PUBLICATION #{name} FOR TABLE t1, t2")
table_list =
tables
|> Enum.map(&~s|"#{&1}"|)
|> Enum.join(", ")

create_publication(conn, name, "TABLE #{table_list}")
end

def create_publication(conn, name, table_spec) when is_binary(table_spec) do
case squery(conn, ~s|CREATE PUBLICATION "#{name}" FOR #{table_spec}|) do
{:ok, _, _} -> {:ok, name}
# TODO: Verify that the publication has the correct tables
{:error, {_, _, _, :duplicate_object, _, _}} -> {:ok, name}
end
end

defp squery(conn, query) do
Logger.debug("#{__MODULE__}: #{query}")
:epgsql.squery(conn, query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
@impl true
def init(conn_config) do
origin = Connectors.origin(conn_config)
conn_opts = Connectors.get_connection_opts(conn_config)
conn_opts = Connectors.get_connection_opts(conn_config, replication: true)
repl_opts = Connectors.get_replication_opts(conn_config)

:gproc.reg(name(origin))
Expand Down
2 changes: 1 addition & 1 deletion components/electric/lib/electric/satellite/ws_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ defmodule Electric.Satellite.WebsocketServer do

def fetch_last_acked_client_lsn(state) do
state.pg_connector_opts
|> Electric.Replication.Connectors.get_connection_opts(replication: false)
|> Electric.Replication.Connectors.get_connection_opts()
|> Electric.Replication.Postgres.Client.with_conn(fn conn ->
Electric.Postgres.Extension.fetch_last_acked_client_lsn(conn, state.client_id)
end)
Expand Down
3 changes: 2 additions & 1 deletion components/electric/test/electric/postgres_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Electric.PostgresTest do
use ExUnitProperties

import Electric.Postgres.TestConnection
alias Electric.Replication.Postgres.Client

setup do
context = create_test_db()
Expand Down Expand Up @@ -67,7 +68,7 @@ defmodule Electric.PostgresTest do
trace("> " <> sql <> ";")
if sql_file, do: IO.write(sql_file, sql <> ";\n\n")

:epgsql.with_transaction(conn, fn tx ->
Client.with_transaction(conn, fn tx ->
{:ok, _count, _rows} = :epgsql.squery(tx, sql <> ";")
end)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Electric.Replication.InitialSyncTest do
alias Electric.Postgres.{CachedWal, Extension, Lsn}
alias Electric.Replication.Changes.{NewRecord, Transaction}
alias Electric.Replication.InitialSync
alias Electric.Replication.Postgres.Client

@origin "initial-sync-test"
@sleep_timeout 500
Expand Down Expand Up @@ -119,7 +120,7 @@ defmodule Electric.Replication.InitialSyncTest do
end

defp electrify_table(conn, name, version) do
:epgsql.with_transaction(conn, fn tx_conn ->
Client.with_transaction(conn, fn tx_conn ->
{:ok, [], []} = :epgsql.squery(tx_conn, "CALL electric.electrify('#{name}')")

{:ok, [], []} =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducerTest do
{Connectors, [:passthrough],
[
get_replication_opts: fn _ -> %{publication: "mock_pub", slot: "mock_slot"} end,
get_connection_opts: fn _ -> %{} end
get_connection_opts: fn _, _ -> %{} end
]},
{SchemaLoader, [:passthrough],
[
Expand Down
13 changes: 5 additions & 8 deletions components/electric/test/support/extension_case.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Electric.Extension.Case.Helpers do
alias Electric.Postgres.Extension

alias Electric.Replication.Postgres.Client
require ExUnit.Assertions

@doc """
Expand Down Expand Up @@ -34,14 +35,10 @@ defmodule Electric.Extension.Case.Helpers do

def tx(fun, cxt) do
try do
:epgsql.with_transaction(
cxt.conn,
fn tx ->
fun.(tx)
raise RollbackError, message: "rollback"
end,
reraise: true
)
Client.with_transaction(cxt.conn, fn tx ->
fun.(tx)
raise RollbackError, message: "rollback"
end)
rescue
RollbackError -> :ok
end
Expand Down

0 comments on commit 0673596

Please sign in to comment.