Skip to content

Commit

Permalink
merges with insert
Browse files Browse the repository at this point in the history
  • Loading branch information
Danwhy committed Feb 28, 2019
2 parents 7735134 + 44f96fc commit 19f0529
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 22 deletions.
94 changes: 88 additions & 6 deletions lib/alog.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,27 @@ defmodule Alog do
driver: :postgrex,
migration_lock: "FOR UPDATE"

@impl true
def supports_ddl_transaction? do
true
end
alias Ecto.Adapters.Postgres, as: EAP

@behaviour Ecto.Adapter.Storage
@behaviour Ecto.Adapter.Schema

# Why did we define our own version of this function?
# Sorry if I have missed something that has been explained already.
@impl true
def supports_ddl_transaction?, do: true

@impl true
defdelegate storage_up(opts), to: Ecto.Adapters.Postgres
defdelegate storage_up(opts), to: EAP

@impl true
defdelegate storage_down(opts), to: Ecto.Adapters.Postgres
defdelegate storage_down(opts), to: EAP

@impl true
defdelegate structure_dump(default, config), to: EAP

@impl true
defdelegate structure_load(default, config), to: EAP

@impl true
def update(adapter_meta, %{source: source, prefix: prefix}, fields, params, returning, opts) do
Expand Down Expand Up @@ -60,4 +68,78 @@ defmodule Alog do
@impl true
def dumpers(:binary_id, type), do: [:binary, type]
def dumpers(_primitive, type), do: [type]

# overrides insert/6 defined in Ecto.Adapters.SQL
@impl true
def insert(
adapter_meta,
%{source: source, prefix: prefix},
params,
on_conflict,
returning,
opts
) do
# converts params from a keyword list to a map
params_map = Enum.into(params, %{})

# removes inserted_at and updated_at from map (will not error if keys are not in map)
map_for_cid = Map.drop(params_map, [:inserted_at, :updated_at])

# creates a cid from the map witout the inserted_at and updated_at_values
cid = Cid.cid(map_for_cid)

# creates a unique entry_id for the data based on the CID generated
entry_id = create_entry_id(source, adapter_meta, cid, 2)

# updates params to ensure that timestamps, cid, and entry_id are all added.
# then converts the map back into a list for use in existing functionality (original format)
params =
map_for_cid
|> add_timestamps()
# <==== Should this be Map.put(:id, cid)??????????
|> Map.put(:cid, cid)
|> Map.put(:entry_id, entry_id)
|> Enum.into([])

{kind, conflict_params, _} = on_conflict
{fields, values} = :lists.unzip(params)
sql = @conn.insert(prefix, source, fields, [fields], on_conflict, returning)

Ecto.Adapters.SQL.struct(
adapter_meta,
@conn,
sql,
:insert,
source,
[],
values ++ conflict_params,
kind,
returning,
opts
)
end

# I think that this step need to also make sure that the data is not an exact copy.
# if the full cid already exists then this is duplicate data.
# Should we insert duplicate data.
# i was thinking maybe if it was existing data but not the most recent data we should re-insert the data
# e.g. if the comment was hi, edited to hey, and then changed back to hi.
defp create_entry_id(source, adapter_meta, cid, n) do
entry_id = String.slice(cid, 0..n)
entry_id_query = "SELECT * FROM #{source} where entry_id='#{entry_id}'"
{:ok, results} = Ecto.Adapters.SQL.query(adapter_meta, entry_id_query, [])

if results.num_rows == 0 do
entry_id
else
create_entry_id(source, adapter_meta, cid, n + 1)
end
end

defp add_timestamps(params) do
params
|> Enum.into(%{})
|> Map.put_new(:inserted_at, NaiveDateTime.utc_now())
|> Map.put_new(:updated_at, NaiveDateTime.utc_now())
end
end
52 changes: 37 additions & 15 deletions lib/alog/connection.ex
Original file line number Diff line number Diff line change
@@ -1,22 +1,44 @@
defmodule Alog.Connection do
alias Ecto.Adapters.Postgres.Connection, as: EAPC

@behaviour Ecto.Adapters.SQL.Connection
@default_port 5432

@impl true
def child_spec(opts) do
opts
|> Keyword.put_new(:port, @default_port)
|> Postgrex.child_spec()
end

@impl true
defdelegate prepare_execute(conn, name, statement, params, opts), to: EAPC

@impl true
defdelegate execute(conn, query, params, opts), to: EAPC

@impl true
defdelegate query(conn, statement, params, opts), to: EAPC

@impl true
defdelegate stream(conn, statement, params, opts), to: EAPC

@impl true
defdelegate child_spec(opts), to: Ecto.Adapters.Postgres.Connection
defdelegate to_constraints(error_struct), to: EAPC

@impl true
defdelegate ddl_logs(result), to: Ecto.Adapters.Postgres.Connection
defdelegate ddl_logs(result), to: EAPC

@impl true
defdelegate prepare_execute(connection, name, statement, params, options),
to: Ecto.Adapters.Postgres.Connection
to: EAPC

