This repository has been archived by the owner on Oct 2, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1 from vaxine-io/triggers
Electric.migrations.deploy_migrations adds triggers to sql
- Loading branch information
Showing
9 changed files
with
1,065 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,3 +28,6 @@ electric-*.tar | |
# Release artefacts | ||
/dist/ | ||
/electric | ||
|
||
# JetBrains project files | ||
.idea/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
const _migrations = <%= migrations %>; | ||
|
||
export const migrations = _migrations; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,281 @@ | ||
defmodule Electric.Migrations do | ||
@moduledoc """ | ||
The `Migrations` context. | ||
Munges sql migrations and uploads them to the server | ||
""" | ||
|
||
@migration_file_name "migration.sql" | ||
@satellite_file_name "satellite.sql" | ||
# @postgre_file_name "postgre.sql" | ||
@publish_lock_file_name ".published" | ||
@manifest_file_name "manifest.json" | ||
@bundle_file_name "manifest.bundle.json" | ||
@js_bundle_file_name "manifest.bundle.js" | ||
@trigger_template EEx.compile_file("lib/electric/triggers.eex") | ||
@bundle_template EEx.compile_file("lib/electric/bundle_js.eex") | ||
|
||
@doc """ | ||
Takes a folder which contains sql migration files and adds the SQLite triggers for any newly created tables | ||
""" | ||
def build_migrations(_src_folder) do | ||
|
||
end | ||
|
||
def send_migrations_to_api(_sql_file_paths) do | ||
throw :NotImplemented | ||
end | ||
|
||
def get_template() do | ||
@trigger_template | ||
end | ||
|
||
@doc """ | ||
Takes a folder which contains sql migration files and adds the SQLite triggers for any newly created tables. | ||
""" | ||
def add_triggers(src_folder, template) do | ||
ordered_migration_paths(src_folder) |> add_triggers_to_migrations(template) | ||
end | ||
|
||
@doc """ | ||
Writes a json manifest file in the migrations root folder listing all the migration names | ||
""" | ||
def write_manifest(src_folder) do | ||
manifest_path = Path.join(src_folder, @manifest_file_name) | ||
migration_names = for migration_folder <- ordered_migration_paths(src_folder) do | ||
Path.basename(migration_folder) | ||
end | ||
manifest = Jason.encode!(%{"migrations" => migration_names}) |> Jason.Formatter.pretty_print() | ||
if File.exists?(manifest_path) do | ||
File.rm(manifest_path) | ||
end | ||
File.write(manifest_path, manifest) | ||
end | ||
|
||
@doc """ | ||
Writes a json bundle file in the migrations root folder listing all the migrations and giving their content | ||
""" | ||
def write_bundle(src_folder) do | ||
migrations = create_bundle(src_folder) | ||
bundle_path = Path.join(src_folder, @bundle_file_name) | ||
if File.exists?(bundle_path) do | ||
File.rm(bundle_path) | ||
end | ||
File.write(bundle_path, migrations) | ||
end | ||
|
||
@doc """ | ||
Writes a js bundle file in the migrations root folder listing all the migrations and giving their content. | ||
Exports the migrations bundle as a js object called "migrations" | ||
""" | ||
def write_js_bundle(src_folder) do | ||
migrations = create_bundle(src_folder) | ||
{result, _bindings} = Code.eval_quoted(@bundle_template, migrations: migrations) | ||
bundle_path = Path.join(src_folder, @js_bundle_file_name) | ||
if File.exists?(bundle_path) do | ||
File.rm(bundle_path) | ||
end | ||
File.write!(bundle_path, result) | ||
end | ||
|
||
defp ordered_migration_paths(src_folder) do | ||
sql_file_paths = Path.join([src_folder, "*", @migration_file_name]) |> Path.wildcard() | ||
migration_names = for file_path <- sql_file_paths do | ||
Path.dirname(file_path) |> Path.basename() | ||
end | ||
|
||
for migration_name <- Enum.sort(migration_names) do | ||
Path.join(src_folder, migration_name) | ||
end | ||
end | ||
|
||
defp create_bundle(src_folder) do | ||
migrations = for migration_folder <- ordered_migration_paths(src_folder) do | ||
satellite_sql_path = Path.join(migration_folder, @satellite_file_name) | ||
migration_text = File.read!(satellite_sql_path) | ||
migration_name = Path.basename(migration_folder) | ||
%{"name" => migration_name, "body" => migration_text} | ||
end | ||
Jason.encode!(%{"migrations" => migrations}) |> Jason.Formatter.pretty_print() | ||
end | ||
|
||
defp calc_hash(with_triggers) do | ||
sha = :crypto.hash(:sha256, with_triggers) | ||
base16 = Base.encode16(sha) | ||
String.downcase(base16) | ||
end | ||
|
||
defp get_metadata(file_path) do | ||
case File.read(file_path) do | ||
{:ok, body} -> | ||
regex = ~r/ElectricDB Migration[\s]*(.*?)[\s]*\*/ | ||
matches = Regex.run(regex, body) | ||
case Jason.decode(List.last(matches)) do | ||
{:ok, metadata} -> metadata | ||
{:error, reason} -> {:error, reason} | ||
end | ||
{:error, reason} -> {:error, reason} | ||
end | ||
end | ||
|
||
defp file_header(hash, name) do | ||
""" | ||
/* | ||
ElectricDB Migration | ||
{"metadata": {"name": "#{name}", "sha256": "#{hash}"}} | ||
*/ | ||
""" | ||
end | ||
|
||
defp add_triggers_to_migrations(ordered_migration_paths, template) do | ||
## throwing tuples funky! | ||
try do | ||
ordered_migrations = for migration_folder_path <- ordered_migration_paths do | ||
migration_file_path = Path.join([migration_folder_path, @migration_file_name]) | ||
case File.read(migration_file_path) do | ||
{:ok, sql} -> | ||
case validate_sql(sql) do | ||
:ok -> sql | ||
{:error, reason} -> throw({:error, reason}) | ||
end | ||
{:error, reason} -> throw({:error, reason}) | ||
end | ||
end | ||
# needs to fail early so has to start at the first migration and go through | ||
for {migration_folder_path, i} <- Enum.with_index(ordered_migration_paths) do | ||
subset_of_migrations = Enum.take(ordered_migrations, i + 1) | ||
# this is using a migration file path and all the migrations up to, an including this migration | ||
case add_triggers_to_migration_folder(migration_folder_path, subset_of_migrations, template) do | ||
:ok -> :ok | ||
{:error, reason} -> throw({:error, reason}) | ||
end | ||
end | ||
catch | ||
{:error, reason} -> {:error, reason} | ||
end | ||
end | ||
|
||
@doc false | ||
def add_triggers_to_migration_folder(migration_folder_path, ordered_migrations, template) do | ||
migration_name = Path.basename(migration_folder_path) | ||
# migration_file_path = Path.join(migration_folder_path, @migration_file_name) | ||
satellite_file_path = Path.join(migration_folder_path, @satellite_file_name) | ||
# postgres_file_path = Path.join(migration_folder_path, @postgre_file_name) | ||
publish_lock_file_path = Path.join(migration_folder_path, @publish_lock_file_name) | ||
|
||
with_triggers = add_triggers_to_last_migration(ordered_migrations, template) | ||
hash = calc_hash(with_triggers) | ||
|
||
# firstly check to see if this source migration file has already been published | ||
if File.exists?(publish_lock_file_path) do | ||
metadata = get_metadata(satellite_file_path) | ||
if metadata["sha256"] == hash do | ||
# if matches then its a happy no-op | ||
:ok | ||
else | ||
# If a published file has been modified this is an error state and so fail. TODO Need to think what remedy is for dev | ||
{:error, "Migration #{migration_name} has already been published and cannot be changed"} | ||
end | ||
else | ||
# otherwise check for the unpublished, but already decorated one | ||
if File.exists?(satellite_file_path) do | ||
metadata = get_metadata(satellite_file_path) | ||
if metadata["sha256"] == hash do | ||
# if matches then its a happy no-op | ||
:ok | ||
else | ||
# if it exists but doesnt match fail and offer the dev to force it. TODO Need to think what remedy is for dev | ||
{:error, "Migration #{migration_name} already exists locally if you would like to overwrite it ..."} | ||
end | ||
else | ||
# if neither file already exists go ahead and write it | ||
header = file_header(hash, migration_name) | ||
File.write(satellite_file_path, header <> with_triggers) | ||
end | ||
end | ||
end | ||
|
||
@doc false | ||
def add_triggers_to_last_migration(ordered_migrations, template) do | ||
# adds triggers for all tables to the end of the last migration | ||
table_infos = all_tables_info(ordered_migrations) | ||
sql_in = List.last(ordered_migrations) | ||
template_all_the_things(sql_in, table_infos, template) | ||
end | ||
|
||
@doc false | ||
def validate_sql(sql_in) do | ||
{:ok, conn} = Exqlite.Sqlite3.open(":memory:") | ||
case Exqlite.Sqlite3.execute(conn, sql_in) do | ||
:ok -> :ok | ||
{:error, reason} -> {:error, reason} | ||
end | ||
end | ||
|
||
@doc false | ||
def created_table_names(sql_in) do | ||
info = all_tables_info(sql_in) | ||
Map.keys(info) | ||
end | ||
|
||
@doc false | ||
def all_tables_info(all_migrations) do | ||
namespace = "main" | ||
# get all the table names | ||
{:ok, conn} = Exqlite.Sqlite3.open(":memory:") | ||
for migration <- all_migrations do | ||
:ok = Exqlite.Sqlite3.execute(conn, migration) | ||
end | ||
{:ok, statement} = Exqlite.Sqlite3.prepare(conn, "SELECT name, sql FROM sqlite_master WHERE type='table' AND name!='_oplog';") | ||
info = get_rows_while(conn, statement, []) | ||
:ok = Exqlite.Sqlite3.release(conn, statement) | ||
|
||
# for each table | ||
infos = for [table_name, _sql] <- info do | ||
# column names | ||
{:ok, info_statement} = Exqlite.Sqlite3.prepare(conn, "PRAGMA table_info(#{table_name});") | ||
columns = Enum.reverse(get_rows_while(conn, info_statement, [])) | ||
column_names = for [_cid, name, _type, _notnull, _dflt_value, _pk] <- columns do | ||
name | ||
end | ||
# private keys columns | ||
private_key_column_names = for [_cid, name, _type, _notnull, _dflt_value, pk] when pk == 1 <- columns do | ||
name | ||
end | ||
|
||
# foreign keys | ||
{:ok, foreign_statement} = Exqlite.Sqlite3.prepare(conn, "PRAGMA foreign_key_list(#{table_name});") | ||
foreign_keys = get_rows_while(conn, foreign_statement, []) | ||
|
||
foreign_keys = for [_a, _b, parent_table, child_key, parent_key, _c, _d, _e] <- foreign_keys do | ||
%{:child_key => child_key, :parent_key => parent_key, :table => "#{namespace}.#{parent_table}"} | ||
end | ||
|
||
%{:table_name => table_name, | ||
:columns => column_names, | ||
:namespace => namespace, | ||
:primary => private_key_column_names, | ||
:foreign_keys => foreign_keys | ||
} | ||
end | ||
Enum.into(infos, %{}, fn info -> {"#{namespace}.#{info.table_name}", info} end) | ||
end | ||
|
||
defp get_rows_while(conn, statement, rows) do | ||
case Exqlite.Sqlite3.step(conn, statement) do | ||
{:row, row} -> | ||
get_rows_while(conn, statement, [row | rows]) | ||
:done -> rows | ||
end | ||
end | ||
|
||
@doc false | ||
def template_all_the_things(original_sql, tables, template) do | ||
patched_sql = Enum.reduce(tables, original_sql, fn {table_full_name, table}, acc -> String.replace(acc, " #{table.table_name} ", " #{table_full_name} ") end) | ||
{result, _bindings} = Code.eval_quoted(template, | ||
is_init: true, | ||
original_sql: patched_sql, | ||
tables: tables) | ||
result | ||
end | ||
|
||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
<%= original_sql %> | ||
/*--------------------------------------------- | ||
Below are templated triggers added by Satellite | ||
---------------------------------------------*/ | ||
|
||
-- The ops log table | ||
CREATE TABLE IF NOT EXISTS _oplog ( | ||
rowid INTEGER PRIMARY KEY AUTOINCREMENT, | ||
namespace String NOT NULL, | ||
tablename String NOT NULL, | ||
optype String NOT NULL, | ||
primaryKey String NOT NULL, | ||
newRow String, | ||
oldRow String, | ||
timestamp TEXT | ||
); | ||
|
||
-- Somewhere to keep our metadata | ||
CREATE TABLE IF NOT EXISTS _satellite_meta ( | ||
key TEXT, | ||
value TEXT | ||
); | ||
|
||
<%= if is_init do %>--initialisation of the metadata table | ||
INSERT INTO _satellite_meta(key,value) VALUES ('currRowId', '-1'), ('ackRowId','-1'), ('compensations', 0); | ||
<% end %> | ||
-- These are toggles for turning the triggers on and off | ||
|
||
DROP TABLE IF EXISTS trigger_settings; | ||
CREATE TABLE trigger_settings(tablename STRING PRIMARY KEY, flag INTEGER); | ||
<%= for {table_full_name, _table} <- tables do %>INSERT INTO trigger_settings(tablename,flag) VALUES ('<%= table_full_name %>', 1); | ||
<% end %> | ||
<%= for {table_full_name, table} <- tables do %> | ||
/* Triggers for table <%= table.table_name %> */ | ||
|
||
-- Ensures primary key is immutable | ||
DROP TRIGGER IF EXISTS update_ensure_<%= table.namespace %>_<%= table.table_name %>_primarykey; | ||
CREATE TRIGGER update_ensure_<%= table.namespace %>_<%= table.table_name %>_primarykey | ||
BEFORE UPDATE ON <%= table_full_name %> | ||
BEGIN | ||
SELECT | ||
CASE<%= for col <- table.primary do %> | ||
WHEN old.<%= col %> != new.<%= col %> THEN | ||
RAISE (ABORT,'cannot change the value of column <%= col %> as it belongs to the primary key')<% end %> | ||
END; | ||
END; | ||
|
||
-- Triggers that add INSERT, UPDATE, DELETE operation to the _opslog table | ||
|
||
DROP TRIGGER IF EXISTS insert_<%= table.namespace %>_<%= table.table_name %>_into_oplog; | ||
CREATE TRIGGER insert_<%= table.namespace %>_<%= table.table_name %>_into_oplog | ||
AFTER INSERT ON <%= table_full_name %> | ||
WHEN 1 == (SELECT flag from trigger_settings WHERE tablename == '<%= table_full_name %>') | ||
BEGIN | ||
INSERT INTO _oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp) | ||
VALUES ('<%= table.namespace %>', '<%= table.table_name %>', 'INSERT', json_object(<%= Enum.join(for col <- table.primary do "'#{col}', new.#{col}" end, ", ") %>), json_object(<%= Enum.join(for col <- table[:columns] do "'#{col}', new.#{col}" end, ", ") %>), NULL, NULL); | ||
END; | ||
|
||
DROP TRIGGER IF EXISTS update_<%= table.namespace %>_<%= table.table_name %>_into_oplog; | ||
CREATE TRIGGER update_<%= table.namespace %>_<%= table.table_name %>_into_oplog | ||
AFTER UPDATE ON <%= table_full_name %> | ||
WHEN 1 == (SELECT flag from trigger_settings WHERE tablename == '<%= table_full_name %>') | ||
BEGIN | ||
INSERT INTO _oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp) | ||
VALUES ('<%= table.namespace %>', '<%= table.table_name %>', 'UPDATE', json_object(<%= Enum.join(for col <- table.primary do "'#{col}', new.#{col}" end, ", ") %>), json_object(<%= Enum.join(for col <- table.columns do "'#{col}', new.#{col}" end, ", ") %>), json_object(<%= Enum.join(for col <- table.columns do "'#{col}', old.#{col}" end, ", ") %>), NULL); | ||
END; | ||
|
||
DROP TRIGGER IF EXISTS delete_<%= table.namespace %>_<%= table.table_name %>_into_oplog; | ||
CREATE TRIGGER delete_<%= table.namespace %>_<%= table.table_name %>_into_oplog | ||
AFTER DELETE ON <%= table_full_name %> | ||
WHEN 1 == (SELECT flag from trigger_settings WHERE tablename == '<%= table_full_name %>') | ||
BEGIN | ||
INSERT INTO _oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp) | ||
VALUES ('<%= table.namespace %>', '<%= table.table_name %>', 'DELETE', json_object(<%= Enum.join(for col <- table.primary do "'#{col}', old.#{col}" end, ", ") %>), NULL, json_object(<%= Enum.join(for col <- table.columns do "'#{col}', old.#{col}" end, ", ") %>), NULL); | ||
END; | ||
|
||
<%= if Enum.count(table.foreign_keys) > 0 do %>-- Triggers for foreign key compensations<% end %> | ||
<%= for foreign_key <- table.foreign_keys do %> | ||
DROP TRIGGER IF EXISTS compensation_insert_<%= table.namespace %>_<%= table.table_name %>_<%= foreign_key.child_key %>_into_oplog; | ||
CREATE TRIGGER compensation_insert_<%= table.namespace %>_<%= table.table_name %>_<%= foreign_key.child_key %>_into_oplog | ||
AFTER INSERT ON <%= table_full_name %> | ||
WHEN 1 == (SELECT flag from trigger_settings WHERE tablename == '<%= tables[foreign_key.table].namespace %>.<%= tables[foreign_key.table].table_name %>') AND | ||
1 == (SELECT value from _satellite_meta WHERE key == 'compensations') | ||
BEGIN | ||
INSERT INTO _oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp) | ||
SELECT '<%= tables[foreign_key.table].namespace %>', '<%= tables[foreign_key.table].table_name %>', 'UPDATE', json_object(<%= Enum.join(for col <- tables[foreign_key.table].primary do "'#{col}', #{col}" end, ", ") %>), json_object(<%= Enum.join(for col <- tables[foreign_key.table].columns do "'#{col}', #{col}" end, ", ") %>), NULL, NULL | ||
FROM <%= tables[foreign_key.table].namespace %>.<%= tables[foreign_key.table].table_name %> WHERE <%= foreign_key.parent_key %> = new.<%= foreign_key.child_key %>; | ||
END; | ||
|
||
DROP TRIGGER IF EXISTS compensation_update_<%= table.namespace %>_<%= table.table_name %>_<%= foreign_key.child_key %>_into_oplog; | ||
CREATE TRIGGER compensation_update_<%= table.namespace %>_<%= table.table_name %>_<%= foreign_key.child_key %>_into_oplog | ||
AFTER UPDATE ON <%= table.namespace %>.<%= table.table_name %> | ||
WHEN 1 == (SELECT flag from trigger_settings WHERE tablename == '<%= tables[foreign_key.table].namespace %>.<%= tables[foreign_key.table].table_name %>') AND | ||
1 == (SELECT value from _satellite_meta WHERE key == 'compensations') | ||
BEGIN | ||
INSERT INTO _oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp) | ||
SELECT '<%= tables[foreign_key.table].namespace %>', '<%= tables[foreign_key.table].table_name %>', 'UPDATE', json_object(<%= Enum.join(for col <- tables[foreign_key.table].primary do "'#{col}', #{col}" end, ", ") %>), json_object(<%= Enum.join(for col <- tables[foreign_key.table].columns do "'#{col}', #{col}" end, ", ") %>), NULL, NULL | ||
FROM <%= tables[foreign_key.table].namespace %>.<%= tables[foreign_key.table].table_name %> WHERE <%= foreign_key.parent_key %> = new.<%= foreign_key.child_key %>; | ||
END; | ||
<% end %> | ||
<% end %> |
Oops, something went wrong.