Skip to content

Commit

Permalink
chore(electric): Reject tables that have columns with DEFAULT express…
Browse files Browse the repository at this point in the history
…ions (#509)

Fixes VAX-1100.
  • Loading branch information
alco authored Oct 16, 2023
1 parent 4092a9d commit aa7c265
Show file tree
Hide file tree
Showing 20 changed files with 157 additions and 80 deletions.
5 changes: 5 additions & 0 deletions .changeset/ninety-lions-kick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric": patch
---

Reject tables that have columns with DEFAULT expressions.
34 changes: 16 additions & 18 deletions components/electric/lib/electric/postgres/extension.ex
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,12 @@ defmodule Electric.Postgres.Extension do
def define_functions(conn) do
Enum.each(Functions.list(), fn {name, sql} ->
case :epgsql.squery(conn, sql) do
{:ok, [], []} -> :ok
error -> raise "Failed to define function '#{name}' with error: #{inspect(error)}"
{:ok, [], []} ->
Logger.debug("Successfully (re)defined SQL function/procedure '#{name}'")
:ok

error ->
raise "Failed to define function '#{name}' with error: #{inspect(error)}"
end
end)
end
Expand All @@ -312,7 +316,8 @@ defmodule Electric.Postgres.Extension do
Migrations.Migration_20230921161045_DropEventTriggers,
Migrations.Migration_20230921161418_ProxyCompatibility,
Migrations.Migration_20231009121515_AllowLargeMigrations,
Migrations.Migration_20231010123118_AddPriorityToVersion
Migrations.Migration_20231010123118_AddPriorityToVersion,
Migrations.Migration_20231016141000_ConvertFunctionToProcedure
]
end

Expand Down Expand Up @@ -361,22 +366,15 @@ defmodule Electric.Postgres.Extension do
Logger.info("Running extension migration: #{version}")

for sql <- module.up(@schema) do
case :epgsql.squery(txconn, sql) do
results when is_list(results) ->
errors = Enum.filter(results, &(elem(&1, 0) == :error))

unless(Enum.empty?(errors)) do
raise RuntimeError,
message: "Migration #{version}/#{module} returned errors: #{inspect(errors)}"
end

:ok

{:ok, _} ->
:ok
results = :epgsql.squery(txconn, sql) |> List.wrap()
errors = Enum.filter(results, &(elem(&1, 0) == :error))

{:ok, _cols, _rows} ->
:ok
if errors == [] do
:ok
else
raise RuntimeError,
message:
"Migration #{version}/#{inspect(module)} returned errors:\n#{inspect(errors, pretty: true)}"
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ BEGIN
) THEN
RAISE NOTICE 'Electrify table %', _quoted_name;

PERFORM <%= @schema %>.__validate_table_column_types(_quoted_name);
CALL <%= @schema %>.__validate_table_column_types(_quoted_name);
CALL <%= @schema %>.__validate_table_column_defaults(_quoted_name);

-- insert the required ddl into the migrations table
SELECT <%= @schema %>.ddlgen_create(_oid) INTO _create_sql;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- This function validates each column of the table that's being electrified
-- and aborts electrification if any column has DEFAULT expression.

CREATE OR REPLACE PROCEDURE <%= @schema %>.__validate_table_column_defaults(table_name text)
SECURITY DEFINER AS $function$
DECLARE
_col_name text;
_col_has_default boolean;
_invalid_cols text[];
BEGIN
FOR _col_name, _col_has_default IN
SELECT attname, atthasdef
FROM pg_attribute
WHERE attrelid = table_name::regclass AND attnum > 0 AND NOT attisdropped
ORDER BY attnum
LOOP
IF _col_has_default THEN
_invalid_cols = array_append(_invalid_cols, format('%I', _col_name));
END IF;
END LOOP;

IF _invalid_cols IS NOT NULL THEN
RAISE EXCEPTION E'Cannot electrify "%" because some of its columns have DEFAULT expression which is not currently supported by Electric:\n %',
table_name, array_to_string(_invalid_cols, E'\n ');
END IF;
END;
$function$ LANGUAGE PLPGSQL;
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
-- This function validates the type of each column of the table that's being electrified,
-- to check if the type is supported by Electric.

<%
valid_column_types =
Electric.Postgres.supported_types()
|> Enum.map(&"'#{&1}'")
|> Enum.join(",")
%>

CREATE OR REPLACE FUNCTION <%= @schema %>.__validate_table_column_types(table_name text)
RETURNS VOID AS $function$
CREATE OR REPLACE PROCEDURE <%= @schema %>.__validate_table_column_types(table_name text)
SECURITY DEFINER AS $function$
DECLARE
_col_name text;
_col_type text;
Expand All @@ -25,7 +28,7 @@ BEGIN
-- We only support unsized varchar type
OR ('varchar' IN (<%= valid_column_types %>) AND _col_type = 'varchar' AND _col_typmod <> -1)
THEN
_invalid_cols = array_append(_invalid_cols, format('"%s" %s', _col_name, _col_type_pretty));
_invalid_cols = array_append(_invalid_cols, format('%I %s', _col_name, _col_type_pretty));
END IF;
END LOOP;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Electric.Postgres.Extension.Migrations.Migration_20231016141000_ConvertFunctionToProcedure do
alias Electric.Postgres.Extension

