diff --git a/README.md b/README.md index 4ef930a..0d99952 100644 --- a/README.md +++ b/README.md @@ -1,59 +1,26 @@ # alog alog (Append-only Log) is an easy way to start using the Lambda/Kappa architecture in your Elixir/Phoenix Apps while still using PostgreSQL (with Ecto). -This module provides some helper functions to make it easy to insert and retrieve the data you need. +This module is an Ecto Adapter that extends the default Postgres adapter with functionality to ensure data is only ever appended, never deleted or edited. ## Usage - At the top of the schema you wish to use append only functions for, `use` this module: + In your Repo module, when defining your Ecto Repo, set the adapter to be this module, Alog - ``` elixir - use Alog - ``` - - The append only functions will then be available to call as part of your schema. - - ## Example - - ``` elixir - defmodule MyApp.User do - use Ecto.Schema - use Alog - - import Ecto.Changeset - - schema "users" do - ... - end - - def changeset(user, attrs) do - ... - end + ``` elixir + defmodule MyApp.Repo do + use Ecto.Repo, + otp_app: :my_app, + adapter: Alog end - ``` - - ## Repo - - You can set the repo you want Alog to use in a config file: - - ``` elixir - config :alog, Alog, - repo: MyApp.Repo - ``` - - If you do not explicitly set a Repo, Alog will try to find it using your application name. - So if your app is `MyApp` and your schema is `MyApp.User`, or `MyApp.Accounts.User`, your Repo should be `MyApp.Repo`. - - ## Uniqueness + ``` - Due to the append only manner in which Alog stores data, it is not compatible with tables that have Unique Indexes applied to any of their columns. If you wish to use alog, you will have to remove these indexes. +## Considerations - For example, the following in a migration file would remove a unique index on the `email` column from the `users` table. +- When inserting or updating an item, the return value of the insert/update function is currently incorrect. The updates and inserts are however done correctly, as you will see if you get all items from the database using `Repo.all`. - ``` - drop(unique_index(:users, :email)) - ``` +- We exclude the `schema_migrations` file from all alog functions, instead forwarding them on to the original Postgres Adapter. - See https://hexdocs.pm/ecto_sql/Ecto.Migration.html#content for more details. +- The autogenerated cid is used as the primary key. There is no way currently to define a custom primary key. - If you want to ensure each entry in your database has a unique field, you can use the [`Ecto.Changeset.unique_constraint/3`](https://hexdocs.pm/ecto/Ecto.Changeset.html#unique_constraint/3) function as normal, and Alog will ensure there are no repeated fields, other than those of the same entry, returning an invalid changeset if there are. \ No newline at end of file +Hopefully these issues can later be resolved by looking at defining/extending our own version of the the `Ecto.Schema` macro. \ No newline at end of file diff --git a/lib/alog.ex b/lib/alog.ex index bdff4b5..f46b3be 100644 --- a/lib/alog.ex +++ b/lib/alog.ex @@ -6,6 +6,7 @@ defmodule Alog do alias Ecto.Adapters.Postgres, as: EAP @behaviour Ecto.Adapter.Storage + @behaviour Ecto.Adapter.Schema @impl true def supports_ddl_transaction?, do: true @@ -16,12 +17,76 @@ defmodule Alog do @impl true defdelegate storage_down(opts), to: EAP + @impl true + defdelegate structure_dump(default, config), to: EAP + + @impl true + defdelegate structure_load(default, config), to: EAP + + @impl true + def update(adapter_meta, %{source: source, prefix: prefix}, fields, params, returning, opts) do + cid = Keyword.get(params, :cid) + query = "SELECT * FROM #{source} where cid='#{cid}'" + {:ok, old} = Ecto.Adapters.SQL.query(adapter_meta, query, []) + + new_params = + Enum.with_index(old.columns) + |> Enum.map(fn {c, i} -> + case Keyword.get(fields, String.to_existing_atom(c)) do + _ when c == "cid" -> + nil + + nil -> + {String.to_existing_atom(c), old.rows |> List.first() |> Enum.at(i)} + + new -> + {String.to_existing_atom(c), new} + end + end) + |> Enum.filter(&(not is_nil(&1))) + |> Keyword.new() + + insert( + adapter_meta, + %{source: source, prefix: prefix}, + new_params, + {:raise, [], []}, + returning, + opts + ) + end + + @impl true + def autogenerate(:binary_id), do: nil + + @impl true + def loaders(:binary_id, type), do: [:binary, type] + def loaders(_primitive, type), do: [type] + + @impl true + def dumpers(:binary_id, type), do: [:binary, type] + def dumpers(_primitive, type), do: [type] + # overrides insert/6 defined in Ecto.Adapters.SQL - def insert(adapter_meta, %{source: "schema_migrations", prefix: prefix}, params, on_conflict, returning, opts) do + def insert( + adapter_meta, + %{source: "schema_migrations", prefix: prefix}, + params, + on_conflict, + returning, + opts + ) do insert_logic(adapter_meta, "schema_migrations", prefix, params, on_conflict, returning, opts) end - def insert(adapter_meta, %{source: source, prefix: prefix}, params, on_conflict, returning, opts) do + def insert( + adapter_meta, + %{source: source, prefix: prefix}, + params, + on_conflict, + returning, + opts + ) do # converts params from a keyword list to a map params_map = Enum.into(params, %{}) @@ -54,7 +119,7 @@ defmodule Alog do if results.num_rows == 0 do entry_id else - create_entry_id(source, adapter_meta, cid, n+1) + create_entry_id(source, adapter_meta, cid, n + 1) end end @@ -69,6 +134,18 @@ defmodule Alog do {kind, conflict_params, _} = on_conflict {fields, values} = :lists.unzip(params) sql = Alog.Connection.insert(prefix, source, fields, [fields], on_conflict, returning) - Ecto.Adapters.SQL.struct(adapter_meta, Alog.Connection, sql, :insert, source, [], values ++ conflict_params, kind, returning, opts) + + Ecto.Adapters.SQL.struct( + adapter_meta, + Alog.Connection, + sql, + :insert, + source, + [], + values ++ conflict_params, + kind, + returning, + opts + ) end end diff --git a/lib/alog/connection.ex b/lib/alog/connection.ex index 7e9e81e..2fc173b 100644 --- a/lib/alog/connection.ex +++ b/lib/alog/connection.ex @@ -26,6 +26,9 @@ defmodule Alog.Connection do @impl true defdelegate to_constraints(error_struct), to: EAPC + @impl true + defdelegate ddl_logs(result), to: EAPC + @impl true def all(query) do iodata_query = EAPC.all(query) @@ -86,23 +89,89 @@ defmodule Alog.Connection do end @impl true - defdelegate update_all(query, prefix \\ nil), to: EAPC + def execute_ddl({c, %Ecto.Migration.Table{} = table, columns}) + when c in [:create, :create_if_not_exists] do + # TODO: need to determine if migration_source has been set in config + # else name is :schema_migrations + with name when name != :schema_migrations <- Map.get(table, :name), + true <- + Enum.any?( + columns, + fn + {:add, field, _type, [primary_key: true]} when field != :cid -> true + _ -> false + end + ) do + raise ArgumentError, "you cannot add a primary key" + else + :schema_migrations -> + EAPC.execute_ddl({c, table, columns}) + + _ -> + EAPC.execute_ddl({c, table, update_columns(columns)}) + end + end + + def execute_ddl({:alter, %Ecto.Migration.Table{}, changes} = command) do + with :ok <- + Enum.each( + changes, + fn + {:remove, :cid, _, _} -> + raise ArgumentError, "you cannot remove cid" + + {_, _, _, [primary_key: true]} -> + raise ArgumentError, "you cannot add a primary key" + + _ -> + nil + end + ) do + EAPC.execute_ddl(command) + end + end + + def execute_ddl({c, %Ecto.Migration.Index{unique: true}}) + when c in [:create, :create_if_not_exists] do + raise ArgumentError, "you cannot create a unique index" + end + + defdelegate execute_ddl(command), to: EAPC + + # Add required columns if they are missing + defp update_columns(columns) do + [ + {:add, :cid, :binary, [primary_key: true]}, + {:add, :entry_id, :string, [null: false]}, + {:add, :deleted, :boolean, [default: false]}, + {:add, :inserted_at, :naive_datetime_usec, [null: false]}, + {:add, :updated_at, :naive_datetime_usec, [null: false]} + ] + |> Enum.reduce(columns, fn {_, c, _, _} = col, acc -> + case Enum.find(acc, fn {_, a, _, _} -> a == c end) do + nil -> acc ++ [col] + _ -> acc + end + end) + end + + # Temporary delegate functions to make tests work @impl true - defdelegate delete_all(query), to: EAPC + defdelegate all(query), to: EAPC @impl true defdelegate insert(prefix, table, header, rows, on_conflict, returning), to: EAPC @impl true - defdelegate update(prefix, table, fields, filters, returning), to: EAPC + defdelegate delete_all(query), to: EAPC @impl true - defdelegate delete(prefix, table, filters, returning), to: EAPC + defdelegate update(prefix, table, fields, filters, returning), to: EAPC @impl true - defdelegate execute_ddl(arg), to: EAPC + defdelegate delete(prefix, table, filters, returning), to: EAPC @impl true - defdelegate ddl_logs(result), to: EAPC + defdelegate update_all(query, prefix \\ nil), to: EAPC end diff --git a/test/insert_test.exs b/test/insert_test.exs index d3059aa..de8ea22 100644 --- a/test/insert_test.exs +++ b/test/insert_test.exs @@ -36,6 +36,7 @@ defmodule AlogTest.InsertTest do # assert User.get(user.entry_id) == user # end end + # # describe "insert/1 - with struct:" do # test "succeeds" do diff --git a/test/update_test.exs b/test/update_test.exs index bd22a9f..1a7d845 100644 --- a/test/update_test.exs +++ b/test/update_test.exs @@ -1,38 +1,15 @@ defmodule AlogTest.UpdateTest do - # use Alog.TestApp.DataCase - # - # alias Alog.TestApp.{User, Helpers} - # - # describe "update/2:" do - # test "succeeds" do - # {:ok, user} = %User{} |> User.changeset(Helpers.user_1_params()) |> User.insert() - # - # assert {:ok, updated_user} = user |> User.changeset(%{postcode: "W2 3EC"}) |> User.update() - # end - # - # test "updates" do - # {:ok, user} = %User{} |> User.changeset(Helpers.user_1_params()) |> User.insert() - # - # {:ok, updated_user} = user |> User.changeset(%{postcode: "W2 3EC"}) |> User.update() - # - # assert updated_user.postcode == "W2 3EC" - # end - # - # test "'get' returns most recently updated item" do - # {:ok, user} = %User{} |> User.changeset(Helpers.user_1_params()) |> User.insert() - # - # {:ok, updated_user} = user |> User.changeset(%{postcode: "W2 3EC"}) |> User.update() - # - # assert User.get(user.entry_id) |> User.preload(:items) == updated_user - # assert User.get(user.entry_id).postcode == "W2 3EC" - # end - # - # test "associations remain after update" do - # {:ok, user, _item} = Helpers.seed_data() - # - # {:ok, _updated_user} = user |> User.changeset(%{postcode: "W2 3EC"}) |> User.update() - # - # assert User.get(user.entry_id) |> User.preload(:items) |> Map.get(:items) |> length == 1 - # end - # end + use Alog.TestApp.DataCase + + alias Alog.TestApp.{Comment, Helpers} + + test "adds new record" do + {:ok, _} = Repo.insert(%Comment{} |> Comment.changeset(%{comment: "hi"})) + + [c | []] = Repo.all(Comment) + + {:ok, _} = Repo.update(Comment.changeset(c, %{comment: "hello"})) + + assert Repo.all(Comment) |> length == 2 + end end