Skip to content

Commit

Permalink
Move the definition of __validate_table_column_types() SQL function i…
Browse files Browse the repository at this point in the history
…nto a standalone module
  • Loading branch information
alco committed Sep 18, 2023
1 parent 9b81d97 commit ac8cdae
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 57 deletions.
18 changes: 18 additions & 0 deletions components/electric/lib/electric/postgres/extension.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Electric.Postgres.Extension do
Replication,
Schema,
Schema.Proto,
Extension.Functions,
Extension.Migration
}

Expand Down Expand Up @@ -238,6 +239,16 @@ defmodule Electric.Postgres.Extension do
end
end

@spec define_functions(conn) :: :ok
def define_functions(conn) do
Enum.each(Functions.list(), fn {name, sql} ->
case :epgsql.squery(conn, sql) do
{:ok, [], []} -> :ok
error -> raise "Failed to define function '#{name}' with error: #{inspect(error)}"
end
end)
end

@spec migrations() :: [module(), ...]
def migrations do
alias Electric.Postgres.Extension.Migrations
Expand Down Expand Up @@ -267,6 +278,13 @@ defmodule Electric.Postgres.Extension do
create_migration_table(txconn)

with_migration_lock(txconn, fn ->
# NOTE(alco): This is currently called BEFORE running any internal migrations because we're only defining the
# type-checking function that the later defined `electrify()` function depends on.
#
# Once we move all function definitions out of migrations, we should call this AFTER all internal migrations
# have been applied.
define_functions(txconn)

existing_migrations = existing_migrations(txconn)

versions =
Expand Down
26 changes: 26 additions & 0 deletions components/electric/lib/electric/postgres/extension/functions.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Electric.Postgres.Extension.Functions do
require EEx

sql_files =
"functions/*.sql.eex"
|> Path.expand(__DIR__)
|> Path.wildcard()

for path <- sql_files, do: @external_resource(path)

@function_defs Map.new(sql_files, fn path ->
{Path.basename(path, ".sql.eex"), {Path.basename(path), File.read!(path)}}
end)

@doc """
Get a list of SQL statements that create various internal SQL functions in the `electric` schema.
Every function in the list is defined as `CREATE OR REPLACE FUNCTION`.
"""
def list do
for {name, args} <- [{"validate_table_column_types", []}] do
{filename, sql} = @function_defs[name]
{name, EEx.eval_string(sql, args, file: filename)}
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<%
valid_column_types =
~w[
bool
date
float8
int2 int4
text
time
timestamp timestamptz
uuid
varchar
]a
|> Enum.map(&"'#{&1}'")
|> Enum.join(",")
%>

CREATE OR REPLACE FUNCTION electric.__validate_table_column_types(table_name text)
RETURNS VOID AS $function$
DECLARE
_col_name text;
_col_type text;
_col_typmod int;
_col_type_pretty text;
_invalid_cols text[];
BEGIN
FOR _col_name, _col_type, _col_typmod, _col_type_pretty IN
SELECT attname, typname, atttypmod, format_type(atttypid, atttypmod)
FROM pg_attribute
JOIN pg_type on atttypid = pg_type.oid
WHERE attrelid = table_name::regclass AND attnum > 0 AND NOT attisdropped
ORDER BY attnum
LOOP
IF _col_type NOT IN (<%= valid_column_types %>)
-- We only support unsized varchar type
OR ('varchar' IN (<%= valid_column_types %>) AND _col_type = 'varchar' AND _col_typmod <> -1)
THEN
_invalid_cols = array_append(_invalid_cols, format('"%s" %s', _col_name, _col_type_pretty));
END IF;
END LOOP;

IF _invalid_cols IS NOT NULL THEN
RAISE EXCEPTION E'Cannot electrify "%" because some of its columns have types not supported by Electric:\n %',
table_name, array_to_string(_invalid_cols, E'\n ');
END IF;
END;
$function$ LANGUAGE PLPGSQL;
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,13 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230605141256_Electr
event_triggers = Extension.event_triggers()
event_trigger_tags = ["'ALTER TABLE'", "'DROP TABLE'", "'DROP INDEX'", "'DROP VIEW'"]

supported_types_sql =
Electric.Satellite.Serialization.supported_pg_types()
|> Enum.map(&"'#{&1}'")
|> Enum.join(",")

electrify_function =
electrify_function_sql(
schema,
electrified_tracking_table,
Extension.electrified_index_table(),
publication,
Extension.add_table_to_publication_sql("%I.%I"),
supported_types_sql
Extension.add_table_to_publication_sql("%I.%I")
)

[
Expand Down Expand Up @@ -77,7 +71,6 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230605141256_Electr
:electrified_tracking_table,
:electrified_index_table,
:publication_name,
:publication_sql,
:valid_column_types
:publication_sql
])
end
Original file line number Diff line number Diff line change
Expand Up @@ -91,39 +91,6 @@ $function$ LANGUAGE PLPGSQL STABLE;

-------------------------------------------------

CREATE OR REPLACE FUNCTION <%= schema %>.__validate_table_column_types(table_name text)
RETURNS VOID AS $function$
DECLARE
_col_name text;
_col_type text;
_col_typmod int;
_col_type_pretty text;
_invalid_cols text[];
BEGIN
FOR _col_name, _col_type, _col_typmod, _col_type_pretty IN
SELECT attname, typname, atttypmod, format_type(atttypid, atttypmod)
FROM pg_attribute
JOIN pg_type on atttypid = pg_type.oid
WHERE attrelid = table_name::regclass AND attnum > 0 AND NOT attisdropped
ORDER BY attnum
LOOP
IF _col_type NOT IN (<%= valid_column_types %>)
-- We only support unsized varchar type
OR ('varchar' IN (<%= valid_column_types %>) AND _col_type = 'varchar' AND _col_typmod <> -1)
THEN
_invalid_cols = array_append(_invalid_cols, format('"%s" %s', _col_name, _col_type_pretty));
END IF;
END LOOP;

IF _invalid_cols IS NOT NULL THEN
RAISE EXCEPTION E'Cannot electrify "%" because some of its columns have types not supported by Electric:\n %',
table_name, array_to_string(_invalid_cols, E'\n ');
END IF;
END;
$function$ LANGUAGE PLPGSQL;

-------------------------------------------------

CREATE OR REPLACE PROCEDURE <%= schema %>.electrify(
name1 text,
name2 text DEFAULT NULL
Expand Down
15 changes: 0 additions & 15 deletions components/electric/lib/electric/satellite/serialization.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,6 @@ defmodule Electric.Satellite.Serialization do
@type relation_mapping() ::
%{Changes.relation() => {PB.relation_id(), [Replication.Column.name()]}}

@spec supported_pg_types :: [atom]
def supported_pg_types do
~w[
bool
date
float8
int2 int4 int8
text
time
timestamp timestamptz
uuid
varchar
]a
end

@doc """
Serialize from internal format to Satellite PB format
"""
Expand Down

0 comments on commit ac8cdae

Please sign in to comment.