Skip to content

Commit

Permalink
Add partitioned tables support
Browse files Browse the repository at this point in the history
  • Loading branch information
magnetised committed Dec 19, 2024
1 parent b9d31ea commit 5e6a8f1
Show file tree
Hide file tree
Showing 20 changed files with 977 additions and 167 deletions.
3 changes: 1 addition & 2 deletions packages/sync-service/lib/electric/plug/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ defmodule Electric.Plug.Utils do
stack_ready_timeout = Access.get(conn.assigns.config, :stack_ready_timeout, 5_000)
stack_events_registry = conn.assigns.config[:stack_events_registry]

ref = make_ref()
Electric.StackSupervisor.subscribe_to_stack_events(stack_events_registry, stack_id, ref)
ref = Electric.StackSupervisor.subscribe_to_stack_events(stack_events_registry, stack_id)

if Electric.ProcessRegistry.alive?(stack_id, Electric.Replication.Supervisor) do
conn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ defmodule Electric.Postgres.Configuration do
FROM input_relations ir
JOIN pg_class pc ON pc.relname = ir.tablename
JOIN pg_namespace pn ON pn.oid = pc.relnamespace
WHERE pn.nspname = ir.schemaname AND pc.relkind = 'r';
WHERE pn.nspname = ir.schemaname AND pc.relkind IN ('r', 'p');
"""

relations = Map.keys(filters)
Expand Down
22 changes: 17 additions & 5 deletions packages/sync-service/lib/electric/postgres/inspector.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
defmodule Electric.Postgres.Inspector do
alias Electric.Replication.Eval.Parser

@type relation :: Electric.relation()
@type relation_id :: Electric.relation_id()
@type relation_kind :: :ordinary_table | :partitioned_table

@type column_info :: %{
name: String.t(),
Expand All @@ -17,15 +19,19 @@ defmodule Electric.Postgres.Inspector do

@type relation_info :: %{
relation_id: relation_id(),
relation: relation()
relation: relation(),
kind: relation_kind(),
parent: nil | relation(),
children: nil | [relation(), ...]
}

@callback load_relation(String.t(), opts :: term()) ::
@callback load_relation(String.t() | relation(), opts :: term()) ::
{:ok, relation_info()} | {:error, String.t()}

@callback load_column_info(relation(), opts :: term()) ::
{:ok, [column_info()]} | :table_not_found

# @callback introspect_relation()
@callback clean(relation(), opts :: term()) :: true

@type inspector :: {module(), opts :: term()}
Expand All @@ -40,9 +46,14 @@ defmodule Electric.Postgres.Inspector do
`"Users"` would return `{"public", "Users"}`,
`some_schema.users` would return `{"some_schema", "users"}`.
"""
@spec load_relation(String.t(), inspector()) :: {:ok, relation_info()} | {:error, String.t()}
def load_relation(table, {module, opts}),
do: module.load_relation(table, opts)
def load_relation(%{schema: schema, table: table}, inspector),
do: load_relation({schema, table}, inspector)

@spec load_relation(String.t() | relation(), inspector()) ::
{:ok, relation_info()} | {:error, String.t()}
def load_relation(table, {module, opts}) do
module.load_relation(table, opts)
end

