From aba7f95c296ff052c7fdda5fe90cfe6dea24adc4 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Fri, 6 Sep 2024 09:34:42 +0200 Subject: [PATCH 01/12] Rebase --- .../sync-service/lib/electric/application.ex | 4 ++ .../lib/electric/connection_manager.ex | 15 +++++ .../sync-service/lib/electric/shape_cache.ex | 29 +++++++-- packages/sync-service/lib/electric/shapes.ex | 9 +++ .../sync-service/lib/electric/timeline.ex | 54 +++++++++++++++ .../lib/electric/timeline_cache.ex | 41 ++++++++++++ .../test/electric/timeline_cache_test.exs | 20 ++++++ .../test/electric/timeline_test.exs | 65 +++++++++++++++++++ packages/sync-service/test/test_helper.exs | 2 + 9 files changed, 234 insertions(+), 5 deletions(-) create mode 100644 packages/sync-service/lib/electric/timeline.ex create mode 100644 packages/sync-service/lib/electric/timeline_cache.ex create mode 100644 packages/sync-service/test/electric/timeline_cache_test.exs create mode 100644 packages/sync-service/test/electric/timeline_test.exs diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 0486d03c06..9aa16f3b35 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -64,6 +64,10 @@ defmodule Electric.Application do pool_size: Application.fetch_env!(:electric, :db_pool_size), types: PgInterop.Postgrex.Types ], + timeline_opts: [ + shape_cache: {Electric.ShapeCache, []}, + timeline_cache: Electric.TimelineCache + ], log_collector: {Electric.Replication.ShapeLogCollector, inspector: inspector}, shape_cache: shape_cache ] diff --git a/packages/sync-service/lib/electric/connection_manager.ex b/packages/sync-service/lib/electric/connection_manager.ex index b3453df02a..9b9ad14723 100644 --- a/packages/sync-service/lib/electric/connection_manager.ex +++ b/packages/sync-service/lib/electric/connection_manager.ex @@ -35,6 +35,8 @@ defmodule Electric.ConnectionManager do :replication_opts, # Database connection pool options. :pool_opts, + # Options specific to `Electric.Timeline`. + :timeline_opts, # Configuration for the log collector :log_collector, # Configuration for the shape cache that implements `Electric.ShapeCacheBehaviour` @@ -56,6 +58,7 @@ defmodule Electric.ConnectionManager do {:connection_opts, Keyword.t()} | {:replication_opts, Keyword.t()} | {:pool_opts, Keyword.t()} + | {:timeline_opts, Keyword.t()} | {:log_collector, {module(), Keyword.t()}} | {:shape_cache, {module(), Keyword.t()}} @@ -88,11 +91,14 @@ defmodule Electric.ConnectionManager do pool_opts = Keyword.fetch!(opts, :pool_opts) + timeline_opts = Keyword.fetch!(opts, :timeline_opts) + state = %State{ connection_opts: connection_opts, replication_opts: replication_opts, pool_opts: pool_opts, + timeline_opts: timeline_opts, log_collector: Keyword.fetch!(opts, :log_collector), shape_cache: Keyword.fetch!(opts, :shape_cache), backoff: {:backoff.init(1000, 10_000), nil} @@ -134,6 +140,8 @@ defmodule Electric.ConnectionManager do def handle_continue(:start_connection_pool, state) do case start_connection_pool(state.connection_opts, state.pool_opts) do {:ok, pid} -> + Electric.Timeline.check(get_pg_timeline(pid), state.timeline_opts) + # Now we have everything ready to start accepting and processing logical messages from # Postgres. Electric.Postgres.ReplicationClient.start_streaming(state.replication_client_pid) @@ -361,4 +369,11 @@ defmodule Electric.ConnectionManager do Keyword.put(connection_opts, :socket_options, tcp_opts) end + + defp get_pg_timeline(conn) do + case Postgrex.query(conn, "SELECT timeline_id FROM pg_control_checkpoint()", []) do + {:ok, %Postgrex.Result{rows: [[timeline_id]]}} -> timeline_id + {:error, _reason} -> nil + end + end end diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index 37d0106dee..eb22efaea4 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -21,6 +21,7 @@ defmodule Electric.ShapeCacheBehaviour do @callback await_snapshot_start(shape_id(), opts :: keyword()) :: :started | {:error, term()} @callback handle_truncate(shape_id(), keyword()) :: :ok @callback clean_shape(shape_id(), keyword()) :: :ok + @callback clean_all_shapes(GenServer.name()) :: :ok @callback has_shape?(shape_id(), keyword()) :: boolean() @callback cast(term(), keyword()) :: :ok end @@ -137,6 +138,13 @@ defmodule Electric.ShapeCache do GenStage.call(server, {:clean, shape_id}) end + @impl Electric.ShapeCacheBehaviour + @spec clean_all_shapes(keyword()) :: :ok + def clean_all_shapes(opts) do + server = Access.get(opts, :server, __MODULE__) + GenServer.call(server, {:clean_all}) + end + @impl Electric.ShapeCacheBehaviour @spec handle_truncate(shape_id(), keyword()) :: :ok def handle_truncate(shape_id, opts \\ []) do @@ -313,6 +321,12 @@ defmodule Electric.ShapeCache do {:reply, :ok, [], state} end + def handle_call({:clean_all}, _from, state) do + clean_up_all_shapes(state) + Logger.info("Cleaning up all shapes") + {:reply, :ok, state} + end + @impl GenStage def handle_cast({:snapshot_xmin_known, shape_id, xmin}, %{shape_status: shape_status} = state) do unless shape_status.set_snapshot_xmin(state.persistent_state, shape_id, xmin) do @@ -353,14 +367,19 @@ defmodule Electric.ShapeCache do state.shape_status.remove_shape(state.persistent_state, shape_id) end - defp is_known_shape_id?(state, shape_id) do - if state.shape_status.get_existing_shape(state.persistent_state, shape_id) do - true - else - false + defp clean_up_all_shapes(state) do + shape_ids = + state.persistent_state |> state.shape_status.list_shapes() |> Enum.map(&elem(&1, 0)) + + for shape_id <- shape_ids do + clean_up_shape(state, shape_id) end end + defp is_known_shape_id?(state, shape_id) do + !!state.shape_status.get_existing_shape(state.persistent_state, shape_id) + end + defp add_waiter(%{awaiting_snapshot_start: waiters} = state, shape_id, waiter), do: %{ state diff --git a/packages/sync-service/lib/electric/shapes.ex b/packages/sync-service/lib/electric/shapes.ex index 859ce9fb6b..0c65132693 100644 --- a/packages/sync-service/lib/electric/shapes.ex +++ b/packages/sync-service/lib/electric/shapes.ex @@ -88,7 +88,16 @@ defmodule Electric.Shapes do for shape_id <- shape_ids do shape_cache.clean_shape(shape_id, opts) end + end + @doc """ + Clean up all data (meta data and shape log + snapshot) associated with all shapes + """ + @spec clean_all_shapes(keyword()) :: :ok + def clean_all_shapes(opts \\ []) do + {shape_cache, opts} = Access.get(opts, :shape_cache, {ShapeCache, []}) + server = Access.get(opts, :server, shape_cache) + shape_cache.clean_all_shapes(server) :ok end diff --git a/packages/sync-service/lib/electric/timeline.ex b/packages/sync-service/lib/electric/timeline.ex new file mode 100644 index 0000000000..0b57397049 --- /dev/null +++ b/packages/sync-service/lib/electric/timeline.ex @@ -0,0 +1,54 @@ +defmodule Electric.Timeline do + @moduledoc """ + Module containing helper functions for handling Postgres timelines. + """ + require Logger + alias Electric.Shapes + alias Electric.TimelineCache + + @type timeline :: integer() | nil + + @doc """ + Checks the provided `pg_timeline` against Electric's timeline. + Normally, Postgres and Electric are on the same timeline and nothing must be done. + If the timelines differ, that indicates that a Point In Time Recovery (PITR) has occurred and all shapes must be cleaned. + If we fail to fetch timeline information, we also clean all shapes for safety as we can't be sure that Postgres and Electric are on the same timeline. + """ + @spec check(timeline(), keyword()) :: :ok + def check(pg_timeline, opts) do + cache = Keyword.fetch!(opts, :timeline_cache) + electric_timeline = TimelineCache.get_timeline(cache) + handle(pg_timeline, electric_timeline, opts) + end + + # Handles the different cases of timeline comparison + @spec handle(timeline(), timeline(), keyword()) :: :ok + defp handle(nil, _, opts) do + Logger.warning("Unknown Postgres timeline; rotating shapes.") + Shapes.clean_all_shapes(opts) + cache = Keyword.fetch!(opts, :timeline_cache) + TimelineCache.store_timeline(cache, nil) + end + + defp handle(pg_timeline_id, electric_timeline_id, _opts) + when pg_timeline_id == electric_timeline_id do + Logger.info("Connected to Postgres timeline #{pg_timeline_id}") + :ok + end + + defp handle(pg_timeline_id, nil, opts) do + Logger.info("No previous timeline detected.") + Logger.info("Connected to Postgres timeline #{pg_timeline_id}") + # Store new timeline + cache = Keyword.fetch!(opts, :timeline_cache) + TimelineCache.store_timeline(cache, pg_timeline_id) + end + + defp handle(pg_timeline_id, _electric_timeline_id, opts) do + Logger.info("Detected PITR to timeline #{pg_timeline_id}; rotating shapes.") + Electric.Shapes.clean_all_shapes(opts) + # Store new timeline only after all shapes have been cleaned + cache = Keyword.fetch!(opts, :timeline_cache) + TimelineCache.store_timeline(cache, pg_timeline_id) + end +end diff --git a/packages/sync-service/lib/electric/timeline_cache.ex b/packages/sync-service/lib/electric/timeline_cache.ex new file mode 100644 index 0000000000..1df195e2cf --- /dev/null +++ b/packages/sync-service/lib/electric/timeline_cache.ex @@ -0,0 +1,41 @@ +defmodule Electric.TimelineCache do + @moduledoc """ + In-memory cache for storing the Postgres timeline on which Electric is running. + """ + use GenServer + + def start_link(timeline_id \\ nil) when is_nil(timeline_id) or is_integer(timeline_id) do + GenServer.start_link(__MODULE__, timeline_id, name: __MODULE__) + end + + @doc """ + Store the timeline ID on which Electric is running. + """ + @spec store_timeline(GenServer.name(), integer()) :: :ok + def store_timeline(server \\ __MODULE__, timeline_id) do + GenServer.call(server, {:store, timeline_id}) + end + + @doc """ + Get the timeline ID on which Electric is running. + Returns nil if the timeline ID is not set. + """ + @spec get_timeline(GenServer.name()) :: integer() | nil + def get_timeline(server \\ __MODULE__) do + GenServer.call(server, :get) + end + + @impl true + def init(timeline_id) do + {:ok, %{id: timeline_id}} + end + + @impl true + def handle_call({:store, timeline_id}, _from, state) do + {:reply, :ok, %{state | id: timeline_id}} + end + + def handle_call(:get, _from, state) do + {:reply, Map.get(state, :id, nil), state} + end +end diff --git a/packages/sync-service/test/electric/timeline_cache_test.exs b/packages/sync-service/test/electric/timeline_cache_test.exs new file mode 100644 index 0000000000..1cb2dcf6ce --- /dev/null +++ b/packages/sync-service/test/electric/timeline_cache_test.exs @@ -0,0 +1,20 @@ +defmodule Electric.TimelineCacheTest do + use ExUnit.Case, async: false + alias Electric.TimelineCache + + describe "get_timeline/1" do + test "returns the timeline ID" do + timeline = 5 + {:ok, pid} = TimelineCache.start_link(timeline) + assert TimelineCache.get_timeline(pid) == timeline + end + end + + describe "store_timeline/2" do + test "stores the timeline ID" do + {:ok, pid} = TimelineCache.start_link(3) + assert TimelineCache.store_timeline(pid, 4) == :ok + assert TimelineCache.get_timeline(pid) == 4 + end + end +end diff --git a/packages/sync-service/test/electric/timeline_test.exs b/packages/sync-service/test/electric/timeline_test.exs new file mode 100644 index 0000000000..8413b65ee4 --- /dev/null +++ b/packages/sync-service/test/electric/timeline_test.exs @@ -0,0 +1,65 @@ +defmodule Electric.TimelineTest do + use ExUnit.Case, async: true + alias Electric.Timeline + alias Electric.TimelineCache + alias Electric.ShapeCacheMock + + import Mox + + describe "check/2" do + setup context do + timeline = context[:electric_timeline] + + pid = + case timeline do + nil -> + {:ok, pid} = TimelineCache.start_link() + pid + + _ -> + {:ok, pid} = TimelineCache.start_link(timeline) + pid + end + + opts = [timeline_cache: pid, shape_cache: {ShapeCacheMock, []}] + {:ok, [timeline: timeline, opts: opts]} + end + + @tag electric_timeline: nil + test "stores the Postgres timeline if Electric has no timeline yet", %{opts: opts} do + timeline = 5 + assert :ok = Timeline.check(timeline, opts) + assert ^timeline = TimelineCache.get_timeline(opts[:timeline_cache]) + end + + @tag electric_timeline: 3 + test "proceeds without changes if Postgres' timeline matches Electric's timeline", %{ + timeline: timeline, + opts: opts + } do + assert :ok = Timeline.check(timeline, opts) + assert ^timeline = TimelineCache.get_timeline(opts[:timeline_cache]) + end + + @tag electric_timeline: 3 + test "cleans all shapes if Postgres' timeline does not match Electric's timeline", %{ + opts: opts + } do + ShapeCacheMock + |> expect(:clean_all_shapes, fn _ -> :ok end) + + pg_timeline = 4 + assert :ok = Timeline.check(pg_timeline, opts) + assert ^pg_timeline = TimelineCache.get_timeline(opts[:timeline_cache]) + end + + @tag electric_timeline: 3 + test "cleans all shapes if Postgres' timeline is unknown", %{opts: opts} do + ShapeCacheMock + |> expect(:clean_all_shapes, fn _ -> :ok end) + + assert :ok = Timeline.check(nil, opts) + assert TimelineCache.get_timeline(opts[:timeline_cache]) == nil + end + end +end diff --git a/packages/sync-service/test/test_helper.exs b/packages/sync-service/test/test_helper.exs index c9e9cb1a25..5bd5a9f076 100644 --- a/packages/sync-service/test/test_helper.exs +++ b/packages/sync-service/test/test_helper.exs @@ -1 +1,3 @@ +Mox.defmock(Electric.ShapeCacheMock, for: Electric.ShapeCacheBehaviour) + ExUnit.start(assert_receive_timeout: 400) From 24a88a3b2f8a07ce4842aa88ae86bef7fc5d3173 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 3 Sep 2024 10:45:56 +0200 Subject: [PATCH 02/12] Make timeline cache persistent. --- .../lib/electric/timeline_cache.ex | 55 +++++++++++++++++-- .../test/electric/timeline_cache_test.exs | 51 +++++++++++++++-- .../test/electric/timeline_test.exs | 14 +---- 3 files changed, 99 insertions(+), 21 deletions(-) diff --git a/packages/sync-service/lib/electric/timeline_cache.ex b/packages/sync-service/lib/electric/timeline_cache.ex index 1df195e2cf..452f4794b3 100644 --- a/packages/sync-service/lib/electric/timeline_cache.ex +++ b/packages/sync-service/lib/electric/timeline_cache.ex @@ -1,11 +1,27 @@ defmodule Electric.TimelineCache do @moduledoc """ - In-memory cache for storing the Postgres timeline on which Electric is running. + Cache storing the Postgres timeline on which Electric is running. """ + require Logger use GenServer - def start_link(timeline_id \\ nil) when is_nil(timeline_id) or is_integer(timeline_id) do - GenServer.start_link(__MODULE__, timeline_id, name: __MODULE__) + alias Electric.PersistentKV + + @timeline_key "timeline_id" + + @schema NimbleOptions.new!( + name: [ + type: {:or, [:atom, {:tuple, [:atom, :atom, :any]}]}, + default: __MODULE__ + ], + timeline_id: [type: {:or, [:integer, nil]}, default: nil], + persistent_kv: [type: :any, required: true] + ) + + def start_link(opts) do + with {:ok, config} <- NimbleOptions.validate(opts, @schema) do + GenServer.start_link(__MODULE__, config, name: config[:name]) + end end @doc """ @@ -26,16 +42,43 @@ defmodule Electric.TimelineCache do end @impl true - def init(timeline_id) do - {:ok, %{id: timeline_id}} + def init(opts) do + with {:ok, tid} <- Access.fetch(opts, :timeline_id), + {:ok, kv} <- Access.fetch(opts, :persistent_kv) do + timeline_id = load_timeline_id(tid, kv) + {:ok, %{id: timeline_id, persistent_kv: kv}} + end end @impl true - def handle_call({:store, timeline_id}, _from, state) do + def handle_call({:store, timeline_id}, _from, %{persistent_kv: kv} = state) do + PersistentKV.set(kv, @timeline_key, timeline_id) {:reply, :ok, %{state | id: timeline_id}} end def handle_call(:get, _from, state) do {:reply, Map.get(state, :id, nil), state} end + + # Loads the timeline ID from persistent storage + # if the provided timeline_id is nil. + # If it is not nil, it stores the provided timeline ID in persistent storage. + defp load_timeline_id(nil, kv) do + case PersistentKV.get(kv, @timeline_key) do + {:ok, timeline_id} -> + timeline_id + + {:error, :not_found} -> + nil + + error -> + Logger.warning("Failed to load timeline ID from persistent storage: #{error}") + nil + end + end + + defp load_timeline_id(timeline_id, kv) do + PersistentKV.set(kv, @timeline_key, timeline_id) + timeline_id + end end diff --git a/packages/sync-service/test/electric/timeline_cache_test.exs b/packages/sync-service/test/electric/timeline_cache_test.exs index 1cb2dcf6ce..800a4c90b7 100644 --- a/packages/sync-service/test/electric/timeline_cache_test.exs +++ b/packages/sync-service/test/electric/timeline_cache_test.exs @@ -3,18 +3,61 @@ defmodule Electric.TimelineCacheTest do alias Electric.TimelineCache describe "get_timeline/1" do - test "returns the timeline ID" do + setup do + %{kv: Electric.PersistentKV.Memory.new!()} + end + + test "returns nil when no timeline ID is available", %{kv: kv} do + {:ok, pid} = TimelineCache.start_link(persistent_kv: kv) + assert TimelineCache.get_timeline(pid) == nil + end + + test "returns the provided timeline ID", %{kv: kv} do timeline = 5 - {:ok, pid} = TimelineCache.start_link(timeline) + {:ok, pid} = TimelineCache.start_link(timeline_id: timeline, persistent_kv: kv) + assert TimelineCache.get_timeline(pid) == timeline + end + end + + describe "start_link/1" do + setup do + %{kv: Electric.PersistentKV.Memory.new!()} + end + + test "persists provided timeline ID and loads timeline ID from storage", %{kv: kv} do + timeline = 9 + # Start a timeline cache which will store the provided timeline ID + {:ok, _pid} = TimelineCache.start_link(timeline_id: timeline, persistent_kv: kv) + + # Start another timeline cache without provided a timeline ID + # it should load the one from persistent storage set by the timeline cache above + {:ok, pid} = TimelineCache.start_link(persistent_kv: kv, name: :timeline_cache_2) assert TimelineCache.get_timeline(pid) == timeline end end describe "store_timeline/2" do - test "stores the timeline ID" do - {:ok, pid} = TimelineCache.start_link(3) + setup do + %{kv: Electric.PersistentKV.Memory.new!()} + end + + test "stores the timeline ID", %{kv: kv} do + {:ok, pid} = TimelineCache.start_link(timeline_id: 3, persistent_kv: kv) + assert TimelineCache.get_timeline(pid) == 3 assert TimelineCache.store_timeline(pid, 4) == :ok assert TimelineCache.get_timeline(pid) == 4 end + + test "persists the timeline ID", %{kv: kv} do + {:ok, cache1} = TimelineCache.start_link(timeline_id: 3, persistent_kv: kv) + assert TimelineCache.get_timeline(cache1) == 3 + assert TimelineCache.store_timeline(cache1, 4) == :ok + assert TimelineCache.get_timeline(cache1) == 4 + + # Check that a fresh timeline cache also loads the timeline ID + # that was persisted by the latest `store_timeline/2` call of the timeline cache above + {:ok, cache2} = TimelineCache.start_link(persistent_kv: kv, name: :timeline_cache_2) + assert TimelineCache.get_timeline(cache2) == 4 + end end end diff --git a/packages/sync-service/test/electric/timeline_test.exs b/packages/sync-service/test/electric/timeline_test.exs index 8413b65ee4..107184a9f4 100644 --- a/packages/sync-service/test/electric/timeline_test.exs +++ b/packages/sync-service/test/electric/timeline_test.exs @@ -9,17 +9,9 @@ defmodule Electric.TimelineTest do describe "check/2" do setup context do timeline = context[:electric_timeline] + kv = Electric.PersistentKV.Memory.new!() - pid = - case timeline do - nil -> - {:ok, pid} = TimelineCache.start_link() - pid - - _ -> - {:ok, pid} = TimelineCache.start_link(timeline) - pid - end + {:ok, pid} = TimelineCache.start_link(timeline_id: timeline, persistent_kv: kv) opts = [timeline_cache: pid, shape_cache: {ShapeCacheMock, []}] {:ok, [timeline: timeline, opts: opts]} @@ -48,7 +40,7 @@ defmodule Electric.TimelineTest do ShapeCacheMock |> expect(:clean_all_shapes, fn _ -> :ok end) - pg_timeline = 4 + pg_timeline = 2 assert :ok = Timeline.check(pg_timeline, opts) assert ^pg_timeline = TimelineCache.get_timeline(opts[:timeline_cache]) end From d1d04d8858bc79484237eeae2e32f5cbd54f2adf Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 3 Sep 2024 10:55:46 +0200 Subject: [PATCH 03/12] Changeset --- .changeset/calm-toys-play.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/calm-toys-play.md diff --git a/.changeset/calm-toys-play.md b/.changeset/calm-toys-play.md new file mode 100644 index 0000000000..dd5380abee --- /dev/null +++ b/.changeset/calm-toys-play.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Handle Postgres Point In Time Recoveries (PITR) by cleaning all shapes. From f03ab0f122e09fc8c3d8df9d3e89e1ea9478b95e Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 3 Sep 2024 14:21:30 +0200 Subject: [PATCH 04/12] Use Jason encoder and decoder for storing timeline ID --- packages/sync-service/lib/electric/timeline_cache.ex | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/lib/electric/timeline_cache.ex b/packages/sync-service/lib/electric/timeline_cache.ex index 452f4794b3..a9211549ce 100644 --- a/packages/sync-service/lib/electric/timeline_cache.ex +++ b/packages/sync-service/lib/electric/timeline_cache.ex @@ -44,9 +44,10 @@ defmodule Electric.TimelineCache do @impl true def init(opts) do with {:ok, tid} <- Access.fetch(opts, :timeline_id), - {:ok, kv} <- Access.fetch(opts, :persistent_kv) do - timeline_id = load_timeline_id(tid, kv) - {:ok, %{id: timeline_id, persistent_kv: kv}} + {:ok, kv_backend} <- Access.fetch(opts, :persistent_kv) do + persistent_kv = PersistentKV.Serialized.new!(backend: kv_backend) + timeline_id = load_timeline_id(tid, persistent_kv) + {:ok, %{id: timeline_id, persistent_kv: persistent_kv}} end end From bd5aff62589b32d8d28b5dee4a9b7d8edd89f918 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 3 Sep 2024 14:32:20 +0200 Subject: [PATCH 05/12] Modify load_timeline_id to only load the timeline ID and introduce a separate store_timeline_id function. --- .../sync-service/lib/electric/timeline_cache.ex | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/packages/sync-service/lib/electric/timeline_cache.ex b/packages/sync-service/lib/electric/timeline_cache.ex index a9211549ce..f180478ce1 100644 --- a/packages/sync-service/lib/electric/timeline_cache.ex +++ b/packages/sync-service/lib/electric/timeline_cache.ex @@ -46,14 +46,21 @@ defmodule Electric.TimelineCache do with {:ok, tid} <- Access.fetch(opts, :timeline_id), {:ok, kv_backend} <- Access.fetch(opts, :persistent_kv) do persistent_kv = PersistentKV.Serialized.new!(backend: kv_backend) - timeline_id = load_timeline_id(tid, persistent_kv) + + timeline_id = + if tid == nil do + load_timeline_id(persistent_kv) + else + store_timeline_id(persistent_kv, tid) + end + {:ok, %{id: timeline_id, persistent_kv: persistent_kv}} end end @impl true def handle_call({:store, timeline_id}, _from, %{persistent_kv: kv} = state) do - PersistentKV.set(kv, @timeline_key, timeline_id) + store_timeline_id(kv, timeline_id) {:reply, :ok, %{state | id: timeline_id}} end @@ -62,9 +69,7 @@ defmodule Electric.TimelineCache do end # Loads the timeline ID from persistent storage - # if the provided timeline_id is nil. - # If it is not nil, it stores the provided timeline ID in persistent storage. - defp load_timeline_id(nil, kv) do + defp load_timeline_id(kv) do case PersistentKV.get(kv, @timeline_key) do {:ok, timeline_id} -> timeline_id @@ -78,7 +83,7 @@ defmodule Electric.TimelineCache do end end - defp load_timeline_id(timeline_id, kv) do + defp store_timeline_id(kv, timeline_id) do PersistentKV.set(kv, @timeline_key, timeline_id) timeline_id end From 83f4403ec7e310485dc9cebdb49f40993ef3882d Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 3 Sep 2024 14:39:08 +0200 Subject: [PATCH 06/12] Remove duplicate ShapeCache mock --- packages/sync-service/test/electric/timeline_test.exs | 8 ++++---- packages/sync-service/test/test_helper.exs | 2 -- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/sync-service/test/electric/timeline_test.exs b/packages/sync-service/test/electric/timeline_test.exs index 107184a9f4..e923713a17 100644 --- a/packages/sync-service/test/electric/timeline_test.exs +++ b/packages/sync-service/test/electric/timeline_test.exs @@ -2,7 +2,7 @@ defmodule Electric.TimelineTest do use ExUnit.Case, async: true alias Electric.Timeline alias Electric.TimelineCache - alias Electric.ShapeCacheMock + alias Support.Mock.ShapeCache import Mox @@ -13,7 +13,7 @@ defmodule Electric.TimelineTest do {:ok, pid} = TimelineCache.start_link(timeline_id: timeline, persistent_kv: kv) - opts = [timeline_cache: pid, shape_cache: {ShapeCacheMock, []}] + opts = [timeline_cache: pid, shape_cache: {ShapeCache, []}] {:ok, [timeline: timeline, opts: opts]} end @@ -37,7 +37,7 @@ defmodule Electric.TimelineTest do test "cleans all shapes if Postgres' timeline does not match Electric's timeline", %{ opts: opts } do - ShapeCacheMock + ShapeCache |> expect(:clean_all_shapes, fn _ -> :ok end) pg_timeline = 2 @@ -47,7 +47,7 @@ defmodule Electric.TimelineTest do @tag electric_timeline: 3 test "cleans all shapes if Postgres' timeline is unknown", %{opts: opts} do - ShapeCacheMock + ShapeCache |> expect(:clean_all_shapes, fn _ -> :ok end) assert :ok = Timeline.check(nil, opts) diff --git a/packages/sync-service/test/test_helper.exs b/packages/sync-service/test/test_helper.exs index 5bd5a9f076..c9e9cb1a25 100644 --- a/packages/sync-service/test/test_helper.exs +++ b/packages/sync-service/test/test_helper.exs @@ -1,3 +1 @@ -Mox.defmock(Electric.ShapeCacheMock, for: Electric.ShapeCacheBehaviour) - ExUnit.start(assert_receive_timeout: 400) From 731d2b23e54b320590c39ea0aa92caf4c9997e35 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 3 Sep 2024 15:19:27 +0200 Subject: [PATCH 07/12] Remove timeline cache and add persistence into TimeLine module. --- .../sync-service/lib/electric/timeline.ex | 49 +++++++--- .../lib/electric/timeline_cache.ex | 90 ------------------- .../test/electric/timeline_cache_test.exs | 63 ------------- .../test/electric/timeline_test.exs | 24 +++-- 4 files changed, 53 insertions(+), 173 deletions(-) delete mode 100644 packages/sync-service/lib/electric/timeline_cache.ex delete mode 100644 packages/sync-service/test/electric/timeline_cache_test.exs diff --git a/packages/sync-service/lib/electric/timeline.ex b/packages/sync-service/lib/electric/timeline.ex index 0b57397049..d74043b84c 100644 --- a/packages/sync-service/lib/electric/timeline.ex +++ b/packages/sync-service/lib/electric/timeline.ex @@ -1,13 +1,16 @@ defmodule Electric.Timeline do @moduledoc """ - Module containing helper functions for handling Postgres timelines. + Genserver that tracks the Postgres timeline ID. + Module exporting functions for handling Postgres timelines. """ require Logger alias Electric.Shapes - alias Electric.TimelineCache + alias Electric.PersistentKV @type timeline :: integer() | nil + @timeline_key "timeline_id" + @doc """ Checks the provided `pg_timeline` against Electric's timeline. Normally, Postgres and Electric are on the same timeline and nothing must be done. @@ -16,8 +19,7 @@ defmodule Electric.Timeline do """ @spec check(timeline(), keyword()) :: :ok def check(pg_timeline, opts) do - cache = Keyword.fetch!(opts, :timeline_cache) - electric_timeline = TimelineCache.get_timeline(cache) + electric_timeline = load_timeline(opts) handle(pg_timeline, electric_timeline, opts) end @@ -26,8 +28,7 @@ defmodule Electric.Timeline do defp handle(nil, _, opts) do Logger.warning("Unknown Postgres timeline; rotating shapes.") Shapes.clean_all_shapes(opts) - cache = Keyword.fetch!(opts, :timeline_cache) - TimelineCache.store_timeline(cache, nil) + store_timeline(nil, opts) end defp handle(pg_timeline_id, electric_timeline_id, _opts) @@ -40,15 +41,41 @@ defmodule Electric.Timeline do Logger.info("No previous timeline detected.") Logger.info("Connected to Postgres timeline #{pg_timeline_id}") # Store new timeline - cache = Keyword.fetch!(opts, :timeline_cache) - TimelineCache.store_timeline(cache, pg_timeline_id) + store_timeline(pg_timeline_id, opts) end - defp handle(pg_timeline_id, _electric_timeline_id, opts) do + defp handle(pg_timeline_id, _, opts) do Logger.info("Detected PITR to timeline #{pg_timeline_id}; rotating shapes.") Electric.Shapes.clean_all_shapes(opts) # Store new timeline only after all shapes have been cleaned - cache = Keyword.fetch!(opts, :timeline_cache) - TimelineCache.store_timeline(cache, pg_timeline_id) + store_timeline(pg_timeline_id, opts) + end + + # Loads the timeline ID from persistent storage + @spec load_timeline(keyword()) :: timeline() + def load_timeline(opts) do + kv_backend = Keyword.fetch!(opts, :persistent_kv) + # default to Jason decoder + kv = PersistentKV.Serialized.new!(backend: kv_backend) + + case PersistentKV.get(kv, @timeline_key) do + {:ok, timeline_id} -> + timeline_id + + {:error, :not_found} -> + nil + + error -> + Logger.warning("Failed to load timeline ID from persistent storage: #{error}") + nil + end + end + + defp store_timeline(timeline_id, opts) do + kv_backend = Keyword.fetch!(opts, :persistent_kv) + # defaults to Jason encoder + kv = PersistentKV.Serialized.new!(backend: kv_backend) + PersistentKV.set(kv, @timeline_key, timeline_id) + :ok end end diff --git a/packages/sync-service/lib/electric/timeline_cache.ex b/packages/sync-service/lib/electric/timeline_cache.ex deleted file mode 100644 index f180478ce1..0000000000 --- a/packages/sync-service/lib/electric/timeline_cache.ex +++ /dev/null @@ -1,90 +0,0 @@ -defmodule Electric.TimelineCache do - @moduledoc """ - Cache storing the Postgres timeline on which Electric is running. - """ - require Logger - use GenServer - - alias Electric.PersistentKV - - @timeline_key "timeline_id" - - @schema NimbleOptions.new!( - name: [ - type: {:or, [:atom, {:tuple, [:atom, :atom, :any]}]}, - default: __MODULE__ - ], - timeline_id: [type: {:or, [:integer, nil]}, default: nil], - persistent_kv: [type: :any, required: true] - ) - - def start_link(opts) do - with {:ok, config} <- NimbleOptions.validate(opts, @schema) do - GenServer.start_link(__MODULE__, config, name: config[:name]) - end - end - - @doc """ - Store the timeline ID on which Electric is running. - """ - @spec store_timeline(GenServer.name(), integer()) :: :ok - def store_timeline(server \\ __MODULE__, timeline_id) do - GenServer.call(server, {:store, timeline_id}) - end - - @doc """ - Get the timeline ID on which Electric is running. - Returns nil if the timeline ID is not set. - """ - @spec get_timeline(GenServer.name()) :: integer() | nil - def get_timeline(server \\ __MODULE__) do - GenServer.call(server, :get) - end - - @impl true - def init(opts) do - with {:ok, tid} <- Access.fetch(opts, :timeline_id), - {:ok, kv_backend} <- Access.fetch(opts, :persistent_kv) do - persistent_kv = PersistentKV.Serialized.new!(backend: kv_backend) - - timeline_id = - if tid == nil do - load_timeline_id(persistent_kv) - else - store_timeline_id(persistent_kv, tid) - end - - {:ok, %{id: timeline_id, persistent_kv: persistent_kv}} - end - end - - @impl true - def handle_call({:store, timeline_id}, _from, %{persistent_kv: kv} = state) do - store_timeline_id(kv, timeline_id) - {:reply, :ok, %{state | id: timeline_id}} - end - - def handle_call(:get, _from, state) do - {:reply, Map.get(state, :id, nil), state} - end - - # Loads the timeline ID from persistent storage - defp load_timeline_id(kv) do - case PersistentKV.get(kv, @timeline_key) do - {:ok, timeline_id} -> - timeline_id - - {:error, :not_found} -> - nil - - error -> - Logger.warning("Failed to load timeline ID from persistent storage: #{error}") - nil - end - end - - defp store_timeline_id(kv, timeline_id) do - PersistentKV.set(kv, @timeline_key, timeline_id) - timeline_id - end -end diff --git a/packages/sync-service/test/electric/timeline_cache_test.exs b/packages/sync-service/test/electric/timeline_cache_test.exs deleted file mode 100644 index 800a4c90b7..0000000000 --- a/packages/sync-service/test/electric/timeline_cache_test.exs +++ /dev/null @@ -1,63 +0,0 @@ -defmodule Electric.TimelineCacheTest do - use ExUnit.Case, async: false - alias Electric.TimelineCache - - describe "get_timeline/1" do - setup do - %{kv: Electric.PersistentKV.Memory.new!()} - end - - test "returns nil when no timeline ID is available", %{kv: kv} do - {:ok, pid} = TimelineCache.start_link(persistent_kv: kv) - assert TimelineCache.get_timeline(pid) == nil - end - - test "returns the provided timeline ID", %{kv: kv} do - timeline = 5 - {:ok, pid} = TimelineCache.start_link(timeline_id: timeline, persistent_kv: kv) - assert TimelineCache.get_timeline(pid) == timeline - end - end - - describe "start_link/1" do - setup do - %{kv: Electric.PersistentKV.Memory.new!()} - end - - test "persists provided timeline ID and loads timeline ID from storage", %{kv: kv} do - timeline = 9 - # Start a timeline cache which will store the provided timeline ID - {:ok, _pid} = TimelineCache.start_link(timeline_id: timeline, persistent_kv: kv) - - # Start another timeline cache without provided a timeline ID - # it should load the one from persistent storage set by the timeline cache above - {:ok, pid} = TimelineCache.start_link(persistent_kv: kv, name: :timeline_cache_2) - assert TimelineCache.get_timeline(pid) == timeline - end - end - - describe "store_timeline/2" do - setup do - %{kv: Electric.PersistentKV.Memory.new!()} - end - - test "stores the timeline ID", %{kv: kv} do - {:ok, pid} = TimelineCache.start_link(timeline_id: 3, persistent_kv: kv) - assert TimelineCache.get_timeline(pid) == 3 - assert TimelineCache.store_timeline(pid, 4) == :ok - assert TimelineCache.get_timeline(pid) == 4 - end - - test "persists the timeline ID", %{kv: kv} do - {:ok, cache1} = TimelineCache.start_link(timeline_id: 3, persistent_kv: kv) - assert TimelineCache.get_timeline(cache1) == 3 - assert TimelineCache.store_timeline(cache1, 4) == :ok - assert TimelineCache.get_timeline(cache1) == 4 - - # Check that a fresh timeline cache also loads the timeline ID - # that was persisted by the latest `store_timeline/2` call of the timeline cache above - {:ok, cache2} = TimelineCache.start_link(persistent_kv: kv, name: :timeline_cache_2) - assert TimelineCache.get_timeline(cache2) == 4 - end - end -end diff --git a/packages/sync-service/test/electric/timeline_test.exs b/packages/sync-service/test/electric/timeline_test.exs index e923713a17..1bfc3ff9ff 100644 --- a/packages/sync-service/test/electric/timeline_test.exs +++ b/packages/sync-service/test/electric/timeline_test.exs @@ -1,19 +1,25 @@ defmodule Electric.TimelineTest do use ExUnit.Case, async: true alias Electric.Timeline - alias Electric.TimelineCache alias Support.Mock.ShapeCache import Mox + describe "load_timeline/1" do + setup do + %{kv: Electric.PersistentKV.Memory.new!()} + end + + test "returns nil when no timeline ID is available", %{kv: kv} do + assert Timeline.load_timeline(persistent_kv: kv) == nil + end + end + describe "check/2" do setup context do timeline = context[:electric_timeline] kv = Electric.PersistentKV.Memory.new!() - - {:ok, pid} = TimelineCache.start_link(timeline_id: timeline, persistent_kv: kv) - - opts = [timeline_cache: pid, shape_cache: {ShapeCache, []}] + opts = [persistent_kv: kv, shape_cache: {ShapeCache, []}] {:ok, [timeline: timeline, opts: opts]} end @@ -21,7 +27,7 @@ defmodule Electric.TimelineTest do test "stores the Postgres timeline if Electric has no timeline yet", %{opts: opts} do timeline = 5 assert :ok = Timeline.check(timeline, opts) - assert ^timeline = TimelineCache.get_timeline(opts[:timeline_cache]) + assert ^timeline = Timeline.load_timeline(opts) end @tag electric_timeline: 3 @@ -30,7 +36,7 @@ defmodule Electric.TimelineTest do opts: opts } do assert :ok = Timeline.check(timeline, opts) - assert ^timeline = TimelineCache.get_timeline(opts[:timeline_cache]) + assert ^timeline = Timeline.load_timeline(opts) end @tag electric_timeline: 3 @@ -42,7 +48,7 @@ defmodule Electric.TimelineTest do pg_timeline = 2 assert :ok = Timeline.check(pg_timeline, opts) - assert ^pg_timeline = TimelineCache.get_timeline(opts[:timeline_cache]) + assert ^pg_timeline = Timeline.load_timeline(opts) end @tag electric_timeline: 3 @@ -51,7 +57,7 @@ defmodule Electric.TimelineTest do |> expect(:clean_all_shapes, fn _ -> :ok end) assert :ok = Timeline.check(nil, opts) - assert TimelineCache.get_timeline(opts[:timeline_cache]) == nil + assert Timeline.load_timeline(opts) == nil end end end From dc38968a36024c95209b687f9a587e922f464920 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 4 Sep 2024 14:22:26 +0200 Subject: [PATCH 08/12] Address Garry's review, part 1 --- packages/sync-service/lib/electric/shapes.ex | 3 +- .../sync-service/lib/electric/timeline.ex | 30 +++++++++---------- .../test/electric/timeline_test.exs | 10 +++++-- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes.ex b/packages/sync-service/lib/electric/shapes.ex index 0c65132693..a76fb43f71 100644 --- a/packages/sync-service/lib/electric/shapes.ex +++ b/packages/sync-service/lib/electric/shapes.ex @@ -96,8 +96,7 @@ defmodule Electric.Shapes do @spec clean_all_shapes(keyword()) :: :ok def clean_all_shapes(opts \\ []) do {shape_cache, opts} = Access.get(opts, :shape_cache, {ShapeCache, []}) - server = Access.get(opts, :server, shape_cache) - shape_cache.clean_all_shapes(server) + shape_cache.clean_all_shapes(opts) :ok end diff --git a/packages/sync-service/lib/electric/timeline.ex b/packages/sync-service/lib/electric/timeline.ex index d74043b84c..a5960c812e 100644 --- a/packages/sync-service/lib/electric/timeline.ex +++ b/packages/sync-service/lib/electric/timeline.ex @@ -20,31 +20,30 @@ defmodule Electric.Timeline do @spec check(timeline(), keyword()) :: :ok def check(pg_timeline, opts) do electric_timeline = load_timeline(opts) - handle(pg_timeline, electric_timeline, opts) + verify_timeline(pg_timeline, electric_timeline, opts) end # Handles the different cases of timeline comparison - @spec handle(timeline(), timeline(), keyword()) :: :ok - defp handle(nil, _, opts) do + @spec verify_timeline(timeline(), timeline(), keyword()) :: :ok + defp verify_timeline(nil, _, opts) do Logger.warning("Unknown Postgres timeline; rotating shapes.") Shapes.clean_all_shapes(opts) store_timeline(nil, opts) end - defp handle(pg_timeline_id, electric_timeline_id, _opts) - when pg_timeline_id == electric_timeline_id do - Logger.info("Connected to Postgres timeline #{pg_timeline_id}") + defp verify_timeline(timeline_id, timeline_id, _opts) do + Logger.info("Connected to Postgres timeline #{timeline_id}") :ok end - defp handle(pg_timeline_id, nil, opts) do + defp verify_timeline(pg_timeline_id, nil, opts) do Logger.info("No previous timeline detected.") Logger.info("Connected to Postgres timeline #{pg_timeline_id}") # Store new timeline store_timeline(pg_timeline_id, opts) end - defp handle(pg_timeline_id, _, opts) do + defp verify_timeline(pg_timeline_id, _, opts) do Logger.info("Detected PITR to timeline #{pg_timeline_id}; rotating shapes.") Electric.Shapes.clean_all_shapes(opts) # Store new timeline only after all shapes have been cleaned @@ -54,9 +53,7 @@ defmodule Electric.Timeline do # Loads the timeline ID from persistent storage @spec load_timeline(keyword()) :: timeline() def load_timeline(opts) do - kv_backend = Keyword.fetch!(opts, :persistent_kv) - # default to Jason decoder - kv = PersistentKV.Serialized.new!(backend: kv_backend) + kv = make_serialized_kv(opts) case PersistentKV.get(kv, @timeline_key) do {:ok, timeline_id} -> @@ -72,10 +69,13 @@ defmodule Electric.Timeline do end defp store_timeline(timeline_id, opts) do + kv = make_serialized_kv(opts) + :ok = PersistentKV.set(kv, @timeline_key, timeline_id) + end + + defp make_serialized_kv(opts) do kv_backend = Keyword.fetch!(opts, :persistent_kv) - # defaults to Jason encoder - kv = PersistentKV.Serialized.new!(backend: kv_backend) - PersistentKV.set(kv, @timeline_key, timeline_id) - :ok + # defaults to using Jason encoder and decoder + PersistentKV.Serialized.new!(backend: kv_backend) end end diff --git a/packages/sync-service/test/electric/timeline_test.exs b/packages/sync-service/test/electric/timeline_test.exs index 1bfc3ff9ff..3dfcd4dd4e 100644 --- a/packages/sync-service/test/electric/timeline_test.exs +++ b/packages/sync-service/test/electric/timeline_test.exs @@ -6,8 +6,10 @@ defmodule Electric.TimelineTest do import Mox describe "load_timeline/1" do - setup do - %{kv: Electric.PersistentKV.Memory.new!()} + @moduletag :tmp_dir + + setup context do + %{kv: Electric.PersistentKV.Filesystem.new!(root: context.tmp_dir)} end test "returns nil when no timeline ID is available", %{kv: kv} do @@ -16,9 +18,11 @@ defmodule Electric.TimelineTest do end describe "check/2" do + @moduletag :tmp_dir + setup context do timeline = context[:electric_timeline] - kv = Electric.PersistentKV.Memory.new!() + kv = Electric.PersistentKV.Filesystem.new!(root: context.tmp_dir) opts = [persistent_kv: kv, shape_cache: {ShapeCache, []}] {:ok, [timeline: timeline, opts: opts]} end From 7b41ff5ed1c45ca0eba0aabf88d56d2274ea659f Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Fri, 6 Sep 2024 09:24:19 +0200 Subject: [PATCH 09/12] Move clean_all_shapes to be a private function of Electric.Timeline module --- packages/sync-service/lib/electric/shapes.ex | 10 ---------- packages/sync-service/lib/electric/timeline.ex | 13 ++++++++++--- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes.ex b/packages/sync-service/lib/electric/shapes.ex index a76fb43f71..b7c41535b5 100644 --- a/packages/sync-service/lib/electric/shapes.ex +++ b/packages/sync-service/lib/electric/shapes.ex @@ -90,16 +90,6 @@ defmodule Electric.Shapes do end end - @doc """ - Clean up all data (meta data and shape log + snapshot) associated with all shapes - """ - @spec clean_all_shapes(keyword()) :: :ok - def clean_all_shapes(opts \\ []) do - {shape_cache, opts} = Access.get(opts, :shape_cache, {ShapeCache, []}) - shape_cache.clean_all_shapes(opts) - :ok - end - defp shape_storage(config, shape_id) do Storage.for_shape(shape_id, Access.fetch!(config, :storage)) end diff --git a/packages/sync-service/lib/electric/timeline.ex b/packages/sync-service/lib/electric/timeline.ex index a5960c812e..8d0230aa33 100644 --- a/packages/sync-service/lib/electric/timeline.ex +++ b/packages/sync-service/lib/electric/timeline.ex @@ -4,7 +4,6 @@ defmodule Electric.Timeline do Module exporting functions for handling Postgres timelines. """ require Logger - alias Electric.Shapes alias Electric.PersistentKV @type timeline :: integer() | nil @@ -27,7 +26,7 @@ defmodule Electric.Timeline do @spec verify_timeline(timeline(), timeline(), keyword()) :: :ok defp verify_timeline(nil, _, opts) do Logger.warning("Unknown Postgres timeline; rotating shapes.") - Shapes.clean_all_shapes(opts) + clean_all_shapes(opts) store_timeline(nil, opts) end @@ -45,7 +44,7 @@ defmodule Electric.Timeline do defp verify_timeline(pg_timeline_id, _, opts) do Logger.info("Detected PITR to timeline #{pg_timeline_id}; rotating shapes.") - Electric.Shapes.clean_all_shapes(opts) + clean_all_shapes(opts) # Store new timeline only after all shapes have been cleaned store_timeline(pg_timeline_id, opts) end @@ -78,4 +77,12 @@ defmodule Electric.Timeline do # defaults to using Jason encoder and decoder PersistentKV.Serialized.new!(backend: kv_backend) end + + # Clean up all data (meta data and shape log + snapshot) associated with all shapes + @spec clean_all_shapes(keyword()) :: :ok + defp clean_all_shapes(opts) do + {shape_cache, opts} = Access.get(opts, :shape_cache, {ShapeCache, []}) + shape_cache.clean_all_shapes(opts) + :ok + end end From 54cf445fb4602d8f7b844d4016d63b4760485b06 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Fri, 6 Sep 2024 09:46:02 +0200 Subject: [PATCH 10/12] Fix error introduced by rebase --- packages/sync-service/lib/electric/application.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 9aa16f3b35..3d3f854edb 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -66,7 +66,7 @@ defmodule Electric.Application do ], timeline_opts: [ shape_cache: {Electric.ShapeCache, []}, - timeline_cache: Electric.TimelineCache + persistent_kv: persistent_kv ], log_collector: {Electric.Replication.ShapeLogCollector, inspector: inspector}, shape_cache: shape_cache From 5d54393d1b170d611af568456c0cd413a766c039 Mon Sep 17 00:00:00 2001 From: Kevin Date: Mon, 9 Sep 2024 11:24:26 +0200 Subject: [PATCH 11/12] Switch order of logging Co-authored-by: Garry Hill --- packages/sync-service/lib/electric/shape_cache.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index eb22efaea4..d78705284b 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -322,8 +322,8 @@ defmodule Electric.ShapeCache do end def handle_call({:clean_all}, _from, state) do - clean_up_all_shapes(state) Logger.info("Cleaning up all shapes") + clean_up_all_shapes(state) {:reply, :ok, state} end From 99adcfda5e437904843c9cdef1f99a108dc3d0b9 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 9 Sep 2024 13:54:31 +0200 Subject: [PATCH 12/12] Let clean_shapes return :ok atom --- packages/sync-service/lib/electric/shapes.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/sync-service/lib/electric/shapes.ex b/packages/sync-service/lib/electric/shapes.ex index b7c41535b5..859ce9fb6b 100644 --- a/packages/sync-service/lib/electric/shapes.ex +++ b/packages/sync-service/lib/electric/shapes.ex @@ -88,6 +88,8 @@ defmodule Electric.Shapes do for shape_id <- shape_ids do shape_cache.clean_shape(shape_id, opts) end + + :ok end defp shape_storage(config, shape_id) do