Skip to content

Commit

Permalink
Make Timeline's dependency on PersistentKV more prominent
Browse files Browse the repository at this point in the history
  • Loading branch information
alco committed Oct 14, 2024
1 parent bc20451 commit b2bc90a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 28 deletions.
4 changes: 1 addition & 3 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ defmodule Electric.Application do
pool_size: config.pool_opts.size,
types: PgInterop.Postgrex.Types
],
timeline_opts: [
persistent_kv: config.persistent_kv
]
persistent_kv: config.persistent_kv
]

# The root application supervisor starts the core global processes, including the HTTP
Expand Down
12 changes: 6 additions & 6 deletions packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ defmodule Electric.ConnectionManager do
:replication_opts,
# Database connection pool options.
:pool_opts,
# Options specific to `Electric.Timeline`.
:timeline_opts,
# Application's persistent key-value storage reference.
:persistent_kv,
# PID of the replication client.
:replication_client_pid,
# PID of the Postgres connection lock.
Expand Down Expand Up @@ -66,7 +66,7 @@ defmodule Electric.ConnectionManager do
{:connection_opts, Keyword.t()}
| {:replication_opts, Keyword.t()}
| {:pool_opts, Keyword.t()}
| {:timeline_opts, Keyword.t()}
| {:persistent_kv, map()}

@type options :: [option]

Expand Down Expand Up @@ -124,14 +124,14 @@ defmodule Electric.ConnectionManager do

pool_opts = Keyword.fetch!(opts, :pool_opts)

timeline_opts = Keyword.fetch!(opts, :timeline_opts)
persistent_kv = Keyword.fetch!(opts, :persistent_kv)

state =
%State{
connection_opts: connection_opts,
replication_opts: replication_opts,
pool_opts: pool_opts,
timeline_opts: timeline_opts,
persistent_kv: persistent_kv,
pg_lock_acquired: false,
backoff: {:backoff.init(1000, 10_000), nil}
}
Expand Down Expand Up @@ -200,7 +200,7 @@ defmodule Electric.ConnectionManager do
check_result =
Electric.Timeline.check(
{state.pg_system_identifier, state.pg_timeline_id},
state.timeline_opts
state.persistent_kv
)

{:ok, shapes_sup_pid} =
Expand Down
37 changes: 18 additions & 19 deletions packages/sync-service/lib/electric/timeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,44 @@ defmodule Electric.Timeline do
If the timelines differ, that indicates that a Point In Time Recovery (PITR) has occurred and all shapes must be cleaned.
If we fail to fetch timeline information, we also clean all shapes for safety as we can't be sure that Postgres and Electric are on the same timeline.
"""
@spec check(timeline(), keyword()) :: :ok | :timeline_changed
def check(pg_timeline, opts) do
electric_timeline = load_timeline(opts)
verify_timeline(pg_timeline, electric_timeline, opts)
@spec check(timeline(), map()) :: :ok | :timeline_changed
def check(pg_timeline, persistent_kv) do
electric_timeline = load_timeline(persistent_kv)
verify_timeline(pg_timeline, electric_timeline, persistent_kv)
end

@spec verify_timeline(timeline(), timeline(), keyword()) :: :ok
@spec verify_timeline(timeline(), timeline(), map()) :: :ok
defp verify_timeline({pg_id, timeline_id} = timeline, timeline, _) do
Logger.info("Connected to Postgres #{pg_id} and timeline #{timeline_id}")
:ok
end

defp verify_timeline({pg_id, timeline_id} = timeline, nil, opts) do
defp verify_timeline({pg_id, timeline_id} = timeline, nil, persistent_kv) do
Logger.info("No previous timeline detected.")
Logger.info("Connected to Postgres #{pg_id} and timeline #{timeline_id}")
store_timeline(timeline, opts)
store_timeline(timeline, persistent_kv)
end

defp verify_timeline({pg_id, _} = timeline, {electric_pg_id, _}, opts)
defp verify_timeline({pg_id, _} = timeline, {electric_pg_id, _}, persistent_kv)
when pg_id != electric_pg_id do
Logger.warning(
"Detected different Postgres DB, with ID: #{pg_id}. Old Postgres DB had ID #{electric_pg_id}. Will purge all shapes."
)

:ok = store_timeline(timeline, opts)
:ok = store_timeline(timeline, persistent_kv)
:timeline_changed
end

defp verify_timeline({_, timeline_id} = timeline, _, opts) do
defp verify_timeline({_, timeline_id} = timeline, _, persistent_kv) do
Logger.warning("Detected PITR to timeline #{timeline_id}; will purge all shapes.")
:ok = store_timeline(timeline, opts)
:ok = store_timeline(timeline, persistent_kv)
:timeline_changed
end

# Loads the PG ID and timeline ID from persistent storage
@spec load_timeline(keyword()) :: timeline()
def load_timeline(opts) do
kv = make_serialized_kv(opts)
@spec load_timeline(map()) :: timeline()
def load_timeline(persistent_kv) do
kv = make_serialized_kv(persistent_kv)

case PersistentKV.get(kv, @timeline_key) do
{:ok, [pg_id, timeline_id]} ->
Expand All @@ -72,14 +72,13 @@ defmodule Electric.Timeline do
end
end

def store_timeline({pg_id, timeline_id}, opts) do
kv = make_serialized_kv(opts)
def store_timeline({pg_id, timeline_id}, persistent_kv) do
kv = make_serialized_kv(persistent_kv)
:ok = PersistentKV.set(kv, @timeline_key, [pg_id, timeline_id])
end

defp make_serialized_kv(opts) do
kv_backend = Keyword.fetch!(opts, :persistent_kv)
defp make_serialized_kv(persistent_kv) do
# defaults to using Jason encoder and decoder
PersistentKV.Serialized.new!(backend: kv_backend)
PersistentKV.Serialized.new!(backend: persistent_kv)
end
end

0 comments on commit b2bc90a

Please sign in to comment.