@doc """
Load column information about a given table using a provided inspector.
Expand All @@ -55,6 +66,7 @@ defmodule Electric.Postgres.Inspector do
@doc """
Clean up all information about a given relation using a provided inspector.
"""

@spec clean(relation(), inspector()) :: true
def clean(relation, {module, opts}), do: module.clean(relation, opts)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,79 @@ defmodule Electric.Postgres.Inspector.DirectInspector do
@doc """
Returns the PG relation from the table name.
"""
def load_relation(table, conn) do
def load_relation(table, conn) when is_binary(table) do
# The extra cast from $1 to text is needed because of Postgrex' OID type encoding
# see: https://github.com/elixir-ecto/postgrex#oid-type-encoding
query = """
SELECT nspname, relname, pg_class.oid
FROM pg_class
JOIN pg_namespace ON relnamespace = pg_namespace.oid
WHERE
relkind = 'r' AND
pg_class.oid = $1::text::regclass
"""
query = load_relation_query("$1::text::regclass")
do_load_relation(conn, query, [table])
end

def load_relation({schema, name}, conn) when is_binary(schema) and is_binary(name) do
query = load_relation_query("format('%I.%I', $1::text, $2::text)::regclass")
do_load_relation(conn, query, [schema, name])
end

case Postgrex.query(conn, query, [table]) do
defp do_load_relation(conn, query, params) do
case Postgrex.query(conn, query, params) do
{:ok, result} ->
# We expect exactly one row because the query didn't fail
# so the relation exists since we could cast it to a regclass
[[schema, table, oid]] = result.rows
{:ok, %{relation_id: oid, relation: {schema, table}}}
[[schema, table, oid, kind, parent, children]] = result.rows

{:ok,
%{
relation_id: oid,
relation: {schema, table},
kind: resolve_kind(kind),
parent: map_relations(parent),
children: map_relations(children)
}}

{:error, err} ->
{:error, Exception.message(err)}
end
end

defp load_relation_query(match) do
# partitions can live in other namespaces from the parent/root table, so we
# need to keep track of them
[
"""
SELECT pn.nspname, pc.relname, pc.oid, pc.relkind, pi_parent.parent, pi_children.children
FROM pg_catalog.pg_class pc
JOIN pg_catalog.pg_namespace pn ON pc.relnamespace = pn.oid
LEFT OUTER JOIN ( -- get schema and name of parent table (if any)
SELECT pi.inhrelid, ARRAY[pn.nspname, pc.relname] parent
FROM pg_catalog.pg_inherits pi
JOIN pg_catalog.pg_class pc ON pi.inhparent = pc.oid
JOIN pg_catalog.pg_namespace pn ON pc.relnamespace = pn.oid
) pi_parent ON pc.oid = pi_parent.inhrelid
LEFT OUTER JOIN ( -- get list of child partitions (if any)
SELECT pi.inhparent, ARRAY_AGG(ARRAY[pn.nspname, pc.relname]) AS children
FROM pg_catalog.pg_inherits pi
JOIN pg_catalog.pg_class pc ON pi.inhrelid = pc.oid
JOIN pg_catalog.pg_namespace pn ON pc.relnamespace = pn.oid
GROUP BY pi.inhparent
) pi_children ON pc.oid = pi_children.inhparent
WHERE
pc.relkind IN ('r', 'p') AND
""",
"pc.oid = ",
match
]
end

defp resolve_kind("r"), do: :ordinary_table
defp resolve_kind("p"), do: :partitioned_table

defp map_relations(nil), do: nil

defp map_relations([schema, name]) when is_binary(schema) and is_binary(name),
do: {schema, name}

defp map_relations(relations) when is_list(relations),
do: Enum.map(relations, &map_relations/1)

@doc """
Load table information (refs) from the database
"""
Expand All @@ -49,7 +98,7 @@ defmodule Electric.Postgres.Inspector.DirectInspector do
JOIN pg_type ON atttypid = pg_type.oid
LEFT JOIN pg_index ON indrelid = pg_class.oid AND indisprimary
LEFT JOIN pg_type AS elem_pg_type ON pg_type.typelem = elem_pg_type.oid
WHERE relname = $1 AND nspname = $2 AND relkind = 'r'
WHERE relname = $1 AND nspname = $2 AND relkind IN ('r', 'p')
ORDER BY pg_class.oid, attnum
"""

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Electric.Postgres.Inspector.EtsInspector do
alias Electric.Postgres.Inspector.DirectInspector
use GenServer

alias Electric.Postgres.Inspector.DirectInspector

@behaviour Electric.Postgres.Inspector

## Public API
Expand Down Expand Up @@ -117,7 +119,7 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
{:error, err} ->
{:reply, {:error, err}, state}

{:ok, relation} ->
{:ok, %{relation: relation} = info} ->
# We keep the mapping in both directions:
# - Forward: user-provided table name -> PG relation (many-to-one)
# e.g. `~s|users|` -> `{"public", "users"}`
Expand All @@ -127,9 +129,11 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
#
# The forward direction allows for efficient lookup (based on user-provided table name)
# the backward direction allows for efficient cleanup (based on PG relation)
:ets.insert(state.pg_info_table, {{table, :table_to_relation}, relation})
:ets.insert(state.pg_relation_table, {{relation, :relation_to_table}, table})
{:reply, {:ok, relation}, state}
:ets.insert(state.pg_info_table, {{table, :table_to_relation}, info})
:ets.insert(state.pg_info_table, {{relation, :table_to_relation}, info})
:ets.insert(state.pg_relation_table, {{info, :relation_to_table}, table})
:ets.insert(state.pg_relation_table, {{info, :relation_to_table}, relation})
{:reply, {:ok, info}, state}
end

relation ->
Expand Down Expand Up @@ -159,12 +163,26 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
end

@pg_rel_position 2
defp relation_from_ets(table, opts_or_state) do
defp relation_from_ets(table, opts_or_state) when is_binary(table) do
ets_table = get_column_info_table(opts_or_state)

:ets.lookup_element(ets_table, {table, :table_to_relation}, @pg_rel_position, :not_found)
end

defp relation_from_ets({_schema, _name} = relation, opts_or_state) do
ets_table = get_column_info_table(opts_or_state)

with info when is_map(info) <-
:ets.lookup_element(
ets_table,
{relation, :table_to_relation},
@pg_rel_position,
:not_found
) do
info
end
end

