Skip to content

Commit

Permalink
fix(electric): Support Prisma shadow database in electric proxy (#769)
Browse files Browse the repository at this point in the history
Fixes #668
  • Loading branch information
magnetised authored Dec 21, 2023
1 parent ffcb8a9 commit c8b06cc
Show file tree
Hide file tree
Showing 12 changed files with 414 additions and 24 deletions.
10 changes: 8 additions & 2 deletions components/electric/lib/electric/postgres/proxy/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ defmodule Electric.Postgres.Proxy.Handler do

{:ok, loader_conn} = loader_module.connect(conn_config, loader_opts)

{:ok, injector} =
{:ok, {stack, _state} = injector} =
state.injector_opts
|> Keyword.merge(loader: {loader_module, loader_conn})
|> Injector.new(
Expand All @@ -241,11 +241,17 @@ defmodule Electric.Postgres.Proxy.Handler do
database: state.database
)

# allow the injector to configure the upstream connection. required in order for prisma's
# connections to the shadow db to ignore the default upstream database and actually connect
# to this ephemeral db
upstream_conn_config =
Injector.Operation.upstream_connection(stack, state.conn_config)

{:ok, pid} =
UpstreamConnection.start_link(
parent: self(),
session_id: state.session_id,
conn_config: state.conn_config
conn_config: upstream_conn_config
)

:ok = downstream([%M.AuthenticationOk{}], socket, state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ defmodule Electric.Postgres.Proxy.Handler.Tracing do
if tracing_enabled?(config) do
{label, colour} =
case {action, side} do
{:send, :client} -> {"[#{session_id}] 🠞 #{side} ", :green}
{:recv, :client} -> {"[#{session_id}] 🠜 #{side} ", :green}
{:send, :server} -> {"[#{session_id}] #{side} 🠜 ", :yellow}
{:recv, :server} -> {"[#{session_id}] #{side} 🠞 ", :yellow}
{:send, :client} -> {"[#{session_id}] -▶ #{side} ", :green}
{:recv, :client} -> {"[#{session_id}] ◀- #{side} ", :green}
{:send, :server} -> {"[#{session_id}] #{side} ◀- ", :yellow}
{:recv, :server} -> {"[#{session_id}] #{side} -▶ ", :yellow}
end

IO.puts(
Expand Down
29 changes: 21 additions & 8 deletions components/electric/lib/electric/postgres/proxy/injector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@ defmodule Electric.Postgres.Proxy.Injector do

capture_mode_opts = Keyword.get(opts, :capture_mode, [])

default = Keyword.get(capture_mode_opts, :default, @default_mode)
per_user = Keyword.get(capture_mode_opts, :per_user, %{})
default_injector = Keyword.get(capture_mode_opts, :default, @default_mode)

session_id = Keyword.get(connection, :session_id, 0)

mode = Map.get(per_user, connection[:username], default)
mode =
per_database_injector(connection) ||
per_user_injector(capture_mode_opts, connection) ||
default_injector

capture =
mode
|> default_capture_mode()
|> configure_capture_mode()
|> initialise_capture_mode(connection)

Logger.info("Initialising injector in capture mode #{inspect(capture || "default")}")
Expand All @@ -52,15 +54,26 @@ defmodule Electric.Postgres.Proxy.Injector do
end
end

defp default_capture_mode(nil) do
@default_mode
defp per_database_injector(connection) do
case Keyword.get(connection, :database) do
"prisma_migrate_shadow_db" <> _ = database ->
Logger.debug("Connection to prisma shadow db: using Shadow injector")
{Injector.Shadow, [database: database]}

_ ->
nil
end
end

defp per_user_injector(opts, connection) do
get_in(opts, [:per_user, connection[:username]])
end

defp default_capture_mode(module) when is_atom(module) do
defp configure_capture_mode(module) when is_atom(module) do
{module, []}
end

defp default_capture_mode({module, params})
defp configure_capture_mode({module, params})
when is_atom(module) and (is_list(params) or is_map(params)) do
{module, params}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,17 @@ defmodule Electric.Postgres.Proxy.Injector.Electric do
def group_messages(msgs) do
{current, final} =
Enum.reduce(msgs, {[], []}, fn
%M.Query{} = msg, {[], f} -> {[], [{:simple, [msg]} | f]}
%M.Query{} = msg, {c, f} -> {[], [{:simple, [msg]}, {:extended, Enum.reverse(c)} | f]}
%M.Sync{} = msg, {c, f} -> {[], [{:extended, Enum.reverse([msg | c])} | f]}
m, {c, f} -> {[m | c], f}
%M.Query{} = msg, {[], f} ->
{[], [{:simple, [msg]} | f]}

%M.Query{} = msg, {c, f} ->
{[], [{:simple, [msg]}, {:extended, Enum.reverse(c)} | f]}

%type{} = msg, {c, f} when type in [M.Sync, M.Flush] ->
{[], [{:extended, Enum.reverse([msg | c])} | f]}

m, {c, f} ->
{[m | c], f}
end)

case {current, final} do
Expand All @@ -135,6 +142,10 @@ defmodule Electric.Postgres.Proxy.Injector.Electric do
end

defimpl Operation do
def upstream_connection(_electric, conn_config) do
conn_config
end

def activate(electric, state, send) do
{electric, state, send}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@ defprotocol Electric.Postgres.Proxy.Injector.Operation do
"""

alias Electric.Postgres.Proxy.Injector.{Send, State}
alias Electric.Replication.Connectors

@type op_stack() :: nil | t() | [t()]
@type op() :: nil | t()
@type result() :: {op(), State.t(), Send.t()}

@spec upstream_connection(t(), Connectors.config()) :: Connectors.config()
def upstream_connection(op, conn_config)

@doc """
Given a set of messages from the client returns an updated operation stack.
"""
Expand Down Expand Up @@ -129,6 +133,10 @@ defmodule Operation.Impl do
quote do
import Injector.Operation.Impl

def upstream_connection(_op, conn_config) do
conn_config
end

# no-op
def recv_client(op, msgs, state) do
{op, state}
Expand Down Expand Up @@ -168,7 +176,8 @@ defmodule Operation.Impl do
{nil, state, send}
end

defoverridable recv_client: 3,
defoverridable upstream_connection: 2,
recv_client: 3,
activate: 3,
recv_server: 4,
send_client: 3,
Expand Down Expand Up @@ -265,6 +274,10 @@ end
defimpl Operation, for: List do
use Operation.Impl

def upstream_connection([op | _rest], conn_config) do
Operation.upstream_connection(op, conn_config)
end

def recv_client([], _msgs, _state) do
raise "empty command stack!"
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ defmodule Electric.Postgres.Proxy.Injector.Prisma do
defimpl Operation do
alias Electric.Postgres.Extension.SchemaLoader

def upstream_connection(_prisma, conn_config) do
conn_config
end

def activate(prisma, state, send) do
{prisma, state, send}
end
Expand Down
Loading

0 comments on commit c8b06cc

Please sign in to comment.