Skip to content

Commit

Permalink
fix(electric): Fix column name quoting in one of the trigger functions (
Browse files Browse the repository at this point in the history
#474)

Fixes VAX-1062.

I had to refactor the way the affected SQL function is defined in order
for it to work both when running Electric for the first time and when
upgrading from an earlier version. I recommend reviewing this PR one
commit at a time to follow the progression of function code repackaging.
  • Loading branch information
alco authored Sep 26, 2023
1 parent 5f2e1ea commit f045ec8
Show file tree
Hide file tree
Showing 10 changed files with 375 additions and 228 deletions.
5 changes: 5 additions & 0 deletions .changeset/pink-snails-push.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric": patch
---

[VAX-1062] Fix column name quoting in a trigger function.
94 changes: 46 additions & 48 deletions components/electric/lib/electric/postgres/extension.ex
Original file line number Diff line number Diff line change
Expand Up @@ -275,60 +275,58 @@ defmodule Electric.Postgres.Extension do
create_schema(txconn)
create_migration_table(txconn)

with_migration_lock(txconn, fn ->
# NOTE(alco): This is currently called BEFORE running any internal migrations because we're only defining the
# type-checking function that the later defined `electrify()` function depends on.
#
# Once we move all function definitions out of migrations, we should call this AFTER all internal migrations
# have been applied.
define_functions(txconn)
newly_applied_versions =
with_migration_lock(txconn, fn ->
existing_versions = txconn |> existing_migration_versions() |> MapSet.new()

existing_migrations = existing_migrations(txconn)

versions =
migrations
|> Enum.reject(fn {version, _module} -> version in existing_migrations end)
|> Enum.reduce([], fn {version, module}, v ->
Logger.info("Running extension migration: #{version}")

disabling_event_triggers(txconn, module, fn ->
for sql <- module.up(@schema) do
case :epgsql.squery(txconn, sql) do
results when is_list(results) ->
errors = Enum.filter(results, &(elem(&1, 0) == :error))

unless(Enum.empty?(errors)) do
raise RuntimeError,
message:
"Migration #{version}/#{module} returned errors: #{inspect(errors)}"
end

:ok

{:ok, _} ->
:ok

{:ok, _cols, _rows} ->
:ok
end
end
end)

{:ok, _count} =
:epgsql.squery(
txconn,
"INSERT INTO #{@migration_table} (version) VALUES ('#{version}')"
)

[version | v]
|> Enum.reject(fn {version, _module} -> version in existing_versions end)
|> Enum.map(fn {version, module} ->
:ok = apply_migration(txconn, version, module)
version
end)
|> Enum.reverse()
end)

{:ok, versions}
end)
:ok = define_functions(txconn)

{:ok, newly_applied_versions}
end)
end

defp apply_migration(txconn, version, module) do
Logger.info("Running extension migration: #{version}")

disabling_event_triggers(txconn, module, fn ->
for sql <- module.up(@schema) do
case :epgsql.squery(txconn, sql) do
results when is_list(results) ->
errors = Enum.filter(results, &(elem(&1, 0) == :error))

unless(Enum.empty?(errors)) do
raise RuntimeError,
message: "Migration #{version}/#{module} returned errors: #{inspect(errors)}"
end

:ok

{:ok, _} ->
:ok

{:ok, _cols, _rows} ->
:ok
end
end
end)

{:ok, 1} =
:epgsql.squery(
txconn,
"INSERT INTO #{@migration_table} (version) VALUES ('#{version}')"
)

:ok
end

# https://dba.stackexchange.com/a/311714
@is_transaction_sql "SELECT transaction_timestamp() != statement_timestamp() AS is_transaction"

Expand Down Expand Up @@ -391,7 +389,7 @@ defmodule Electric.Postgres.Extension do
{:ok, [], []} = :epgsql.squery(conn, query)
end

defp existing_migrations(conn) do
defp existing_migration_versions(conn) do
{:ok, _cols, rows} =
:epgsql.squery(conn, "SELECT version FROM #{@migration_table} ORDER BY version ASC")

Expand Down
58 changes: 50 additions & 8 deletions components/electric/lib/electric/postgres/extension/functions.ex
Original file line number Diff line number Diff line change
@@ -1,26 +1,68 @@
defmodule Electric.Postgres.Extension.Functions do
@moduledoc """
This module organizes SQL functions that are to be defined in Electric's internal database schema.
"""

alias Electric.Postgres.Extension
require EEx

sql_files =
"functions/*.sql.eex"
|> Path.expand(__DIR__)
|> Path.wildcard()

for path <- sql_files, do: @external_resource(path)
function_names =
for path <- sql_files do
@external_resource path

name = path |> Path.basename(".sql.eex") |> String.to_atom()
_ = EEx.function_from_file(:def, name, path, [:assigns])

name
end

fn_name_type =
Enum.reduce(function_names, fn name, code ->
quote do
unquote(name) | unquote(code)
end
end)

@typep name :: unquote(fn_name_type)
@typep sql :: String.t()
@type function_list :: [{name, sql}]

@function_defs Map.new(sql_files, fn path ->
{Path.basename(path, ".sql.eex"), {Path.basename(path), File.read!(path)}}
end)
@function_names function_names

@doc """
Get a list of SQL statements that create various internal SQL functions in the `electric` schema.
Get a list of `{name, SQL}` pairs where the the SQL code contains the definition of a function (or multiple functions).
Every function in the list is defined as `CREATE OR REPLACE FUNCTION`.
"""
# NOTE(alco): Eventually, we're hoping to move all function definitions out of migrations and define them all
# here. See VAX-1016 for details.
@spec list :: function_list
def list do
for {name, args} <- [{"validate_table_column_types", []}] do
{filename, sql} = @function_defs[name]
{name, EEx.eval_string(sql, args, file: filename)}
for name <- @function_names do
{name, by_name(name)}
end
end

@doc """
Look up the SQL code for a function by its canonical name (basename without extension).
We catalog all function definitions as files inside the `functions/` subdirectory. A single file usually contains a
single function definition but may have more than one if they are all meant to be evaluated as a unit.
"""
@spec by_name(name) :: sql
def by_name(name) when name in @function_names do
apply(__MODULE__, name, [assigns()])
end

# This map of assigns is the same for all function templates.
defp assigns do
%{
schema: Extension.schema()
}
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
CREATE OR REPLACE FUNCTION <%= @schema %>.install_function__perform_reordered_op(schema_name TEXT, table_name TEXT, primary_key_list TEXT[], non_pk_column_list TEXT[])
RETURNS TEXT
LANGUAGE PLPGSQL AS $outer_function$
DECLARE
function_name TEXT := 'perform_reordered_op___' || schema_name || '__' || table_name;
shadow_table_name TEXT := 'shadow__' || schema_name || '__' || table_name;
tombstone_table_name TEXT := 'tombstone__' || schema_name || '__' || table_name;
tag_column_list TEXT[] := electric.format_every(non_pk_column_list, '_tag_%s');
reordered_column_list TEXT[];
where_pks_equal_shadow TEXT;
built_row_fill_pks TEXT;
built_row_fill_from_reordered TEXT;
all_pks_present_formatter TEXT;
case_columns_formatter TEXT;
case_pks TEXT;
case_columns TEXT;
on_primary_keys TEXT;
where_pk_main_or_tomb_clause TEXT;
current_row_fill_from_reordered TEXT;
built_row_overrides TEXT;
update_clause TEXT;
BEGIN
reordered_column_list := electric.format_every(non_pk_column_list, '__reordered_%s');

where_pks_equal_shadow := electric.format_every_and_join(primary_key_list, '%1$I = shadow_row.%1$I', ' AND ');

built_row_fill_pks := electric.format_every_and_join(primary_key_list, E'\n built_row.%1$I := shadow_row.%1$I;', '');
built_row_fill_from_reordered := electric.zip_format_every_and_join(non_pk_column_list, reordered_column_list, E'\n built_row.%1$I := shadow_row.%2$I;', '');

case_pks := electric.format_every_and_join(primary_key_list,
$$
CASE
WHEN main.%1$I IS NOT NULL THEN main.%1$I
ELSE NULL
END as %1$I$$, ',');

all_pks_present_formatter := electric.format_every_and_join(primary_key_list, '%%1$I.%1$I IS NOT NULL', ' AND ');
case_columns_formatter := format(
$$
CASE
WHEN %s THEN main.%%1$I
WHEN %s THEN tomb.%%1$I
END as %%1$I$$, format(all_pks_present_formatter, 'main'), format(all_pks_present_formatter, 'tomb'));
case_columns := electric.format_every_and_join(non_pk_column_list, case_columns_formatter, ',');

on_primary_keys := electric.format_every_and_join(primary_key_list, 'main.%1$I = tomb.%1$I', ' AND ');
where_pk_main_or_tomb_clause :=
'(' || electric.format_every_and_join(primary_key_list, 'main.%1$I = shadow_row.%1$I', ' AND ')
|| ') OR ('
|| electric.format_every_and_join(primary_key_list, 'tomb.%1$I = shadow_row.%1$I', ' AND ') || ')';

current_row_fill_from_reordered := electric.zip_format_every_and_join(non_pk_column_list, reordered_column_list, E'\n current_row.%1$I = shadow_row.%2$I;', '');

built_row_overrides := electric.zip_format_every_and_join(tag_column_list, non_pk_column_list,
$$
IF shadow_row.%1$I != shadow_row._tag OR NOT shadow_row._modified_columns_bit_mask[%3$s] THEN
built_row.%2$I = current_row.%2$I;
END IF;
$$, '');

IF array_length(non_pk_column_list, 1) > 0 THEN
update_clause := format($$
UPDATE %1$s SET
-- REPEATED BLOCK PER COLUMN
%3$s
WHERE %2$s;
$$,
format('%I.%I', schema_name, table_name),
where_pks_equal_shadow,
electric.format_every_and_join(non_pk_column_list, '%1$I = built_row.%1$I'));
ELSE
update_clause := 'NULL;'; -- No-op, since there are no non-pk columns
END IF;

-- The `%n$I` placeholders use n-th argument for formatting.
-- Generally, 1 is a function name, 2 is a shadow table name, 3 is a tombstone table name
EXECUTE format($injected$
CREATE OR REPLACE FUNCTION electric.%1$I(shadow_row electric.%2$I)
RETURNS VOID
LANGUAGE PLPGSQL SECURITY DEFINER AS
$function$
DECLARE
built_row %4$s%%ROWTYPE;
current_row %4$s%%ROWTYPE;
tombstone_row electric.%3$I%%ROWTYPE;
old_row_found boolean;
BEGIN
RAISE DEBUG ' Preparing a real operation based on shadow row %%', to_json(shadow_row);

-- Tags are empty: process as a DELETE
IF COALESCE(array_length(shadow_row._tags, 1), 0) = 0 THEN
DELETE FROM %4$s WHERE %5$s;
RAISE DEBUG ' Handled as DELETE';
RETURN;
END IF;

-- Tags are not empty, process as UPSERT
-- We accept ALL operations from Electric as INSERTs to correctly process them,
-- and we need to convert the insert to UPSERT (with possible conflict resolution against an already-deleted row)

-- Reconstruct row-to-be-inserted from the reordered values
%6$s
%7$s

RAISE DEBUG ' Starting from %%', to_json(built_row);

-- We do a join here to avoid a race between main table & tombstone, just in case
SELECT
-- REPEATED BLOCK PER COLUMN
%8$s
INTO current_row
FROM %4$s AS main
FULL OUTER JOIN electric.%3$I AS tomb
ON %9$s
WHERE %10$s;
IF NOT FOUND THEN
-- REPEATED BLOCK PER COLUMN
%11$s
END IF;

old_row_found := FOUND AND %12$s;

-- If tag of the column differs from the tag of the entire operation, prefer saved value
-- otherwise, prefer what has been sent in the reordered operation,
-- but if the column isn't marked as modified, always take the saved value.
-- Tags being equal for non-modified columns can occur if the column has been modified by another operation in the same transaction.
-- REPEATED BLOCK PER COLUMN
%13$s

RAISE DEBUG ' After resolution %%', to_json(built_row);

IF NOT old_row_found THEN
-- Handle as INSERT
-- USES COLUMN LIST
INSERT INTO %4$s
(%14$s)
VALUES
(%15$s);
RAISE DEBUG ' Handled as INSERT';
ELSE
-- Handle as UPDATE
%16$s
RAISE DEBUG ' Handled as UPDATE';
END IF;
END;
$function$;$injected$,
function_name, -- 1
shadow_table_name, -- 2
tombstone_table_name, -- 3
format('%I.%I', schema_name, table_name), -- 4
where_pks_equal_shadow, -- 5
built_row_fill_pks, -- 6
built_row_fill_from_reordered, -- 7
electric.append_string_unless_empty(case_pks, case_columns), -- 8
on_primary_keys, -- 9
where_pk_main_or_tomb_clause, -- 10
current_row_fill_from_reordered, -- 11
format(all_pks_present_formatter, 'current_row'), -- 12
built_row_overrides, -- 13
electric.format_every_and_join(primary_key_list || non_pk_column_list, '%I'), -- 14
electric.format_every_and_join(primary_key_list || non_pk_column_list, 'built_row.%I'), -- 15
update_clause -- 16
);

RETURN function_name;
END;
$outer_function$;
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
|> Enum.join(",")
%>

CREATE OR REPLACE FUNCTION electric.__validate_table_column_types(table_name text)
CREATE OR REPLACE FUNCTION <%= @schema %>.__validate_table_column_types(table_name text)
RETURNS VOID AS $function$
DECLARE
_col_name text;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230512000000_confli
[
@contents["electric_tag_type_and_operators"],
@contents["utility_functions"],
# This function definition is included here because it is referenced in the definition of
# "trigger_function_installers" below it.
Extension.Functions.by_name(:perform_reordered_op_installer_function),
@contents["trigger_function_installers"],
@contents["shadow_table_creation_and_update"]
# We need to actually run shadow table creation/updates, but that's handled in the next migration.
Expand Down
Loading

0 comments on commit f045ec8

Please sign in to comment.