Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert #50

Merged
merged 19 commits into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 67 additions & 6 deletions lib/alog.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,77 @@ defmodule Alog do
driver: :postgrex,
migration_lock: "FOR UPDATE"

@impl true
def supports_ddl_transaction? do
true
end
alias Ecto.Adapters.Postgres, as: EAP

@behaviour Ecto.Adapter.Storage

# Why did we define our own version of this function?
# Sorry if I have missed something that has been explained already.
@impl true
def supports_ddl_transaction?, do: true

@impl true
defdelegate storage_up(opts), to: EAP

@impl true
defdelegate storage_down(opts), to: EAP

@impl true
defdelegate storage_up(opts), to: Ecto.Adapters.Postgres
defdelegate structure_dump(default, config), to: EAP

@impl true
defdelegate storage_down(opts), to: Ecto.Adapters.Postgres
defdelegate structure_load(default, config), to: EAP

# overrides insert/6 defined in Ecto.Adapters.SQL
def insert(adapter_meta, %{source: source, prefix: prefix}, params, on_conflict, returning, opts) do
# converts params from a keyword list to a map
params_map = Enum.into(params, %{})

# removes inserted_at and updated_at from map (will not error if keys are not in map)
map_for_cid = Map.drop(params_map, [:inserted_at, :updated_at])

# creates a cid from the map witout the inserted_at and updated_at_values
cid = Cid.cid(map_for_cid)

# creates a unique entry_id for the data based on the CID generated
entry_id = create_entry_id(source, adapter_meta, cid, 2)

# updates params to ensure that timestamps, cid, and entry_id are all added.
# then converts the map back into a list for use in existing functionality (original format)
params =
map_for_cid
|> add_timestamps()
|> Map.put(:cid, cid) # <==== Should this be Map.put(:id, cid)??????????
|> Map.put(:entry_id, entry_id)
|> Enum.into([])

{kind, conflict_params, _} = on_conflict
{fields, values} = :lists.unzip(params)
sql = @conn.insert(prefix, source, fields, [fields], on_conflict, returning)
Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :insert, source, [], values ++ conflict_params, kind, returning, opts)
end

# I think that this step need to also make sure that the data is not an exact copy.
# if the full cid already exists then this is duplicate data.
# Should we insert duplicate data.
# i was thinking maybe if it was existing data but not the most recent data we should re-insert the data
# e.g. if the comment was hi, edited to hey, and then changed back to hi.
defp create_entry_id(source, adapter_meta, cid, n) do
entry_id = String.slice(cid, 0..n)
entry_id_query = "SELECT * FROM #{source} where entry_id='#{entry_id}'"
{:ok, results} = Ecto.Adapters.SQL.query(adapter_meta, entry_id_query, [])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to enhance this existing cid lookup later on. Maybe we could use a cache system to store all the current cid (with ETS)? If we can manage to not send another query to Postgres that would be ideal but it's working for now 👍


if results.num_rows == 0 do
entry_id
else
create_entry_id(source, adapter_meta, cid, n+1)
end
end

defp add_timestamps(params) do
params
|> Enum.into(%{})
|> Map.put_new(:inserted_at, NaiveDateTime.utc_now())
|> Map.put_new(:updated_at, NaiveDateTime.utc_now())
end
end
46 changes: 41 additions & 5 deletions lib/alog/connection.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,52 @@
defmodule Alog.Connection do
alias Ecto.Adapters.Postgres.Connection, as: EAPC

@behaviour Ecto.Adapters.SQL.Connection
@default_port 5432

@impl true
def child_spec(opts) do
opts
|> Keyword.put_new(:port, @default_port)
|> Postgrex.child_spec()
end

@impl true
defdelegate prepare_execute(conn, name, statement, params, opts), to: EAPC

@impl true
defdelegate execute(conn, query, params, opts), to: EAPC

@impl true
defdelegate query(conn, statement, params, opts), to: EAPC

@impl true
defdelegate stream(conn, statement, params, opts), to: EAPC

@impl true
defdelegate to_constraints(error_struct), to: EAPC

@impl true
defdelegate all(query), to: EAPC

@impl true
defdelegate update_all(query, prefix \\ nil), to: EAPC

@impl true
defdelegate delete_all(query), to: EAPC

@impl true
defdelegate insert(prefix, table, header, rows, on_conflict, returning), to: EAPC

@impl true
defdelegate ddl_logs(result), to: Ecto.Adapter.Postgres
defdelegate update(prefix, table, fields, filters, returning), to: EAPC

@impl true
defdelegate prepare_execute(connection, name, statement, params, options),
to: Ecto.Adapter.Postgres
defdelegate delete(prefix, table, filters, returning), to: EAPC

@impl true
defdelegate query(connection, statement, params, options), to: Ecto.Adapter.Postgres
defdelegate execute_ddl(arg), to: EAPC

@impl true
defdelegate stream(connection, statement, params, options), to: Ecto.Adapter.Postgres
defdelegate ddl_logs(result), to: EAPC
end
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ defmodule Alog.MixProject do
defp deps do
[
{:ecto_sql, "~> 3.0.5"},
{:postgrex, ">= 0.0.0"}
{:postgrex, ">= 0.0.0"},
{:excid, "~> 0.1.0"}
]
end
end
4 changes: 4 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
%{
"b58": {:hex, :b58, "0.1.1", "950563b90ad95be214143aa807d16c98d2ac7bab915097cb1461c936128c6b90", [:mix], [], "hexpm"},
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm"},
"db_connection": {:hex, :db_connection, "2.0.5", "ddb2ba6761a08b2bb9ca0e7d260e8f4dd39067426d835c24491a321b7f92a4da", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"},
"decimal": {:hex, :decimal, "1.6.0", "bfd84d90ff966e1f5d4370bdd3943432d8f65f07d3bab48001aebd7030590dcc", [:mix], [], "hexpm"},
"ecto": {:hex, :ecto, "3.0.7", "44dda84ac6b17bbbdeb8ac5dfef08b7da253b37a453c34ab1a98de7f7e5fec7f", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm"},
"ecto_sql": {:hex, :ecto_sql, "3.0.5", "7e44172b4f7aca4469f38d7f6a3da394dbf43a1bcf0ca975e958cb957becd74e", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.0.6", [hex: :ecto, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.9.1", [hex: :mariaex, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.14.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.3.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"ex_multihash": {:hex, :ex_multihash, "2.0.0", "7fb36f842a2ec1c6bbba550f28fcd16d3c62981781b9466c9c1975c43d7db43c", [:mix], [], "hexpm"},
"excid": {:hex, :excid, "0.1.0", "85fd8ac6466660bc8c4c0b52275e7a255389c683a18f70aaed8cd2ac7735cfab", [:mix], [{:b58, "~> 0.1.1", [hex: :b58, repo: "hexpm", optional: false]}, {:ex_multihash, "~> 2.0", [hex: :ex_multihash, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
"postgrex": {:hex, :postgrex, "0.14.1", "63247d4a5ad6b9de57a0bac5d807e1c32d41e39c04b8a4156a26c63bcd8a2e49", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
"telemetry": {:hex, :telemetry, "0.3.0", "099a7f3ce31e4780f971b4630a3c22ec66d22208bc090fe33a2a3a6a67754a73", [:rebar3], [], "hexpm"},
}