diff --git a/components/electric/lib/electric/postgres/extension.ex b/components/electric/lib/electric/postgres/extension.ex index a474d6d26b..1ab994bb4e 100644 --- a/components/electric/lib/electric/postgres/extension.ex +++ b/components/electric/lib/electric/postgres/extension.ex @@ -7,6 +7,7 @@ defmodule Electric.Postgres.Extension do Replication, Schema, Schema.Proto, + Extension.Functions, Extension.Migration } @@ -238,6 +239,16 @@ defmodule Electric.Postgres.Extension do end end + @spec define_functions(conn) :: :ok + def define_functions(conn) do + Enum.each(Functions.list(), fn {name, sql} -> + case :epgsql.squery(conn, sql) do + {:ok, [], []} -> :ok + error -> raise "Failed to define function '#{name}' with error: #{inspect(error)}" + end + end) + end + @spec migrations() :: [module(), ...] def migrations do alias Electric.Postgres.Extension.Migrations @@ -267,6 +278,13 @@ defmodule Electric.Postgres.Extension do create_migration_table(txconn) with_migration_lock(txconn, fn -> + # NOTE(alco): This is currently called BEFORE running any internal migrations because we're only defining the + # type-checking function that the later defined `electrify()` function depends on. + # + # Once we move all function definitions out of migrations, we should call this AFTER all internal migrations + # have been applied. + define_functions(txconn) + existing_migrations = existing_migrations(txconn) versions = diff --git a/components/electric/lib/electric/postgres/extension/functions.ex b/components/electric/lib/electric/postgres/extension/functions.ex new file mode 100644 index 0000000000..48beaf1e98 --- /dev/null +++ b/components/electric/lib/electric/postgres/extension/functions.ex @@ -0,0 +1,26 @@ +defmodule Electric.Postgres.Extension.Functions do + require EEx + + sql_files = + "functions/*.sql.eex" + |> Path.expand(__DIR__) + |> Path.wildcard() + + for path <- sql_files, do: @external_resource(path) + + @function_defs Map.new(sql_files, fn path -> + {Path.basename(path, ".sql.eex"), {Path.basename(path), File.read!(path)}} + end) + + @doc """ + Get a list of SQL statements that create various internal SQL functions in the `electric` schema. + + Every function in the list is defined as `CREATE OR REPLACE FUNCTION`. + """ + def list do + for {name, args} <- [{"validate_table_column_types", []}] do + {filename, sql} = @function_defs[name] + {name, EEx.eval_string(sql, args, file: filename)} + end + end +end diff --git a/components/electric/lib/electric/postgres/extension/functions/validate_table_column_types.sql.eex b/components/electric/lib/electric/postgres/extension/functions/validate_table_column_types.sql.eex new file mode 100644 index 0000000000..fdb9ee9cdc --- /dev/null +++ b/components/electric/lib/electric/postgres/extension/functions/validate_table_column_types.sql.eex @@ -0,0 +1,47 @@ +<% + valid_column_types = + ~w[ + bool + date + float8 + int2 int4 + text + time + timestamp timestamptz + uuid + varchar + ]a + |> Enum.map(&"'#{&1}'") + |> Enum.join(",") +%> + +CREATE OR REPLACE FUNCTION electric.__validate_table_column_types(table_name text) +RETURNS VOID AS $function$ +DECLARE + _col_name text; + _col_type text; + _col_typmod int; + _col_type_pretty text; + _invalid_cols text[]; +BEGIN + FOR _col_name, _col_type, _col_typmod, _col_type_pretty IN + SELECT attname, typname, atttypmod, format_type(atttypid, atttypmod) + FROM pg_attribute + JOIN pg_type on atttypid = pg_type.oid + WHERE attrelid = table_name::regclass AND attnum > 0 AND NOT attisdropped + ORDER BY attnum + LOOP + IF _col_type NOT IN (<%= valid_column_types %>) + -- We only support unsized varchar type + OR ('varchar' IN (<%= valid_column_types %>) AND _col_type = 'varchar' AND _col_typmod <> -1) + THEN + _invalid_cols = array_append(_invalid_cols, format('"%s" %s', _col_name, _col_type_pretty)); + END IF; + END LOOP; + + IF _invalid_cols IS NOT NULL THEN + RAISE EXCEPTION E'Cannot electrify "%" because some of its columns have types not supported by Electric:\n %', + table_name, array_to_string(_invalid_cols, E'\n '); + END IF; +END; +$function$ LANGUAGE PLPGSQL; diff --git a/components/electric/lib/electric/postgres/extension/migrations/20230605141256_electrify_function.ex b/components/electric/lib/electric/postgres/extension/migrations/20230605141256_electrify_function.ex index e3c57bbef3..418fc14fb3 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20230605141256_electrify_function.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20230605141256_electrify_function.ex @@ -20,19 +20,13 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230605141256_Electr event_triggers = Extension.event_triggers() event_trigger_tags = ["'ALTER TABLE'", "'DROP TABLE'", "'DROP INDEX'", "'DROP VIEW'"] - supported_types_sql = - Electric.Satellite.Serialization.supported_pg_types() - |> Enum.map(&"'#{&1}'") - |> Enum.join(",") - electrify_function = electrify_function_sql( schema, electrified_tracking_table, Extension.electrified_index_table(), publication, - Extension.add_table_to_publication_sql("%I.%I"), - supported_types_sql + Extension.add_table_to_publication_sql("%I.%I") ) [ @@ -77,7 +71,6 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230605141256_Electr :electrified_tracking_table, :electrified_index_table, :publication_name, - :publication_sql, - :valid_column_types + :publication_sql ]) end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20230605141256_electrify_function/electrify.sql.eex b/components/electric/lib/electric/postgres/extension/migrations/20230605141256_electrify_function/electrify.sql.eex index aea7a404dc..8069f10369 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20230605141256_electrify_function/electrify.sql.eex +++ b/components/electric/lib/electric/postgres/extension/migrations/20230605141256_electrify_function/electrify.sql.eex @@ -91,39 +91,6 @@ $function$ LANGUAGE PLPGSQL STABLE; ------------------------------------------------- -CREATE OR REPLACE FUNCTION <%= schema %>.__validate_table_column_types(table_name text) -RETURNS VOID AS $function$ -DECLARE - _col_name text; - _col_type text; - _col_typmod int; - _col_type_pretty text; - _invalid_cols text[]; -BEGIN - FOR _col_name, _col_type, _col_typmod, _col_type_pretty IN - SELECT attname, typname, atttypmod, format_type(atttypid, atttypmod) - FROM pg_attribute - JOIN pg_type on atttypid = pg_type.oid - WHERE attrelid = table_name::regclass AND attnum > 0 AND NOT attisdropped - ORDER BY attnum - LOOP - IF _col_type NOT IN (<%= valid_column_types %>) - -- We only support unsized varchar type - OR ('varchar' IN (<%= valid_column_types %>) AND _col_type = 'varchar' AND _col_typmod <> -1) - THEN - _invalid_cols = array_append(_invalid_cols, format('"%s" %s', _col_name, _col_type_pretty)); - END IF; - END LOOP; - - IF _invalid_cols IS NOT NULL THEN - RAISE EXCEPTION E'Cannot electrify "%" because some of its columns have types not supported by Electric:\n %', - table_name, array_to_string(_invalid_cols, E'\n '); - END IF; -END; -$function$ LANGUAGE PLPGSQL; - -------------------------------------------------- - CREATE OR REPLACE PROCEDURE <%= schema %>.electrify( name1 text, name2 text DEFAULT NULL diff --git a/components/electric/lib/electric/satellite/serialization.ex b/components/electric/lib/electric/satellite/serialization.ex index 46363fe049..0962891cd8 100644 --- a/components/electric/lib/electric/satellite/serialization.ex +++ b/components/electric/lib/electric/satellite/serialization.ex @@ -21,21 +21,6 @@ defmodule Electric.Satellite.Serialization do @type relation_mapping() :: %{Changes.relation() => {PB.relation_id(), [Replication.Column.name()]}} - @spec supported_pg_types :: [atom] - def supported_pg_types do - ~w[ - bool - date - float8 - int2 int4 int8 - text - time - timestamp timestamptz - uuid - varchar - ]a - end - @doc """ Serialize from internal format to Satellite PB format """