Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (sync service): multi tenancy #1886

Merged
merged 55 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
6f776ba
Add TenantManager that spawns the necessary processes per tenant.
kevin-dp Oct 3, 2024
dc0352d
Initialize separate storage backend for each tenant
alco Oct 17, 2024
ebbb2aa
Fix tests
alco Oct 17, 2024
ed497e1
mix format
alco Oct 17, 2024
8d55fee
Fix missing :storage and :tenant_id options where needed
alco Oct 17, 2024
e53a144
Support configuring a default tenant when running in any environment,…
alco Oct 17, 2024
7227b24
Use default tenant for TS integration tests on CI
alco Oct 17, 2024
e091d1c
Adjust the health check endpoint for CI
alco Oct 17, 2024
bebbcdc
fixup! Support configuring a default tenant when running in any envir…
alco Oct 17, 2024
1b9a3ba
(tmp) Load requested or default tenant in DeleteShapePlug
alco Oct 17, 2024
76d9512
Add tenant id to .env.dev
msfstef Oct 21, 2024
badc5e1
Fix typo in options passed to plug.
kevin-dp Oct 21, 2024
566626b
Remove commented tests because they are either obsolete or included i…
kevin-dp Oct 22, 2024
3213d22
Fix unit tests.
kevin-dp Oct 23, 2024
5769825
Make default tenant optional
kevin-dp Oct 24, 2024
f879573
rename TENANT_ID env var to DATABASE_ID
kevin-dp Oct 24, 2024
904589e
Modify health check endpoint to take a database_id
kevin-dp Oct 24, 2024
5698fce
Restore allow_shape_deletion config option needed for delete shape plug.
kevin-dp Oct 24, 2024
7404df8
Fix react hooks test
kevin-dp Oct 24, 2024
e647177
Fix rolling deploy integration test
kevin-dp Oct 24, 2024
a4556f7
Unit tests for add database plug.
kevin-dp Oct 28, 2024
f72bdb7
Add optional databaseId parameter the ShapeStream for selecting a DB …
kevin-dp Oct 28, 2024
86a4060
Integration test for multi tenancy
kevin-dp Oct 28, 2024
0e47e49
Fix error message in delete shape plug
kevin-dp Oct 28, 2024
33205ba
Plug for deleting a tenant
kevin-dp Oct 28, 2024
6d122af
Fix clearShape in test setup
kevin-dp Oct 28, 2024
3dab5d5
Do not take the tenant ID of the 2nd tenant for tge integration test …
kevin-dp Oct 28, 2024
224d892
Rename delete DB plug to remove DB plug.
kevin-dp Oct 28, 2024
9ef0dcf
Unit tests for remove DB plug
kevin-dp Oct 28, 2024
4a02174
Remove obsolete comment
kevin-dp Oct 28, 2024
3c23385
Extract duplicated functions for loading tenant to utility module.
kevin-dp Oct 29, 2024
350ea8c
Return 404 if tenant is not found
kevin-dp Oct 29, 2024
551e56d
Update OpenAPI spec
kevin-dp Oct 29, 2024
859ac13
Rename id parameter to database_id in add DB plug
kevin-dp Oct 29, 2024
e2be9e6
Disable file parallelism in vitest to avoid flaky tests due to some u…
kevin-dp Oct 29, 2024
918389d
Handle failure to parse connection string
kevin-dp Oct 29, 2024
aafb9a5
Store references to per-tenant ETS tables in a global ETS table.
kevin-dp Oct 30, 2024
4300bfd
WIP shutting down
kevin-dp Oct 30, 2024
ad1cb56
Reverse error messages
msfstef Oct 30, 2024
23909e7
Shutdown tenant processes and clean up ETS table when tenant is deleted.
kevin-dp Oct 30, 2024
4659b15
Introduce a with_supervised_tenant in component_setup.
kevin-dp Oct 31, 2024
73649fb
Rebase on top of main
kevin-dp Oct 31, 2024
c92ea4d
Pass tenant_id option to stop_tenant
kevin-dp Oct 31, 2024
0915711
Persist tenants
kevin-dp Oct 31, 2024
53a1629
Unit test tenant persistence module
kevin-dp Nov 4, 2024
e57db87
Remove obsolete tenant deletion in component setup
kevin-dp Nov 4, 2024
71a528f
Remove tenant from disk when the tenant is deleted.
kevin-dp Nov 4, 2024
30ae9b4
Unit test that tenant manager loads tenants from storage
kevin-dp Nov 4, 2024
92cb589
Formatting
kevin-dp Nov 5, 2024
fc77fc7
Generate unique tenant ID if DB URL is provided but no tenant ID is p…
kevin-dp Nov 5, 2024
c92e74b
Log message when tenant is reloaded from storage
kevin-dp Nov 5, 2024
8537861
fixes
icehaunter Nov 5, 2024
673d80d
Fixed compilation warnings
icehaunter Nov 5, 2024
9733e96
fixed tests
icehaunter Nov 5, 2024
77750ac
Fixed API spec
icehaunter Nov 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ts_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ jobs:
defaults:
run:
working-directory: ${{ matrix.package_dir }}
env:
DATABASE_ID: ci_test_tenant
steps:
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
Expand Down Expand Up @@ -111,7 +113,7 @@ jobs:
mix run --no-halt &

