diff --git a/.changeset/cyan-guests-lie.md b/.changeset/cyan-guests-lie.md new file mode 100644 index 0000000000..1dee217c6b --- /dev/null +++ b/.changeset/cyan-guests-lie.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Add `CLEANUP_REPLICATION_SLOTS_ON_SHUTDOWN` env var option to configure whether temporary replication slots are used, to allow easier cleanups on test deploys diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 1e08d6a7a7..509d8f885d 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -196,6 +196,7 @@ config :electric, telemetry_statsd_host: statsd_host, db_pool_size: env!("DB_POOL_SIZE", :integer, 20), replication_stream_id: replication_stream_id, + replication_slot_temporary?: env!("CLEANUP_REPLICATION_SLOTS_ON_SHUTDOWN", :boolean, false), service_port: env!("PORT", :integer, 3000), prometheus_port: prometheus_port, storage: storage, diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index efb2f7974f..fe16ec77c9 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -29,6 +29,7 @@ defmodule Electric.Application do publication_name: config.replication_opts.publication_name, try_creating_publication?: true, slot_name: config.replication_opts.slot_name, + slot_temporary?: config.replication_opts.slot_temporary?, transaction_received: {Electric.Replication.ShapeLogCollector, :store_transaction, [shape_log_collector]}, relation_received: @@ -102,6 +103,7 @@ defmodule Electric.Application do replication_stream_id = Application.fetch_env!(:electric, :replication_stream_id) publication_name = "electric_publication_#{replication_stream_id}" slot_name = "electric_slot_#{replication_stream_id}" + slot_temporary? = Application.get_env(:electric, :replication_slot_temporary?, false) get_pg_version_fn = fn -> Electric.Connection.Manager.get_pg_version(Electric.Connection.Manager) @@ -133,7 +135,8 @@ defmodule Electric.Application do replication_opts: %{ stream_id: replication_stream_id, publication_name: publication_name, - slot_name: slot_name + slot_name: slot_name, + slot_temporary?: slot_temporary? }, pool_opts: %{ size: Application.fetch_env!(:electric, :db_pool_size) diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index 639e2db26a..2e06e122a2 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -33,6 +33,7 @@ defmodule Electric.Postgres.ReplicationClient do :try_creating_publication?, :start_streaming?, :slot_name, + :slot_temporary?, :display_settings, origin: "postgres", txn_collector: %Collector{}, @@ -56,6 +57,7 @@ defmodule Electric.Postgres.ReplicationClient do try_creating_publication?: boolean(), start_streaming?: boolean(), slot_name: String.t(), + slot_temporary?: boolean(), origin: String.t(), txn_collector: Collector.t(), step: Electric.Postgres.ReplicationClient.step(), @@ -70,7 +72,8 @@ defmodule Electric.Postgres.ReplicationClient do publication_name: [required: true, type: :string], try_creating_publication?: [required: true, type: :boolean], start_streaming?: [type: :boolean, default: true], - slot_name: [required: true, type: :string] + slot_name: [required: true, type: :string], + slot_temporary?: [type: :boolean, default: false] ) @spec new(Access.t()) :: t() diff --git a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex index 7d97e412cd..f4f02b5eb7 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex @@ -8,6 +8,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do module focused on the handling of logical messages. """ alias Electric.Utils + alias Electric.Postgres.ReplicationClient.State require Logger @@ -100,9 +101,15 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do ### - defp create_slot_query(state) do - query = - "CREATE_REPLICATION_SLOT #{Utils.quote_name(state.slot_name)} LOGICAL pgoutput NOEXPORT_SNAPSHOT" + @slot_options "LOGICAL pgoutput NOEXPORT_SNAPSHOT" + @temp_slot_options "TEMPORARY #{@slot_options}" + defp create_slot_query(%State{slot_name: slot_name, slot_temporary?: true} = state) do + query = "CREATE_REPLICATION_SLOT #{Utils.quote_name(slot_name)} #{@temp_slot_options}" + {:query, query, state} + end + + defp create_slot_query(%State{slot_name: slot_name} = state) do + query = "CREATE_REPLICATION_SLOT #{Utils.quote_name(slot_name)} #{@slot_options}" {:query, query, state} end