diff --git a/components/electric/Makefile b/components/electric/Makefile index 850f788fd4..efa0e4758f 100644 --- a/components/electric/Makefile +++ b/components/electric/Makefile @@ -51,7 +51,7 @@ export UID=$(shell id -u) export GID=$(shell id -g) stop_dev_env: - docker compose -f ${DC_CONFIG} down + docker compose -f ${DC_CONFIG} down --volumes DOCKER_PREFIX:=$(shell basename $(CURDIR)) docker-pgsql-%: diff --git a/components/electric/dev/compose.yaml b/components/electric/dev/compose.yaml index 733f3e5039..c21f3bfdd8 100644 --- a/components/electric/dev/compose.yaml +++ b/components/electric/dev/compose.yaml @@ -1,8 +1,8 @@ -# Run using `docker compose -f databases.yaml up`. version: "3.1" +name: "electric_dev" services: - db_a: + postgres: image: postgres:14-alpine environment: POSTGRES_DB: electric diff --git a/components/electric/lib/electric/postgres/extension.ex b/components/electric/lib/electric/postgres/extension.ex index 91eb4bd554..82f9eb7a80 100644 --- a/components/electric/lib/electric/postgres/extension.ex +++ b/components/electric/lib/electric/postgres/extension.ex @@ -4,6 +4,7 @@ defmodule Electric.Postgres.Extension do """ alias Electric.Postgres.{Schema, Schema.Proto, Extension.Functions, Extension.Migration} + alias Electric.Replication.Postgres.Client alias Electric.Utils require Logger @@ -391,16 +392,13 @@ defmodule Electric.Postgres.Extension do defp ensure_transaction(conn, fun) when is_function(fun, 1) do case :epgsql.squery(conn, @is_transaction_sql) do - {:ok, _cols, [{"t"}]} -> - fun.(conn) - - {:ok, _cols, [{"f"}]} -> - :epgsql.with_transaction(conn, fun) + {:ok, _cols, [{"t"}]} -> fun.(conn) + {:ok, _cols, [{"f"}]} -> Client.with_transaction(conn, fun) end end def create_schema(conn) do - ddl(conn, ~s|CREATE SCHEMA IF NOT EXISTS "#{@schema}"|) + {:ok, [], []} = :epgsql.squery(conn, ~s|CREATE SCHEMA IF NOT EXISTS "#{@schema}"|) end @create_migration_table_sql """ @@ -411,11 +409,13 @@ defmodule Electric.Postgres.Extension do """ def create_migration_table(conn) do - ddl(conn, @create_migration_table_sql) + {:ok, [], []} = :epgsql.squery(conn, @create_migration_table_sql) end defp with_migration_lock(conn, fun) do - ddl(conn, "LOCK TABLE #{@migration_table} IN SHARE UPDATE EXCLUSIVE MODE") + {:ok, [], []} = + :epgsql.squery(conn, "LOCK TABLE #{@migration_table} IN SHARE UPDATE EXCLUSIVE MODE") + fun.() end @@ -426,14 +426,6 @@ defmodule Electric.Postgres.Extension do Enum.map(rows, fn {version} -> String.to_integer(version) end) end - defp ddl(conn, sql, _bind \\ []) do - case :epgsql.squery(conn, sql) do - {:ok, _count} -> conn - {:ok, _count, _cols, _rows} -> conn - {:ok, _cols, _rows} -> conn - end - end - def migration_versions(module) when is_atom(module) do unless function_exported?(module, :migrations, 0), do: raise(ArgumentError, message: "Module #{module} does not have a migrations/0 function") diff --git a/components/electric/lib/electric/postgres/extension/schema_loader/epgsql.ex b/components/electric/lib/electric/postgres/extension/schema_loader/epgsql.ex index 2f97bbd288..5fe49fecb8 100644 --- a/components/electric/lib/electric/postgres/extension/schema_loader/epgsql.ex +++ b/components/electric/lib/electric/postgres/extension/schema_loader/epgsql.ex @@ -26,7 +26,7 @@ defmodule Electric.Postgres.Extension.SchemaLoader.Epgsql do :error -> conn_config - |> Connectors.get_connection_opts(replication: false) + |> Connectors.get_connection_opts() |> Electric.Utils.epgsql_config() |> :epgsql.connect() end diff --git a/components/electric/lib/electric/replication/connectors.ex b/components/electric/lib/electric/replication/connectors.ex index d22a4bd125..f93b90e858 100644 --- a/components/electric/lib/electric/replication/connectors.ex +++ b/components/electric/lib/electric/replication/connectors.ex @@ -105,7 +105,7 @@ defmodule Electric.Replication.Connectors do @spec get_connection_opts(config()) :: connection_opts() def get_connection_opts(config, opts \\ []) do - replication? = Keyword.get(opts, :replication, true) + replication? = Keyword.get(opts, :replication, false) config |> Keyword.fetch!(:connection) diff --git a/components/electric/lib/electric/replication/initial_sync.ex b/components/electric/lib/electric/replication/initial_sync.ex index 5c156440da..d389731b72 100644 --- a/components/electric/lib/electric/replication/initial_sync.ex +++ b/components/electric/lib/electric/replication/initial_sync.ex @@ -98,11 +98,12 @@ defmodule Electric.Replication.InitialSync do connection: opts, telemetry_span: span ) do - Client.with_conn(Connectors.get_connection_opts(opts, replication: false), fn conn -> + Client.with_conn(Connectors.get_connection_opts(opts), fn conn -> origin = Connectors.origin(opts) {:ok, _, schema} = Extension.SchemaCache.load(origin) - :epgsql.with_transaction( + Client.with_transaction( + "ISOLATION LEVEL REPEATABLE READ READ ONLY", conn, fn conn -> # Do the magic write described in the function docs. It's important that this is @@ -144,14 +145,14 @@ defmodule Electric.Replication.InitialSync do results -> send(parent, {:subscription_data, subscription_id, results}) end - end, - begin_opts: "ISOLATION LEVEL REPEATABLE READ READ ONLY" + end ) end) end defp perform_magic_write(opts, subscription_id) do - Connectors.get_connection_opts(opts, replication: false) + opts + |> Connectors.get_connection_opts() |> Client.with_conn( &Extension.update_transaction_marker(&1, "subscription:" <> subscription_id) ) diff --git a/components/electric/lib/electric/replication/postgres/client.ex b/components/electric/lib/electric/replication/postgres/client.ex index a2c478329c..9dd22c4c45 100644 --- a/components/electric/lib/electric/replication/postgres/client.ex +++ b/components/electric/lib/electric/replication/postgres/client.ex @@ -53,6 +53,16 @@ defmodule Electric.Replication.Postgres.Client do end end + @doc """ + Wrapper for :epgsql.with_transaction/3 that always sets `reraise` to `true` by default and makes `begin_opts` a + standalone function argument for easier code reading. + """ + def with_transaction(mode \\ "", conn, fun, in_opts \\ []) + when is_binary(mode) and is_list(in_opts) do + opts = Keyword.merge([reraise: true, begin_opts: mode], in_opts) + :epgsql.with_transaction(conn, fun, opts) + end + def close(conn) do :epgsql.close(conn) end @@ -89,31 +99,6 @@ defmodule Electric.Replication.Postgres.Client do end end - @spec create_publication(connection(), publication(), :all | binary | [binary]) :: - {:ok, String.t()} - def create_publication(conn, name, :all) do - # squery(conn, "CREATE PUBLICATION #{name} FOR ALL TABLES") - create_publication(conn, name, "ALL TABLES") - end - - def create_publication(conn, name, tables) when is_list(tables) do - # squery(conn, "CREATE PUBLICATION #{name} FOR TABLE t1, t2") - table_list = - tables - |> Enum.map(&~s|"#{&1}"|) - |> Enum.join(", ") - - create_publication(conn, name, "TABLE #{table_list}") - end - - def create_publication(conn, name, table_spec) when is_binary(table_spec) do - case squery(conn, ~s|CREATE PUBLICATION "#{name}" FOR #{table_spec}|) do - {:ok, _, _} -> {:ok, name} - # TODO: Verify that the publication has the correct tables - {:error, {_, _, _, :duplicate_object, _, _}} -> {:ok, name} - end - end - defp squery(conn, query) do Logger.debug("#{__MODULE__}: #{query}") :epgsql.squery(conn, query) diff --git a/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex b/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex index cbc47dc8f3..73ab15ccbd 100644 --- a/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex +++ b/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex @@ -80,7 +80,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do @impl true def init(conn_config) do origin = Connectors.origin(conn_config) - conn_opts = Connectors.get_connection_opts(conn_config) + conn_opts = Connectors.get_connection_opts(conn_config, replication: true) repl_opts = Connectors.get_replication_opts(conn_config) :gproc.reg(name(origin)) diff --git a/components/electric/lib/electric/satellite/ws_server.ex b/components/electric/lib/electric/satellite/ws_server.ex index 516cb2071b..43967d2a44 100644 --- a/components/electric/lib/electric/satellite/ws_server.ex +++ b/components/electric/lib/electric/satellite/ws_server.ex @@ -500,7 +500,7 @@ defmodule Electric.Satellite.WebsocketServer do def fetch_last_acked_client_lsn(state) do state.pg_connector_opts - |> Electric.Replication.Connectors.get_connection_opts(replication: false) + |> Electric.Replication.Connectors.get_connection_opts() |> Electric.Replication.Postgres.Client.with_conn(fn conn -> Electric.Postgres.Extension.fetch_last_acked_client_lsn(conn, state.client_id) end) diff --git a/components/electric/test/electric/postgres_test.exs b/components/electric/test/electric/postgres_test.exs index 57f94846bc..8856fd7fcb 100644 --- a/components/electric/test/electric/postgres_test.exs +++ b/components/electric/test/electric/postgres_test.exs @@ -3,6 +3,7 @@ defmodule Electric.PostgresTest do use ExUnitProperties import Electric.Postgres.TestConnection + alias Electric.Replication.Postgres.Client setup do context = create_test_db() @@ -67,7 +68,7 @@ defmodule Electric.PostgresTest do trace("> " <> sql <> ";") if sql_file, do: IO.write(sql_file, sql <> ";\n\n") - :epgsql.with_transaction(conn, fn tx -> + Client.with_transaction(conn, fn tx -> {:ok, _count, _rows} = :epgsql.squery(tx, sql <> ";") end) diff --git a/components/electric/test/electric/replication/initial_sync_test.exs b/components/electric/test/electric/replication/initial_sync_test.exs index 50953b7281..fa4d4e8fd1 100644 --- a/components/electric/test/electric/replication/initial_sync_test.exs +++ b/components/electric/test/electric/replication/initial_sync_test.exs @@ -6,6 +6,7 @@ defmodule Electric.Replication.InitialSyncTest do alias Electric.Postgres.{CachedWal, Extension, Lsn} alias Electric.Replication.Changes.{NewRecord, Transaction} alias Electric.Replication.InitialSync + alias Electric.Replication.Postgres.Client @origin "initial-sync-test" @sleep_timeout 500 @@ -119,7 +120,7 @@ defmodule Electric.Replication.InitialSyncTest do end defp electrify_table(conn, name, version) do - :epgsql.with_transaction(conn, fn tx_conn -> + Client.with_transaction(conn, fn tx_conn -> {:ok, [], []} = :epgsql.squery(tx_conn, "CALL electric.electrify('#{name}')") {:ok, [], []} = diff --git a/components/electric/test/electric/replication/postgres/logical_replication_producer_test.exs b/components/electric/test/electric/replication/postgres/logical_replication_producer_test.exs index 2ea6a47b63..84b3e59799 100644 --- a/components/electric/test/electric/replication/postgres/logical_replication_producer_test.exs +++ b/components/electric/test/electric/replication/postgres/logical_replication_producer_test.exs @@ -24,7 +24,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducerTest do {Connectors, [:passthrough], [ get_replication_opts: fn _ -> %{publication: "mock_pub", slot: "mock_slot"} end, - get_connection_opts: fn _ -> %{} end + get_connection_opts: fn _, _ -> %{} end ]}, {SchemaLoader, [:passthrough], [ diff --git a/components/electric/test/support/extension_case.ex b/components/electric/test/support/extension_case.ex index 26ad72128d..e528ed0178 100644 --- a/components/electric/test/support/extension_case.ex +++ b/components/electric/test/support/extension_case.ex @@ -1,6 +1,7 @@ defmodule Electric.Extension.Case.Helpers do alias Electric.Postgres.Extension + alias Electric.Replication.Postgres.Client require ExUnit.Assertions @doc """ @@ -34,14 +35,10 @@ defmodule Electric.Extension.Case.Helpers do def tx(fun, cxt) do try do - :epgsql.with_transaction( - cxt.conn, - fn tx -> - fun.(tx) - raise RollbackError, message: "rollback" - end, - reraise: true - ) + Client.with_transaction(cxt.conn, fn tx -> + fun.(tx) + raise RollbackError, message: "rollback" + end) rescue RollbackError -> :ok end