wait-on: |
http-get://localhost:3000/v1/health
http-get://localhost:3000/v1/health?database_id=${{ env.DATABASE_ID }}

tail: true
log-output-resume: stderr
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/_macros.luxinc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
[shell $shell_name]
-$fail_pattern

!DATABASE_URL=$database_url PORT=$port $env ../scripts/electric_dev.sh
!DATABASE_ID=integration_test_tenant DATABASE_URL=$database_url PORT=$port $env ../scripts/electric_dev.sh
[endmacro]

[macro teardown]
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/invalidated-replication-slot.lux
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

[my invalidated_slot_error=
"""
[error] GenServer Electric.Connection.Manager terminating
[error] :gen_statem {Electric.Registry.Processes, {Electric.Postgres.ReplicationClient, :default, "integration_test_tenant"}} terminating
** (Postgrex.Error) ERROR 55000 (object_not_in_prerequisite_state) cannot read from logical replication slot "electric_slot_integration"

This slot has been invalidated because it exceeded the maximum reserved size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@
?Txn received in Shapes.Consumer: %Electric.Replication.Changes.Transaction{xid: $xid

# Both consumers hit their call limit and exit with simulated storage failures.
?\[error\] GenServer {Electric\.Registry\.Processes, {Electric\.Shapes\.Consumer, :default, "[0-9-]+"}} terminating
?\[error\] GenServer {Electric\.Registry\.Processes, {Electric\.Shapes\.Consumer, :default, "integration_test_tenant", "[0-9-]+"}} terminating
??Simulated storage failure
?\[error\] GenServer {Electric\.Registry\.Processes, {Electric\.Shapes\.Consumer, :default, "[0-9-]+"}} terminating
?\[error\] GenServer {Electric\.Registry\.Processes, {Electric\.Shapes\.Consumer, :default, "integration_test_tenant", "[0-9-]+"}} terminating
??Simulated storage failure

# The log collector process and the replication client both exit, as their lifetimes are tied
# together by the supervision tree design.
??[error] GenServer {Electric.Registry.Processes, {Electric.Replication.ShapeLogCollector, :default}} terminating
??[error] :gen_statem {Electric.Registry.Processes, {Electric.Postgres.ReplicationClient, :default}} terminating
??[error] GenServer {Electric.Registry.Processes, {Electric.Replication.ShapeLogCollector, :default, "integration_test_tenant"}} terminating
??[error] :gen_statem {Electric.Registry.Processes, {Electric.Postgres.ReplicationClient, :default, "integration_test_tenant"}} terminating

# Observe that both shape consumers and the replication client have restarted.
??[debug] Found existing replication slot
Expand Down
8 changes: 4 additions & 4 deletions integration-tests/tests/rolling-deploy.lux
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

# First service should be health and active
[shell orchestator]
!curl -X GET http://localhost:3000/v1/health
!curl -X GET http://localhost:3000/v1/health?database_id=integration_test_tenant
??{"status":"active"}

## Start the second sync service.
Expand All @@ -35,9 +35,9 @@

# Second service should be in waiting state, ready to take over
[shell orchestator]
!curl -X GET http://localhost:3000/v1/health
!curl -X GET http://localhost:3000/v1/health?database_id=integration_test_tenant
??{"status":"active"}
!curl -X GET http://localhost:3001/v1/health
!curl -X GET http://localhost:3001/v1/health?database_id=integration_test_tenant
??{"status":"waiting"}

## Terminate first electric
Expand All @@ -55,7 +55,7 @@

# Second service is now healthy and active
[shell orchestator]
!curl -X GET http://localhost:3001/v1/health
!curl -X GET http://localhost:3001/v1/health?database_id=integration_test_tenant
??{"status":"active"}

[cleanup]
Expand Down
3 changes: 2 additions & 1 deletion packages/react-hooks/test/support/global-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { makePgClient } from './test-helpers'

const url = process.env.ELECTRIC_URL ?? `http://localhost:3000`
const proxyUrl = process.env.ELECTRIC_PROXY_CACHE_URL ?? `http://localhost:3002`
const databaseId = process.env.DATABASE_ID ?? `test_tenant`

// name of proxy cache container to execute commands against,
// see docker-compose.yml that spins it up for details
Expand All @@ -29,7 +30,7 @@ function waitForElectric(url: string): Promise<void> {
)

const tryHealth = async () =>
fetch(`${url}/v1/health`)
fetch(`${url}/v1/health?database_id=${databaseId}`)
.then(async (res): Promise<void> => {
if (!res.ok) return tryHealth()
const { status } = (await res.json()) as { status: string }
Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/.env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ CACHE_MAX_AGE=1
CACHE_STALE_AGE=3
# using a small chunk size of 10kB for dev to speed up tests
LOG_CHUNK_BYTES_THRESHOLD=10000
DATABASE_ID=test_tenant
# configuring a second database for multi-tenancy integration testing
OTHER_DATABASE_URL=postgresql://postgres:password@localhost:54322/electric?sslmode=disable
3 changes: 3 additions & 0 deletions packages/sync-service/.env.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
LOG_LEVEL=info
DATABASE_URL=postgresql://postgres:password@localhost:54321/postgres?sslmode=disable
DATABASE_ID=test_tenant
44 changes: 24 additions & 20 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ config :logger,
handle_sasl_reports: sasl?

if config_env() == :test do
config(:logger, level: :info)
config(:electric, pg_version_for_tests: env!("POSTGRES_VERSION", :integer, 150_001))
config :electric, pg_version_for_tests: env!("POSTGRES_VERSION", :integer, 150_001)
end

electric_instance_id = :default
Expand Down Expand Up @@ -85,28 +84,32 @@ otel_simple_processor =
config :opentelemetry,
processors: [otel_batch_processor, otel_simple_processor] |> Enum.reject(&is_nil/1)

connection_opts =
if Config.config_env() == :test do
[
hostname: "localhost",
port: 54321,
username: "postgres",
password: "password",
database: "postgres",
sslmode: :disable
]
else
{:ok, database_url_config} =
env!("DATABASE_URL", :string)
|> Electric.ConfigParser.parse_postgresql_uri()
database_url = env!("DATABASE_URL", :string, nil)
default_tenant = env!("DATABASE_ID", :string, nil)

case {database_url, default_tenant} do
{nil, nil} ->
# No default tenant provided
:ok

{nil, _} ->
raise "DATABASE_URL must be provided when DATABASE_ID is set"

{_, _} ->
# A default tenant is provided
{:ok, database_url_config} = Electric.ConfigParser.parse_postgresql_uri(database_url)

database_ipv6_config =
env!("DATABASE_USE_IPV6", :boolean, false)

database_url_config ++ [ipv6: database_ipv6_config]
end
connection_opts = database_url_config ++ [ipv6: database_ipv6_config]

config :electric, default_connection_opts: Electric.Utils.obfuscate_password(connection_opts)

config :electric, connection_opts: Electric.Utils.obfuscate_password(connection_opts)
# if `default_tenant` is nil, generate a random UUID for it
tenant_id = default_tenant || Electric.Utils.uuid4()
config :electric, default_tenant: tenant_id
end

enable_integration_testing = env!("ENABLE_INTEGRATION_TESTING", :boolean, false)
cache_max_age = env!("CACHE_MAX_AGE", :integer, 60)
Expand Down Expand Up @@ -205,4 +208,5 @@ config :electric,
prometheus_port: prometheus_port,
storage: storage,
persistent_kv: persistent_kv,
listen_on_ipv6?: env!("LISTEN_ON_IPV6", :boolean, false)
listen_on_ipv6?: env!("LISTEN_ON_IPV6", :boolean, false),
tenant_tables_name: :tenant_tables
18 changes: 18 additions & 0 deletions packages/sync-service/dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ services:
- docker-entrypoint.sh
- -c
- config_file=/etc/postgresql.conf
postgres2:
image: postgres:16-alpine
environment:
POSTGRES_DB: electric
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "54322:5433"
volumes:
- ./postgres2.conf:/etc/postgresql.conf:ro
- ./init.sql:/docker-entrypoint-initdb.d/00_shared_init.sql:ro
tmpfs:
- /var/lib/postgresql/data
- /tmp
entrypoint:
- docker-entrypoint.sh
- -c
- config_file=/etc/postgresql.conf
nginx:
image: nginx:latest
ports:
Expand Down
3 changes: 3 additions & 0 deletions packages/sync-service/dev/postgres2.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
listen_addresses = '*'
wal_level = logical # minimal, replica, or logical
port = 5433
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: why do you need to change the internal listening port instead of using existing pg conf? The PGs are always different either on mapped port from the POV of host machine, or on hostname from POV of other docker containers in the compose file

107 changes: 33 additions & 74 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
defmodule Electric.Application do
use Application
require Config

@process_registry_name Electric.Registry.Processes
def process_registry, do: @process_registry_name

@spec process_name(atom(), atom()) :: {:via, atom(), atom()}
def process_name(electric_instance_id, module) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id}}}
@spec process_name(atom(), String.t(), atom()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, tenant_id, module) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, tenant_id}}}
end

@spec process_name(atom(), atom(), term()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, module, id) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, id}}}
@spec process_name(atom(), String.t(), atom(), term()) :: {:via, atom(), {atom(), term()}}
def process_name(electric_instance_id, tenant_id, module, id) when is_atom(module) do
{:via, Registry, {@process_registry_name, {module, electric_instance_id, tenant_id, id}}}
end

@impl true
Expand All @@ -20,27 +21,14 @@ defmodule Electric.Application do

config = configure()

shape_log_collector = Electric.Replication.ShapeLogCollector.name(config.electric_instance_id)
tenant_id = Application.get_env(:electric, :default_tenant)
tenant_opts = [electric_instance_id: config.electric_instance_id]

connection_manager_opts = [
router_opts = [
electric_instance_id: config.electric_instance_id,
connection_opts: config.connection_opts,
replication_opts: [
publication_name: config.replication_opts.publication_name,
try_creating_publication?: true,
slot_name: config.replication_opts.slot_name,
slot_temporary?: config.replication_opts.slot_temporary?,
transaction_received:
{Electric.Replication.ShapeLogCollector, :store_transaction, [shape_log_collector]},
relation_received:
{Electric.Replication.ShapeLogCollector, :handle_relation_msg, [shape_log_collector]}
],
pool_opts: [
name: Electric.DbPool,
pool_size: config.pool_opts.size,
types: PgInterop.Postgrex.Types
],
persistent_kv: config.persistent_kv
tenant_manager: Electric.TenantManager.name(tenant_opts),
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false),
registry: Registry.ShapeChanges
]

# The root application supervisor starts the core global processes, including the HTTP
Expand All @@ -61,41 +49,38 @@ defmodule Electric.Application do
name: @process_registry_name, keys: :unique, partitions: System.schedulers_online()},
{Registry,
name: Registry.ShapeChanges, keys: :duplicate, partitions: System.schedulers_online()},
{Electric.Postgres.Inspector.EtsInspector, pool: Electric.DbPool},
Electric.TenantSupervisor,
{Electric.TenantManager, router_opts},
{Bandit,
plug:
{Electric.Plug.Router,
storage: config.storage,
registry: Registry.ShapeChanges,
shape_cache: {Electric.ShapeCache, config.shape_cache_opts},
get_service_status: &Electric.ServiceStatus.check/0,
inspector: config.inspector,
long_poll_timeout: 20_000,
max_age: Application.fetch_env!(:electric, :cache_max_age),
stale_age: Application.fetch_env!(:electric, :cache_stale_age),
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)},
plug: {Electric.Plug.Router, router_opts},
port: Application.fetch_env!(:electric, :service_port),
thousand_island_options: http_listener_options()}
],
prometheus_endpoint(Application.fetch_env!(:electric, :prometheus_port)),
[{Electric.Connection.Supervisor, connection_manager_opts}]
prometheus_endpoint(Application.fetch_env!(:electric, :prometheus_port))
])

Supervisor.start_link(children,
strategy: :one_for_one,
name: Electric.Supervisor
)
{:ok, sup_pid} =
Supervisor.start_link(children,
strategy: :one_for_one,
name: Electric.Supervisor
)

if tenant_id do
connection_opts = Application.fetch_env!(:electric, :default_connection_opts)
Electric.TenantManager.create_tenant(tenant_id, connection_opts, tenant_opts)
end

{:ok, sup_pid}
end

# This function is called once in the application's start() callback. It reads configuration
# from the OTP application env, runs some pre-processing functions and stores the processed
# configuration as a single map using `:persistent_term`.
defp configure do
electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id)
tenant_tables_name = Application.fetch_env!(:electric, :tenant_tables_name)
:ets.new(tenant_tables_name, [:public, :named_table, :set, {:read_concurrency, true}])

{storage_module, storage_in_opts} = Application.fetch_env!(:electric, :storage)
storage_opts = storage_module.shared_opts(storage_in_opts)
storage = {storage_module, storage_opts}
electric_instance_id = Application.fetch_env!(:electric, :electric_instance_id)

{kv_module, kv_fun, kv_params} = Application.fetch_env!(:electric, :persistent_kv)
persistent_kv = apply(kv_module, kv_fun, [kv_params])
Expand All @@ -105,33 +90,9 @@ defmodule Electric.Application do
slot_name = "electric_slot_#{replication_stream_id}"
slot_temporary? = Application.get_env(:electric, :replication_slot_temporary?, false)

get_pg_version_fn = fn ->
Electric.Connection.Manager.get_pg_version(Electric.Connection.Manager)
end

prepare_tables_mfa =
{Electric.Postgres.Configuration, :configure_tables_for_replication!,
[get_pg_version_fn, publication_name]}

inspector =
{Electric.Postgres.Inspector.EtsInspector, server: Electric.Postgres.Inspector.EtsInspector}

shape_cache_opts = [
electric_instance_id: electric_instance_id,
storage: storage,
inspector: inspector,
prepare_tables_fn: prepare_tables_mfa,
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold),
log_producer: Electric.Replication.ShapeLogCollector.name(electric_instance_id),
consumer_supervisor: Electric.Shapes.ConsumerSupervisor.name(electric_instance_id),
registry: Registry.ShapeChanges
]

config = %Electric.Application.Configuration{
electric_instance_id: electric_instance_id,
storage: storage,
persistent_kv: persistent_kv,
connection_opts: Application.fetch_env!(:electric, :connection_opts),
replication_opts: %{
stream_id: replication_stream_id,
publication_name: publication_name,
Expand All @@ -140,9 +101,7 @@ defmodule Electric.Application do
},
pool_opts: %{
size: Application.fetch_env!(:electric, :db_pool_size)
},
inspector: inspector,
shape_cache_opts: shape_cache_opts
}
}

Electric.Application.Configuration.save(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ defmodule Electric.Application.Configuration do

defstruct ~w[
electric_instance_id
storage
persistent_kv
connection_opts
replication_opts
pool_opts
inspector
shape_cache_opts
]a

@type t :: %__MODULE__{}
Expand Down
Loading
Loading