diff --git a/.changeset/calm-toys-play.md b/.changeset/calm-toys-play.md new file mode 100644 index 0000000000..dd5380abee --- /dev/null +++ b/.changeset/calm-toys-play.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Handle Postgres Point In Time Recoveries (PITR) by cleaning all shapes. diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 0486d03c06..3d3f854edb 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -64,6 +64,10 @@ defmodule Electric.Application do pool_size: Application.fetch_env!(:electric, :db_pool_size), types: PgInterop.Postgrex.Types ], + timeline_opts: [ + shape_cache: {Electric.ShapeCache, []}, + persistent_kv: persistent_kv + ], log_collector: {Electric.Replication.ShapeLogCollector, inspector: inspector}, shape_cache: shape_cache ] diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index b3453df02a..9b9ad14723 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -35,6 +35,8 @@ defmodule Electric.ConnectionManager do :replication_opts, # Database connection pool options. :pool_opts, + # Options specific to `Electric.Timeline`. + :timeline_opts, # Configuration for the log collector :log_collector, # Configuration for the shape cache that implements `Electric.ShapeCacheBehaviour` @@ -56,6 +58,7 @@ defmodule Electric.ConnectionManager do {:connection_opts, Keyword.t()} | {:replication_opts, Keyword.t()} | {:pool_opts, Keyword.t()} + | {:timeline_opts, Keyword.t()} | {:log_collector, {module(), Keyword.t()}} | {:shape_cache, {module(), Keyword.t()}} @@ -88,11 +91,14 @@ defmodule Electric.ConnectionManager do pool_opts = Keyword.fetch!(opts, :pool_opts) + timeline_opts = Keyword.fetch!(opts, :timeline_opts) + state = %State{ connection_opts: connection_opts, replication_opts: replication_opts, pool_opts: pool_opts, + timeline_opts: timeline_opts, log_collector: Keyword.fetch!(opts, :log_collector), shape_cache: Keyword.fetch!(opts, :shape_cache), backoff: {:backoff.init(1000, 10_000), nil} @@ -134,6 +140,8 @@ defmodule Electric.ConnectionManager do def handle_continue(:start_connection_pool, state) do case start_connection_pool(state.connection_opts, state.pool_opts) do {:ok, pid} -> + Electric.Timeline.check(get_pg_timeline(pid), state.timeline_opts) + # Now we have everything ready to start accepting and processing logical messages from # Postgres. Electric.Postgres.ReplicationClient.start_streaming(state.replication_client_pid) @@ -361,4 +369,11 @@ defmodule Electric.ConnectionManager do Keyword.put(connection_opts, :socket_options, tcp_opts) end + + defp get_pg_timeline(conn) do + case Postgrex.query(conn, "SELECT timeline_id FROM pg_control_checkpoint()", []) do + {:ok, %Postgrex.Result{rows: [[timeline_id]]}} -> timeline_id + {:error, _reason} -> nil + end + end end diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index 37d0106dee..d78705284b 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -21,6 +21,7 @@ defmodule Electric.ShapeCacheBehaviour do @callback await_snapshot_start(shape_id(), opts :: keyword()) :: :started | {:error, term()} @callback handle_truncate(shape_id(), keyword()) :: :ok @callback clean_shape(shape_id(), keyword()) :: :ok + @callback clean_all_shapes(GenServer.name()) :: :ok @callback has_shape?(shape_id(), keyword()) :: boolean() @callback cast(term(), keyword()) :: :ok end @@ -137,6 +138,13 @@ defmodule Electric.ShapeCache do GenStage.call(server, {:clean, shape_id}) end + @impl Electric.ShapeCacheBehaviour + @spec clean_all_shapes(keyword()) :: :ok + def clean_all_shapes(opts) do + server = Access.get(opts, :server, __MODULE__) + GenServer.call(server, {:clean_all}) + end + @impl Electric.ShapeCacheBehaviour @spec handle_truncate(shape_id(), keyword()) :: :ok def handle_truncate(shape_id, opts \\ []) do @@ -313,6 +321,12 @@ defmodule Electric.ShapeCache do {:reply, :ok, [], state} end + def handle_call({:clean_all}, _from, state) do + Logger.info("Cleaning up all shapes") + clean_up_all_shapes(state) + {:reply, :ok, state} + end + @impl GenStage def handle_cast({:snapshot_xmin_known, shape_id, xmin}, %{shape_status: shape_status} = state) do unless shape_status.set_snapshot_xmin(state.persistent_state, shape_id, xmin) do @@ -353,14 +367,19 @@ defmodule Electric.ShapeCache do state.shape_status.remove_shape(state.persistent_state, shape_id) end - defp is_known_shape_id?(state, shape_id) do - if state.shape_status.get_existing_shape(state.persistent_state, shape_id) do - true - else - false + defp clean_up_all_shapes(state) do + shape_ids = + state.persistent_state |> state.shape_status.list_shapes() |> Enum.map(&elem(&1, 0)) + + for shape_id <- shape_ids do + clean_up_shape(state, shape_id) end end + defp is_known_shape_id?(state, shape_id) do + !!state.shape_status.get_existing_shape(state.persistent_state, shape_id) + end + defp add_waiter(%{awaiting_snapshot_start: waiters} = state, shape_id, waiter), do: %{ state diff --git a/packages/sync-service/lib/electric/timeline.ex b/packages/sync-service/lib/electric/timeline.ex new file mode 100644 index 0000000000..8d0230aa33 --- /dev/null +++ b/packages/sync-service/lib/electric/timeline.ex @@ -0,0 +1,88 @@ +defmodule Electric.Timeline do + @moduledoc """ + Genserver that tracks the Postgres timeline ID. + Module exporting functions for handling Postgres timelines. + """ + require Logger + alias Electric.PersistentKV + + @type timeline :: integer() | nil + + @timeline_key "timeline_id" + + @doc """ + Checks the provided `pg_timeline` against Electric's timeline. + Normally, Postgres and Electric are on the same timeline and nothing must be done. + If the timelines differ, that indicates that a Point In Time Recovery (PITR) has occurred and all shapes must be cleaned. + If we fail to fetch timeline information, we also clean all shapes for safety as we can't be sure that Postgres and Electric are on the same timeline. + """ + @spec check(timeline(), keyword()) :: :ok + def check(pg_timeline, opts) do + electric_timeline = load_timeline(opts) + verify_timeline(pg_timeline, electric_timeline, opts) + end + + # Handles the different cases of timeline comparison + @spec verify_timeline(timeline(), timeline(), keyword()) :: :ok + defp verify_timeline(nil, _, opts) do + Logger.warning("Unknown Postgres timeline; rotating shapes.") + clean_all_shapes(opts) + store_timeline(nil, opts) + end + + defp verify_timeline(timeline_id, timeline_id, _opts) do + Logger.info("Connected to Postgres timeline #{timeline_id}") + :ok + end + + defp verify_timeline(pg_timeline_id, nil, opts) do + Logger.info("No previous timeline detected.") + Logger.info("Connected to Postgres timeline #{pg_timeline_id}") + # Store new timeline + store_timeline(pg_timeline_id, opts) + end + + defp verify_timeline(pg_timeline_id, _, opts) do + Logger.info("Detected PITR to timeline #{pg_timeline_id}; rotating shapes.") + clean_all_shapes(opts) + # Store new timeline only after all shapes have been cleaned + store_timeline(pg_timeline_id, opts) + end + + # Loads the timeline ID from persistent storage + @spec load_timeline(keyword()) :: timeline() + def load_timeline(opts) do + kv = make_serialized_kv(opts) + + case PersistentKV.get(kv, @timeline_key) do + {:ok, timeline_id} -> + timeline_id + + {:error, :not_found} -> + nil + + error -> + Logger.warning("Failed to load timeline ID from persistent storage: #{error}") + nil + end + end + + defp store_timeline(timeline_id, opts) do + kv = make_serialized_kv(opts) + :ok = PersistentKV.set(kv, @timeline_key, timeline_id) + end + + defp make_serialized_kv(opts) do + kv_backend = Keyword.fetch!(opts, :persistent_kv) + # defaults to using Jason encoder and decoder + PersistentKV.Serialized.new!(backend: kv_backend) + end + + # Clean up all data (meta data and shape log + snapshot) associated with all shapes + @spec clean_all_shapes(keyword()) :: :ok + defp clean_all_shapes(opts) do + {shape_cache, opts} = Access.get(opts, :shape_cache, {ShapeCache, []}) + shape_cache.clean_all_shapes(opts) + :ok + end +end diff --git a/packages/sync-service/test/electric/timeline_test.exs b/packages/sync-service/test/electric/timeline_test.exs new file mode 100644 index 0000000000..3dfcd4dd4e --- /dev/null +++ b/packages/sync-service/test/electric/timeline_test.exs @@ -0,0 +1,67 @@ +defmodule Electric.TimelineTest do + use ExUnit.Case, async: true + alias Electric.Timeline + alias Support.Mock.ShapeCache + + import Mox + + describe "load_timeline/1" do + @moduletag :tmp_dir + + setup context do + %{kv: Electric.PersistentKV.Filesystem.new!(root: context.tmp_dir)} + end + + test "returns nil when no timeline ID is available", %{kv: kv} do + assert Timeline.load_timeline(persistent_kv: kv) == nil + end + end + + describe "check/2" do + @moduletag :tmp_dir + + setup context do + timeline = context[:electric_timeline] + kv = Electric.PersistentKV.Filesystem.new!(root: context.tmp_dir) + opts = [persistent_kv: kv, shape_cache: {ShapeCache, []}] + {:ok, [timeline: timeline, opts: opts]} + end + + @tag electric_timeline: nil + test "stores the Postgres timeline if Electric has no timeline yet", %{opts: opts} do + timeline = 5 + assert :ok = Timeline.check(timeline, opts) + assert ^timeline = Timeline.load_timeline(opts) + end + + @tag electric_timeline: 3 + test "proceeds without changes if Postgres' timeline matches Electric's timeline", %{ + timeline: timeline, + opts: opts + } do + assert :ok = Timeline.check(timeline, opts) + assert ^timeline = Timeline.load_timeline(opts) + end + + @tag electric_timeline: 3 + test "cleans all shapes if Postgres' timeline does not match Electric's timeline", %{ + opts: opts + } do + ShapeCache + |> expect(:clean_all_shapes, fn _ -> :ok end) + + pg_timeline = 2 + assert :ok = Timeline.check(pg_timeline, opts) + assert ^pg_timeline = Timeline.load_timeline(opts) + end + + @tag electric_timeline: 3 + test "cleans all shapes if Postgres' timeline is unknown", %{opts: opts} do + ShapeCache + |> expect(:clean_all_shapes, fn _ -> :ok end) + + assert :ok = Timeline.check(nil, opts) + assert Timeline.load_timeline(opts) == nil + end + end +end