Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: clean shapes on PITR #1541

Merged
merged 12 commits into from
Sep 9, 2024
5 changes: 5 additions & 0 deletions .changeset/calm-toys-play.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Handle Postgres Point In Time Recoveries (PITR) by cleaning all shapes.
4 changes: 4 additions & 0 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ defmodule Electric.Application do
pool_size: Application.fetch_env!(:electric, :db_pool_size),
types: PgInterop.Postgrex.Types
],
timeline_opts: [
shape_cache: {Electric.ShapeCache, []},
persistent_kv: persistent_kv
],
log_collector: {Electric.Replication.ShapeLogCollector, inspector: inspector},
shape_cache: shape_cache
]
Expand Down
15 changes: 15 additions & 0 deletions packages/sync-service/lib/electric/connection_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ defmodule Electric.ConnectionManager do
:replication_opts,
# Database connection pool options.
:pool_opts,
# Options specific to `Electric.Timeline`.
:timeline_opts,
# Configuration for the log collector
:log_collector,
# Configuration for the shape cache that implements `Electric.ShapeCacheBehaviour`
Expand All @@ -56,6 +58,7 @@ defmodule Electric.ConnectionManager do
{:connection_opts, Keyword.t()}
| {:replication_opts, Keyword.t()}
| {:pool_opts, Keyword.t()}
| {:timeline_opts, Keyword.t()}
| {:log_collector, {module(), Keyword.t()}}
| {:shape_cache, {module(), Keyword.t()}}

Expand Down Expand Up @@ -88,11 +91,14 @@ defmodule Electric.ConnectionManager do

pool_opts = Keyword.fetch!(opts, :pool_opts)

timeline_opts = Keyword.fetch!(opts, :timeline_opts)

