diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index ef5424ca97..0ffce62d93 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -106,8 +106,7 @@ case {database_url, default_tenant} do config :electric, default_connection_opts: Electric.Utils.obfuscate_password(connection_opts) - # if `default_tenant` is nil, generate a random UUID for it - tenant_id = default_tenant || Electric.Utils.uuid4() + tenant_id = default_tenant || "00000000-0000-0000-0000-000000000000" config :electric, default_tenant: tenant_id end @@ -209,4 +208,6 @@ config :electric, prometheus_port: prometheus_port, storage: storage, persistent_kv: persistent_kv, - listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false) + control_plane: env!("ELECTRIC_CONTROL_PLANE", &Electric.ControlPlane.parse_config/1, nil), + listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false), + tenant_tables_name: :tenant_tables diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 9022349cb1..75483f1913 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -105,7 +105,8 @@ defmodule Electric.Application do }, pool_opts: %{ size: Application.fetch_env!(:electric, :db_pool_size) - } + }, + control_plane: Application.get_env(:electric, :control_plane, nil) } Electric.Application.Configuration.save(config) diff --git a/packages/sync-service/lib/electric/application/configuration.ex b/packages/sync-service/lib/electric/application/configuration.ex index 709def2d45..227a727d46 100644 --- a/packages/sync-service/lib/electric/application/configuration.ex +++ b/packages/sync-service/lib/electric/application/configuration.ex @@ -9,6 +9,7 @@ defmodule Electric.Application.Configuration do persistent_kv replication_opts pool_opts + control_plane ]a @type t :: %__MODULE__{} diff --git a/packages/sync-service/lib/electric/control_plane.ex b/packages/sync-service/lib/electric/control_plane.ex index 4f82e0646b..c9d189bfaa 100644 --- a/packages/sync-service/lib/electric/control_plane.ex +++ b/packages/sync-service/lib/electric/control_plane.ex @@ -26,6 +26,17 @@ defmodule Electric.ControlPlane do paths: %{optional(String.t()) => map()} } + def parse_config(""), do: nil + def parse_config(config_string) do + result = Jason.decode!(config_string) + + %__MODULE__{ + base_url: Map.fetch!(result, "base_url"), + auth: Map.get(result, "auth", nil), + paths: Map.get(result, "paths", %__MODULE__{}.paths), + } + end + @spec list_tenants(t(), keyword()) :: {:ok, included :: list(map()), deleted :: list(map())} | {:error, :unreachable} def list_tenants(%__MODULE__{} = plane, opts) do @@ -106,10 +117,9 @@ defmodule Electric.ControlPlane do end defp insert_instance_id(string, instance_id), - do: String.replace(string, "%{instance_id}", instance_id) + do: String.replace(string, "%{instance_id}", to_string(instance_id)) - @spec collect_ops(list({String.t(), String.t(), %{String.t() => String.t()}})) :: - {map(), map()} + @spec collect_ops(Enumerable.t()) :: {map(), map()} defp collect_ops(ops) do Enum.reduce(ops, {%{}, %{}}, fn {"insert", key, value}, {ins_acc, del_acc} -> diff --git a/packages/sync-service/lib/electric/plug/remove_database_plug.ex b/packages/sync-service/lib/electric/plug/remove_database_plug.ex index cd8adec1ce..53fe54a441 100644 --- a/packages/sync-service/lib/electric/plug/remove_database_plug.ex +++ b/packages/sync-service/lib/electric/plug/remove_database_plug.ex @@ -16,7 +16,7 @@ defmodule Electric.Plug.RemoveDatabasePlug do |> send_resp(200, Jason.encode_to_iodata!(tenant_id)) |> halt() - :not_found -> + {:error, :not_found} -> conn |> send_resp(404, Jason.encode_to_iodata!("Database #{tenant_id} not found.")) |> halt() diff --git a/packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex b/packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex index f3026ddbb5..918ee10554 100644 --- a/packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex +++ b/packages/sync-service/lib/electric/tenant/dynamic_supervisor.ex @@ -10,6 +10,7 @@ defmodule Electric.TenantSupervisor do def start_link(opts) do DynamicSupervisor.start_link(__MODULE__, [], name: name(opts)) + end def start_tenant(opts) do diff --git a/packages/sync-service/lib/electric/tenant_manager.ex b/packages/sync-service/lib/electric/tenant_manager.ex index d5b1fb6986..74bf8f8ee9 100644 --- a/packages/sync-service/lib/electric/tenant_manager.ex +++ b/packages/sync-service/lib/electric/tenant_manager.ex @@ -2,8 +2,6 @@ defmodule Electric.TenantManager do use GenServer require Logger - alias Electric.Tenant.Persistence - @tenant_info_pos 2 # Public API @@ -38,8 +36,6 @@ defmodule Electric.TenantManager do name: Keyword.get_lazy(opts, :name, fn -> name(opts) end) ) - recreate_tenants_from_disk!(opts) - {:ok, pid} end @@ -92,8 +88,183 @@ defmodule Electric.TenantManager do @spec create_tenant(String.t(), Keyword.t(), Keyword.t()) :: :ok | {:error, atom()} def create_tenant(tenant_id, connection_opts, opts \\ []) do + server = Keyword.get(opts, :tenant_manager, name(opts)) + GenServer.call(server, {:create_tenant, tenant_id, connection_opts, opts}) + end + + @doc """ + Deletes a tenant by its ID. + """ + @spec delete_tenant(String.t(), Keyword.t()) :: :ok | {:error, :not_found} + def delete_tenant(tenant_id, opts) do + server = Keyword.get(opts, :tenant_manager, name(opts)) + + with {:ok, _} <- get_tenant(tenant_id, opts) do + GenServer.call(server, {:delete_tenant, tenant_id}) + end + end + + defp do_stop_and_delete_tenant(tenant_id, %{dbs: dbs, tenants_ets: tenants} = state) do + state.init_opts + |> Keyword.fetch!(:electric_instance_id) + |> Electric.Connection.Manager.name(tenant_id) + |> Electric.Connection.Manager.drop_replication_slot() + + with {:ok, tenant} <- get_tenant(tenant_id, state.init_opts) do + pg_id = Access.fetch!(tenant, :pg_id) + :ets.delete(tenants, tenant_id) + state = %{state | dbs: MapSet.delete(dbs, pg_id)} + + # TODO: This leaves orphaned shapes with data on disk + :ok = + Electric.TenantSupervisor.stop_tenant( + tenant_id: tenant_id, + electric_instance_id: Access.fetch!(state.init_opts, :electric_instance_id) + ) + + {:ok, state} + end + end + + defp do_create_and_store_tenant( + tenant_id, + connection_opts, + opts, + %{dbs: dbs, tenants_ets: tenants} = state + ) do + {tenant, start_tenant_opts} = create_tenant_spec(tenant_id, connection_opts, opts) + tenant_id = tenant[:tenant_id] + pg_id = tenant[:pg_id] + + cond do + :ets.member(tenants, tenant_id) -> + {:error, {:tenant_already_exists, tenant_id}} + + MapSet.member?(dbs, pg_id) -> + {:error, {:db_already_in_use, pg_id}} + + true -> + with {:ok, _} <- Electric.TenantSupervisor.start_tenant(start_tenant_opts) do + true = :ets.insert_new(tenants, {tenant_id, tenant}) + {:ok, %{state | dbs: MapSet.put(dbs, pg_id)}} + end + end + end + + ## Internal API + + @impl GenServer + def init(opts) do + # information about all tenants is kept in an ETS table + # that maps tenant_id to tenant information. + # it is stored in an ETS table to allow concurrent reads. + # the table is protected such that only this genserver can write to it + # which ensures that all writes are serialised + tenants_ets_table = + :ets.new(tenants_ets_table_name(opts), [ + :named_table, + :protected, + :set, + {:read_concurrency, true} + ]) + + # state is a set `dbs` of PG identifiers used by tenants + # such that we can reject any request to store a tenant + # that uses a DB that is already in use + {:ok, + %{ + tenants_ets: tenants_ets_table, + dbs: MapSet.new(), + init_opts: opts, + load_successful?: false + }, {:continue, :load_from_control_plane}} + end + + @impl GenServer + def handle_continue(:load_from_control_plane, state) do + case initialize_tenants_from_control_plane(state) do + {:ok, state} -> + {:noreply, %{state | load_successful?: true}} + + {:error, :unreachable} -> + Process.send_after(self(), :retry_initialization, 500) + {:noreply, state} + end + end + + @impl GenServer + def handle_info(:retry_initialization, state) do + {:noreply, state, {:continue, :load_from_control_plane}} + end + + @impl GenServer + def handle_call(_, _, %{load_successful?: false} = state) do + {:reply, {:error, :not_ready}, state} + end + + def handle_call({:create_tenant, tenant_id, connection_opts, opts}, _, state) do + case do_create_and_store_tenant(tenant_id, connection_opts, opts, state) do + {:ok, state} -> {:reply, :ok, state} + {:error, reason} -> {:reply, {:error, reason}, state} + end + end + + def handle_call({:delete_tenant, tenant_id}, _from, state) do + case do_stop_and_delete_tenant(tenant_id, state) do + {:ok, state} -> {:reply, :ok, state} + {:error, reason} -> {:reply, {:error, reason}, state} + end + end + + defp initialize_tenants_from_control_plane(state) do + with {:load, control_plane} <- get_control_plane(state), + {:ok, to_add, to_remove} <- + Electric.ControlPlane.list_tenants(control_plane, state.init_opts) do + state = + Enum.reduce(to_remove, state, fn %{"id" => tenant_id}, state -> + case do_stop_and_delete_tenant(tenant_id, state) do + {:ok, state} -> state + _ -> state + end + end) + + Enum.reduce(to_add, state, fn %{"id" => tenant_id, "connection_uri" => connection_url}, + state -> + {:ok, result} = Electric.ConfigParser.parse_postgresql_uri(connection_url) + connection_opts = Electric.Utils.obfuscate_password(result) + + case do_create_and_store_tenant( + tenant_id, + connection_opts, + state.init_opts, + state + ) do + {:ok, state} -> + {:ok, state} + + {:error, error} -> + raise """ + Error while trying to initialize a tenant #{tenant_id} from the control plane: + #{inspect(error)} + Connection opts: #{inspect(result)} + """ + end + end) + end + end + + defp get_control_plane(state) do + %{control_plane: control_plane} = + Keyword.get_lazy(state.init_opts, :app_config, fn -> + Electric.Application.Configuration.get() + end) + + if is_nil(control_plane), do: {:ok, state}, else: {:load, control_plane} + end + + defp create_tenant_spec(tenant_id, connection_opts, opts) do app_config = - %{electric_instance_id: electric_instance_id, persistent_kv: persistent_kv} = + %{electric_instance_id: electric_instance_id} = Keyword.get_lazy(opts, :app_config, fn -> Electric.Application.Configuration.get() end) inspector = @@ -178,15 +349,6 @@ defmodule Electric.TenantManager do allow_shape_deletion: allow_shape_deletion ] - # Store the tenant in the tenant manager - store_tenant_opts = - opts ++ - [ - electric_instance_id: electric_instance_id, - persistent_kv: persistent_kv, - connection_opts: connection_opts - ] - start_tenant_opts = [ app_config: app_config, electric_instance_id: electric_instance_id, @@ -196,132 +358,6 @@ defmodule Electric.TenantManager do storage: storage ] - with :ok <- store_tenant(tenant, store_tenant_opts), - {:ok, _} <- Electric.TenantSupervisor.start_tenant(start_tenant_opts) do - :ok - end - end - - @doc """ - Stores the provided tenant in the tenant manager. - """ - @spec store_tenant(Keyword.t(), Keyword.t()) :: :ok | {:error, atom()} - def store_tenant(tenant, opts) do - server = Keyword.get(opts, :tenant_manager, name(opts)) - - case GenServer.call(server, {:store_tenant, tenant}) do - {:tenant_already_exists, tenant_id} -> - {:error, {:tenant_already_exists, tenant_id}} - - {:db_already_in_use, pg_id} -> - {:error, {:db_already_in_use, pg_id}} - - :ok -> - Electric.Tenant.Persistence.persist_tenant!( - Keyword.fetch!(tenant, :tenant_id), - Keyword.fetch!(opts, :connection_opts), - opts - ) - end - end - - @doc """ - Deletes a tenant by its ID. - """ - @spec delete_tenant(String.t(), Keyword.t()) :: :ok | :not_found - def delete_tenant(tenant_id, opts) do - server = Keyword.get(opts, :tenant_manager, name(opts)) - - case get_tenant(tenant_id, opts) do - {:ok, tenant} -> - pg_id = Access.fetch!(tenant, :pg_id) - - case GenServer.call(server, {:delete_tenant, tenant_id, pg_id}) do - :ok -> - :ok = - opts - |> Keyword.fetch!(:electric_instance_id) - |> Electric.Connection.Manager.name(tenant_id) - |> Electric.Connection.Manager.drop_replication_slot() - - :ok = Electric.TenantSupervisor.stop_tenant(opts ++ [tenant_id: tenant_id]) - :ok = Electric.Tenant.Persistence.delete_tenant!(tenant_id, opts) - - :not_found -> - :not_found - end - - {:error, :not_found} -> - :not_found - end - end - - ## Internal API - - @impl GenServer - def init(opts) do - # information about all tenants is kept in an ETS table - # that maps tenant_id to tenant information. - # it is stored in an ETS table to allow concurrent reads. - # the table is protected such that only this genserver can write to it - # which ensures that all writes are serialised - tenants_ets_table = - :ets.new(tenants_ets_table_name(opts), [ - :named_table, - :protected, - :set, - {:read_concurrency, true} - ]) - - # state is a set `dbs` of PG identifiers used by tenants - # such that we can reject any request to store a tenant - # that uses a DB that is already in use - {:ok, %{tenants_ets: tenants_ets_table, dbs: MapSet.new()}} - end - - @impl GenServer - def handle_call( - {:store_tenant, tenant}, - _from, - %{dbs: dbs, tenants_ets: tenants} = state - ) do - tenant_id = tenant[:tenant_id] - pg_id = tenant[:pg_id] - - if :ets.member(tenants, tenant_id) do - {:reply, {:tenant_already_exists, tenant_id}, state} - else - if MapSet.member?(dbs, pg_id) do - {:reply, {:db_already_in_use, pg_id}, state} - else - true = :ets.insert_new(tenants, {tenant_id, tenant}) - {:reply, :ok, %{state | dbs: MapSet.put(dbs, pg_id)}} - end - end - end - - @impl GenServer - def handle_call( - {:delete_tenant, tenant_id, pg_id}, - _from, - %{tenants_ets: tenants, dbs: dbs} = state - ) do - if :ets.member(tenants, tenant_id) do - :ets.delete(tenants, tenant_id) - {:reply, :ok, %{state | dbs: MapSet.delete(dbs, pg_id)}} - else - {:reply, :not_found, state} - end - end - - defp recreate_tenants_from_disk!(opts) do - # Load the tenants from the persistent KV store - tenants = Persistence.load_tenants!(opts) - - # Recreate all tenants - Enum.each(tenants, fn {tenant_id, conn_opts} -> - Logger.info("Reloading tenant #{tenant_id} from storage") - :ok = create_tenant(tenant_id, conn_opts, opts) - end) + {tenant, start_tenant_opts} end end