@impl true
defdelegate query(connection, statement, params, options), to: Ecto.Adapters.Postgres.Connection
defdelegate query(connection, statement, params, options), to: EAPC

@impl true
defdelegate stream(connection, statement, params, options),
to: Ecto.Adapters.Postgres.Connection
to: EAPC

@impl true
def execute_ddl({c, %Ecto.Migration.Table{} = table, columns} = command)
Expand All @@ -35,10 +57,10 @@ defmodule Alog.Connection do
raise ArgumentError, "you cannot add a primary key"
else
:schema_migrations ->
Ecto.Adapters.Postgres.Connection.execute_ddl({c, table, columns})
EAPC.execute_ddl({c, table, columns})

_ ->
Ecto.Adapters.Postgres.Connection.execute_ddl({c, table, update_columns(columns)})
EAPC.execute_ddl({c, table, update_columns(columns)})
end
end

Expand All @@ -57,7 +79,7 @@ defmodule Alog.Connection do
nil
end
) do
Ecto.Adapters.Postgres.Connection.execute_ddl(command)
EAPC.execute_ddl(command)
end
end

Expand All @@ -66,7 +88,7 @@ defmodule Alog.Connection do
raise ArgumentError, "you cannot create a unique index"
end

defdelegate execute_ddl(command), to: Ecto.Adapters.Postgres.Connection
defdelegate execute_ddl(command), to: EAPC

# Add required columns if they are missing
defp update_columns(columns) do
Expand All @@ -88,20 +110,20 @@ defmodule Alog.Connection do
# Temporary delegate functions to make tests work

@impl true
defdelegate all(a), to: Ecto.Adapters.Postgres.Connection
defdelegate all(query), to: EAPC

@impl true
defdelegate insert(a, b, c, d, e, f), to: Ecto.Adapters.Postgres.Connection
defdelegate insert(prefix, table, header, rows, on_conflict, returning), to: EAPC

@impl true
defdelegate execute(a, b, c, d), to: Ecto.Adapters.Postgres.Connection
defdelegate delete_all(query), to: EAPC

@impl true
defdelegate delete_all(a), to: Ecto.Adapters.Postgres.Connection
defdelegate update(prefix, table, fields, filters, returning), to: EAPC

@impl true
defdelegate to_constraints(a), to: Ecto.Adapters.Postgres.Connection
defdelegate delete(prefix, table, filters, returning), to: EAPC

@impl true
defdelegate update(a, b, c, d, e), to: Ecto.Adapters.Postgres.Connection
defdelegate update_all(query, prefix \\ nil), to: EAPC
end
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ defmodule Alog.MixProject do
defp deps do
[
{:ecto_sql, "~> 3.0.5"},
{:postgrex, ">= 0.0.0"}
{:postgrex, ">= 0.0.0"},
{:excid, "~> 0.1.0"}
]
end
end
4 changes: 4 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
%{
"b58": {:hex, :b58, "0.1.1", "950563b90ad95be214143aa807d16c98d2ac7bab915097cb1461c936128c6b90", [:mix], [], "hexpm"},
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm"},
"db_connection": {:hex, :db_connection, "2.0.5", "ddb2ba6761a08b2bb9ca0e7d260e8f4dd39067426d835c24491a321b7f92a4da", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"},
"decimal": {:hex, :decimal, "1.6.0", "bfd84d90ff966e1f5d4370bdd3943432d8f65f07d3bab48001aebd7030590dcc", [:mix], [], "hexpm"},
"ecto": {:hex, :ecto, "3.0.7", "44dda84ac6b17bbbdeb8ac5dfef08b7da253b37a453c34ab1a98de7f7e5fec7f", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm"},
"ecto_sql": {:hex, :ecto_sql, "3.0.5", "7e44172b4f7aca4469f38d7f6a3da394dbf43a1bcf0ca975e958cb957becd74e", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.0.6", [hex: :ecto, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.9.1", [hex: :mariaex, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.14.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.3.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"ex_multihash": {:hex, :ex_multihash, "2.0.0", "7fb36f842a2ec1c6bbba550f28fcd16d3c62981781b9466c9c1975c43d7db43c", [:mix], [], "hexpm"},
"excid": {:hex, :excid, "0.1.0", "85fd8ac6466660bc8c4c0b52275e7a255389c683a18f70aaed8cd2ac7735cfab", [:mix], [{:b58, "~> 0.1.1", [hex: :b58, repo: "hexpm", optional: false]}, {:ex_multihash, "~> 2.0", [hex: :ex_multihash, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
"postgrex": {:hex, :postgrex, "0.14.1", "63247d4a5ad6b9de57a0bac5d807e1c32d41e39c04b8a4156a26c63bcd8a2e49", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
"telemetry": {:hex, :telemetry, "0.3.0", "099a7f3ce31e4780f971b4630a3c22ec66d22208bc090fe33a2a3a6a67754a73", [:rebar3], [], "hexpm"},
}

0 comments on commit 19f0529

Please sign in to comment.