Skip to content

Commit

Permalink
Merge pull request #51 from dwyl/update
Browse files Browse the repository at this point in the history
Update
  • Loading branch information
RobStallion authored Mar 13, 2019
2 parents fd63ef5 + 052bd7e commit e0e08be
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 92 deletions.
59 changes: 13 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,59 +1,26 @@
# alog
alog (Append-only Log) is an easy way to start using the Lambda/Kappa architecture in your Elixir/Phoenix Apps while still using PostgreSQL (with Ecto).

This module provides some helper functions to make it easy to insert and retrieve the data you need.
This module is an Ecto Adapter that extends the default Postgres adapter with functionality to ensure data is only ever appended, never deleted or edited.

## Usage

At the top of the schema you wish to use append only functions for, `use` this module:
In your Repo module, when defining your Ecto Repo, set the adapter to be this module, Alog

``` elixir
use Alog
```

The append only functions will then be available to call as part of your schema.

## Example

``` elixir
defmodule MyApp.User do
use Ecto.Schema
use Alog

import Ecto.Changeset

schema "users" do
...
end

def changeset(user, attrs) do
...
end
``` elixir
defmodule MyApp.Repo do
use Ecto.Repo,
otp_app: :my_app,
adapter: Alog
end
```

## Repo

You can set the repo you want Alog to use in a config file:

``` elixir
config :alog, Alog,
repo: MyApp.Repo
```

If you do not explicitly set a Repo, Alog will try to find it using your application name.
So if your app is `MyApp` and your schema is `MyApp.User`, or `MyApp.Accounts.User`, your Repo should be `MyApp.Repo`.

## Uniqueness
```

Due to the append only manner in which Alog stores data, it is not compatible with tables that have Unique Indexes applied to any of their columns. If you wish to use alog, you will have to remove these indexes.
## Considerations

For example, the following in a migration file would remove a unique index on the `email` column from the `users` table.
- When inserting or updating an item, the return value of the insert/update function is currently incorrect. The updates and inserts are however done correctly, as you will see if you get all items from the database using `Repo.all`.

```
drop(unique_index(:users, :email))
```
- We exclude the `schema_migrations` file from all alog functions, instead forwarding them on to the original Postgres Adapter.

See https://hexdocs.pm/ecto_sql/Ecto.Migration.html#content for more details.
- The autogenerated cid is used as the primary key. There is no way currently to define a custom primary key.

