Skip to content

Commit

Permalink
Allow configuration of temporary replication slots (#1896)
Browse files Browse the repository at this point in the history
Fixes #1854

For test environments it is convenient to have the option to use
temporary replication slots such that no subsequent cleanup work is
required.
  • Loading branch information
msfstef authored Oct 29, 2024
1 parent a503c21 commit 9652bca
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 5 deletions.
5 changes: 5 additions & 0 deletions .changeset/cyan-guests-lie.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Add `CLEANUP_REPLICATION_SLOTS_ON_SHUTDOWN` env var option to configure whether temporary replication slots are used, to allow easier cleanups on test deploys
1 change: 1 addition & 0 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ config :electric,
telemetry_statsd_host: statsd_host,
db_pool_size: env!("DB_POOL_SIZE", :integer, 20),
replication_stream_id: replication_stream_id,
replication_slot_temporary?: env!("CLEANUP_REPLICATION_SLOTS_ON_SHUTDOWN", :boolean, false),
service_port: env!("PORT", :integer, 3000),
prometheus_port: prometheus_port,
storage: storage,
Expand Down
5 changes: 4 additions & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Electric.Application do
publication_name: config.replication_opts.publication_name,
try_creating_publication?: true,
slot_name: config.replication_opts.slot_name,
slot_temporary?: config.replication_opts.slot_temporary?,
transaction_received:
{Electric.Replication.ShapeLogCollector, :store_transaction, [shape_log_collector]},
relation_received:
Expand Down Expand Up @@ -102,6 +103,7 @@ defmodule Electric.Application do
replication_stream_id = Application.fetch_env!(:electric, :replication_stream_id)
publication_name = "electric_publication_#{replication_stream_id}"
slot_name = "electric_slot_#{replication_stream_id}"
slot_temporary? = Application.get_env(:electric, :replication_slot_temporary?, false)

get_pg_version_fn = fn ->
Electric.Connection.Manager.get_pg_version(Electric.Connection.Manager)
Expand Down Expand Up @@ -133,7 +135,8 @@ defmodule Electric.Application do
replication_opts: %{
stream_id: replication_stream_id,
publication_name: publication_name,
slot_name: slot_name
slot_name: slot_name,
slot_temporary?: slot_temporary?
},
pool_opts: %{
size: Application.fetch_env!(:electric, :db_pool_size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ defmodule Electric.Postgres.ReplicationClient do
:try_creating_publication?,
:start_streaming?,
:slot_name,
:slot_temporary?,
:display_settings,
origin: "postgres",
txn_collector: %Collector{},
Expand All @@ -56,6 +57,7 @@ defmodule Electric.Postgres.ReplicationClient do
try_creating_publication?: boolean(),
start_streaming?: boolean(),
slot_name: String.t(),
slot_temporary?: boolean(),
origin: String.t(),
txn_collector: Collector.t(),
step: Electric.Postgres.ReplicationClient.step(),
Expand All @@ -70,7 +72,8 @@ defmodule Electric.Postgres.ReplicationClient do
publication_name: [required: true, type: :string],
try_creating_publication?: [required: true, type: :boolean],
start_streaming?: [type: :boolean, default: true],
slot_name: [required: true, type: :string]
slot_name: [required: true, type: :string],
slot_temporary?: [type: :boolean, default: false]
)

@spec new(Access.t()) :: t()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
module focused on the handling of logical messages.
"""
alias Electric.Utils
alias Electric.Postgres.ReplicationClient.State

require Logger

Expand Down Expand Up @@ -100,9 +101,15 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do

###

defp create_slot_query(state) do
query =
"CREATE_REPLICATION_SLOT #{Utils.quote_name(state.slot_name)} LOGICAL pgoutput NOEXPORT_SNAPSHOT"
@slot_options "LOGICAL pgoutput NOEXPORT_SNAPSHOT"
@temp_slot_options "TEMPORARY #{@slot_options}"
defp create_slot_query(%State{slot_name: slot_name, slot_temporary?: true} = state) do
query = "CREATE_REPLICATION_SLOT #{Utils.quote_name(slot_name)} #{@temp_slot_options}"
{:query, query, state}
end

defp create_slot_query(%State{slot_name: slot_name} = state) do
query = "CREATE_REPLICATION_SLOT #{Utils.quote_name(slot_name)} #{@slot_options}"

{:query, query, state}
end
Expand Down

0 comments on commit 9652bca

Please sign in to comment.