Skip to content

Commit

Permalink
chore(electric): Clean up Logger calls in server-side auth logic (#402)
Browse files Browse the repository at this point in the history
This addresses the issue reported in Discord on 4 Aug 2023. The
`metadata` key was a bad idea because the default console formatter for
Logger expects metadata values to be implement the `String.Chars`
protocol which lists and maps do not.

Regardless, I now think that including the error in the logged message
is better DX than tagging it on as metadata.
  • Loading branch information
alco authored Sep 11, 2023
1 parent 74e99d1 commit 553a817
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 79 deletions.
52 changes: 35 additions & 17 deletions components/electric/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,56 @@ config :logger,
config :logger, :console,
format: "$time $metadata[$level] $message\n",
metadata: [
# :pid is intentionally put as the first list item below. Logger prints metadata in the same order as it is configured
# here, so having :pid sorted in the list alphabetically would make it get in the away of log output matching that we
# do in many of our E2E tests.
:pid,
:client_id,
:component,
:connection,
:instance_id,
:origin,
:pid,
:pg_client,
:pg_producer,
:pg_slot,
:remote_ip,
:component,
:instance_id,
:client_id,
:user_id,
:metadata,
:request_id
# :remote_ip is intentionally commented out below.
#
# IP addresses are user-identifiable information protected under GDPR. Our
# customers might not like it when they use client IP addresses in the
# logs of their on-premises installation of Electric.
#
# Although it appears the consensus is thta logging IP addresses is fine
# (see https://law.stackexchange.com/a/28609), there are caveats.
#
# I think that adding IP addresses to logs should be made as part of the
# same decision that determines the log retention policy. Since we're not
# tying the logged IP addresses to users' personal information managed by
# customer apps, we cannot clean them up as part of the "delete all user
# data" procedure that app developers have in place to conform to GDPR
# requirements. Therefore, logging IP addresses by default is better
# avoided in production builds of Electric.
#
# We may introduce it as a configurable option for better DX at some point.
# :remote_ip,
:request_id,
:sq_client,
:user_id
]

pg_server_port =
System.get_env("LOGICAL_PUBLISHER_PORT", default_pg_server_port) |> String.to_integer()

config :electric,
# Used only to send server identification upon connection,
# can stay default while we're not working on multi-instance setups
instance_id: System.get_env("ELECTRIC_INSTANCE_ID", default_instance_id)
instance_id: System.get_env("ELECTRIC_INSTANCE_ID", default_instance_id),
http_port: System.get_env("HTTP_PORT", default_http_server_port) |> String.to_integer(),
pg_server_port: pg_server_port

config :electric, Electric.Replication.Postgres,
pg_client: Electric.Replication.Postgres.Client,
producer: Electric.Replication.Postgres.LogicalReplicationProducer

config :electric, Electric.Satellite.WebsocketServer,
port: System.get_env("HTTP_PORT", default_http_server_port) |> String.to_integer()

pg_server_port =
System.get_env("LOGICAL_PUBLISHER_PORT", default_pg_server_port) |> String.to_integer()

config :electric, Electric.PostgresServer, port: pg_server_port

# The :prod environment is inlined here because by default Mix won't copy any config/runtime.*.exs files when assembling
# a release, and we want a single configuration file in our release.
if config_env() == :prod do
Expand Down
11 changes: 3 additions & 8 deletions components/electric/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Electric.Application do
unless Application.get_env(:electric, :disable_listeners, false) do
[
# Satellite websocket connections are served from this router
{Bandit, plug: Electric.Plug.Router, port: ws_server_port()}
{Bandit, plug: Electric.Plug.Router, port: http_port()}
]
else
[]
Expand Down Expand Up @@ -53,11 +53,6 @@ defmodule Electric.Application do
end
end

defp ws_server_port,
do:
Application.fetch_env!(:electric, Electric.Satellite.WebsocketServer)
|> Keyword.fetch!(:port)

defp pg_server_port,
do: Application.fetch_env!(:electric, Electric.PostgresServer) |> Keyword.fetch!(:port)
defp http_port, do: Application.fetch_env!(:electric, :http_port)
defp pg_server_port, do: Application.fetch_env!(:electric, :pg_server_port)
end
89 changes: 40 additions & 49 deletions components/electric/lib/electric/satellite/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -194,62 +194,53 @@ defmodule Electric.Satellite.Protocol do
{nil | :stop | PB.sq_pb_msg() | [PB.sq_pb_msg()], State.t()}
| {:force_unpause, PB.sq_pb_msg() | [PB.sq_pb_msg()], State.t()}
| {:error, PB.sq_pb_msg()}
def process_message(msg, %State{} = state) when not auth_passed?(state) do
case msg do
%SatAuthReq{id: client_id, token: token}
when client_id !== "" and token !== "" ->
Logger.debug("Received auth request for #{inspect(client_id)}")

# NOTE: We treat successful registration with Electric.safe_reg as an
# indication that at least the previously connected WS client is down.
# However satellite_client_manager may not necessarily have reacted to that
# yet. So as long as safe_reg succeeded call to ClientManager should
# succeed as well
reg_name = Electric.Satellite.WebsocketServer.reg_name(client_id)

with {:ok, auth} <- Electric.Satellite.Auth.validate_token(token, state.auth_provider),
true <- Electric.safe_reg(reg_name, 1000),
:ok <- ClientManager.register_client(client_id, reg_name) do
Logger.metadata(client_id: client_id, user_id: auth.user_id)
Logger.info("authenticated client #{client_id} as user #{auth.user_id}")
Metrics.satellite_connection_event(%{authorized_connection: 1})

{%SatAuthResp{id: Electric.instance_id()},
%State{state | auth: auth, auth_passed: true, client_id: client_id}}
else
{:error, %SatErrorResp{}} = error ->
error

{:error, %Electric.Satellite.Auth.TokenError{message: message}} ->
Logger.warning("client authorization failed",
metadata: [client_id: client_id, error: message]
)

{:error, %SatErrorResp{error_type: :AUTH_REQUIRED}}

{:error, :already_registered} ->
Logger.info(
"attempted multiple connections from the client with same id: #{inspect(client_id)}"
)

{:error, %SatErrorResp{error_type: :INVALID_REQUEST}}

{:error, reason} ->
Logger.error(
"authorization failed for client: #{client_id} with reason: #{inspect(reason)}"
)

{:error, %SatErrorResp{error_type: :AUTH_REQUIRED}}
end
def process_message(%SatAuthReq{id: client_id, token: token}, state)
when not auth_passed?(state) and client_id != "" and token != "" do
Logger.metadata(client_id: client_id)
Logger.debug("Received auth request")

# NOTE: We treat successful registration with Electric.safe_reg as an
# indication that at least the previously connected WS client is down.
# However satellite_client_manager may not necessarily have reacted to that
# yet. So as long as safe_reg succeeded call to ClientManager should
# succeed as well
reg_name = Electric.Satellite.WebsocketServer.reg_name(client_id)

with {:ok, auth} <- Electric.Satellite.Auth.validate_token(token, state.auth_provider),
true <- Electric.safe_reg(reg_name, 1000),
:ok <- ClientManager.register_client(client_id, reg_name) do
Logger.metadata(user_id: auth.user_id)
Logger.info("Successfully authenticated the client")
Metrics.satellite_connection_event(%{authorized_connection: 1})

{
%SatAuthResp{id: Electric.instance_id()},
%State{state | auth: auth, auth_passed: true, client_id: client_id}
}
else
{:error, %SatErrorResp{}} = error ->
error

%SatAuthReq{} ->
{:error, :already_registered} ->
Logger.info("attempted multiple connections from the same client")
{:error, %SatErrorResp{error_type: :INVALID_REQUEST}}

_ ->
{:error, %Electric.Satellite.Auth.TokenError{message: message}} ->
Logger.warning("Client authentication failed: #{message}")
{:error, %SatErrorResp{error_type: :AUTH_REQUIRED}}

{:error, reason} ->
Logger.error("Client authentication failed: #{inspect(reason)}")
{:error, %SatErrorResp{error_type: :AUTH_REQUIRED}}
end
end

def process_message(%SatAuthReq{}, state) when not auth_passed?(state),
do: {:error, %SatErrorResp{error_type: :INVALID_REQUEST}}

def process_message(_, state) when not auth_passed?(state),
do: {:error, %SatErrorResp{error_type: :AUTH_REQUIRED}}

# Satellite client request replication
def process_message(
%SatInStartReplicationReq{lsn: client_lsn, options: opts} = msg,
Expand Down
4 changes: 2 additions & 2 deletions e2e/tests/02.01_postgres_data_streams_to_satellite.lux
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

[shell electric]
# We expect to send the transaction to Satellite
?client_id=client_1_1 user_id=1 \[debug\] trans:(.*)%Electric.Replication.Changes.NewRecord\{(.*)record: %\{
?client_id=client_1_1 .+ user_id=1 \[debug\] trans:(.*)%Electric.Replication.Changes.NewRecord\{(.*)record: %\{
?+"content" => "sentinel value"
?+"content_b" => nil
?"id" => "00000000-0000-0000-0000-000000000000"
Expand All @@ -37,4 +37,4 @@
?\}

[cleanup]
[invoke teardown]
[invoke teardown]
6 changes: 3 additions & 3 deletions e2e/tests/02.03_partial_replication_based_on_user_id.lux
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

[shell electric]
# We expect to send the transaction to both satellites
?client_id=client_1_1 user_id=1 \[debug\] trans:(.*)%Electric.Replication.Changes.NewRecord
?client_id=client_2_1 user_id=2 \[debug\] trans:(.*)%Electric.Replication.Changes.NewRecord
?client_id=client_1_1 .+ user_id=1 \[debug\] trans:(.*)%Electric.Replication.Changes.NewRecord
?client_id=client_2_1 .+ user_id=2 \[debug\] trans:(.*)%Electric.Replication.Changes.NewRecord

[shell user_1_ws1]
# And recieve it on Satellite 1. Assertion for the row missing on Satellite 2 is above
Expand All @@ -40,4 +40,4 @@
?\}

[cleanup]
[invoke teardown]
[invoke teardown]

0 comments on commit 553a817

Please sign in to comment.