@behaviour Extension.Migration

@impl true
def version, do: 2023_10_16_14_10_00

@impl true
def up(schema) do
["DROP ROUTINE IF EXISTS #{schema}.__validate_table_column_types(text)"]
end

@impl true
def down(_schema) do
[]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Electric.Postgres.Extension.ElectrifyTest do
end

test_tx "inserts a row into the electrified table", fn conn ->
sql = "CREATE TABLE buttercup (id uuid PRIMARY KEY DEFAULT uuid_generate_v4());"
sql = "CREATE TABLE buttercup (id uuid PRIMARY KEY);"
{:ok, _cols, _rows} = :epgsql.squery(conn, sql)

sql = "CALL electric.electrify('buttercup');"
Expand All @@ -32,7 +32,7 @@ defmodule Electric.Postgres.Extension.ElectrifyTest do
test_tx "inserts the DDL for the table into the migration table", fn conn ->
sql = """
CREATE TABLE buttercup (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
id uuid PRIMARY KEY,
value text,
secret integer
);
Expand All @@ -56,7 +56,7 @@ defmodule Electric.Postgres.Extension.ElectrifyTest do
test_tx "duplicate call does not raise", fn conn ->
sql = """
CREATE TABLE buttercup (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
id uuid PRIMARY KEY,
value text,
secret integer
);
Expand All @@ -73,10 +73,10 @@ defmodule Electric.Postgres.Extension.ElectrifyTest do
end

test_tx "handles quoted table names", fn conn ->
sql = "CREATE TABLE \"la la daisy\" (id uuid PRIMARY KEY DEFAULT uuid_generate_v4());"
sql = "CREATE TABLE \"la la daisy\" (id uuid PRIMARY KEY);"
{:ok, _cols, _rows} = :epgsql.squery(conn, sql)

sql = "CREATE TABLE \"la la buttercup\" (id uuid PRIMARY KEY DEFAULT uuid_generate_v4());"
sql = "CREATE TABLE \"la la buttercup\" (id uuid PRIMARY KEY);"
{:ok, _cols, _rows} = :epgsql.squery(conn, sql)

sql = "CALL electric.electrify('public', 'la la daisy');"
Expand All @@ -94,7 +94,7 @@ defmodule Electric.Postgres.Extension.ElectrifyTest do
end

test_tx "allows for namespaced table names", fn conn ->
sql = "CREATE TABLE daisy (id uuid PRIMARY KEY DEFAULT uuid_generate_v4());"
sql = "CREATE TABLE daisy (id uuid PRIMARY KEY);"
{:ok, _cols, _rows} = :epgsql.squery(conn, sql)

sql = "CALL electric.electrify('public.daisy');"
Expand All @@ -121,7 +121,7 @@ defmodule Electric.Postgres.Extension.ElectrifyTest do

sql = """
CREATE TABLE balloons.buttercup (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
id uuid PRIMARY KEY,
value text,
secret integer
);
Expand Down Expand Up @@ -165,7 +165,7 @@ defmodule Electric.Postgres.Extension.ElectrifyTest do
test_tx "adds the electrified table to the publication", fn conn ->
assert published_tables(conn) == []

sql = "CREATE TABLE buttercup (id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), value text);"
sql = "CREATE TABLE buttercup (id uuid PRIMARY KEY, value text);"

{:ok, _cols, _rows} = :epgsql.squery(conn, sql)

Expand All @@ -183,7 +183,7 @@ defmodule Electric.Postgres.Extension.ElectrifyTest do

sql = """
CREATE TABLE buttercup (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
id uuid PRIMARY KEY,
value text,
secret integer
);
Expand Down
45 changes: 35 additions & 10 deletions components/electric/test/electric/postgres/extension_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,11 @@ defmodule Electric.Postgres.ExtensionTest do
id UUID PRIMARY KEY,
c1 CHARACTER,
c2 CHARACTER(11),
c3 VARCHAR(11),
"C3" VARCHAR(11),
num8a INT8,
num8b BIGINT,
real4a FLOAT4,
real4b REAL,
"Real4b" REAL,
created_at TIMETZ
);
CALL electric.electrify('public.t1');
Expand All @@ -469,14 +469,14 @@ defmodule Electric.Postgres.ExtensionTest do
assert error_msg ==
"""
Cannot electrify "public.t1" because some of its columns have types not supported by Electric:
"c1" character(1)
"c2" character(11)
"c3" character varying(11)
"num8a" bigint
"num8b" bigint
"real4a" real
"real4b" real
"created_at" time with time zone
c1 character(1)
c2 character(11)
"C3" character varying(11)
num8a bigint
num8b bigint
real4a real
"Real4b" real
created_at time with time zone
"""
|> String.trim()
end
Expand All @@ -496,6 +496,31 @@ defmodule Electric.Postgres.ExtensionTest do
assert {:ok, false} = Extension.electrified?(conn, "daisy")
assert {:ok, false} = Extension.electrified?(conn, "public", "daisy")
end

