diff --git a/components/electric/lib/electric/postgres/proxy/handler.ex b/components/electric/lib/electric/postgres/proxy/handler.ex index ab56f3685b..a53eea2e9e 100644 --- a/components/electric/lib/electric/postgres/proxy/handler.ex +++ b/components/electric/lib/electric/postgres/proxy/handler.ex @@ -232,7 +232,7 @@ defmodule Electric.Postgres.Proxy.Handler do {:ok, loader_conn} = loader_module.connect(conn_config, loader_opts) - {:ok, injector} = + {:ok, {stack, _state} = injector} = state.injector_opts |> Keyword.merge(loader: {loader_module, loader_conn}) |> Injector.new( @@ -241,11 +241,17 @@ defmodule Electric.Postgres.Proxy.Handler do database: state.database ) + # allow the injector to configure the upstream connection. required in order for prisma's + # connections to the shadow db to ignore the default upstream database and actually connect + # to this ephemeral db + upstream_conn_config = + Injector.Operation.upstream_connection(stack, state.conn_config) + {:ok, pid} = UpstreamConnection.start_link( parent: self(), session_id: state.session_id, - conn_config: state.conn_config + conn_config: upstream_conn_config ) :ok = downstream([%M.AuthenticationOk{}], socket, state) diff --git a/components/electric/lib/electric/postgres/proxy/handler/tracing.ex b/components/electric/lib/electric/postgres/proxy/handler/tracing.ex index 95c9c46f10..60e27efd32 100644 --- a/components/electric/lib/electric/postgres/proxy/handler/tracing.ex +++ b/components/electric/lib/electric/postgres/proxy/handler/tracing.ex @@ -9,10 +9,10 @@ defmodule Electric.Postgres.Proxy.Handler.Tracing do if tracing_enabled?(config) do {label, colour} = case {action, side} do - {:send, :client} -> {"[#{session_id}] 🠞 #{side} ", :green} - {:recv, :client} -> {"[#{session_id}] 🠜 #{side} ", :green} - {:send, :server} -> {"[#{session_id}] #{side} 🠜 ", :yellow} - {:recv, :server} -> {"[#{session_id}] #{side} 🠞 ", :yellow} + {:send, :client} -> {"[#{session_id}] -▶ #{side} ", :green} + {:recv, :client} -> {"[#{session_id}] ◀- #{side} ", :green} + {:send, :server} -> {"[#{session_id}] #{side} ◀- ", :yellow} + {:recv, :server} -> {"[#{session_id}] #{side} -▶ ", :yellow} end IO.puts( diff --git a/components/electric/lib/electric/postgres/proxy/injector.ex b/components/electric/lib/electric/postgres/proxy/injector.ex index 1b4f564af1..d7afbc6529 100644 --- a/components/electric/lib/electric/postgres/proxy/injector.ex +++ b/components/electric/lib/electric/postgres/proxy/injector.ex @@ -30,16 +30,18 @@ defmodule Electric.Postgres.Proxy.Injector do capture_mode_opts = Keyword.get(opts, :capture_mode, []) - default = Keyword.get(capture_mode_opts, :default, @default_mode) - per_user = Keyword.get(capture_mode_opts, :per_user, %{}) + default_injector = Keyword.get(capture_mode_opts, :default, @default_mode) session_id = Keyword.get(connection, :session_id, 0) - mode = Map.get(per_user, connection[:username], default) + mode = + per_database_injector(connection) || + per_user_injector(capture_mode_opts, connection) || + default_injector capture = mode - |> default_capture_mode() + |> configure_capture_mode() |> initialise_capture_mode(connection) Logger.info("Initialising injector in capture mode #{inspect(capture || "default")}") @@ -52,15 +54,26 @@ defmodule Electric.Postgres.Proxy.Injector do end end - defp default_capture_mode(nil) do - @default_mode + defp per_database_injector(connection) do + case Keyword.get(connection, :database) do + "prisma_migrate_shadow_db" <> _ = database -> + Logger.debug("Connection to prisma shadow db: using Shadow injector") + {Injector.Shadow, [database: database]} + + _ -> + nil + end + end + + defp per_user_injector(opts, connection) do + get_in(opts, [:per_user, connection[:username]]) end - defp default_capture_mode(module) when is_atom(module) do + defp configure_capture_mode(module) when is_atom(module) do {module, []} end - defp default_capture_mode({module, params}) + defp configure_capture_mode({module, params}) when is_atom(module) and (is_list(params) or is_map(params)) do {module, params} end diff --git a/components/electric/lib/electric/postgres/proxy/injector/electric.ex b/components/electric/lib/electric/postgres/proxy/injector/electric.ex index 5118939edb..6040ffecf5 100644 --- a/components/electric/lib/electric/postgres/proxy/injector/electric.ex +++ b/components/electric/lib/electric/postgres/proxy/injector/electric.ex @@ -111,10 +111,17 @@ defmodule Electric.Postgres.Proxy.Injector.Electric do def group_messages(msgs) do {current, final} = Enum.reduce(msgs, {[], []}, fn - %M.Query{} = msg, {[], f} -> {[], [{:simple, [msg]} | f]} - %M.Query{} = msg, {c, f} -> {[], [{:simple, [msg]}, {:extended, Enum.reverse(c)} | f]} - %M.Sync{} = msg, {c, f} -> {[], [{:extended, Enum.reverse([msg | c])} | f]} - m, {c, f} -> {[m | c], f} + %M.Query{} = msg, {[], f} -> + {[], [{:simple, [msg]} | f]} + + %M.Query{} = msg, {c, f} -> + {[], [{:simple, [msg]}, {:extended, Enum.reverse(c)} | f]} + + %type{} = msg, {c, f} when type in [M.Sync, M.Flush] -> + {[], [{:extended, Enum.reverse([msg | c])} | f]} + + m, {c, f} -> + {[m | c], f} end) case {current, final} do @@ -135,6 +142,10 @@ defmodule Electric.Postgres.Proxy.Injector.Electric do end defimpl Operation do + def upstream_connection(_electric, conn_config) do + conn_config + end + def activate(electric, state, send) do {electric, state, send} end diff --git a/components/electric/lib/electric/postgres/proxy/injector/operation.ex b/components/electric/lib/electric/postgres/proxy/injector/operation.ex index ed8e5152a5..4365d02b16 100644 --- a/components/electric/lib/electric/postgres/proxy/injector/operation.ex +++ b/components/electric/lib/electric/postgres/proxy/injector/operation.ex @@ -60,11 +60,15 @@ defprotocol Electric.Postgres.Proxy.Injector.Operation do """ alias Electric.Postgres.Proxy.Injector.{Send, State} + alias Electric.Replication.Connectors @type op_stack() :: nil | t() | [t()] @type op() :: nil | t() @type result() :: {op(), State.t(), Send.t()} + @spec upstream_connection(t(), Connectors.config()) :: Connectors.config() + def upstream_connection(op, conn_config) + @doc """ Given a set of messages from the client returns an updated operation stack. """ @@ -129,6 +133,10 @@ defmodule Operation.Impl do quote do import Injector.Operation.Impl + def upstream_connection(_op, conn_config) do + conn_config + end + # no-op def recv_client(op, msgs, state) do {op, state} @@ -168,7 +176,8 @@ defmodule Operation.Impl do {nil, state, send} end - defoverridable recv_client: 3, + defoverridable upstream_connection: 2, + recv_client: 3, activate: 3, recv_server: 4, send_client: 3, @@ -265,6 +274,10 @@ end defimpl Operation, for: List do use Operation.Impl + def upstream_connection([op | _rest], conn_config) do + Operation.upstream_connection(op, conn_config) + end + def recv_client([], _msgs, _state) do raise "empty command stack!" end diff --git a/components/electric/lib/electric/postgres/proxy/injector/prisma.ex b/components/electric/lib/electric/postgres/proxy/injector/prisma.ex index af962795b6..51e8f38584 100644 --- a/components/electric/lib/electric/postgres/proxy/injector/prisma.ex +++ b/components/electric/lib/electric/postgres/proxy/injector/prisma.ex @@ -22,6 +22,10 @@ defmodule Electric.Postgres.Proxy.Injector.Prisma do defimpl Operation do alias Electric.Postgres.Extension.SchemaLoader + def upstream_connection(_prisma, conn_config) do + conn_config + end + def activate(prisma, state, send) do {prisma, state, send} end diff --git a/components/electric/lib/electric/postgres/proxy/injector/shadow.ex b/components/electric/lib/electric/postgres/proxy/injector/shadow.ex new file mode 100644 index 0000000000..768ed96cad --- /dev/null +++ b/components/electric/lib/electric/postgres/proxy/injector/shadow.ex @@ -0,0 +1,221 @@ +defmodule Electric.Postgres.Proxy.Injector.Shadow do + defstruct [:database] + + alias Electric.Postgres.Proxy.{ + Injector, + Injector.Operation, + Injector.Send, + Injector.State, + Parser + } + + alias PgProtocol.Message, as: M + + @type t() :: %__MODULE__{} + + def injector do + Injector.new( + [loader: nil, capture_mode: [default: {__MODULE__, []}]], + username: "shadow" + ) + end + + defmodule Simple do + alias Electric.DDLX.Command + + defstruct [:msgs, :resp] + + defimpl Operation do + use Operation.Impl + # activating when the only query was an electric ddlx command so we need to fake the + # complete interaction, including the ReadyForQuery, as nothing will be sent to the server + # at all + def activate(%{msgs: []} = op, state, send) do + {op, send} = send_electric_resps(op, send) + {op, send} = send_ready(op, send, state) + {op, state, send} + end + + def activate(%{msgs: msgs} = op, state, send) do + {%{op | msgs: []}, state, Send.server(send, msgs)} + end + + def recv_server(%{resp: [{:sql, _} | resp]} = op, %M.CommandComplete{} = msg, state, send) do + {op, send} = send_electric_resps(%{op | resp: resp}, Send.client(send, msg)) + {op, state, send} + end + + def recv_server(%{resp: []}, %M.ReadyForQuery{} = msg, state, send) do + {nil, state, Send.client(send, msg)} + end + + def recv_server(op, msg, state, send) do + {op, state, Send.client(send, msg)} + end + + defp send_electric_resps(%{resp: resp} = op, send) do + {cmds, resps} = Enum.split_while(resp, &(elem(&1, 0) == :electric)) + + msgs = + Enum.map(cmds, fn {:electric, cmd} -> %M.CommandComplete{tag: Command.tag(cmd)} end) + + {%{op | resp: resps}, Send.client(send, msgs)} + end + + defp send_ready(%{resp: []}, send, state) do + msg = + if State.tx?(state) do + %M.ReadyForQuery{status: :tx} + else + %M.ReadyForQuery{status: :idle} + end + + {nil, Send.client(send, msg)} + end + + defp send_ready(%{resp: [_ | _]} = op, send, _state) do + {op, send} + end + end + end + + defmodule Extended do + # user has done a parse-describe for a ddlx statement + # this waits for the corresponding bind-execute and fakes + # execution + alias Electric.DDLX.Command + + defstruct [:cmd, :msgs] + + defimpl Operation do + use Operation.Impl + + def activate(op, state, send) do + {op, state, Send.client(send, Enum.map(op.msgs, &response(&1, state)))} + end + + def recv_client(op, msgs, state) do + if Enum.any?(msgs, &is_struct(&1, M.Execute)) do + tag = Command.tag(op.cmd) + pass = Operation.Pass.client(Enum.map(msgs, &response(&1, tag, state))) + {pass, state} + else + {[Operation.Pass.server(msgs), op], state} + end + end + end + end + + # provide a loader impl that just ignores electrification + defmodule NullLoader do + @moduledoc false + def table_electrified?(_, _), do: {:ok, false} + end + + defimpl Operation do + use Operation.Impl + + # For shadow connections, we want to honour the database from the connection params. + # + # Normally we ignore the db param in the startup message and always connect to the + # configured upstream database. + def upstream_connection(shadow, conn_config) do + put_in(conn_config, [:connection, :database], shadow.database) + end + + def recv_client(shadow, msgs, state) do + chunks = Injector.Electric.group_messages(msgs) + + {commands, {shadow, state}} = + Enum.flat_map_reduce(chunks, {shadow, state}, &command_for_msgs/2) + + {Enum.concat(commands, [shadow]), state} + end + + def recv_server(shadow, msg, state, send) do + {shadow, state, Send.client(send, msg)} + end + + def recv_error(shadow, _msgs, state, send) do + {shadow, state, send} + end + + def send_error(shadow, state, send) do + {shadow, state, send} + end + + defp null_loader(state) do + %{state | loader: {NullLoader, []}} + end + + defp monitor_tx(%{action: {:tx, :begin}}, state) do + State.begin(state) + end + + defp monitor_tx(%{action: {:tx, :commit}}, state) do + State.commit(state) + end + + defp monitor_tx(%{action: {:tx, :rollback}}, state) do + State.rollback(state) + end + + defp monitor_tx(_analysis, state) do + state + end + + defp command_for_msgs({:simple, [%M.Query{} = msg]}, {shadow, state}) do + case Parser.parse(msg) do + {:ok, stmts} -> + analysis = Parser.analyse(stmts, null_loader(state)) + state = Enum.reduce(analysis, state, &monitor_tx/2) + types = analysis |> filter_ddlx() + + msgs = + Enum.flat_map(types, fn + {:electric, _} -> [] + {:sql, msg} -> [msg] + end) + + {[%Simple{msgs: msgs, resp: types}], {shadow, state}} + + {:error, error} -> + {[%Operation.SyntaxError{error: error, msg: msg}], {shadow, state}} + end + end + + defp command_for_msgs({:extended, msgs}, {shadow, state}) do + case Enum.find(msgs, &is_struct(&1, M.Parse)) do + %M.Parse{} = msg -> + handle_parse(msg, msgs, shadow, state) + + nil -> + {[Operation.Pass.server(msgs)], {shadow, state}} + end + end + + defp handle_parse(msg, msgs, shadow, state) do + case Parser.parse(msg) do + {:ok, [stmt]} -> + analysis = Parser.analyse(stmt, null_loader(state)) + state = Enum.reduce([analysis], state, &monitor_tx/2) + [type] = [analysis] |> filter_ddlx() + + case type do + {:electric, cmd} -> + {[%Extended{cmd: cmd, msgs: msgs}], {shadow, state}} + + {:sql, _msg} -> + {[Operation.Pass.server(msgs)], {shadow, state}} + end + end + end + + defp filter_ddlx(analysis) do + Enum.map(analysis, fn + %{action: {:electric, command}} -> {:electric, command} + %{source: msg} -> {:sql, msg} + end) + end + end +end diff --git a/components/electric/lib/electric/postgres/proxy/injector/transparent.ex b/components/electric/lib/electric/postgres/proxy/injector/transparent.ex index a55ebb9dbb..eb3b0da0c5 100644 --- a/components/electric/lib/electric/postgres/proxy/injector/transparent.ex +++ b/components/electric/lib/electric/postgres/proxy/injector/transparent.ex @@ -18,6 +18,10 @@ defmodule Electric.Postgres.Proxy.Injector.Transparent do end defimpl Operation do + def upstream_connection(_transparent, conn_config) do + conn_config + end + def activate(transparent, state, send) do {transparent, state, send} end diff --git a/components/electric/lib/electric/postgres/proxy/parser.ex b/components/electric/lib/electric/postgres/proxy/parser.ex index 332250042c..d3c62d7d69 100644 --- a/components/electric/lib/electric/postgres/proxy/parser.ex +++ b/components/electric/lib/electric/postgres/proxy/parser.ex @@ -115,12 +115,11 @@ defmodule Electric.Postgres.Proxy.Parser do %{node: {:range_var, %{schemaname: sname, relname: tname}}} -> {:table, {sname, tname}} - other -> + _other -> # we don't really care about the table name for SELECT statements # unless except perhaps in very specific cases, which are either plain # `select * from table` queries, which are handled above, or would be # specifically dealt with - Logger.debug("unrecognised from_clause object #{inspect(other)}") {nil, nil} end end diff --git a/components/electric/lib/electric/postgres/proxy/upstream_connection.ex b/components/electric/lib/electric/postgres/proxy/upstream_connection.ex index b1a5c7e131..9a856212fe 100644 --- a/components/electric/lib/electric/postgres/proxy/upstream_connection.ex +++ b/components/electric/lib/electric/postgres/proxy/upstream_connection.ex @@ -33,7 +33,7 @@ defmodule Electric.Postgres.Proxy.UpstreamConnection do connection = Connectors.get_connection_opts(conn_config, replication: false) Electric.reg(name(session_id)) - Logger.metadata(session_id: session_id) + Logger.metadata(proxy_session_id: session_id) decoder = PgProtocol.Decoder.backend() @@ -62,15 +62,21 @@ defmodule Electric.Postgres.Proxy.UpstreamConnection do _ -> [] end + Logger.debug("Connecting to upstream server: #{inspect(host)}:#{port}") + {:ok, conn} = :gen_tcp.connect(host, port, [active: true] ++ extra_options, 1000) {:noreply, %{state | conn: conn}, {:continue, {:authenticate, params}}} end def handle_continue({:authenticate, params}, state) do + %{username: user, database: database} = params + + Logger.debug("Authenticating to upstream database: #{user}@#{database}") + msg = %M.StartupMessage{ params: %{ - "user" => Map.fetch!(params, :username), - "database" => Map.fetch!(params, :database), + "user" => user, + "database" => database, "client_encoding" => "UTF-8", "application_name" => "electric" } diff --git a/components/electric/test/electric/postgres/proxy/injector/shadow_test.exs b/components/electric/test/electric/postgres/proxy/injector/shadow_test.exs new file mode 100644 index 0000000000..406184f522 --- /dev/null +++ b/components/electric/test/electric/postgres/proxy/injector/shadow_test.exs @@ -0,0 +1,96 @@ +defmodule Electric.Postgres.Proxy.Injector.ShadowTest do + use ExUnit.Case, async: true + + alias Electric.Postgres.Proxy.Injector.Shadow + + import Electric.Postgres.Proxy.TestScenario + + setup do + {:ok, injector} = Shadow.injector() + {:ok, injector: injector} + end + + describe "extended protocol" do + test "shortcuts DDLX", cxt do + cxt.injector + |> client(begin()) + |> server(complete_ready("BEGIN", :tx)) + |> client(parse_describe("SELECT * FROM something")) + |> server(parse_describe_complete()) + |> client(bind_execute()) + |> server(bind_execute_complete()) + |> client( + parse_describe("ALTER TABLE public.items ENABLE ELECTRIC"), + server: [], + client: parse_describe_complete() + ) + |> client(bind_execute(), client: bind_execute_complete("ELECTRIC ENABLE"), server: []) + |> client(commit()) + |> server(complete_ready("COMMIT", :idle)) + |> idle!(Shadow) + end + end + + describe "simple protocol" do + test "shortcuts DDLX", cxt do + cxt.injector + |> client(begin()) + |> server(complete_ready("BEGIN", :tx)) + |> client(query("SELECT * FROM something")) + |> server(complete_ready("SELECT 1", :tx)) + |> client(query("ALTER TABLE public.items ENABLE ELECTRIC"), + server: [], + client: complete_ready("ELECTRIC ENABLE", :tx) + ) + |> client(commit()) + |> server(complete_ready("COMMIT", :idle)) + |> idle!(Shadow) + end + + test "shortcuts DDLX within multiple commands", cxt do + cxt.injector + |> client(begin()) + |> server(complete_ready("BEGIN", :tx)) + |> client( + query(""" + CREATE TABLE something (id uuid PRIMARY KEY); + CREATE TABLE something_else (id uuid PRIMARY KEY); + + ALTER TABLE something ENABLE ELECTRIC; + ALTER TABLE something_else ENABLE ELECTRIC; + + ELECTRIC GRANT ALL ON something TO 'admin'; + + CREATE TABLE fish (id uuid PRIMARY KEY); + ELECTRIC GRANT ALL ON something_else TO 'admin'; + """), + server: [ + query("CREATE TABLE something (id uuid PRIMARY KEY)"), + query("CREATE TABLE something_else (id uuid PRIMARY KEY)"), + query("CREATE TABLE fish (id uuid PRIMARY KEY)") + ] + ) + |> server( + [ + complete("CREATE TABLE"), + complete("CREATE TABLE"), + complete("CREATE TABLE"), + ready(:tx) + ], + client: [ + complete("CREATE TABLE"), + complete("CREATE TABLE"), + complete("ELECTRIC ENABLE"), + complete("ELECTRIC ENABLE"), + complete("ELECTRIC GRANT"), + complete("CREATE TABLE"), + complete("ELECTRIC GRANT"), + ready(:tx) + ] + ) + |> client(commit()) + |> server(complete_ready("COMMIT", :idle)) + |> idle!(Shadow) + end + end +end diff --git a/components/electric/test/electric/postgres/proxy/injector_test.exs b/components/electric/test/electric/postgres/proxy/injector_test.exs index 71b4e1b8a9..d81dee2a4e 100644 --- a/components/electric/test/electric/postgres/proxy/injector_test.exs +++ b/components/electric/test/electric/postgres/proxy/injector_test.exs @@ -2,6 +2,7 @@ defmodule Electric.Postgres.Proxy.InjectorTest do use ExUnit.Case, async: true alias PgProtocol.Message, as: M + alias Electric.Postgres.Proxy alias Electric.Postgres.Proxy.Injector alias Electric.Postgres.Proxy.TestScenario alias Electric.Postgres.Extension.SchemaLoader @@ -89,6 +90,22 @@ defmodule Electric.Postgres.Proxy.InjectorTest do assert {:ok, {[%FakeCapture{database: "important", version: :fake}], %Injector.State{}}} = Injector.new(opts, username: "fake", database: "important") end + + test "prisma shadow database mode" do + opts = Keyword.put(Proxy.default_handler_config()[:injector], :loader, MockSchemaLoader) + + assert {:ok, + {[ + %Injector.Shadow{ + database: "prisma_migrate_shadow_db_cb1834f9-acad-49ec-965f-97579e3688a8" + } + ], + %Injector.State{}}} = + Injector.new(opts, + username: "fake", + database: "prisma_migrate_shadow_db_cb1834f9-acad-49ec-965f-97579e3688a8" + ) + end end for s <- TestScenario.scenarios() do