Skip to content

Commit

Permalink
chore(electric): redefine column checks in electrify on every start…
Browse files Browse the repository at this point in the history
…up to use new types (#431)

Closes VAX-1033.
  • Loading branch information
alco authored Sep 19, 2023
1 parent 4bd9ea2 commit da9a718
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 59 deletions.
5 changes: 5 additions & 0 deletions .changeset/happy-buttons-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric": patch
---

Rewrite the type-validating part of `electify` function to expand allowed types when Electric instance is upgraded
19 changes: 18 additions & 1 deletion components/electric/lib/electric/postgres/extension.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Electric.Postgres.Extension do
Manages our pseudo-extension code
"""

alias Electric.Postgres.{Extension.Migration, Schema, Schema.Proto}
alias Electric.Postgres.{Schema, Schema.Proto, Extension.Functions, Extension.Migration}
alias Electric.Utils

require Logger
Expand Down Expand Up @@ -227,6 +227,16 @@ defmodule Electric.Postgres.Extension do
end
end

@spec define_functions(conn) :: :ok
def define_functions(conn) do
Enum.each(Functions.list(), fn {name, sql} ->
case :epgsql.squery(conn, sql) do
{:ok, [], []} -> :ok
error -> raise "Failed to define function '#{name}' with error: #{inspect(error)}"
end
end)
end

@spec migrations() :: [module(), ...]
def migrations do
alias Electric.Postgres.Extension.Migrations
Expand Down Expand Up @@ -266,6 +276,13 @@ defmodule Electric.Postgres.Extension do
create_migration_table(txconn)

with_migration_lock(txconn, fn ->
# NOTE(alco): This is currently called BEFORE running any internal migrations because we're only defining the
# type-checking function that the later defined `electrify()` function depends on.
#
# Once we move all function definitions out of migrations, we should call this AFTER all internal migrations
# have been applied.
define_functions(txconn)

existing_migrations = existing_migrations(txconn)

versions =
Expand Down
26 changes: 26 additions & 0 deletions components/electric/lib/electric/postgres/extension/functions.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Electric.Postgres.Extension.Functions do
require EEx

sql_files =
"functions/*.sql.eex"
|> Path.expand(__DIR__)
|> Path.wildcard()

for path <- sql_files, do: @external_resource(path)

@function_defs Map.new(sql_files, fn path ->
{Path.basename(path, ".sql.eex"), {Path.basename(path), File.read!(path)}}
end)

@doc """
Get a list of SQL statements that create various internal SQL functions in the `electric` schema.
Every function in the list is defined as `CREATE OR REPLACE FUNCTION`.
"""
def list do
for {name, args} <- [{"validate_table_column_types", []}] do
{filename, sql} = @function_defs[name]
{name, EEx.eval_string(sql, args, file: filename)}
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<%
valid_column_types =
~w[
bool
date
float8
int2 int4
text
time
timestamp timestamptz
uuid
varchar
]a
|> Enum.map(&"'#{&1}'")
|> Enum.join(",")
%>

CREATE OR REPLACE FUNCTION electric.__validate_table_column_types(table_name text)
RETURNS VOID AS $function$
DECLARE
_col_name text;
_col_type text;
_col_typmod int;
_col_type_pretty text;
_invalid_cols text[];
BEGIN
FOR _col_name, _col_type, _col_typmod, _col_type_pretty IN
SELECT attname, typname, atttypmod, format_type(atttypid, atttypmod)
FROM pg_attribute
JOIN pg_type on atttypid = pg_type.oid
WHERE attrelid = table_name::regclass AND attnum > 0 AND NOT attisdropped
ORDER BY attnum
LOOP
IF _col_type NOT IN (<%= valid_column_types %>)
-- We only support unsized varchar type
OR ('varchar' IN (<%= valid_column_types %>) AND _col_type = 'varchar' AND _col_typmod <> -1)
THEN
_invalid_cols = array_append(_invalid_cols, format('"%s" %s', _col_name, _col_type_pretty));
END IF;
END LOOP;

IF _invalid_cols IS NOT NULL THEN
RAISE EXCEPTION E'Cannot electrify "%" because some of its columns have types not supported by Electric:\n %',
table_name, array_to_string(_invalid_cols, E'\n ');
END IF;
END;
$function$ LANGUAGE PLPGSQL;
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,13 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230605141256_Electr
event_triggers = Extension.event_triggers()
event_trigger_tags = ["'ALTER TABLE'", "'DROP TABLE'", "'DROP INDEX'", "'DROP VIEW'"]

supported_types_sql =
Electric.Satellite.Serialization.supported_pg_types()
|> Enum.map(&"'#{&1}'")
|> Enum.join(",")

electrify_function =
electrify_function_sql(
schema,
electrified_tracking_table,
Extension.electrified_index_table(),
publication,
Extension.add_table_to_publication_sql("%I.%I"),
supported_types_sql
Extension.add_table_to_publication_sql("%I.%I")
)

[
Expand Down Expand Up @@ -77,7 +71,6 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230605141256_Electr
:electrified_tracking_table,
:electrified_index_table,
:publication_name,
:publication_sql,
:valid_column_types
:publication_sql
])
end
Original file line number Diff line number Diff line change
Expand Up @@ -91,39 +91,6 @@ $function$ LANGUAGE PLPGSQL STABLE;

-------------------------------------------------

CREATE OR REPLACE FUNCTION <%= schema %>.__validate_table_column_types(table_name text)
RETURNS VOID AS $function$
DECLARE
_col_name text;
_col_type text;
_col_typmod int;
_col_type_pretty text;
_invalid_cols text[];
BEGIN
FOR _col_name, _col_type, _col_typmod, _col_type_pretty IN
SELECT attname, typname, atttypmod, format_type(atttypid, atttypmod)
FROM pg_attribute
JOIN pg_type on atttypid = pg_type.oid
WHERE attrelid = table_name::regclass AND attnum > 0 AND NOT attisdropped
ORDER BY attnum
LOOP
IF _col_type NOT IN (<%= valid_column_types %>)
-- We only support unsized varchar type
OR ('varchar' IN (<%= valid_column_types %>) AND _col_type = 'varchar' AND _col_typmod <> -1)
THEN
_invalid_cols = array_append(_invalid_cols, format('"%s" %s', _col_name, _col_type_pretty));
END IF;
END LOOP;

IF _invalid_cols IS NOT NULL THEN
RAISE EXCEPTION E'Cannot electrify "%" because some of its columns have types not supported by Electric:\n %',
table_name, array_to_string(_invalid_cols, E'\n ');
END IF;
END;
$function$ LANGUAGE PLPGSQL;

-------------------------------------------------

CREATE OR REPLACE PROCEDURE <%= schema %>.electrify(
name1 text,
name2 text DEFAULT NULL
Expand Down
15 changes: 0 additions & 15 deletions components/electric/lib/electric/satellite/serialization.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,6 @@ defmodule Electric.Satellite.Serialization do
@type relation_mapping() ::
%{Changes.relation() => {PB.relation_id(), [Replication.Column.name()]}}

@spec supported_pg_types :: [atom]
def supported_pg_types do
~w[
bool
date
float8
int2 int4
text
time
timestamp timestamptz
uuid
varchar
]a
end

@doc """
Serialize from internal format to Satellite PB format
"""
Expand Down
2 changes: 1 addition & 1 deletion docs/usage/data-modelling/types.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ You are responsible for ensuring the uniqueness of your primary keys. If you som
- `boolean`
- `uuid`

The authoritative list of supported data types is maintained in the [`supported_pg_types/0` function](https://github.com/search?q=repo%3Aelectric-sql%2Felectric+symbol%3Asupported_pg_types&type=code).
The authoritative list of supported data types is maintained in the [`validate_table_column_types.sql.eex` file](https://github.com/electric-sql/electric/blob/main/components/electric/lib/electric/postgres/extension/functions/validate_table_column_types.sql.eex).

:::caution Work in progress
We are actively working on building out data type support. If you need a type we don't yet support, please [let us know on Discord](https://discord.electric-sql.com).
Expand Down

0 comments on commit da9a718

Please sign in to comment.