Skip to content

Commit

Permalink
chore (sync service): refactor tenant manager to store tenant info in…
Browse files Browse the repository at this point in the history
… ETS table (#1953)

This PR applies the refactoring proposed by @icehaunter which consists
of storing the tenant information in a protected ETS table. Any process
can read the tenant information from the ETS table.

#### Concurrent reads
On every shape request the tenant information is loaded from the ETS
table.
Concurrent shape requests can read this information from the ETS table
concurrently.

#### Serialised writes
Writes to the ETS table are less frequent (only when adding or removing
tenants) and are still serialised through the tenant manager's genserver
and thus are not prone to race conditions.
  • Loading branch information
kevin-dp authored Nov 7, 2024
1 parent 38afb0c commit 5e60e71
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 51 deletions.
5 changes: 5 additions & 0 deletions .changeset/funny-spoons-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Refactored the tenant manager to store tenant information in an ETS table for improved read performance.
2 changes: 1 addition & 1 deletion integration-tests/tests/multi-tenancy.lux
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,4 @@
[cleanup]
[invoke teardown]
# Also tear down the first tenant
[invoke teardown_container $tenant1_pg_container_name]
[invoke teardown_container $tenant1_pg_container_name]
124 changes: 81 additions & 43 deletions packages/sync-service/lib/electric/tenant_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule Electric.TenantManager do

alias Electric.Tenant.Persistence

@tenant_info_pos 2

# Public API

def name(electric_instance_id)
Expand All @@ -20,6 +22,16 @@ defmodule Electric.TenantManager do
|> name()
end

def tenants_ets_table_name(electric_instance_id)
when is_binary(electric_instance_id) or is_atom(electric_instance_id) do
:"tenants_ets_table_#{electric_instance_id}"
end

def tenants_ets_table_name(opts) do
Access.fetch!(opts, :electric_instance_id)
|> tenants_ets_table_name()
end

def start_link(opts) do
{:ok, pid} =
GenServer.start_link(__MODULE__, opts,
Expand All @@ -39,18 +51,39 @@ defmodule Electric.TenantManager do
"""
@spec get_only_tenant(Keyword.t()) ::
{:ok, Keyword.t()} | {:error, :not_found} | {:error, :several_tenants}
def get_only_tenant(opts \\ []) do
server = Keyword.get(opts, :tenant_manager, name(opts))
GenServer.call(server, :get_only_tenant)
def get_only_tenant(opts) do
tenants = tenants_ets_table_name(opts)

case :ets.first(tenants) do
:"$end_of_table" ->
# the ETS table does not contain any tenant
{:error, :not_found}

tenant_id ->
case :ets.next(tenants, tenant_id) do
:"$end_of_table" ->
# There is no next key, so this is the only tenant
tenant = :ets.lookup_element(tenants, tenant_id, @tenant_info_pos)
{:ok, tenant}

_ ->
{:error, :several_tenants}
end
end
end

@doc """
Retrieves a tenant by its ID.
"""
@spec get_tenant(String.t(), Keyword.t()) :: {:ok, Keyword.t()} | {:error, :not_found}
def get_tenant(tenant_id, opts \\ []) do
server = Keyword.get(opts, :tenant_manager, name(opts))
GenServer.call(server, {:get_tenant, tenant_id})
def get_tenant(tenant_id, opts) do
tenants = tenants_ets_table_name(opts)

if :ets.member(tenants, tenant_id) do
{:ok, :ets.lookup_element(tenants, tenant_id, @tenant_info_pos)}
else
{:error, :not_found}
end
end

@doc """
Expand Down Expand Up @@ -196,15 +229,21 @@ defmodule Electric.TenantManager do
Deletes a tenant by its ID.
"""
@spec delete_tenant(String.t(), Keyword.t()) :: :ok | :not_found
def delete_tenant(tenant_id, opts \\ []) do
def delete_tenant(tenant_id, opts) do
server = Keyword.get(opts, :tenant_manager, name(opts))

case GenServer.call(server, {:get_tenant, tenant_id}) do
case get_tenant(tenant_id, opts) do
{:ok, tenant} ->
pg_id = Access.fetch!(tenant, :pg_id)
:ok = GenServer.call(server, {:delete_tenant, tenant_id, pg_id})
:ok = Electric.TenantSupervisor.stop_tenant(opts ++ [tenant_id: tenant_id])
:ok = Electric.Tenant.Persistence.delete_tenant!(tenant_id, opts)

case GenServer.call(server, {:delete_tenant, tenant_id, pg_id}) do
:ok ->
:ok = Electric.TenantSupervisor.stop_tenant(opts ++ [tenant_id: tenant_id])
:ok = Electric.Tenant.Persistence.delete_tenant!(tenant_id, opts)

:not_found ->
:not_found
end

{:error, :not_found} ->
:not_found
Expand All @@ -214,62 +253,61 @@ defmodule Electric.TenantManager do
## Internal API

@impl GenServer
def init(_opts) do
# state contains an index `tenants` of tenant_id -> tenant
# and a set `dbs` of PG identifiers used by tenants
{:ok, %{tenants: Map.new(), dbs: MapSet.new()}}
def init(opts) do
# information about all tenants is kept in an ETS table
# that maps tenant_id to tenant information.
# it is stored in an ETS table to allow concurrent reads.
# the table is protected such that only this genserver can write to it
# which ensures that all writes are serialised
tenants_ets_table =
:ets.new(tenants_ets_table_name(opts), [
:named_table,
:protected,
:set,
{:read_concurrency, true}
])

# state is a set `dbs` of PG identifiers used by tenants
# such that we can reject any request to store a tenant
# that uses a DB that is already in use
{:ok, %{tenants_ets: tenants_ets_table, dbs: MapSet.new()}}
end

@impl GenServer
def handle_call(
{:store_tenant, tenant},
_from,
%{tenants: tenants, dbs: dbs} = state
%{dbs: dbs, tenants_ets: tenants} = state
) do
tenant_id = tenant[:tenant_id]
pg_id = tenant[:pg_id]

if Map.has_key?(tenants, tenant_id) do
if :ets.member(tenants, tenant_id) do
{:reply, {:tenant_already_exists, tenant_id}, state}
else
if MapSet.member?(dbs, pg_id) do
{:reply, {:db_already_in_use, pg_id}, state}
else
{:reply, :ok,
%{tenants: Map.put(tenants, tenant_id, tenant), dbs: MapSet.put(dbs, pg_id)}}
true = :ets.insert_new(tenants, {tenant_id, tenant})
{:reply, :ok, %{state | dbs: MapSet.put(dbs, pg_id)}}
end
end
end

@impl GenServer
def handle_call(:get_only_tenant, _from, %{tenants: tenants} = state) do
case map_size(tenants) do
1 ->
tenant = tenants |> Map.values() |> Enum.at(0)
{:reply, {:ok, tenant}, state}

0 ->
{:reply, {:error, :not_found}, state}

_ ->
{:reply, {:error, :several_tenants}, state}
end
end

@impl GenServer
def handle_call({:get_tenant, tenant_id}, _from, %{tenants: tenants} = state) do
if Map.has_key?(tenants, tenant_id) do
{:reply, {:ok, Map.get(tenants, tenant_id)}, state}
def handle_call(
{:delete_tenant, tenant_id, pg_id},
_from,
%{tenants_ets: tenants, dbs: dbs} = state
) do
if :ets.member(tenants, tenant_id) do
:ets.delete(tenants, tenant_id)
{:reply, :ok, %{state | dbs: MapSet.delete(dbs, pg_id)}}
else
{:reply, {:error, :not_found}, state}
{:reply, :not_found, state}
end
end

@impl GenServer
def handle_call({:delete_tenant, tenant_id, pg_id}, _from, %{tenants: tenants, dbs: dbs}) do
{:reply, :ok, %{tenants: Map.delete(tenants, tenant_id), dbs: MapSet.delete(dbs, pg_id)}}
end

defp recreate_tenants_from_disk!(opts) do
# Load the tenants from the persistent KV store
tenants = Persistence.load_tenants!(opts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ defmodule Electric.Plug.DeleteShapePlugTest do
store_tenant(tenant, ctx)

config = [
electric_instance_id: ctx.electric_instance_id,
storage: {Mock.Storage, []},
tenant_manager: ctx.tenant_manager,
allow_shape_deletion: allow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ defmodule Electric.Plug.HealthCheckPlugTest do
def conn(ctx) do
# Pass mock dependencies to the plug
config = [
electric_instance_id: ctx.electric_instance_id,
tenant_manager: ctx.tenant_manager
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
store_tenant(tenant, ctx)

config = [
electric_instance_id: ctx.electric_instance_id,
storage: {Mock.Storage, []},
tenant_manager: ctx.tenant_manager
]
Expand Down
36 changes: 29 additions & 7 deletions packages/sync-service/test/electric/tenant_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ defmodule Electric.TenantManagerTest do
# Check that it recreated the tenant
{:ok, tenant} =
TenantManager.get_tenant(@tenant_id,
electric_instance_id: ctx.electric_instance_id,
tenant_manager: ctx.tenant_manager,
tenant_tables_name: ctx.tenant_tables_name
)
Expand Down Expand Up @@ -142,12 +143,18 @@ defmodule Electric.TenantManagerTest do

test "get_only_tenant/1 complains if there are no tenants", ctx do
assert {:error, :not_found} =
TenantManager.get_only_tenant(tenant_manager: ctx.tenant_manager)
TenantManager.get_only_tenant(
tenant_manager: ctx.tenant_manager,
electric_instance_id: ctx.electric_instance_id
)
end

test "get_tenant/2 complains if the tenant does not exist", ctx do
assert {:error, :not_found} =
TenantManager.get_tenant("non-existing tenant", tenant_manager: ctx.tenant_manager)
TenantManager.get_tenant("non-existing tenant",
tenant_manager: ctx.tenant_manager,
electric_instance_id: ctx.electric_instance_id
)
end
end

Expand All @@ -162,14 +169,20 @@ defmodule Electric.TenantManagerTest do

test "get_only_tenant/1 returns the only tenant", ctx do
{:ok, tenant_config} =
TenantManager.get_only_tenant(tenant_manager: ctx.tenant_manager)
TenantManager.get_only_tenant(
tenant_manager: ctx.tenant_manager,
electric_instance_id: ctx.electric_instance_id
)

assert tenant_config[:tenant_id] == ctx.tenant_id
end

test "get_tenant/2 returns the requested tenant", ctx do
{:ok, tenant_config} =
TenantManager.get_tenant(ctx.tenant_id, tenant_manager: ctx.tenant_manager)
TenantManager.get_tenant(ctx.tenant_id,
tenant_manager: ctx.tenant_manager,
electric_instance_id: ctx.electric_instance_id
)

assert tenant_config[:tenant_id] == ctx.tenant_id
end
Expand All @@ -194,12 +207,18 @@ defmodule Electric.TenantManagerTest do

test "get_only_tenant/1 complains if there are several tenants", ctx do
assert {:error, :several_tenants} =
TenantManager.get_only_tenant(tenant_manager: ctx.tenant_manager)
TenantManager.get_only_tenant(
tenant_manager: ctx.tenant_manager,
electric_instance_id: ctx.electric_instance_id
)
end

test "get_tenant/2 returns the requested tenant", ctx do
{:ok, tenant_config} =
TenantManager.get_tenant("another_tenant", tenant_manager: ctx.tenant_manager)
TenantManager.get_tenant("another_tenant",
tenant_manager: ctx.tenant_manager,
electric_instance_id: ctx.electric_instance_id
)

assert tenant_config[:tenant_id] == "another_tenant"
end
Expand Down Expand Up @@ -247,7 +266,10 @@ defmodule Electric.TenantManagerTest do
# Check that the tenant is now unknown to the tenant manager
# and that it is fully shut down and removed from the ETS table
assert {:error, :not_found} =
TenantManager.get_tenant(tenant_id, tenant_manager: tenant_manager)
TenantManager.get_tenant(tenant_id,
tenant_manager: tenant_manager,
electric_instance_id: electric_instance_id
)

# Verify process was terminated
refute Process.alive?(tenant_supervisor_pid)
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/test/support/component_setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ defmodule Support.ComponentSetup do

def build_router_opts(ctx, overrides \\ []) do
[
electric_instance_id: ctx.electric_instance_id,
tenant_manager: ctx.tenant_manager,
storage: ctx.storage,
registry: ctx.registry,
Expand Down

0 comments on commit 5e60e71

Please sign in to comment.