test_tx "table electrification rejects default column expressions", fn conn ->
assert [
{:ok, [], []},
{:error, {:error, :error, _, :raise_exception, error_msg, _}}
] =
:epgsql.squery(conn, """
CREATE TABLE public.t1 (
id UUID PRIMARY KEY,
t1 TEXT DEFAULT '',
num INTEGER NOT NULL,
"Ts" TIMESTAMP DEFAULT now(),
name VARCHAR
);
CALL electric.electrify('public.t1');
""")

assert error_msg ==
"""
Cannot electrify "public.t1" because some of its columns have DEFAULT expression which is not currently supported by Electric:
t1
"Ts"
"""
|> String.trim()
end
end

defp migration_history(conn, after_version \\ nil) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Electric.Replication.Shapes.ShapeRequestTest do
setup :load_schema

@tag with_sql: """
INSERT INTO public.my_entries (content) VALUES ('test content');
INSERT INTO public.my_entries (id, content) VALUES (gen_random_uuid(), 'test content');
"""
test "should be able to fulfill full-table request", %{
origin: origin,
Expand Down Expand Up @@ -84,7 +84,7 @@ defmodule Electric.Replication.Shapes.ShapeRequestTest do
end

@tag with_sql: """
INSERT INTO public.my_entries (content) VALUES ('test content');
INSERT INTO public.my_entries (id, content) VALUES (gen_random_uuid(), 'test content');
"""
test "should not fulfill full-table requests if the table has already been sent", %{
origin: origin,
Expand Down
15 changes: 10 additions & 5 deletions components/electric/test/electric/satellite/subscriptions_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ defmodule Electric.Satellite.SubscriptionsTest do
assert [%SatOpInsert{row_data: %{values: [_, "John"]}}] = received[request_id]

{:ok, 1} =
:epgsql.equery(pg_conn, "INSERT INTO public.my_entries (content) VALUES ($1)", ["WRONG"])
:epgsql.equery(pg_conn, "INSERT INTO public.my_entries (id, content) VALUES ($1, $2)", [
uuid4(),
"WRONG"
])

{:ok, 1} =
:epgsql.equery(pg_conn, "INSERT INTO public.users (id, name) VALUES ($1, $2)", [
Expand Down Expand Up @@ -196,7 +199,8 @@ defmodule Electric.Satellite.SubscriptionsTest do
assert [] = received[request_id2]

{:ok, 1} =
:epgsql.equery(pg_conn, "INSERT INTO public.my_entries (content) VALUES ($1)", [
:epgsql.equery(pg_conn, "INSERT INTO public.my_entries (id, content) VALUES ($1, $2)", [
uuid4(),
"Correct"
])

Expand Down Expand Up @@ -229,8 +233,8 @@ defmodule Electric.Satellite.SubscriptionsTest do
end

@tag with_sql: """
INSERT INTO public.users (id, name) VALUES ('#{uuid4()}', 'John');
INSERT INTO public.my_entries (content) VALUES ('Old');
INSERT INTO public.users (id, name) VALUES (gen_random_uuid(), 'John');
INSERT INTO public.my_entries (id, content) VALUES (gen_random_uuid(), 'Old');
"""
test "The client can connect and subscribe, and that works with multiple subscriptions",
%{conn: pg_conn} = ctx do
Expand Down Expand Up @@ -282,7 +286,8 @@ defmodule Electric.Satellite.SubscriptionsTest do
assert [%SatOpInsert{row_data: %{values: [_, "Old", _]}}] = received[request_id2]

{:ok, 1} =
:epgsql.equery(pg_conn, "INSERT INTO public.my_entries (content) VALUES ($1)", [
:epgsql.equery(pg_conn, "INSERT INTO public.my_entries (id, content) VALUES ($1, $2)", [
uuid4(),
"Correct"
])

Expand Down
6 changes: 4 additions & 2 deletions components/electric/test/support/extension_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule Electric.Extension.Case.Helpers do
end

def tx(fun, cxt) do
ExUnit.Assertions.assert_raise(RollbackError, fn ->
try do
:epgsql.with_transaction(
cxt.conn,
fn tx ->
Expand All @@ -42,7 +42,9 @@ defmodule Electric.Extension.Case.Helpers do
end,
reraise: true
)
end)
rescue
RollbackError -> :ok
end
end

def migrate(conn, opts \\ []) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ defmodule Electric.Postgres.TestConnection do
{:ok, [], []} =
:epgsql.squery(conn, """
CREATE TABLE public.my_entries (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
id UUID PRIMARY KEY,
content VARCHAR NOT NULL,
content_b TEXT
);
Expand Down
4 changes: 2 additions & 2 deletions e2e/init.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
CREATE TABLE entries (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
id UUID PRIMARY KEY,
content VARCHAR NOT NULL,
content_b TEXT
);

CREATE TABLE owned_entries (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
id UUID PRIMARY KEY,
electric_user_id TEXT NOT NULL,
content VARCHAR NOT NULL
);
Loading

0 comments on commit aa7c265

Please sign in to comment.