Skip to content

Commit

Permalink
Make Timeline's internal logic clearer through fewer indirections in …
Browse files Browse the repository at this point in the history
…the code
  • Loading branch information
alco committed Oct 14, 2024
1 parent b2bc90a commit fe25522
Showing 1 changed file with 18 additions and 11 deletions.
29 changes: 18 additions & 11 deletions packages/sync-service/lib/electric/timeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ defmodule Electric.Timeline do
@type timeline_id :: integer()
@type timeline :: {pg_id(), timeline_id()} | nil

@type check_result :: :ok | :timeline_changed

@timeline_key "timeline_id"

@doc """
Expand All @@ -20,37 +22,42 @@ defmodule Electric.Timeline do
If the timelines differ, that indicates that a Point In Time Recovery (PITR) has occurred and all shapes must be cleaned.
If we fail to fetch timeline information, we also clean all shapes for safety as we can't be sure that Postgres and Electric are on the same timeline.
"""
@spec check(timeline(), map()) :: :ok | :timeline_changed
@spec check(timeline(), map()) :: check_result()
def check(pg_timeline, persistent_kv) do
electric_timeline = load_timeline(persistent_kv)
verify_timeline(pg_timeline, electric_timeline, persistent_kv)

# In any situation where the newly fetched timeline is different from the one we had
# stored previously, overwrite the old one with the new one in our persistent KV store.
if pg_timeline != electric_timeline do
:ok = store_timeline(pg_timeline, persistent_kv)
end

# Now check for specific differences between the two timelines.
verify_timeline(pg_timeline, electric_timeline)
end

@spec verify_timeline(timeline(), timeline(), map()) :: :ok
defp verify_timeline({pg_id, timeline_id} = timeline, timeline, _) do
@spec verify_timeline(timeline(), timeline()) :: check_result()
defp verify_timeline({pg_id, timeline_id} = timeline, timeline) do
Logger.info("Connected to Postgres #{pg_id} and timeline #{timeline_id}")
:ok
end

defp verify_timeline({pg_id, timeline_id} = timeline, nil, persistent_kv) do
defp verify_timeline({pg_id, timeline_id}, nil) do
Logger.info("No previous timeline detected.")
Logger.info("Connected to Postgres #{pg_id} and timeline #{timeline_id}")
store_timeline(timeline, persistent_kv)
:ok
end

defp verify_timeline({pg_id, _} = timeline, {electric_pg_id, _}, persistent_kv)
when pg_id != electric_pg_id do
defp verify_timeline({pg_id, _}, {electric_pg_id, _}) when pg_id != electric_pg_id do
Logger.warning(
"Detected different Postgres DB, with ID: #{pg_id}. Old Postgres DB had ID #{electric_pg_id}. Will purge all shapes."
)

:ok = store_timeline(timeline, persistent_kv)
:timeline_changed
end

defp verify_timeline({_, timeline_id} = timeline, _, persistent_kv) do
defp verify_timeline({_, timeline_id}, _) do
Logger.warning("Detected PITR to timeline #{timeline_id}; will purge all shapes.")
:ok = store_timeline(timeline, persistent_kv)
:timeline_changed
end

Expand Down

0 comments on commit fe25522

Please sign in to comment.