Skip to content

Commit

Permalink
feat (sync service): multi tenancy (#1886)
Browse files Browse the repository at this point in the history
_Closes #1868, closes #1844. I kept that PR up to consult its diff until
all the issues have been resolved here._

This PR fixes #1591.
It enables Electric to work with multiple databases (i.e. tenants).

### Architectural overview


![multi-tenant-electric](https://github.com/user-attachments/assets/9129d146-288d-43ea-8a9d-879656173443)

TODO:

- [x] rebase on top of main
- [x] review commented out code and either remove it or make sure the
place it had been moved to has been updated for multitenancy
- [x] add a `database_id` query param to the health endpoint to request
the health of a given tenant
- [x] extract `validate_tenant_id`, `load_tenant` and `assign_tenant`
which currently are duplicated in the create shape plug and delete shape
plug.
- [x] Implement a two-tier ETS storage for column info and relation
data. In this PR we replace two named tables with anonymous ones,
necessitating a roundtrip to a single gen server just to fetch the table
handle. This gen server becomes a bottleneck. We should either create
another static ETS table to lookup individual ETS table handles in or
store all column info and relation info in two static (named) tables,
ensuring they don't trump each other via proper key design.
- [x] unit tests for multi tenancy
- [x] unit tests for add DB plug
- [x] unit tests for delete DB plug
- [x] e2e tests for multi tenancy
  - [x] debug flaky cache tests
- [x] add a diagram explaining the new architecture
- [x] update the open API spec (for add DB endpoint and remove DB
endpoint)
- [x] persist tenant information and restore it on startup
- [ ] e2e test for persistence of tenants
- [x] Generate a random database id (i.e. tenant id) if you only provide
a database_url env var

---------

Co-authored-by: Oleksii Sholik <[email protected]>
Co-authored-by: msfstef <[email protected]>
Co-authored-by: Ilia Borovitinov <[email protected]>
  • Loading branch information
4 people authored Nov 5, 2024
1 parent 0d23660 commit 3775f1c
Show file tree
Hide file tree
Showing 75 changed files with 3,020 additions and 631 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ts_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ jobs:
defaults:
run:
working-directory: ${{ matrix.package_dir }}
env:
DATABASE_ID: ci_test_tenant
steps:
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
Expand Down Expand Up @@ -111,7 +113,7 @@ jobs:
mix run --no-halt &
wait-on: |
http-get://localhost:3000/v1/health
http-get://localhost:3000/v1/health?database_id=${{ env.DATABASE_ID }}
tail: true
log-output-resume: stderr
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/_macros.luxinc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
[shell $shell_name]
-$fail_pattern

!DATABASE_URL=$database_url PORT=$port $env ../scripts/electric_dev.sh
!DATABASE_ID=integration_test_tenant DATABASE_URL=$database_url PORT=$port $env ../scripts/electric_dev.sh
[endmacro]

[macro teardown]
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/invalidated-replication-slot.lux
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

[my invalidated_slot_error=
"""
[error] GenServer Electric.Connection.Manager terminating
[error] :gen_statem {Electric.Registry.Processes, {Electric.Postgres.ReplicationClient, :default, "integration_test_tenant"}} terminating
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot_integration"

This slot has been invalidated because it exceeded the maximum reserved size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@
?Txn received in Shapes.Consumer: %Electric.Replication.Changes.Transaction{xid: $xid

# Both consumers hit their call limit and exit with simulated storage failures.
?\[error\] GenServer {Electric\.Registry\.Processes, {Electric\.Shapes\.Consumer, :default, "[0-9-]+"}} terminating
?\[error\] GenServer {Electric\.Registry\.Processes, {Electric\.Shapes\.Consumer, :default, "integration_test_tenant", "[0-9-]+"}} terminating
??Simulated storage failure
?\[error\] GenServer {Electric\.Registry\.Processes, {Electric\.Shapes\.Consumer, :default, "[0-9-]+"}} terminating
?\[error\] GenServer {Electric\.Registry\.Processes, {Electric\.Shapes\.Consumer, :default, "integration_test_tenant", "[0-9-]+"}} terminating
??Simulated storage failure

# The log collector process and the replication client both exit, as their lifetimes are tied
# together by the supervision tree design.
??[error] GenServer {Electric.Registry.Processes, {Electric.Replication.ShapeLogCollector, :default}} terminating
??[error] :gen_statem {Electric.Registry.Processes, {Electric.Postgres.ReplicationClient, :default}} terminating
??[error] GenServer {Electric.Registry.Processes, {Electric.Replication.ShapeLogCollector, :default, "integration_test_tenant"}} terminating
??[error] :gen_statem {Electric.Registry.Processes, {Electric.Postgres.ReplicationClient, :default, "integration_test_tenant"}} terminating

# Observe that both shape consumers and the replication client have restarted.
??[debug] Found existing replication slot
Expand Down
8 changes: 4 additions & 4 deletions integration-tests/tests/rolling-deploy.lux
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

# First service should be health and active
[shell orchestator]
!curl -X GET http://localhost:3000/v1/health
!curl -X GET http://localhost:3000/v1/health?database_id=integration_test_tenant
??{"status":"active"}

## Start the second sync service.
Expand All @@ -35,9 +35,9 @@

# Second service should be in waiting state, ready to take over
[shell orchestator]
!curl -X GET http://localhost:3000/v1/health
!curl -X GET http://localhost:3000/v1/health?database_id=integration_test_tenant
??{"status":"active"}
!curl -X GET http://localhost:3001/v1/health
!curl -X GET http://localhost:3001/v1/health?database_id=integration_test_tenant
??{"status":"waiting"}

## Terminate first electric
Expand All @@ -55,7 +55,7 @@

# Second service is now healthy and active
[shell orchestator]
!curl -X GET http://localhost:3001/v1/health
!curl -X GET http://localhost:3001/v1/health?database_id=integration_test_tenant
??{"status":"active"}

[cleanup]
Expand Down
3 changes: 2 additions & 1 deletion packages/react-hooks/test/support/global-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { makePgClient } from './test-helpers'

const url = process.env.ELECTRIC_URL ?? `http://localhost:3000`
const proxyUrl = process.env.ELECTRIC_PROXY_CACHE_URL ?? `http://localhost:3002`
const databaseId = process.env.DATABASE_ID ?? `test_tenant`

// name of proxy cache container to execute commands against,
// see docker-compose.yml that spins it up for details
Expand All @@ -29,7 +30,7 @@ function waitForElectric(url: string): Promise<void> {
)

const tryHealth = async () =>
fetch(`${url}/v1/health`)
fetch(`${url}/v1/health?database_id=${databaseId}`)
.then(async (res): Promise<void> => {
if (!res.ok) return tryHealth()
const { status } = (await res.json()) as { status: string }
Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/.env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ CACHE_MAX_AGE=1
CACHE_STALE_AGE=3
# using a small chunk size of 10kB for dev to speed up tests
LOG_CHUNK_BYTES_THRESHOLD=10000
DATABASE_ID=test_tenant
# configuring a second database for multi-tenancy integration testing
OTHER_DATABASE_URL=postgresql://postgres:password@localhost:54322/electric?sslmode=disable
3 changes: 3 additions & 0 deletions packages/sync-service/.env.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
LOG_LEVEL=info
DATABASE_URL=postgresql://postgres:password@localhost:54321/postgres?sslmode=disable
DATABASE_ID=test_tenant
44 changes: 24 additions & 20 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ config :logger,
handle_sasl_reports: sasl?

if config_env() == :test do
config(:logger, level: :info)
config(:electric, pg_version_for_tests: env!("POSTGRES_VERSION", :integer, 150_001))
config :electric, pg_version_for_tests: env!("POSTGRES_VERSION", :integer, 150_001)
end

electric_instance_id = :default
Expand Down Expand Up @@ -85,28 +84,32 @@ otel_simple_processor =
config :opentelemetry,
processors: [otel_batch_processor, otel_simple_processor] |> Enum.reject(&is_nil/1)

connection_opts =
if Config.config_env() == :test do
[
hostname: "localhost",
port: 54321,
username: "postgres",
password: "password",
database: "postgres",
sslmode: :disable
]
else
{:ok, database_url_config} =
env!("DATABASE_URL", :string)
|> Electric.ConfigParser.parse_postgresql_uri()
database_url = env!("DATABASE_URL", :string, nil)
default_tenant = env!("DATABASE_ID", :string, nil)

case {database_url, default_tenant} do
{nil, nil} ->
# No default tenant provided
:ok

{nil, _} ->
raise "DATABASE_URL must be provided when DATABASE_ID is set"

{_, _} ->
# A default tenant is provided
{:ok, database_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_url)

database_ipv6_config =
env!("DATABASE_USE_IPV6", :boolean, false)

database_url_config ++ [ipv6: database_ipv6_config]
end
connection_opts = database_url_config ++ [ipv6: database_ipv6_config]

config :electric, default_connection_opts: Electric.Utils.obfuscate_password(connection_opts)

config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts)
# if `default_tenant` is nil, generate a random UUID for it
tenant_id = default_tenant || Electric.Utils.uuid4()
config :electric, default_tenant: tenant_id
end

enable_integration_testing = env!("ENABLE_INTEGRATION_TESTING", :boolean, false)
cache_max_age = env!("CACHE_MAX_AGE", :integer, 60)
Expand Down Expand Up @@ -205,4 +208,5 @@ config :electric,
prometheus_port: prometheus_port,
storage: storage,
persistent_kv: persistent_kv,
listen_on_ipv6?: env!("LISTEN_ON_IPV6", :boolean, false)
listen_on_ipv6?: env!("LISTEN_ON_IPV6", :boolean, false),
tenant_tables_name: :tenant_tables
18 changes: 18 additions & 0 deletions packages/sync-service/dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ services:
- docker-entrypoint.sh
- -c
- config_file=/etc/postgresql.conf
postgres2:
image: postgres:16-alpine
environment:
POSTGRES_DB: electric
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "54322:5433"
volumes:
- ./postgres2.conf:/etc/postgresql.conf:ro
- ./init.sql:/docker-entrypoint-initdb.d/00_shared_init.sql:ro
tmpfs:
- /var/lib/postgresql/data
- /tmp
entrypoint:
- docker-entrypoint.sh
- -c
- config_file=/etc/postgresql.conf
nginx:
image: nginx:latest
ports:
Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/dev/postgres2.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
listen_addresses = '*'
wal_level = logical # minimal, replica, or logical
port = 5433
107 changes: 33 additions & 74 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
defmodule Electric.Application do
use Application
require Config

@process_registry_name Electric.Registry.Processes
def process_registry, do: @process_registry_name

@spec process_name(atom(), atom()) :: {:via, atom(), atom()}
def process_name(electric_instance_id, module) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id}}}
@spec process_name(atom(), String.t(), atom()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, tenant_id, module) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, tenant_id}}}
end

@spec process_name(atom(), atom(), term()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, module, id) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, id}}}
@spec process_name(atom(), String.t(), atom(), term()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, tenant_id, module, id) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, tenant_id, id}}}
end

@impl true
Expand All @@ -20,27 +21,14 @@ defmodule Electric.Application do

config = configure()

shape_log_collector = Electric.Replication.ShapeLogCollector.name(config.electric_instance_id)
tenant_id = Application.get_env(:electric, :default_tenant)
tenant_opts = [electric_instance_id: config.electric_instance_id]

connection_manager_opts = [
router_opts = [
electric_instance_id: config.electric_instance_id,
connection_opts: config.connection_opts,
replication_opts: [
publication_name: config.replication_opts.publication_name,
try_creating_publication?: true,
slot_name: config.replication_opts.slot_name,
slot_temporary?: config.replication_opts.slot_temporary?,
transaction_received:
{Electric.Replication.ShapeLogCollector, :store_transaction, [shape_log_collector]},
relation_received:
{Electric.Replication.ShapeLogCollector, :handle_relation_msg, [shape_log_collector]}
],
pool_opts: [
name: Electric.DbPool,
pool_size: config.pool_opts.size,
types: PgInterop.Postgrex.Types
],
persistent_kv: config.persistent_kv
tenant_manager: Electric.TenantManager.name(tenant_opts),
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false),
registry: Registry.ShapeChanges
]

# The root application supervisor starts the core global processes, including the HTTP
Expand All @@ -61,41 +49,38 @@ defmodule Electric.Application do
name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()},
{Registry,
name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()},
{Electric.Postgres.Inspector.EtsInspector, pool: Electric.DbPool},
Electric.TenantSupervisor,
{Electric.TenantManager, router_opts},
{Bandit,
plug:
{Electric.Plug.Router,
storage: config.storage,
registry: Registry.ShapeChanges,
shape_cache: {Electric.ShapeCache, config.shape_cache_opts},
get_service_status: &Electric.ServiceStatus.check/0,
inspector: config.inspector,
long_poll_timeout: 20_000,
max_age: Application.fetch_env!(:electric, :cache_max_age),
stale_age: Application.fetch_env!(:electric, :cache_stale_age),
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)},
plug: {Electric.Plug.Router, router_opts},
port: Application.fetch_env!(:electric, :service_port),
thousand_island_options: http_listener_options()}
],
prometheus_endpoint(Application.fetch_env!(:electric, :prometheus_port)),
[{Electric.Connection.Supervisor, connection_manager_opts}]
prometheus_endpoint(Application.fetch_env!(:electric, :prometheus_port))
])

Supervisor.start_link(children,
strategy: :one_for_one,
name: Electric.Supervisor
)
{:ok, sup_pid} =
Supervisor.start_link(children,
strategy: :one_for_one,
name: Electric.Supervisor
)

if tenant_id do
connection_opts = Application.fetch_env!(:electric, :default_connection_opts)
Electric.TenantManager.create_tenant(tenant_id, connection_opts, tenant_opts)
end

{:ok, sup_pid}
end

# This function is called once in the application's start() callback. It reads configuration
# from the OTP application env, runs some pre-processing functions and stores the processed
# configuration as a single map using `:persistent_term`.
defp configure do
electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id)
tenant_tables_name = Application.fetch_env!(:electric, :tenant_tables_name)
:ets.new(tenant_tables_name, [:public, :named_table, :set, {:read_concurrency, true}])

{storage_module, storage_in_opts} = Application.fetch_env!(:electric, :storage)
storage_opts = storage_module.shared_opts(storage_in_opts)
storage = {storage_module, storage_opts}
electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id)

{kv_module, kv_fun, kv_params} = Application.fetch_env!(:electric, :persistent_kv)
persistent_kv = apply(kv_module, kv_fun, [kv_params])
Expand All @@ -105,33 +90,9 @@ defmodule Electric.Application do
slot_name = "electric_slot_#{replication_stream_id}"
slot_temporary? = Application.get_env(:electric, :replication_slot_temporary?, false)

get_pg_version_fn = fn ->
Electric.Connection.Manager.get_pg_version(Electric.Connection.Manager)
end

prepare_tables_mfa =
{Electric.Postgres.Configuration, :configure_tables_for_replication!,
[get_pg_version_fn, publication_name]}

inspector =
{Electric.Postgres.Inspector.EtsInspector, server: Electric.Postgres.Inspector.EtsInspector}

shape_cache_opts = [
electric_instance_id: electric_instance_id,
storage: storage,
inspector: inspector,
prepare_tables_fn: prepare_tables_mfa,
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold),
log_producer: Electric.Replication.ShapeLogCollector.name(electric_instance_id),
consumer_supervisor: Electric.Shapes.ConsumerSupervisor.name(electric_instance_id),
registry: Registry.ShapeChanges
]

config = %Electric.Application.Configuration{
electric_instance_id: electric_instance_id,
storage: storage,
persistent_kv: persistent_kv,
connection_opts: Application.fetch_env!(:electric, :connection_opts),
replication_opts: %{
stream_id: replication_stream_id,
publication_name: publication_name,
Expand All @@ -140,9 +101,7 @@ defmodule Electric.Application do
},
pool_opts: %{
size: Application.fetch_env!(:electric, :db_pool_size)
},
inspector: inspector,
shape_cache_opts: shape_cache_opts
}
}

Electric.Application.Configuration.save(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ defmodule Electric.Application.Configuration do

defstruct ~w[
electric_instance_id
storage
persistent_kv
connection_opts
replication_opts
pool_opts
inspector
shape_cache_opts
]a

@type t :: %__MODULE__{}
Expand Down
Loading

0 comments on commit 3775f1c

Please sign in to comment.