Skip to content

Commit

Permalink
Adding ability to query tenants from an Electric API
Browse files Browse the repository at this point in the history
  • Loading branch information
icehaunter committed Nov 12, 2024
1 parent ae18f4a commit 9683683
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 0 deletions.
125 changes: 125 additions & 0 deletions packages/sync-service/lib/electric/control_plane.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
defmodule Electric.ControlPlane do
@moduledoc """
Functions that interact with the control plane that exists outside Electric.
"""
require Logger

defstruct [
:base_url,
:auth,
paths: %{
"tenant_shape" => %{
"url" => "/v1/shape",
"params" => %{
"offset" => "-1",
"table" => "databases",
"where" => "electric_url ILIKE '%%{instance_id}'",
"select" => "id,connection_url"
}
}
}
]

@type t() :: %__MODULE__{
base_url: String.t(),
auth: nil | String.t() | {:basic, String.t()} | {:bearer, String.t()},
paths: %{optional(String.t()) => map()}
}

@spec list_tenants(t(), keyword()) ::
{:ok, included :: list(map()), deleted :: list(map())} | {:error, :unreachable}
def list_tenants(%__MODULE__{} = plane, opts) do
%{electric_instance_id: instance_id} =
Keyword.get_lazy(opts, :app_config, fn -> Electric.Application.Configuration.get() end)

plane
|> build_req("tenant_shape", instance_id)
|> read_electric_api_until_done()
|> case do
{:ok, result} when is_list(result) ->
# We expect the control plane to fulfill the Electric API, so we can decode it here from the complete response
{ins_acc, del_acc} =
result
|> Stream.reject(&get_in(&1, ["headers", "control"]))
|> Stream.map(
&{get_in(&1, ["headers", "operation"]), Map.fetch!(&1, "key"),
Map.fetch!(&1, "value")}
)
|> collect_ops()

{:ok, Map.values(ins_acc), Map.values(del_acc)}

{:ok, %Req.Response{status: status, body: body}} ->
Logger.error(
"Could not reach the control plane while trying to list tenants. Latest response has status #{status} and body #{inspect(body)}"
)

{:error, :unreachable}

{:error, error} ->
Logger.error(
"Could not reach the control plane while trying to list tenants. Latest response was #{inspect(error)}"
)

{:error, :unreachable}
end
end

# We need to read the Electric stream until complete
defp read_electric_api_until_done(req, agg \\ []) do
with {:ok, %Req.Response{status: 200} = resp} <-
Req.get(req, max_retries: 4, retry_delay: 1_000) do
if Req.Response.get_header(resp, "electric-up-to-date") != [] do
{:ok, agg ++ resp.body}
else
[electric_handle] = Req.Response.get_header(resp, "electric-handle")
[electric_offset] = Req.Response.get_header(resp, "electric-offset")

req
|> Req.merge(params: [handle: electric_handle, offset: electric_offset])
|> read_electric_api_until_done(agg ++ resp.body)
end
end
end

defp build_req(%__MODULE__{} = plane, path_name, instance_id) do
%{"url" => url} = path_spec = Map.fetch!(plane.paths, path_name)
url = insert_instance_id(url, instance_id)

params =
path_spec
|> Map.get("params", [])
|> Enum.map(fn {k, v} -> {k, insert_instance_id(v, instance_id)} end)

headers =
path_spec
|> Map.get("headers", [])
|> Enum.map(fn {k, v} -> {k, insert_instance_id(v, instance_id)} end)

Req.new(
base_url: plane.base_url,
url: url,
params: params,
auth: plane.auth,
headers: headers
)
end

defp insert_instance_id(string, instance_id),
do: String.replace(string, "%{instance_id}", instance_id)

@spec collect_ops(list({String.t(), String.t(), %{String.t() => String.t()}})) ::
{map(), map()}
defp collect_ops(ops) do
Enum.reduce(ops, {%{}, %{}}, fn
{"insert", key, value}, {ins_acc, del_acc} ->
{Map.put(ins_acc, key, value), del_acc}

{"update", key, value}, {ins_acc, del_acc} ->
{Map.update!(ins_acc, key, &Map.merge(&1, value)), del_acc}

{"delete", key, value}, {ins_acc, del_acc} ->
{Map.delete(ins_acc, key), Map.put(del_acc, key, value)}
end)
end
end
1 change: 1 addition & 0 deletions packages/sync-service/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ defmodule Electric.MixProject do
{:plug, "~> 1.16"},
{:postgrex, "~> 0.19"},
{:retry, "~> 0.18"},
{:req, "~> 0.5"},
{:telemetry_metrics_prometheus_core, "~> 1.1"},
{:telemetry_metrics_statsd, "~> 0.7"},
{:telemetry_poller, "~> 1.1"},
Expand Down
4 changes: 4 additions & 0 deletions packages/sync-service/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"},
"excoveralls": {:hex, :excoveralls, "0.18.2", "86efd87a0676a3198ff50b8c77620ea2f445e7d414afa9ec6c4ba84c9f8bdcc2", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "230262c418f0de64077626a498bd4fdf1126d5c2559bb0e6b43deac3005225a4"},
"finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"},
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
"gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"},
"grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"},
Expand All @@ -25,9 +26,11 @@
"makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"},
"mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"},
"mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"},
"mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
"opentelemetry": {:hex, :opentelemetry, "1.5.0", "7dda6551edfc3050ea4b0b40c0d2570423d6372b97e9c60793263ef62c53c3c2", [:rebar3], [{:opentelemetry_api, "~> 1.4", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "cdf4f51d17b592fc592b9a75f86a6f808c23044ba7cf7b9534debbcc5c23b0ee"},
"opentelemetry_api": {:hex, :opentelemetry_api, "1.4.0", "63ca1742f92f00059298f478048dfb826f4b20d49534493d6919a0db39b6db04", [:mix, :rebar3], [], "hexpm", "3dfbbfaa2c2ed3121c5c483162836c4f9027def469c41578af5ef32589fcfc58"},
"opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.8.0", "5d546123230771ef4174e37bedfd77e3374913304cd6ea3ca82a2add49cd5d56", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.5.0", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.4.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.18", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "a1f9f271f8d3b02b81462a6bfef7075fd8457fdb06adff5d2537df5e2264d9af"},
Expand All @@ -38,6 +41,7 @@
"plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"},
"postgrex": {:hex, :postgrex, "0.19.0", "f7d50e50cb42e0a185f5b9a6095125a9ab7e4abccfbe2ab820ab9aa92b71dbab", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "dba2d2a0a8637defbf2307e8629cb2526388ba7348f67d04ec77a5d6a72ecfae"},
"protox": {:hex, :protox, "1.7.3", "dff5488a648850c95cbd1cca5430be7ccedc99e4102aa934dbf60abfa30e64c1", [:mix], [{:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "b936c0654b68b306c4be853db23bb5623e2be89e11238908f2ff6da10fc0275f"},
"req": {:hex, :req, "0.5.7", "b722680e03d531a2947282adff474362a48a02aa54b131196fbf7acaff5e4cee", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "c6035374615120a8923e8089d0c21a3496cf9eda2d287b806081b8f323ceee29"},
"retry": {:hex, :retry, "0.18.0", "dc58ebe22c95aa00bc2459f9e0c5400e6005541cf8539925af0aa027dc860543", [:mix], [], "hexpm", "9483959cc7bf69c9e576d9dfb2b678b71c045d3e6f39ab7c9aa1489df4492d73"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
Expand Down

0 comments on commit 9683683

Please sign in to comment.