state =
%State{
connection_opts: connection_opts,
replication_opts: replication_opts,
pool_opts: pool_opts,
timeline_opts: timeline_opts,
log_collector: Keyword.fetch!(opts, :log_collector),
shape_cache: Keyword.fetch!(opts, :shape_cache),
backoff: {:backoff.init(1000, 10_000), nil}
Expand Down Expand Up @@ -134,6 +140,8 @@ defmodule Electric.ConnectionManager do
def handle_continue(:start_connection_pool, state) do
case start_connection_pool(state.connection_opts, state.pool_opts) do
{:ok, pid} ->
Electric.Timeline.check(get_pg_timeline(pid), state.timeline_opts)

# Now we have everything ready to start accepting and processing logical messages from
# Postgres.
Electric.Postgres.ReplicationClient.start_streaming(state.replication_client_pid)
Expand Down Expand Up @@ -361,4 +369,11 @@ defmodule Electric.ConnectionManager do

Keyword.put(connection_opts, :socket_options, tcp_opts)
end

defp get_pg_timeline(conn) do
case Postgrex.query(conn, "SELECT timeline_id FROM pg_control_checkpoint()", []) do
{:ok, %Postgrex.Result{rows: [[timeline_id]]}} -> timeline_id
{:error, _reason} -> nil
end
end
end
29 changes: 24 additions & 5 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ defmodule Electric.ShapeCacheBehaviour do
@callback await_snapshot_start(shape_id(), opts :: keyword()) :: :started | {:error, term()}
@callback handle_truncate(shape_id(), keyword()) :: :ok
@callback clean_shape(shape_id(), keyword()) :: :ok
@callback clean_all_shapes(GenServer.name()) :: :ok
@callback has_shape?(shape_id(), keyword()) :: boolean()
@callback cast(term(), keyword()) :: :ok
end
Expand Down Expand Up @@ -137,6 +138,13 @@ defmodule Electric.ShapeCache do
GenStage.call(server, {:clean, shape_id})
end

@impl Electric.ShapeCacheBehaviour
@spec clean_all_shapes(keyword()) :: :ok
def clean_all_shapes(opts) do
server = Access.get(opts, :server, __MODULE__)
GenServer.call(server, {:clean_all})
end

@impl Electric.ShapeCacheBehaviour
@spec handle_truncate(shape_id(), keyword()) :: :ok
def handle_truncate(shape_id, opts \\ []) do
Expand Down Expand Up @@ -313,6 +321,12 @@ defmodule Electric.ShapeCache do
{:reply, :ok, [], state}
end

def handle_call({:clean_all}, _from, state) do
clean_up_all_shapes(state)
Logger.info("Cleaning up all shapes")
kevin-dp marked this conversation as resolved.
Show resolved Hide resolved
{:reply, :ok, state}
end

@impl GenStage
def handle_cast({:snapshot_xmin_known, shape_id, xmin}, %{shape_status: shape_status} = state) do
unless shape_status.set_snapshot_xmin(state.persistent_state, shape_id, xmin) do
Expand Down Expand Up @@ -353,14 +367,19 @@ defmodule Electric.ShapeCache do
state.shape_status.remove_shape(state.persistent_state, shape_id)
end

defp is_known_shape_id?(state, shape_id) do
if state.shape_status.get_existing_shape(state.persistent_state, shape_id) do
true
else
false
defp clean_up_all_shapes(state) do
shape_ids =
state.persistent_state |> state.shape_status.list_shapes() |> Enum.map(&elem(&1, 0))

for shape_id <- shape_ids do
clean_up_shape(state, shape_id)
end
end

defp is_known_shape_id?(state, shape_id) do
!!state.shape_status.get_existing_shape(state.persistent_state, shape_id)
end

defp add_waiter(%{awaiting_snapshot_start: waiters} = state, shape_id, waiter),
do: %{
state
Expand Down
2 changes: 0 additions & 2 deletions packages/sync-service/lib/electric/shapes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ defmodule Electric.Shapes do
for shape_id <- shape_ids do
shape_cache.clean_shape(shape_id, opts)
end

kevin-dp marked this conversation as resolved.
Show resolved Hide resolved
:ok
end

defp shape_storage(config, shape_id) do
Expand Down
88 changes: 88 additions & 0 deletions packages/sync-service/lib/electric/timeline.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
defmodule Electric.Timeline do
kevin-dp marked this conversation as resolved.
Show resolved Hide resolved
@moduledoc """
Genserver that tracks the Postgres timeline ID.
Module exporting functions for handling Postgres timelines.
"""
require Logger
alias Electric.PersistentKV

@type timeline :: integer() | nil

@timeline_key "timeline_id"

@doc """
Checks the provided `pg_timeline` against Electric's timeline.
Normally, Postgres and Electric are on the same timeline and nothing must be done.
If the timelines differ, that indicates that a Point In Time Recovery (PITR) has occurred and all shapes must be cleaned.
If we fail to fetch timeline information, we also clean all shapes for safety as we can't be sure that Postgres and Electric are on the same timeline.
"""
@spec check(timeline(), keyword()) :: :ok
def check(pg_timeline, opts) do
electric_timeline = load_timeline(opts)
verify_timeline(pg_timeline, electric_timeline, opts)
end

# Handles the different cases of timeline comparison
@spec verify_timeline(timeline(), timeline(), keyword()) :: :ok
defp verify_timeline(nil, _, opts) do
Logger.warning("Unknown Postgres timeline; rotating shapes.")
clean_all_shapes(opts)
store_timeline(nil, opts)
end

defp verify_timeline(timeline_id, timeline_id, _opts) do
Logger.info("Connected to Postgres timeline #{timeline_id}")
:ok
end

defp verify_timeline(pg_timeline_id, nil, opts) do
Logger.info("No previous timeline detected.")
Logger.info("Connected to Postgres timeline #{pg_timeline_id}")
# Store new timeline
store_timeline(pg_timeline_id, opts)
end

defp verify_timeline(pg_timeline_id, _, opts) do
Logger.info("Detected PITR to timeline #{pg_timeline_id}; rotating shapes.")
clean_all_shapes(opts)
# Store new timeline only after all shapes have been cleaned
store_timeline(pg_timeline_id, opts)
end

# Loads the timeline ID from persistent storage
@spec load_timeline(keyword()) :: timeline()
def load_timeline(opts) do
kv = make_serialized_kv(opts)

case PersistentKV.get(kv, @timeline_key) do
{:ok, timeline_id} ->
timeline_id

{:error, :not_found} ->
nil

error ->
Logger.warning("Failed to load timeline ID from persistent storage: #{error}")
nil
end
end

defp store_timeline(timeline_id, opts) do
kv = make_serialized_kv(opts)
:ok = PersistentKV.set(kv, @timeline_key, timeline_id)
end

defp make_serialized_kv(opts) do
kv_backend = Keyword.fetch!(opts, :persistent_kv)
kevin-dp marked this conversation as resolved.
Show resolved Hide resolved
# defaults to using Jason encoder and decoder
PersistentKV.Serialized.new!(backend: kv_backend)
end

# Clean up all data (meta data and shape log + snapshot) associated with all shapes
@spec clean_all_shapes(keyword()) :: :ok
defp clean_all_shapes(opts) do
{shape_cache, opts} = Access.get(opts, :shape_cache, {ShapeCache, []})
shape_cache.clean_all_shapes(opts)
:ok
end
end
67 changes: 67 additions & 0 deletions packages/sync-service/test/electric/timeline_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule Electric.TimelineTest do
use ExUnit.Case, async: true
alias Electric.Timeline
alias Support.Mock.ShapeCache

import Mox

describe "load_timeline/1" do
@moduletag :tmp_dir

setup context do
%{kv: Electric.PersistentKV.Filesystem.new!(root: context.tmp_dir)}
end

test "returns nil when no timeline ID is available", %{kv: kv} do
assert Timeline.load_timeline(persistent_kv: kv) == nil
end
end

describe "check/2" do
@moduletag :tmp_dir

setup context do
timeline = context[:electric_timeline]
kv = Electric.PersistentKV.Filesystem.new!(root: context.tmp_dir)
opts = [persistent_kv: kv, shape_cache: {ShapeCache, []}]
{:ok, [timeline: timeline, opts: opts]}
end

@tag electric_timeline: nil
test "stores the Postgres timeline if Electric has no timeline yet", %{opts: opts} do
timeline = 5
assert :ok = Timeline.check(timeline, opts)
assert ^timeline = Timeline.load_timeline(opts)
end

@tag electric_timeline: 3
test "proceeds without changes if Postgres' timeline matches Electric's timeline", %{
timeline: timeline,
opts: opts
} do
assert :ok = Timeline.check(timeline, opts)
assert ^timeline = Timeline.load_timeline(opts)
end

@tag electric_timeline: 3
test "cleans all shapes if Postgres' timeline does not match Electric's timeline", %{
opts: opts
} do
ShapeCache
|> expect(:clean_all_shapes, fn _ -> :ok end)

pg_timeline = 2
assert :ok = Timeline.check(pg_timeline, opts)
assert ^pg_timeline = Timeline.load_timeline(opts)
end

@tag electric_timeline: 3
test "cleans all shapes if Postgres' timeline is unknown", %{opts: opts} do
ShapeCache
|> expect(:clean_all_shapes, fn _ -> :ok end)

assert :ok = Timeline.check(nil, opts)
assert Timeline.load_timeline(opts) == nil
end
end
end
Loading