Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ecto support based on ClickhouseEcto #69

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
8 changes: 8 additions & 0 deletions lib/pillar/ecto.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule Pillar.Ecto do
use Ecto.Adapters.SQL,
driver: Pillar.Ecto.Driver

def supports_ddl_transaction?(), do: false

def lock_for_migrations(_, _, _), do: nil
end
165 changes: 165 additions & 0 deletions lib/pillar/ecto/conn_mod.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
defmodule Pillar.Ecto.ConnMod do
@moduledoc false

use DBConnection

alias Pillar.Connection
alias Pillar.HttpClient
alias Pillar.HttpClient.Response
alias Pillar.HttpClient.TransportError
alias Pillar.Ecto.Helpers

def connect(opts) do
url = Keyword.get(opts, :url, "http://localhost:8123")
conn = Pillar.Connection.new(url)
{:ok, %{conn: conn}}
end

def disconnect(_err, _state) do
:ok
end

@doc false
def ping(state) do
{:ok, state}
end

@doc false
def reconnect(new_opts, state) do
with :ok <- disconnect("Reconnecting", state),
do: connect(new_opts)
end

@doc false
def checkin(state) do
{:ok, state}
end

@doc false
def checkout(state) do
{:ok, state}
end

@doc false
def handle_status(_, state) do
{:idle, state}
end

@doc false
def handle_prepare(query, _, state) do
{:ok, query, state}
end

@doc false
def handle_execute(query, _params, _opts, state) do
params = Enum.join(query.params, "&")

url =
state.conn
|> Connection.url_from_connection()

url = url <> "&" <> params

url
|> HttpClient.post(
query.statement <>
" FORMAT JSONCompactEachRowWithNamesAndTypes SETTINGS date_time_output_format='iso', output_format_json_quote_64bit_integers=0",
timeout: 60_000
)
|> parse()
|> case do
{:error, reason} ->
{:error, reason, state}

{:ok, body} ->
[types | rows] =
body
|> String.split("\n", trim: true)
|> Enum.map(&Jason.decode!(&1))
|> Enum.drop(1)

rows =
rows
|> Enum.map(fn row ->
Enum.zip(row, types)
|> Enum.map(fn {data, type} ->
Helpers.parse_type(type, data)
end)
end)

{
:ok,
query,
to_result(rows),
state
}
end
end

defp parse(%Response{status_code: 200, body: body}) do
{:ok, body}
end

defp parse(%Response{status_code: _any, body: _body} = resp) do
{:error, resp}
end

defp parse(%TransportError{} = error) do
{:error, error}
end

defp parse(%RuntimeError{} = error) do
{:error, error}
end

defp to_result(res) do
case res do
xs when is_list(xs) -> %{num_rows: Enum.count(xs), rows: Enum.map(xs, &to_row/1)}
nil -> %{num_rows: 0, rows: [nil]}
_ -> %{num_rows: 1, rows: [res]}
end
end

defp to_row(xs) when is_list(xs), do: xs
defp to_row(x) when is_map(x), do: Map.values(x)
defp to_row(x), do: x

@doc false
def handle_declare(_query, _params, _opts, state) do
{:error, :cursors_not_supported, state}
end

@doc false
def handle_deallocate(_query, _cursor, _opts, state) do
{:error, :cursors_not_supported, state}
end

def handle_fetch(_query, _cursor, _opts, state) do
{:error, :cursors_not_supported, state}
end

@doc false
def handle_begin(_opts, state) do
{:error, :cursors_not_supported, state}
end

@doc false
def handle_close(_query, _opts, state) do
{:error, :cursors_not_supported, state}
end

@doc false
def handle_commit(_opts, state) do
{:error, :cursors_not_supported, state}
end

@doc false
def handle_info(_msg, state) do
{:error, :cursors_not_supported, state}
end

@doc false
def handle_rollback(_opts, state) do
{:error, :cursors_not_supported, state}
end
end
59 changes: 59 additions & 0 deletions lib/pillar/ecto/connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
defmodule Pillar.Ecto.Connection do
alias Pillar.Ecto.Query

def child_spec(opts) do
DBConnection.child_spec(Pillar.Ecto.ConnMod, opts)
end

def query(conn, query, params, _) when is_binary(query) do
query = %Query{name: "", statement: query, params: params}
execute(conn, query, [], [])
end

def prepare_execute(conn, name, prepared_query, params, options) do
query = %Query{name: name, statement: prepared_query, params: params}

case DBConnection.prepare_execute(conn, query, params, options) do
{:ok, query, result} ->
{:ok, %{query | statement: prepared_query}, result}

{:error, error} ->
raise error
end
end

def execute(conn, query, params, options) do
case DBConnection.prepare_execute(conn, query, params, options) do
{:ok, _query, result} ->
{:ok, result}

{:error, error} ->
raise error
end
end

def to_constraints(_error), do: []

def stream(_conn, _prepared, _params, _options), do: raise("not implemented")

## Queries
def all(query) do
Query.all(query)
end

def update_all(query, prefix \\ nil), do: Query.update_all(query, prefix)

def delete_all(query), do: Query.delete_all(query)

def insert(prefix, table, header, rows, on_conflict, returning, _),
do: Query.insert(prefix, table, header, rows, on_conflict, returning)

def update(prefix, table, fields, filters, returning),
do: Query.update(prefix, table, fields, filters, returning)

def delete(prefix, table, filters, returning),
do: Query.delete(prefix, table, filters, returning)

## Migration
def execute_ddl(_), do: raise("No")
end
22 changes: 22 additions & 0 deletions lib/pillar/ecto/driver.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Pillar.Ecto.Driver do
alias Pillar.Ecto.Query

def start_link(opts \\ []) do
DBConnection.start_link(
Pillar.Ecto.ConnMod,
opts |> Keyword.put(:show_sensitive_data_on_connection_error, true)
)
end

def child_spec(opts) do
DBConnection.child_spec(Pillar.Ecto.ConnMod, opts)
end

def query(conn, statement, params \\ [], opts \\ []) do
DBConnection.prepare_execute(conn, %Query{name: "", statement: statement}, params, opts)
end

def query!(conn, statement, params \\ [], opts \\ []) do
DBConnection.prepare_execute!(conn, %Query{name: "", statement: statement}, params, opts)
end
end
Loading