If you want to ensure each entry in your database has a unique field, you can use the [`Ecto.Changeset.unique_constraint/3`](https://hexdocs.pm/ecto/Ecto.Changeset.html#unique_constraint/3) function as normal, and Alog will ensure there are no repeated fields, other than those of the same entry, returning an invalid changeset if there are.
Hopefully these issues can later be resolved by looking at defining/extending our own version of the the `Ecto.Schema` macro.
85 changes: 81 additions & 4 deletions lib/alog.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Alog do
alias Ecto.Adapters.Postgres, as: EAP

@behaviour Ecto.Adapter.Storage
@behaviour Ecto.Adapter.Schema

@impl true
def supports_ddl_transaction?, do: true
Expand All @@ -16,12 +17,76 @@ defmodule Alog do
@impl true
defdelegate storage_down(opts), to: EAP

@impl true
defdelegate structure_dump(default, config), to: EAP

@impl true
defdelegate structure_load(default, config), to: EAP

@impl true
def update(adapter_meta, %{source: source, prefix: prefix}, fields, params, returning, opts) do
cid = Keyword.get(params, :cid)
query = "SELECT * FROM #{source} where cid='#{cid}'"
{:ok, old} = Ecto.Adapters.SQL.query(adapter_meta, query, [])

new_params =
Enum.with_index(old.columns)
|> Enum.map(fn {c, i} ->
case Keyword.get(fields, String.to_existing_atom(c)) do
_ when c == "cid" ->
nil

nil ->
{String.to_existing_atom(c), old.rows |> List.first() |> Enum.at(i)}

new ->
{String.to_existing_atom(c), new}
end
end)
|> Enum.filter(&(not is_nil(&1)))
|> Keyword.new()

insert(
adapter_meta,
%{source: source, prefix: prefix},
new_params,
{:raise, [], []},
returning,
opts
)
end

@impl true
def autogenerate(:binary_id), do: nil

@impl true
def loaders(:binary_id, type), do: [:binary, type]
def loaders(_primitive, type), do: [type]

@impl true
def dumpers(:binary_id, type), do: [:binary, type]
def dumpers(_primitive, type), do: [type]

# overrides insert/6 defined in Ecto.Adapters.SQL
def insert(adapter_meta, %{source: "schema_migrations", prefix: prefix}, params, on_conflict, returning, opts) do
def insert(
adapter_meta,
%{source: "schema_migrations", prefix: prefix},
params,
on_conflict,
returning,
opts
) do
insert_logic(adapter_meta, "schema_migrations", prefix, params, on_conflict, returning, opts)
end

def insert(adapter_meta, %{source: source, prefix: prefix}, params, on_conflict, returning, opts) do
def insert(
adapter_meta,
%{source: source, prefix: prefix},
params,
on_conflict,
returning,
opts
) do
# converts params from a keyword list to a map
params_map = Enum.into(params, %{})

Expand Down Expand Up @@ -54,7 +119,7 @@ defmodule Alog do
if results.num_rows == 0 do
entry_id
else
create_entry_id(source, adapter_meta, cid, n+1)
create_entry_id(source, adapter_meta, cid, n + 1)
end
end

Expand All @@ -69,6 +134,18 @@ defmodule Alog do
{kind, conflict_params, _} = on_conflict
{fields, values} = :lists.unzip(params)
sql = Alog.Connection.insert(prefix, source, fields, [fields], on_conflict, returning)
Ecto.Adapters.SQL.struct(adapter_meta, Alog.Connection, sql, :insert, source, [], values ++ conflict_params, kind, returning, opts)

Ecto.Adapters.SQL.struct(
adapter_meta,
Alog.Connection,
sql,
:insert,
source,
[],
values ++ conflict_params,
kind,
returning,
opts
)
end
end
81 changes: 75 additions & 6 deletions lib/alog/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ defmodule Alog.Connection do
@impl true
defdelegate to_constraints(error_struct), to: EAPC

@impl true
defdelegate ddl_logs(result), to: EAPC

@impl true
def all(query) do
iodata_query = EAPC.all(query)
Expand Down Expand Up @@ -86,23 +89,89 @@ defmodule Alog.Connection do
end

@impl true
defdelegate update_all(query, prefix \\ nil), to: EAPC
def execute_ddl({c, %Ecto.Migration.Table{} = table, columns})
when c in [:create, :create_if_not_exists] do
# TODO: need to determine if migration_source has been set in config
# else name is :schema_migrations
with name when name != :schema_migrations <- Map.get(table, :name),
true <-
Enum.any?(
columns,
fn
{:add, field, _type, [primary_key: true]} when field != :cid -> true
_ -> false
end
) do
raise ArgumentError, "you cannot add a primary key"
else
:schema_migrations ->
EAPC.execute_ddl({c, table, columns})

_ ->
EAPC.execute_ddl({c, table, update_columns(columns)})
end
end

def execute_ddl({:alter, %Ecto.Migration.Table{}, changes} = command) do
with :ok <-
Enum.each(
changes,
fn
{:remove, :cid, _, _} ->
raise ArgumentError, "you cannot remove cid"

{_, _, _, [primary_key: true]} ->
raise ArgumentError, "you cannot add a primary key"

_ ->
nil
end
) do
EAPC.execute_ddl(command)
end
end

def execute_ddl({c, %Ecto.Migration.Index{unique: true}})
when c in [:create, :create_if_not_exists] do
raise ArgumentError, "you cannot create a unique index"
end

defdelegate execute_ddl(command), to: EAPC

# Add required columns if they are missing
defp update_columns(columns) do
[
{:add, :cid, :binary, [primary_key: true]},
{:add, :entry_id, :string, [null: false]},
{:add, :deleted, :boolean, [default: false]},
{:add, :inserted_at, :naive_datetime_usec, [null: false]},
{:add, :updated_at, :naive_datetime_usec, [null: false]}
]
|> Enum.reduce(columns, fn {_, c, _, _} = col, acc ->
case Enum.find(acc, fn {_, a, _, _} -> a == c end) do
nil -> acc ++ [col]
_ -> acc
end
end)
end

# Temporary delegate functions to make tests work

@impl true
defdelegate delete_all(query), to: EAPC
defdelegate all(query), to: EAPC

@impl true
defdelegate insert(prefix, table, header, rows, on_conflict, returning), to: EAPC

@impl true
defdelegate update(prefix, table, fields, filters, returning), to: EAPC
defdelegate delete_all(query), to: EAPC

@impl true
defdelegate delete(prefix, table, filters, returning), to: EAPC
defdelegate update(prefix, table, fields, filters, returning), to: EAPC

@impl true
defdelegate execute_ddl(arg), to: EAPC
defdelegate delete(prefix, table, filters, returning), to: EAPC

@impl true
defdelegate ddl_logs(result), to: EAPC
defdelegate update_all(query, prefix \\ nil), to: EAPC
end
1 change: 1 addition & 0 deletions test/insert_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ defmodule AlogTest.InsertTest do
# assert User.get(user.entry_id) == user
# end
end

#
# describe "insert/1 - with struct:" do
# test "succeeds" do
Expand Down
49 changes: 13 additions & 36 deletions test/update_test.exs
Original file line number Diff line number Diff line change
@@ -1,38 +1,15 @@
defmodule AlogTest.UpdateTest do
# use Alog.TestApp.DataCase
#
# alias Alog.TestApp.{User, Helpers}
#
# describe "update/2:" do
# test "succeeds" do
# {:ok, user} = %User{} |> User.changeset(Helpers.user_1_params()) |> User.insert()
#
# assert {:ok, updated_user} = user |> User.changeset(%{postcode: "W2 3EC"}) |> User.update()
# end
#
# test "updates" do
# {:ok, user} = %User{} |> User.changeset(Helpers.user_1_params()) |> User.insert()
#
# {:ok, updated_user} = user |> User.changeset(%{postcode: "W2 3EC"}) |> User.update()
#
# assert updated_user.postcode == "W2 3EC"
# end
#
# test "'get' returns most recently updated item" do
# {:ok, user} = %User{} |> User.changeset(Helpers.user_1_params()) |> User.insert()
#
# {:ok, updated_user} = user |> User.changeset(%{postcode: "W2 3EC"}) |> User.update()
#
# assert User.get(user.entry_id) |> User.preload(:items) == updated_user
# assert User.get(user.entry_id).postcode == "W2 3EC"
# end
#
# test "associations remain after update" do
# {:ok, user, _item} = Helpers.seed_data()
#
# {:ok, _updated_user} = user |> User.changeset(%{postcode: "W2 3EC"}) |> User.update()
#
# assert User.get(user.entry_id) |> User.preload(:items) |> Map.get(:items) |> length == 1
# end
# end
use Alog.TestApp.DataCase

alias Alog.TestApp.{Comment, Helpers}

test "adds new record" do
{:ok, _} = Repo.insert(%Comment{} |> Comment.changeset(%{comment: "hi"}))

[c | []] = Repo.all(Comment)

{:ok, _} = Repo.update(Comment.changeset(c, %{comment: "hello"}))

assert Repo.all(Comment) |> length == 2
end
end

0 comments on commit e0e08be

Please sign in to comment.