@pg_table_idx 1
defp tables_from_ets(relation, opts_or_state) do
ets_table = get_relation_table(opts_or_state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ defmodule Electric.Replication.ShapeLogCollector do
state = Map.merge(opts, %{producer: nil, subscriptions: {0, MapSet.new()}})
# start in demand: :accumulate mode so that the ShapeCache is able to start
# all active consumers before we start sending transactions
{:producer, state, dispatcher: Electric.Shapes.Dispatcher, demand: opts.demand}
{:producer, state,
dispatcher: {Electric.Shapes.Dispatcher, inspector: state.inspector}, demand: opts.demand}
end

def handle_subscribe(:consumer, _opts, from, state) do
Expand Down Expand Up @@ -148,14 +149,43 @@ defmodule Electric.Replication.ShapeLogCollector do

OpenTelemetry.add_span_attributes("rel.is_dropped": true)

reload_partitioned_table(rel, state)

{:reply, :ok, [], state}
end

defp handle_relation(rel, from, state) do
OpenTelemetry.add_span_attributes("rel.is_dropped": false)
reload_partitioned_table(rel, state)
{:noreply, [rel], %{state | producer: from}}
end

defp reload_partitioned_table(rel, state) do
case Inspector.load_relation(rel, state.inspector) do
{:ok, %{parent: nil}} ->
:ok

{:ok, %{parent: {_, _} = parent}} ->
# probably a new partition for an existing partitioned table
# so force a reload of the relation info

# TODO: we should probabaly have a way to clean the inspector cache
# just based on the relation, there's a chance that this results in
# a query to pg just to then drop the info
with {:ok, info} <- Inspector.load_relation(parent, state.inspector) do
Inspector.clean(info, state.inspector)
end

{:ok, _} ->
# probably a malformed value from a test inspector
:ok

{:error, _} ->
# just ignore errors here, they're unlikely anyway
:ok
end
end

defp remove_subscription(from, %{subscriptions: {count, set}} = state) do
subscriptions =
if MapSet.member?(set, from) do
Expand Down
57 changes: 42 additions & 15 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ defmodule Electric.Shapes.Consumer do
restart: :temporary,
significant: true

alias Electric.ShapeCache.LogChunker
alias Electric.LogItems
alias Electric.Postgres.Inspector
alias Electric.Replication.Changes
alias Electric.Replication.Changes.Transaction
alias Electric.ShapeCache
alias Electric.ShapeCache.LogChunker
alias Electric.Shapes.Shape
alias Electric.Telemetry.OpenTelemetry
alias Electric.Utils
Expand Down Expand Up @@ -182,25 +183,51 @@ defmodule Electric.Shapes.Consumer do

# `Shapes.Dispatcher` only works with single-events, so we can safely assert
# that here
def handle_events([%Changes.Relation{}], _from, state) do
%{shape: %{root_table: root_table}, inspector: {inspector, inspector_opts}} = state
def handle_events([%Changes.Relation{} = relation], _from, state) do
%{shape: %{root_table: root_table} = shape, inspector: inspector} = state

# we now recelve relation messages from partitions, as well as ones
# affecting our root table so we need to be clear what we're getting -- if
# the relation message refers to our root table then we need to drop the
# shape as something has changed. if the relation is a new partition, so
# it's parent is our root table, then we need to just add that partition to
# our shape so txns from the new partition are properly mapped to our root
# table.
if relation.id == shape.root_table_id do
Logger.info(
"Schema for the table #{Utils.inspect_relation(root_table)} changed - terminating shape #{state.shape_handle}"
)

Logger.info(
"Schema for the table #{Utils.inspect_relation(root_table)} changed - terminating shape #{state.shape_handle}"
)
# We clean up the relation info from ETS as it has changed and we want
# to source the fresh info from postgres for the next shape creation
Inspector.clean(root_table, inspector)

# We clean up the relation info from ETS as it has changed and we want
# to source the fresh info from postgres for the next shape creation
inspector.clean(root_table, inspector_opts)
state =
reply_to_snapshot_waiters(
{:error, "Shape relation changed before snapshot was ready"},
state
)

state =
reply_to_snapshot_waiters(
{:error, "Shape relation changed before snapshot was ready"},
state
cleanup(state)

{:stop, :normal, state}
else
# if we're receiving this relation message but the relation doesn't refer
# to the root table for the shape, then it **must** be because of the addition of a partition
# to the root table

{:ok, %{parent: ^root_table, relation: table}} =
Inspector.load_relation({relation.schema, relation.table}, inspector)

# a new partition has been added
Logger.info(
"New partition #{Utils.inspect_relation(table)} for table #{Utils.inspect_relation(root_table)}"
)

cleanup(state)
{:stop, :normal, state}
shape = Shape.add_partition(shape, root_table, table)

{:noreply, [], %{state | shape: shape}}
end
end

# Buffer incoming transactions until we know our xmin
Expand Down
Loading

0 comments on commit 5e6a8f1

Please sign in to comment.