diff --git a/.changeset/cold-ads-cough.md b/.changeset/cold-ads-cough.md new file mode 100644 index 0000000000..52da3b80a2 --- /dev/null +++ b/.changeset/cold-ads-cough.md @@ -0,0 +1,6 @@ +--- +"@core/electric": minor +"electric-sql": minor +--- + +Add protocol version negotiation to websocket connection step diff --git a/clients/typescript/package.json b/clients/typescript/package.json index 53680f95c6..6c012060f6 100644 --- a/clients/typescript/package.json +++ b/clients/typescript/package.json @@ -108,6 +108,7 @@ } }, "scripts": { + "prebuild": "node -p \"'export const LIB_VERSION = \\'' + require('./package.json').version + '\\''\" > src/version.ts", "dev": "tsmodule dev", "build": "rm -rf ./dist && tsc && tsmodule build", "build-dev": "rm -rf ./dist && tsc && tsmodule build --dev", diff --git a/clients/typescript/src/drivers/better-sqlite3/index.ts b/clients/typescript/src/drivers/better-sqlite3/index.ts index e9f3477029..d8e79ab4d0 100644 --- a/clients/typescript/src/drivers/better-sqlite3/index.ts +++ b/clients/typescript/src/drivers/better-sqlite3/index.ts @@ -14,7 +14,7 @@ import { DatabaseAdapter } from './adapter' import { Database } from './database' import { ElectricClient } from '../../client/model/client' import { DbSchema } from '../../client/model/schema' -import { WebSocketNodeFactory } from '../../sockets/node' +import { WebSocketNode } from '../../sockets/node' export { DatabaseAdapter } export type { Database } @@ -27,7 +27,7 @@ export const electrify = async , T extends Database>( ): Promise> => { const dbName: DbName = db.name const adapter = opts?.adapter || new DatabaseAdapter(db) - const socketFactory = opts?.socketFactory || new WebSocketNodeFactory() + const socketFactory = opts?.socketFactory || WebSocketNode const namespace = await baseElectrify( dbName, diff --git a/clients/typescript/src/drivers/cordova-sqlite-storage/index.ts b/clients/typescript/src/drivers/cordova-sqlite-storage/index.ts index 3b7439fef8..6601760641 100644 --- a/clients/typescript/src/drivers/cordova-sqlite-storage/index.ts +++ b/clients/typescript/src/drivers/cordova-sqlite-storage/index.ts @@ -11,7 +11,7 @@ import { import { DatabaseAdapter } from './adapter' import { ElectricConfig } from '../../config' import { Database } from './database' -import { MockSocketFactory } from '../../sockets/mock' +import { MockSocket } from '../../sockets/mock' import { ElectricClient } from '../../client/model/client' import { DbSchema } from '../../client/model/schema' @@ -26,7 +26,7 @@ export const electrify = async >( ): Promise> => { const dbName: DbName = db.dbname! const adapter = opts?.adapter || new DatabaseAdapter(db) - const socketFactory = opts?.socketFactory || new MockSocketFactory() + const socketFactory = opts?.socketFactory || MockSocket const namespace = await baseElectrify( dbName, diff --git a/clients/typescript/src/drivers/cordova-sqlite-storage/test.ts b/clients/typescript/src/drivers/cordova-sqlite-storage/test.ts index 3e7425e0df..313fc95c9f 100644 --- a/clients/typescript/src/drivers/cordova-sqlite-storage/test.ts +++ b/clients/typescript/src/drivers/cordova-sqlite-storage/test.ts @@ -12,7 +12,7 @@ import { MockRegistry } from '../../satellite/mock' import { DatabaseAdapter } from './adapter' import { Database } from './database' import { MockDatabase } from './mock' -import { MockSocketFactory } from '../../sockets/mock' +import { MockSocket } from '../../sockets/mock' import { ElectricClient } from '../../client/model/client' import { ElectricConfig } from '../../config' import { DbSchema } from '../../client/model' @@ -41,7 +41,7 @@ export const initTestable = async < const adapter = opts?.adapter || new DatabaseAdapter(db) const notifier = (opts?.notifier as N) || new MockNotifier(dbName) const migrator = opts?.migrator || new MockMigrator() - const socketFactory = opts?.socketFactory || new MockSocketFactory() + const socketFactory = opts?.socketFactory || MockSocket const registry = opts?.registry || new MockRegistry() const dal = await electrify( diff --git a/clients/typescript/src/drivers/expo-sqlite/index.ts b/clients/typescript/src/drivers/expo-sqlite/index.ts index 2c9d25f568..b653bdf69f 100644 --- a/clients/typescript/src/drivers/expo-sqlite/index.ts +++ b/clients/typescript/src/drivers/expo-sqlite/index.ts @@ -11,7 +11,6 @@ import { import { DatabaseAdapter } from './adapter' import { ElectricConfig } from '../../config' import { Database } from './database' -import { WebSocketReactNativeFactory } from '../../sockets/react-native' // Provide implementation for TextEncoder/TextDecoder import 'fastestsmallesttextencoderdecoder' @@ -30,6 +29,7 @@ import uuid from 'react-native-uuid' import { ElectricClient } from '../../client/model/client' import { DbSchema } from '../../client/model/schema' +import { WebSocketReactNative } from '../../sockets/react-native' export { DatabaseAdapter } export type { Database } @@ -42,7 +42,7 @@ export const electrify = async >( ): Promise> => { const dbName: DbName = db._name! const adapter = opts?.adapter || new DatabaseAdapter(db) - const socketFactory = opts?.socketFactory || new WebSocketReactNativeFactory() + const socketFactory = opts?.socketFactory || WebSocketReactNative const namespace = await baseElectrify( dbName, diff --git a/clients/typescript/src/drivers/expo-sqlite/test.ts b/clients/typescript/src/drivers/expo-sqlite/test.ts index ae260aba5b..ec787645e1 100644 --- a/clients/typescript/src/drivers/expo-sqlite/test.ts +++ b/clients/typescript/src/drivers/expo-sqlite/test.ts @@ -12,7 +12,7 @@ import { MockRegistry } from '../../satellite/mock' import { DatabaseAdapter } from './adapter' import { Database } from './database' import { MockDatabase, MockWebSQLDatabase } from './mock' -import { MockSocketFactory } from '../../sockets/mock' +import { MockSocket } from '../../sockets/mock' import { ElectricConfig } from '../../config' import { ElectricClient } from '../../client/model/client' import { DbSchema } from '../../client/model' @@ -70,7 +70,7 @@ export async function initTestable< const adapter = opts?.adapter || new DatabaseAdapter(db) const migrator = opts?.migrator || new MockMigrator() const notifier = (opts?.notifier as N) || new MockNotifier(dbName) - const socketFactory = opts?.socketFactory || new MockSocketFactory() + const socketFactory = opts?.socketFactory || MockSocket const registry = opts?.registry || new MockRegistry() const dal = await electrify( diff --git a/clients/typescript/src/drivers/react-native-sqlite-storage/index.ts b/clients/typescript/src/drivers/react-native-sqlite-storage/index.ts index 7e03fb25be..41a9a16852 100644 --- a/clients/typescript/src/drivers/react-native-sqlite-storage/index.ts +++ b/clients/typescript/src/drivers/react-native-sqlite-storage/index.ts @@ -10,7 +10,7 @@ import { import { DatabaseAdapter } from './adapter' import { ElectricConfig } from '../../config' -import { WebSocketReactNativeFactory } from '../../sockets/react-native' +import { WebSocketReactNative } from '../../sockets/react-native' import { Database } from './database' import { ElectricClient } from '../../client/model/client' @@ -42,7 +42,7 @@ export const electrify = async >( ): Promise> => { const dbName: DbName = db.dbName const adapter = opts?.adapter || new DatabaseAdapter(db, promisesEnabled) - const socketFactory = opts?.socketFactory || new WebSocketReactNativeFactory() + const socketFactory = opts?.socketFactory || WebSocketReactNative const namespace = await baseElectrify( dbName, diff --git a/clients/typescript/src/drivers/react-native-sqlite-storage/test.ts b/clients/typescript/src/drivers/react-native-sqlite-storage/test.ts index b9eb19191f..c3f52583c9 100644 --- a/clients/typescript/src/drivers/react-native-sqlite-storage/test.ts +++ b/clients/typescript/src/drivers/react-native-sqlite-storage/test.ts @@ -15,7 +15,7 @@ import { MockRegistry } from '../../satellite/mock' import { DatabaseAdapter } from './adapter' import { Database } from './index' import { enablePromiseRuntime, MockDatabase } from './mock' -import { MockSocketFactory } from '../../sockets/mock' +import { MockSocket } from '../../sockets/mock' import { ElectricClient } from '../../client/model/client' import { ElectricConfig } from '../../config' import { DbSchema } from '../../client/model' @@ -46,7 +46,7 @@ export const initTestable = async < const adapter = opts?.adapter || new DatabaseAdapter(db, promisesEnabled) const migrator = opts?.migrator || new MockMigrator() const notifier = (opts?.notifier as N) || new MockNotifier(dbName) - const socketFactory = opts?.socketFactory || new MockSocketFactory() + const socketFactory = opts?.socketFactory || MockSocket const registry = opts?.registry || new MockRegistry() const dal = await baseElectrify( diff --git a/clients/typescript/src/drivers/wa-sqlite/index.ts b/clients/typescript/src/drivers/wa-sqlite/index.ts index d99e5ddde4..08395afcaf 100644 --- a/clients/typescript/src/drivers/wa-sqlite/index.ts +++ b/clients/typescript/src/drivers/wa-sqlite/index.ts @@ -2,7 +2,7 @@ import { DatabaseAdapter } from './adapter' import { ElectricDatabase } from './database' import { ElectricConfig } from '../../config' import { electrify as baseElectrify, ElectrifyOptions } from '../../electric' -import { WebSocketWebFactory } from '../../sockets/web' +import { WebSocketWeb } from '../../sockets/web' import { ElectricClient, DbSchema } from '../../client/model' import { Database } from './database' @@ -17,7 +17,7 @@ export const electrify = async >( ): Promise> => { const dbName = db.name const adapter = opts?.adapter || new DatabaseAdapter(db) - const socketFactory = opts?.socketFactory || new WebSocketWebFactory() + const socketFactory = opts?.socketFactory || WebSocketWeb const client = await baseElectrify( dbName, diff --git a/clients/typescript/src/satellite/client.ts b/clients/typescript/src/satellite/client.ts index 20c7fbdfd1..5062d1d08d 100644 --- a/clients/typescript/src/satellite/client.ts +++ b/clients/typescript/src/satellite/client.ts @@ -40,7 +40,7 @@ import { msgToString, serverErrorToSatelliteError, } from '../util/proto' -import { Socket, SocketFactory } from '../sockets/index' +import { PROTOCOL_VSN, Socket, SocketFactory } from '../sockets/index' import _m0 from 'protobufjs/minimal.js' import { EventEmitter } from 'events' import { @@ -232,7 +232,7 @@ export class SatelliteClient extends EventEmitter implements Client { } return new Promise((resolve, reject) => { - this.socket = this.socketFactory.create() + this.socket = new this.socketFactory(PROTOCOL_VSN) const onceError = (error: Error) => { this.close() diff --git a/clients/typescript/src/sockets/index.ts b/clients/typescript/src/sockets/index.ts index 399f69e780..303ab79678 100644 --- a/clients/typescript/src/sockets/index.ts +++ b/clients/typescript/src/sockets/index.ts @@ -1,4 +1,5 @@ import { SatelliteError } from '../util' +import { LIB_VERSION } from '../version' export type Data = string | Uint8Array @@ -6,6 +7,10 @@ export interface ConnectionOptions { url: string } +// Take major & minor version of the library +export const PROTOCOL_VSN = + 'electric.' + LIB_VERSION.split('.').slice(0, 2).join('.') + export interface Socket { open(opts: ConnectionOptions): this write(data: Data): this @@ -21,6 +26,4 @@ export interface Socket { removeErrorListener(cb: (error: SatelliteError) => void): void } -export interface SocketFactory { - create(): Socket -} +export type SocketFactory = new (protocolVersion: string) => Socket diff --git a/clients/typescript/src/sockets/mock.ts b/clients/typescript/src/sockets/mock.ts index 5a1595b5aa..08075dfd0d 100644 --- a/clients/typescript/src/sockets/mock.ts +++ b/clients/typescript/src/sockets/mock.ts @@ -1,14 +1,12 @@ import { EventEmitter } from 'events' -import { ConnectionOptions, Data, Socket, SocketFactory } from './index' +import { ConnectionOptions, Data, Socket } from './index' import { SatelliteError } from '../util' -export class MockSocketFactory implements SocketFactory { - create(): MockSocket { - return new MockSocket() +export class MockSocket extends EventEmitter implements Socket { + constructor() { + super() } -} -export class MockSocket extends EventEmitter implements Socket { open(_opts: ConnectionOptions): this { return this } diff --git a/clients/typescript/src/sockets/node.ts b/clients/typescript/src/sockets/node.ts index 689b5b20a5..9de0a2d7fa 100644 --- a/clients/typescript/src/sockets/node.ts +++ b/clients/typescript/src/sockets/node.ts @@ -1,18 +1,12 @@ import EventEmitter from 'events' -import { ConnectionOptions, Data, Socket, SocketFactory } from './index' +import { ConnectionOptions, Data, Socket } from './index' import { WebSocket } from 'ws' import { SatelliteError, SatelliteErrorCode } from '../util' -export class WebSocketNodeFactory implements SocketFactory { - create(): WebSocketNode { - return new WebSocketNode() - } -} - export class WebSocketNode extends EventEmitter implements Socket { private socket?: WebSocket - constructor() { + constructor(private protocolVsn: string) { super() } @@ -24,7 +18,7 @@ export class WebSocketNode extends EventEmitter implements Socket { ) } - this.socket = new WebSocket(opts.url) + this.socket = new WebSocket(opts.url, [this.protocolVsn]) this.socket.binaryType = 'nodebuffer' this.socket.on('open', () => this.emit('open')) diff --git a/clients/typescript/src/sockets/react-native.ts b/clients/typescript/src/sockets/react-native.ts index 8adf0e28f6..51d61ca109 100644 --- a/clients/typescript/src/sockets/react-native.ts +++ b/clients/typescript/src/sockets/react-native.ts @@ -1,13 +1,6 @@ -import { ConnectionOptions, Data, Socket, SocketFactory } from '.' +import { ConnectionOptions, Data, Socket } from '.' import { SatelliteError, SatelliteErrorCode } from '../util' -// FIXME: This implementation is a bit contrived because it is not using EventEmitter -export class WebSocketReactNativeFactory implements SocketFactory { - create(): WebSocketReactNative { - return new WebSocketReactNative() - } -} - export class WebSocketReactNative implements Socket { private socket?: WebSocket @@ -16,6 +9,8 @@ export class WebSocketReactNative implements Socket { private onceErrorCallbacks: ((error: SatelliteError) => void)[] = [] private messageCallbacks: ((data: any) => void)[] = [] + constructor(private protocolVsn: string) {} + open(opts: ConnectionOptions): this { if (this.socket) { throw new SatelliteError( @@ -24,7 +19,7 @@ export class WebSocketReactNative implements Socket { ) } - this.socket = new WebSocket(opts.url) + this.socket = new WebSocket(opts.url, [this.protocolVsn]) this.socket.binaryType = 'arraybuffer' this.socket.onopen = () => { diff --git a/clients/typescript/src/sockets/web.ts b/clients/typescript/src/sockets/web.ts index 6b52d4aabf..6df854fc4c 100644 --- a/clients/typescript/src/sockets/web.ts +++ b/clients/typescript/src/sockets/web.ts @@ -1,12 +1,6 @@ -import { ConnectionOptions, Data, Socket, SocketFactory } from '.' +import { ConnectionOptions, Data, Socket } from '.' import { SatelliteError, SatelliteErrorCode } from '../util' -export class WebSocketWebFactory implements SocketFactory { - create(): WebSocketWeb { - return new WebSocketWeb() - } -} - // FIXME: This implementation is a bit contrived because it is not using EventEmitter export class WebSocketWeb implements Socket { private socket?: WebSocket @@ -40,6 +34,8 @@ export class WebSocketWeb implements Socket { private messageListener?: (event: MessageEvent) => void private closeListener?: () => void + constructor(private protocolVsn: string) {} + open(opts: ConnectionOptions): this { if (this.socket) { throw new SatelliteError( @@ -48,7 +44,7 @@ export class WebSocketWeb implements Socket { ) } - this.socket = new WebSocket(opts.url) + this.socket = new WebSocket(opts.url, [this.protocolVsn]) this.socket.binaryType = 'arraybuffer' this.socket.addEventListener('open', this.connectListener) diff --git a/clients/typescript/src/version.ts b/clients/typescript/src/version.ts new file mode 100644 index 0000000000..3729cfa1da --- /dev/null +++ b/clients/typescript/src/version.ts @@ -0,0 +1 @@ +export const LIB_VERSION = '0.5.2' diff --git a/clients/typescript/test/migrators/builder.test.ts b/clients/typescript/test/migrators/builder.test.ts index d2e95cb79a..804ce3b56c 100644 --- a/clients/typescript/test/migrators/builder.test.ts +++ b/clients/typescript/test/migrators/builder.test.ts @@ -14,7 +14,7 @@ import Database from 'better-sqlite3' import { electrify } from '../../src/drivers/better-sqlite3' import path from 'path' import { DbSchema } from '../../src/client/model' -import { MockSocketFactory } from '../../src/sockets/mock' +import { MockSocket } from '../../src/sockets/mock' function encodeSatOpMigrateMsg(request: SatOpMigrate) { return ( @@ -132,7 +132,7 @@ test('load migration from meta data', async (t) => { token: 'test-token', }, }, - { socketFactory: new MockSocketFactory() } + { socketFactory: MockSocket } ) // Check that the DB is initialized with the stars table diff --git a/clients/typescript/test/satellite/client.test.ts b/clients/typescript/test/satellite/client.test.ts index 5e0d5c140f..9d520b0550 100644 --- a/clients/typescript/test/satellite/client.test.ts +++ b/clients/typescript/test/satellite/client.test.ts @@ -8,7 +8,7 @@ import { serializeRow, } from '../../src/satellite/client' import { OplogEntry, toTransactions } from '../../src/satellite/oplog' -import { WebSocketNodeFactory } from '../../src/sockets/node' +import { WebSocketNode } from '../../src/sockets/node' import { base64, bytesToNumber } from '../../src/util/common' import { getObjFromString, @@ -42,7 +42,7 @@ test.beforeEach((t) => { const client = new SatelliteClient( dbName, - new WebSocketNodeFactory(), + WebSocketNode, new MockNotifier(dbName), { host: '127.0.0.1', diff --git a/components/electric/.formatter.exs b/components/electric/.formatter.exs index 36c96ce09f..48e471a4c0 100644 --- a/components/electric/.formatter.exs +++ b/components/electric/.formatter.exs @@ -1,5 +1,6 @@ # Used by "mix format" [ inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], - locals_without_parens: [test_tx: 2] + locals_without_parens: [test_tx: 2], + import_deps: [:plug] ] diff --git a/components/electric/config/runtime.exs b/components/electric/config/runtime.exs index e9bb04be69..7a934cf8c7 100644 --- a/components/electric/config/runtime.exs +++ b/components/electric/config/runtime.exs @@ -30,12 +30,13 @@ config :logger, :console, :pg_client, :pg_producer, :pg_slot, - :sq_client, + :remote_ip, :component, :instance_id, :client_id, :user_id, - :metadata + :metadata, + :request_id ] config :electric, diff --git a/components/electric/lib/electric.ex b/components/electric/lib/electric.ex index 7d48b895a2..b18fb2dedb 100644 --- a/components/electric/lib/electric.ex +++ b/components/electric/lib/electric.ex @@ -94,4 +94,9 @@ defmodule Electric do def instance_id do Application.fetch_env!(:electric, :instance_id) end + + @current_vsn Mix.Project.config()[:version] |> Version.parse!() + def vsn do + @current_vsn + end end diff --git a/components/electric/lib/electric/plug/router.ex b/components/electric/lib/electric/plug/router.ex index b35f9480d2..72b7b5bcb7 100644 --- a/components/electric/lib/electric/plug/router.ex +++ b/components/electric/lib/electric/plug/router.ex @@ -1,17 +1,15 @@ defmodule Electric.Plug.Router do - use Plug.Router, init_mode: :runtime + use Plug.Router import Plug.Conn require Logger - plug(:match) - plug(:dispatch) + plug :match + plug Plug.Logger + plug :dispatch - forward("/api/migrations", to: Electric.Plug.Migrations) - forward("/api/status", to: Electric.Plug.Status) - - match "/ws" do - Electric.Plug.SatelliteWebsocketPlug.call(conn, Electric.Plug.SatelliteWebsocketPlug.init([])) - end + forward "/api/migrations", to: Electric.Plug.Migrations + forward "/api/status", to: Electric.Plug.Status + forward "/ws", to: Electric.Plug.SatelliteWebsocketPlug match _ do send_resp(conn, 404, "Not found") diff --git a/components/electric/lib/electric/plug/satellite_websocket_plug.ex b/components/electric/lib/electric/plug/satellite_websocket_plug.ex index 8245ef5eb0..5e1b1fc896 100644 --- a/components/electric/lib/electric/plug/satellite_websocket_plug.ex +++ b/components/electric/lib/electric/plug/satellite_websocket_plug.ex @@ -1,9 +1,14 @@ defmodule Electric.Plug.SatelliteWebsocketPlug do - use Plug.Builder, init_mode: :runtime + require Logger + use Plug.Builder - def init(handler_opts), + @protocol_prefix "electric." + + def init(handler_opts), do: handler_opts + + defp build_websocket_opts(base_opts), do: - handler_opts + base_opts |> Keyword.put_new_lazy(:auth_provider, fn -> Electric.Satellite.Auth.provider() end) |> Keyword.put_new_lazy(:pg_connector_opts, fn -> Electric.Application.pg_connection_opts() @@ -13,21 +18,63 @@ defmodule Electric.Plug.SatelliteWebsocketPlug do end) def call(conn, handler_opts) do - if Bandit.WebSocket.Handshake.valid_upgrade?(conn) do - ws_opts = if List.first(conn.path_info) == "compress", do: [compress: true], else: [] - + with {:ok, conn} <- check_if_valid_upgrade(conn), + {:ok, conn} <- check_if_subprotocol_present(conn), + {:ok, conn} <- check_if_vsn_compatible(conn, with: "<= #{Electric.vsn()}") do Logger.metadata( remote_ip: conn.remote_ip |> :inet.ntoa() |> to_string(), instance_id: Electric.instance_id() ) - upgrade_adapter( - conn, + client_vsn = conn.assigns.satellite_vsn + protocol_vsn = "#{client_vsn.major}.#{client_vsn.minor}" + Logger.debug("Upgrading connection for client with protocol version #{protocol_vsn}") + + conn + |> put_resp_header("sec-websocket-protocol", @protocol_prefix <> protocol_vsn) + |> upgrade_adapter( :websocket, - {Electric.Satellite.WebsocketServer, handler_opts, ws_opts} + {Electric.Satellite.WebsocketServer, build_websocket_opts(handler_opts), []} ) else - send_resp(conn, 401, "Bad request") + {:error, code, body} -> + send_resp(conn, code, body) + end + end + + defp check_if_valid_upgrade(%Plug.Conn{} = conn) do + if Bandit.WebSocket.Handshake.valid_upgrade?(conn) do + {:ok, conn} + else + {:error, 400, "Bad request"} + end + end + + defp check_if_subprotocol_present(%Plug.Conn{} = conn) do + case get_satellite_subprotocol(conn) do + {:ok, vsn} -> {:ok, assign(conn, :satellite_vsn, vsn)} + :error -> {:error, 400, "Missing satellite websocket subprotocol"} + end + end + + defp check_if_vsn_compatible(%Plug.Conn{} = conn, with: requirements) do + if Version.match?(conn.assigns.satellite_vsn, requirements) do + {:ok, conn} + else + {:error, 400, + "Cannot connect satellite version #{conn.assigns.satellite_vsn}: this server requires #{requirements}"} + end + end + + defp get_satellite_subprotocol(%Plug.Conn{} = conn) do + get_req_header(conn, "sec-websocket-protocol") + |> Enum.filter(&String.starts_with?(&1, @protocol_prefix)) + |> case do + [@protocol_prefix <> version] when byte_size(version) < 20 -> + Version.parse(version <> ".0") + + _ -> + :error end end end diff --git a/components/electric/lib/electric/satellite/protobuf.ex b/components/electric/lib/electric/satellite/protobuf.ex index 39fd061cda..a5d7e2ddbb 100644 --- a/components/electric/lib/electric/satellite/protobuf.ex +++ b/components/electric/lib/electric/satellite/protobuf.ex @@ -144,6 +144,11 @@ defmodule Electric.Satellite.Protobuf do {:error, :unknown_msg_type} end + def decode!(tag, binary) do + {:ok, msg} = decode(tag, binary) + msg + end + @spec json_decode(byte(), binary(), list()) :: {:ok, sq_pb_msg()} | {:error, any()} for {module, tag} <- @mapping do def json_decode(unquote(tag), binary, opts) do diff --git a/components/electric/lib/mint/web_socket_client.ex b/components/electric/lib/mint/web_socket_client.ex new file mode 100644 index 0000000000..6b9687f4b5 --- /dev/null +++ b/components/electric/lib/mint/web_socket_client.ex @@ -0,0 +1,351 @@ +defmodule Mint.WebSocketClient do + @moduledoc """ + A GenServer wrapper over a Mint.WebSocket connection. + + `c:Genserver.init/1` callback tries to establish an HTTP connection + and then upgrade it to websocket. + + Any GenServer callbacks will have a state as a 2-tuple: a conn struct + to be used with functions from this module, and user-provided state. + + Any callbacks from this module (`c:handle_frame/2`, `c:do_handle_info/2`) get only + user-provided state as the second argument. + + `c:GenServer.handle_info/2` callback cannot be used due to how Mint is set up, so all messages + are forwarded to a `c:do_handle_info` callback instead. + """ + + alias Mint.WebSocketClient + require Logger + + @type conn :: %{ + conn: Mint.HTTP.t(), + websocket: Mint.WebSocket.t(), + ref: reference(), + subprotocol: String.t() | nil + } + + @type handler_return :: + {:noreply, term()} + | {:reply, [{:text, String.t()} | {:binary, binary()}], term()} + | {:stop, term(), term(), term()} + | {:stop, term(), term(), [{:text, String.t()} | {:binary, binary()}], term()} + + @callback handle_connection(subprotocol :: String.t(), conn(), init_arg :: term()) :: + {:ok, term(), [Mint.WebSocket.frame()]} | {:error, term()} + @callback handle_frame({:text, String.t()} | {:binary, binary()}, term()) :: handler_return() + + @callback do_handle_info(term(), term()) :: handler_return() + + defmacro __using__(_opts) do + quote do + use GenServer, restart: :temporary + require Logger + + alias Mint.WebSocketClient + + @behaviour Mint.WebSocketClient + + @impl GenServer + def init(init_arg) do + protocol = Keyword.get(init_arg, :protocol, :ws) + host = Keyword.fetch!(init_arg, :host) + port = Keyword.get(init_arg, :port, if(protocol == :wss, do: 443, else: 80)) + path = Keyword.get(init_arg, :path, "/") + subprotocol = Keyword.get(init_arg, :subprotocol) + + ws_headers = + [] ++ if(subprotocol, do: [{"sec-websocket-protocol", subprotocol}], else: []) + + with {:ok, state} <- + Mint.WebSocketClient.setup_ws_connection( + protocol, + host, + port, + path, + ws_headers + ) do + case handle_connection(state.subprotocol, state, Keyword.get(init_arg, :init_arg)) do + {:ok, user_state, extra_frames} -> + state = {Map.merge(state, %{status: :open}), user_state} + + Enum.reduce_while(extra_frames, {:ok, state}, fn frame, + {:ok, {conn_state, user_state}} -> + frame + |> do_handle_frame(user_state) + |> Mint.WebSocketClient.process_callback_response(conn_state) + |> case do + {:noreply, state} -> {:cont, {:ok, state}} + {:stop, reason, _} -> {:halt, {:stop, reason}} + end + end) + + {:error, reason} -> + Mint.HTTP.close(state.conn) + {:stop, {:error, reason}} + end + else + {:error, reason} -> + Logger.error("HTTP connection to the server failed: #{inspect(reason)}") + {:stop, {:error, reason}} + + {:error, conn, reason} -> + Logger.error("Failed to upgrade websocket connection: #{inspect(reason)}") + + Mint.HTTP.close(conn) + {:stop, {:error, reason}} + end + end + + @impl GenServer + def handle_info(message, {%{conn: conn, ref: ref} = conn_state, user_state} = state) do + case Mint.WebSocket.stream(conn, message) do + :unknown -> + do_handle_info(message, user_state) + |> WebSocketClient.process_callback_response(conn_state) + |> WebSocketClient.maybe_close_socket() + + {:ok, conn, [{:data, ^ref, data}]} -> + decode_and_handle(data, WebSocketClient.update_state(state, %{conn: conn})) + + {:error, conn, error, response} -> + Mint.HTTP.close(conn) + {:stop, {:error, error, response}, state} + end + end + + @spec do_handle_frame(Mint.WebSocket.frame(), {WebSocketClient.conn(), term()}) :: + WebSocketClient.handler_return() + defp do_handle_frame({:ping, text}, state), do: {:reply, {:pong, text}, state} + defp do_handle_frame({:pong, _}, state), do: {:noreply, state} + + defp do_handle_frame({:close, 1000, _}, state) do + Logger.info("Server closed the websocket connection") + + {:stop, :normal, :close, state} + end + + defp do_handle_frame({:close, code, reason}, state) do + Logger.warning( + "Server closed the websocket connection with code #{code} and reason #{reason}" + ) + + {:stop, :normal, :close, state} + end + + defp do_handle_frame(frame, state) do + handle_frame(frame, state) + end + + @spec decode_and_handle(binary(), {WebSocketClient.conn(), term()}) :: + {:noreply, term()} | {:stop, term(), term()} + defp decode_and_handle(data, {conn_state, _} = state) do + case Mint.WebSocket.decode(conn_state.websocket, data) do + {:ok, websocket, frames} -> + result_tuple = {:noreply, WebSocketClient.update_state(state, websocket: websocket)} + + Enum.reduce_while(frames, result_tuple, fn frame, {:noreply, {conn, user_state}} -> + result = + do_handle_frame(frame, user_state) + |> WebSocketClient.process_callback_response(conn) + |> WebSocketClient.maybe_close_socket() + + if elem(result, 0) == :stop do + {:halt, result} + else + {:cont, result} + end + end) + + {:error, _websocket, error} -> + Logger.debug("Invalid frame received: #{inspect(error)}") + Mint.HTTP.close(conn_state.conn) + {:stop, {:error, error}, state} + end + end + + def handle_connection(_, _, state), do: {:ok, state, []} + def handle_frame(_, state), do: {:noreply, state} + + def do_handle_info(msg, state) do + proc = + case Process.info(self(), :registered_name) do + {_, []} -> self() + {_, name} -> name + end + + Logger.warning( + "#{inspect(__MODULE__)} #{inspect(proc)} received unexpected message in do_handle_info/2: #{inspect(msg)}" + ) + + {:noreply, state} + end + + defoverridable Mint.WebSocketClient + end + end + + @doc """ + Send websocket frames over an established connection + """ + @spec send_frames(conn(), [Mint.WebSocket.frame()]) :: {:ok, conn()} | {:error, any()} + def send_frames(conn_state, frames) do + with {:ok, websocket, data} <- encode_all(conn_state.websocket, frames), + {:ok, conn} <- + Mint.WebSocket.stream_request_body(conn_state.conn, conn_state.ref, data) do + {:ok, %{conn_state | websocket: websocket, conn: conn}} + else + {:error, _, error} -> + {:error, error} + end + end + + @doc """ + Try to receive next frames over an established connection + """ + @spec receive_next_frames!(conn()) :: {:ok, conn(), [Mint.WebSocket.frame()]} + def receive_next_frames!(%{conn: conn, websocket: websocket, ref: ref} = conn_state) do + msg = receive do: ({proto, _, _} = message when proto in [:tcp, :ssl] -> message) + {:ok, conn, [{:data, ^ref, data}]} = Mint.WebSocket.stream(conn, msg) + {:ok, websocket, frames} = Mint.WebSocket.decode(websocket, data) + + {:ok, %{conn_state | websocket: websocket, conn: conn}, frames} + end + + # Internal functions used by the functions created in the `__using__` macro + @doc false + @spec process_callback_response(handler_return(), conn()) :: + {:noreply, {conn(), term()}} | {:stop, term(), {conn(), term()}} + def process_callback_response({:noreply, user_state}, conn), do: {:noreply, {conn, user_state}} + + def process_callback_response({:reply, frames, user_state}, conn) do + case send_frames(conn, List.wrap(frames)) do + {:ok, conn} -> {:noreply, {conn, user_state}} + {:error, reason} -> {:stop, {:error, reason}, {conn, user_state}} + end + end + + def process_callback_response({:stop, reason, close_reason, user_state}, conn) + when close_reason == :close + when is_tuple(close_reason) and tuple_size(close_reason) == 3 and + elem(close_reason, 0) == :close do + case send_frames(conn, List.wrap(close_reason)) do + {:ok, conn} -> {:stop, reason, {conn, user_state}} + {:error, %Mint.TransportError{reason: :closed}} -> {:stop, :normal, {conn, user_state}} + {:error, reason} -> {:stop, {:error, reason}, {conn, user_state}} + end + end + + def process_callback_response({:stop, reason, close_reason, frames, user_state}, conn) + when close_reason == :close + when is_tuple(close_reason) and tuple_size(close_reason) == 3 and + elem(close_reason, 0) == :close do + case send_frames(conn, List.wrap(frames) ++ List.wrap(close_reason)) do + {:ok, conn} -> {:stop, reason, {conn, user_state}} + {:error, %Mint.TransportError{reason: :closed}} -> {:stop, :normal, {conn, user_state}} + {:error, reason} -> {:stop, {:error, reason}, {conn, user_state}} + end + end + + @doc false + def maybe_close_socket({:stop, reason, {%{conn: conn}, _} = state}) do + {:ok, conn} = Mint.HTTP.close(conn) + {:stop, reason, update_state(state, %{conn: conn})} + end + + def maybe_close_socket({:noreply, state}), do: {:noreply, state} + + @spec encode_all(Mint.WebSocket.t(), [Mint.WebSocket.frame()]) :: + {:ok, Mint.WebSocket.t(), iolist()} | {:error, Mint.WebSocket.t(), any()} + defp encode_all(websocket, frames) do + Enum.reduce_while(List.wrap(frames), {:ok, websocket, []}, fn frame, {:ok, ws, acc} -> + case Mint.WebSocket.encode(ws, frame) do + {:ok, ws, data} -> {:cont, {:ok, ws, [data | acc]}} + {:error, ws, error} -> {:halt, {:error, ws, error}} + end + end) + |> case do + {:ok, ws, data} -> {:ok, ws, Enum.reverse(data)} + {:error, _, _} = error -> error + end + end + + @doc false + @spec update_state({conn(), term()}, map() | keyword()) :: {conn(), term()} + def update_state({internal, user}, updates) do + {Enum.into(updates, internal), user} + end + + @doc false + @spec setup_ws_connection( + :ws | :wss, + Mint.Types.address(), + :inet.port_number(), + String.t(), + Mint.Types.headers() + ) :: {:ok, conn()} | {:error, term()} | {:error, Mint.HTTP.t(), term()} + def setup_ws_connection(protocol, host, port, path \\ "/", ws_headers \\ []) + when protocol in [:ws, :wss] do + http_protocol = if protocol == :ws, do: :http, else: :https + + Logger.debug( + "Trying to establish websocket connection to #{protocol}://#{host}:#{port}#{path}" + ) + + with {:ok, conn} <- Mint.HTTP.connect(http_protocol, host, port, log: true), + {:ok, conn, ref} <- Mint.WebSocket.upgrade(protocol, conn, path, ws_headers), + http_reply_message = receive(do: (message -> message)), + {:ok, conn, [{:status, ^ref, status}, {:headers, ^ref, resp_headers}, {:done, ^ref}]} <- + Mint.WebSocket.stream(conn, http_reply_message), + {:ok, subprotocol} <- validate_subprotocol(conn, ws_headers, resp_headers), + {:ok, conn, websocket} <- + Mint.WebSocket.new(conn, ref, status, resp_headers) do + {:ok, %{conn: conn, websocket: websocket, ref: ref, subprotocol: subprotocol}} + end + end + + defp validate_subprotocol(conn, ws_headers, resp_headers) do + protocols = { + get_header(ws_headers, "sec-websocket-protocol"), + get_header(resp_headers, "sec-websocket-protocol") + } + + case protocols do + # Subprotocol wasn't requested + {nil, nil} -> + {:ok, nil} + + # Subprotocol wasn't requested, but was sent by the server + {nil, got} -> + Logger.error("No subprotocol was requested, but the server returned #{inspect(got)}") + {:error, conn, :invalid_subprotocol} + + # Subprotocol was requested, but the server ignored it + {requested, nil} -> + Logger.error("Subprotocols #{requested} were requested, but the server didn't send one") + {:error, conn, :invalid_subprotocol} + + {requested, got} -> + requested = + String.split(requested, ",", trim: true) + |> Enum.map(&String.trim/1) + + if got in requested do + {:ok, got} + else + Logger.error( + "Subprotocols #{requested} were requested, but the server returned #{inspect(got)}" + ) + + {:error, conn, :invalid_subprotocol} + end + end + end + + defp get_header(headers, key) do + Enum.find_value(headers, fn + {^key, value} -> value + _ -> nil + end) + end +end diff --git a/components/electric/lib/satellite/protocol_helpers.ex b/components/electric/lib/satellite/protocol_helpers.ex new file mode 100644 index 0000000000..0783da6aca --- /dev/null +++ b/components/electric/lib/satellite/protocol_helpers.ex @@ -0,0 +1,108 @@ +defmodule Satellite.ProtocolHelpers do + @moduledoc """ + Helper module for building protobuf objects to send via the test WS client. + Used mainly in E2E tests, which is why this is not in the test folder. + """ + use Electric.Satellite.Protobuf + alias Electric.Satellite.Serialization + + def subscription_request(id \\ nil, shape_requests) do + shape_requests + |> Enum.map(fn + {id, tables: tables} -> + %SatShapeReq{ + request_id: to_string(id), + shape_definition: %SatShapeDef{ + selects: tables |> Enum.map(&%SatShapeDef.Select{tablename: &1}) + } + } + end) + |> then( + &%SatSubsReq{ + subscription_id: id || Electric.Utils.uuid4(), + shape_requests: &1 + } + ) + end + + def schema("public.entries") do + %{ + schema: "public", + name: "entries", + oid: 11111, + primary_keys: ["id"], + columns: [ + %{name: "id", type: :uuid}, + %{name: "content", type: :varchar}, + %{name: "content_b", type: :varchar} + ] + } + end + + def relation("public.entries") do + %SatRelation{ + columns: [ + %SatRelationColumn{name: "id", type: "uuid", is_nullable: false}, + %SatRelationColumn{name: "content", type: "varchar", is_nullable: false}, + %SatRelationColumn{name: "content_b", type: "varchar", is_nullable: true} + ], + relation_id: 11111, + schema_name: "public", + table_name: "entries", + table_type: :TABLE + } + end + + def insert(table, data) when is_map(data) do + schema = schema(table) + columns = Enum.map(schema.columns, & &1.name) + + %SatOpInsert{relation_id: schema.oid, row_data: Serialization.map_to_row(data, columns)} + end + + def update(table, pk, old_data, new_data, tags \\ []) + when is_list(tags) and is_map(pk) and is_map(old_data) and is_map(new_data) do + schema = schema(table) + columns = Enum.map(schema.columns, & &1.name) + + %SatOpUpdate{ + relation_id: schema.oid, + old_row_data: Serialization.map_to_row(Map.merge(old_data, pk), columns), + row_data: Serialization.map_to_row(Map.merge(new_data, pk), columns), + tags: tags + } + end + + def delete(table, old_data, tags \\ []) when is_list(tags) and is_map(old_data) do + schema = schema(table) + columns = Enum.map(schema.columns, & &1.name) + + %SatOpDelete{ + relation_id: schema.oid, + old_row_data: Serialization.map_to_row(old_data, columns), + tags: tags + } + end + + def transaction(lsn, commit_time, op_or_ops) + when is_binary(lsn) and (is_integer(commit_time) or is_struct(commit_time, DateTime)) do + commit_time = + if is_integer(commit_time), + do: commit_time, + else: DateTime.to_unix(commit_time, :millisecond) + + begin = {:begin, %SatOpBegin{commit_timestamp: commit_time, lsn: lsn, trans_id: ""}} + commit = {:commit, %SatOpCommit{commit_timestamp: commit_time, lsn: lsn, trans_id: ""}} + ops = [begin] ++ List.wrap(op_or_ops) ++ [commit] + + ops = + Enum.map(ops, fn + {type, op} -> %SatTransOp{op: {type, op}} + %SatOpInsert{} = op -> %SatTransOp{op: {:insert, op}} + %SatOpUpdate{} = op -> %SatTransOp{op: {:update, op}} + %SatOpDelete{} = op -> %SatTransOp{op: {:delete, op}} + end) + + %SatOpLog{ops: ops} + end +end diff --git a/components/electric/lib/satellite/satellite_ws_client.ex b/components/electric/lib/satellite/satellite_ws_client.ex deleted file mode 100644 index af7e6c8c5e..0000000000 --- a/components/electric/lib/satellite/satellite_ws_client.ex +++ /dev/null @@ -1,656 +0,0 @@ -defmodule Electric.Test.SatelliteWsClient do - @moduledoc """ - - """ - alias Electric.Satellite.Serialization - alias Electric.Replication.Changes.Transaction - - use Electric.Satellite.Protobuf - - require Logger - - defmodule State do - defstruct auto_in_sub: false, - auto_ping: false, - debug: false, - filter_reply: nil, - format: :term, - history: nil, - num: 0, - parent: nil, - stream_ref: nil, - conn: nil, - last_lsn: nil - end - - def connect() do - host = {127, 0, 0, 1} - port = 5133 - connect(host, port) - end - - def connect(host, port) do - host = - case host do - h when is_binary(h) -> :erlang.binary_to_list(host) - _ -> host - end - - {:ok, conn} = :gun.open(host, port, %{:transport => :tcp}) - {:ok, _} = :gun.await_up(conn) - stream_ref = :gun.ws_upgrade(conn, "/ws", []) - - {:upgrade, [<<"websocket">>], _} = :gun.await(conn, stream_ref) - {:ok, {conn, stream_ref}} - end - - # Automatically send auth - @type opt() :: - {:auth, boolean()} - | {:host, String.t()} - | {:port, pos_integer()} - | {:debug, boolean()} - # Logging format - | {:format, :term | :json | :compact} - # Automatically respond to pings - | {:auto_ping, boolean()} - # Automatically acknowledge subscription from Electric - | {:auto_in_sub, boolean()} - # Client identification - | {:id, term()} - # Automatically subscribe to Electric starting from lsn - | {:sub, String.t()} - # Request to continue following subscriptions - | {:subscription_ids, [String.t()]} - | {:auto_register, boolean()} - - @type conn() :: atom() | pid() - - @spec connect_and_spawn([opt()]) :: {:ok, pid()} - def connect_and_spawn(opts \\ []) do - self = self() - :application.ensure_all_started(:gun) - :proc_lib.start(__MODULE__, :loop_init, [self, opts]) - end - - def with_connect(opts, fun) do - {:ok, pid} = connect_and_spawn(opts) - - try do - fun.(pid) - after - :ok = disconnect(pid) - end - end - - def is_alive(conn \\ __MODULE__) do - conn = - cond do - is_pid(conn) -> - conn - - is_atom(conn) -> - :erlang.whereis(conn) - end - - Process.alive?(conn) - end - - def gen_schema() do - %{ - schema: "public", - name: "entries", - oid: 11111, - primary_keys: ["id"], - columns: [ - %{name: "id", type: :uuid}, - %{name: "content", type: :varchar}, - %{name: "content_b", type: :varchar} - ] - } - end - - def gen_owned_schema() do - %{ - schema_name: "public", - table_name: "entries", - oid: 22222, - columns: [ - %{name: "id", type: :uuid}, - %{name: "electric_user_id", type: :varchar}, - %{name: "content", type: :varchar} - ] - } - end - - def build_subscription_request(id \\ nil, shape_requests) do - shape_requests - |> Enum.map(fn - {id, tables: tables} -> - %SatShapeReq{ - request_id: to_string(id), - shape_definition: %SatShapeDef{ - selects: tables |> Enum.map(&%SatShapeDef.Select{tablename: &1}) - } - } - end) - |> then( - &%SatSubsReq{ - subscription_id: id || Electric.Utils.uuid4(), - shape_requests: &1 - } - ) - end - - def send_test_relation(conn \\ __MODULE__) do - relation = %SatRelation{ - columns: [ - %SatRelationColumn{name: "id", type: "uuid", is_nullable: false}, - %SatRelationColumn{name: "content", type: "varchar", is_nullable: false}, - %SatRelationColumn{name: "content_b", type: "varchar", is_nullable: true} - ], - relation_id: 11111, - schema_name: "public", - table_name: "entries", - table_type: :TABLE - } - - send_data(conn, relation) - :ok - end - - def send_test_relation_owned(conn \\ __MODULE__) do - relation = %SatRelation{ - columns: [ - %SatRelationColumn{name: "id", type: "uuid", is_nullable: false}, - %SatRelationColumn{name: "electric_user_id", type: "varchar", is_nullable: false}, - %SatRelationColumn{name: "content", type: "varchar", is_nullable: false} - ], - relation_id: 22222, - schema_name: "public", - table_name: "owned_entries", - table_type: :TABLE - } - - send_data(conn, relation) - :ok - end - - def send_new_data(conn \\ __MODULE__, lsn, commit_time, id, value) do - send_tx_data( - conn, - lsn, - commit_time, - {:insert, %SatOpInsert{relation_id: 11111, row_data: map_to_row([id, value, ""])}} - ) - end - - def send_new_owned_data(conn \\ __MODULE__, lsn, commit_time, id, user_id, value) do - send_tx_data( - conn, - lsn, - commit_time, - {:insert, %SatOpInsert{relation_id: 22222, row_data: map_to_row([id, user_id, value])}} - ) - end - - def entries_table_send_insert(conn \\ __MODULE__, lsn, commit_time, data) do - columns = Enum.map(gen_schema().columns, & &1.name) - - send_tx_data( - conn, - lsn, - commit_time, - {:insert, - %SatOpInsert{ - relation_id: gen_schema().oid, - row_data: Serialization.map_to_row(data, columns) - }} - ) - end - - def entries_table_send_update( - conn \\ __MODULE__, - lsn, - commit_time, - id, - old_data, - new_data, - tags \\ [] - ) do - columns = Enum.map(gen_schema().columns, & &1.name) - - send_tx_data( - conn, - lsn, - commit_time, - {:update, - %SatOpUpdate{ - relation_id: gen_schema().oid, - old_row_data: Serialization.map_to_row(Map.put(old_data, "id", id), columns), - row_data: Serialization.map_to_row(Map.put(new_data, "id", id), columns), - tags: tags - }} - ) - end - - def entries_table_send_delete( - conn \\ __MODULE__, - lsn, - commit_time, - old_data, - tags \\ [] - ) do - columns = Enum.map(gen_schema().columns, & &1.name) - - send_tx_data( - conn, - lsn, - commit_time, - {:delete, - %SatOpDelete{ - relation_id: gen_schema().oid, - old_row_data: Serialization.map_to_row(old_data, columns), - tags: tags - }} - ) - end - - def send_update_data(conn \\ __MODULE__, lsn, commit_time, id, value, old_value) do - send_tx_data( - conn, - lsn, - commit_time, - {:update, - %SatOpUpdate{ - relation_id: 11111, - old_row_data: map_to_row([id, old_value, ""]), - row_data: map_to_row([id, value, ""]) - }} - ) - end - - def send_update_owned_data(conn \\ __MODULE__, lsn, commit_time, id, user_id, value, old_value) do - send_tx_data( - conn, - lsn, - commit_time, - {:update, - %SatOpUpdate{ - relation_id: 22222, - old_row_data: map_to_row([id, user_id, old_value]), - row_data: map_to_row([id, user_id, value]) - }} - ) - end - - def send_delete_data(conn \\ __MODULE__, lsn, commit_time, id, value) do - send_tx_data( - conn, - lsn, - commit_time, - {:delete, %SatOpDelete{relation_id: 11111, old_row_data: map_to_row([id, value, ""])}} - ) - end - - def send_delete_owned_data(conn \\ __MODULE__, lsn, commit_time, id, user_id, value) do - send_tx_data( - conn, - lsn, - commit_time, - {:delete, %SatOpDelete{relation_id: 22222, old_row_data: map_to_row([id, user_id, value])}} - ) - end - - def send_tx_data(conn, lsn, commit_time, op_or_ops) do - begin = {:begin, %SatOpBegin{commit_timestamp: commit_time, lsn: lsn, trans_id: ""}} - commit = {:commit, %SatOpCommit{commit_timestamp: commit_time, lsn: lsn, trans_id: ""}} - ops = [begin] ++ List.wrap(op_or_ops) ++ [commit] - - ops = - Enum.map(ops, fn - {type, op} -> %SatTransOp{op: {type, op}} - %SatOpInsert{} = op -> %SatTransOp{op: {:insert, op}} - %SatOpUpdate{} = op -> %SatTransOp{op: {:update, op}} - %SatOpDelete{} = op -> %SatTransOp{op: {:delete, op}} - end) - - tx = %SatOpLog{ops: ops} - - send_data(conn, tx) - :ok - end - - @doc """ - Serialize transaction that is represented in internal Electric format - """ - def send_tx_internal(conn, %Transaction{} = tx, lsn, relations) do - {sat_oplog, [], _} = Serialization.serialize_trans(tx, lsn, relations) - - for op <- sat_oplog do - send_data(conn, op) - end - end - - @doc """ - Serialize relation that is represented in internal Electric format - """ - # def send_relation_internal(conn, schema, name, oid, columns) do - def send_relation_internal(conn, table_info) do - sat_rel = Serialization.serialize_relation(table_info) - send_data(conn, sat_rel) - end - - @spec send_data(conn(), PB.sq_pb_msg(), fun() | :default) :: term() - def send_data(conn, data, filter \\ :default) do - filter = - case filter do - :default -> fn _, _ -> true end - etc -> etc - end - - send(conn, {:ctrl_stream, data, filter}) - end - - @spec send_bin_data(conn(), binary(), fun() | :default) :: term() - def send_bin_data(conn, data, filter \\ :default) do - filter = - case filter do - :default -> fn _, _ -> true end - etc -> etc - end - - send(conn, {:ctrl_bin, data, filter}) - end - - def disconnect(conn \\ __MODULE__) do - conn = - case conn do - conn when is_atom(conn) -> - :erlang.whereis(conn) - - conn when is_pid(conn) -> - conn - end - - with true <- :erlang.is_pid(conn) do - ref = :erlang.monitor(:process, conn) - send(conn, {:gun_error, :none, :none, :none}) - - receive do - {:DOWN, ^ref, :process, _, _} -> - :ok - after - 5000 -> - :erlang.exit(conn, :kill) - end - else - _ -> :ok - end - end - - def get_ets() do - __MODULE__ - end - - @spec loop_init(pid(), [opt()]) :: any - def loop_init(parent, opts) do - host = Keyword.get(opts, :host, "localhost") - port = Keyword.get(opts, :port, 5133) - {:ok, {conn, stream_ref}} = connect(host, port) - - self = self() - - t = - case Keyword.get(opts, :auto_register, true) do - true -> - Process.register(self(), __MODULE__) - :ets.new(__MODULE__, [:named_table, :ordered_set]) - - false -> - :ets.new(__MODULE__, [:ordered_set]) - end - - try do - Logger.info("started #{inspect(self)}") - - maybe_auth(conn, stream_ref, opts) - maybe_subscribe(conn, stream_ref, opts) - - :proc_lib.init_ack(parent, {:ok, self()}) - - loop(%State{ - conn: conn, - stream_ref: stream_ref, - parent: parent, - history: t, - num: 0, - filter_reply: fn _, _ -> true end, - debug: Keyword.get(opts, :debug, false), - format: Keyword.get(opts, :format, :term), - auto_ping: Keyword.get(opts, :auto_ping, false), - auto_in_sub: Keyword.get(opts, :auto_in_sub, false) - }) - rescue - e -> - Logger.error(Exception.format(:error, e, __STACKTRACE__)) - reraise e, __STACKTRACE__ - end - end - - def loop(%State{conn: conn, stream_ref: stream_ref, history: table, num: num} = state) do - receive do - {:ctrl_stream, data, filter} -> - {:ok, type, _iodata} = PB.encode(data) - maybe_debug("send data #{type}: #{inspect(data)}", state) - - :gun.ws_send(conn, stream_ref, {:binary, serialize(data)}) - loop(%State{state | filter_reply: filter}) - - {:ctrl_bin, data, filter} -> - :gun.ws_send(conn, stream_ref, {:binary, data}) - maybe_debug("send bin data: #{inspect(data)}", state) - loop(%State{state | filter_reply: filter}) - - {:gun_response, ^conn, _, _, status, headers} -> - :gun.close(conn) - Logger.error("gun error: #{inspect(status)} #{inspect(headers)}") - - {:gun_error, _, _, :none} -> - :gun.close(conn) - Logger.info("instructed to close connection") - - {:gun_error, _, _, reason} -> - :gun.close(conn) - Logger.error("gun error: #{inspect(reason)}") - - {:gun_ws, ^conn, ^stream_ref, :close} -> - :gun.close(conn) - Logger.info("gun_ws: close by the server") - - {:gun_ws, ^conn, ^stream_ref, {:binary, <> = bin}} -> - maybe_debug("received bin: #{type} #{inspect(data)}", state) - data = deserialize(bin, state.format) - - case data do - %SatPingReq{} when state.auto_ping == true -> - Process.send( - self(), - {:ctrl_stream, %SatPingResp{lsn: state.last_lsn}, state.filter_reply}, - [] - ) - - %SatInStartReplicationReq{} when state.auto_in_sub == true -> - Process.send( - self(), - {:ctrl_stream, %SatInStartReplicationResp{}, state.filter_reply}, - [] - ) - - _ -> - :ok - end - - :ets.insert(table, {num, data}) - - case state.filter_reply do - nil -> - :ok - - fun -> - case fun.(num, data) do - true -> - msg = {self(), data} - maybe_debug("sending to: #{inspect(state.parent)} #{inspect(msg)}", state) - send(state.parent, msg) - - false -> - :ok - end - end - - case data do - %SatPingReq{} -> - Logger.info("rec: #{inspect(data)}") - loop(%State{state | num: num}) - - _ -> - Logger.info("rec [#{num}]: #{inspect(data)}") - loop(%State{state | num: num + 1}) - end - - msg -> - Logger.warning("Unhandled: #{inspect(msg)}") - end - end - - def maybe_auth(conn, stream_ref, opts) do - case auth_token!(opts) do - {:ok, token} -> - id = Keyword.get(opts, :id, "id") - - headers = [%SatAuthHeaderPair{key: :PROTO_VERSION, value: PB.get_long_proto_vsn()}] - auth_req = serialize(%SatAuthReq{id: id, token: token, headers: headers}) - - :gun.ws_send(conn, stream_ref, {:binary, auth_req}) - {:ws, {:binary, auth_frame}} = :gun.await(conn, stream_ref) - %SatAuthResp{} = deserialize(auth_frame) - :ok = :gun.update_flow(conn, stream_ref, 1) - - Logger.debug("Auth passed") - - :no_auth -> - :ok - end - end - - defp auth_token!(opts) do - case Keyword.get(opts, :auth, false) do - false -> - :no_auth - - %{token: token} -> - {:ok, token} - - %{auth_config: config, user_id: user_id} -> - {:ok, Electric.Satellite.Auth.Secure.create_token(user_id, config: config)} - - %{user_id: user_id} -> - {:ok, Electric.Satellite.Auth.Secure.create_token(user_id)} - - invalid -> - raise ArgumentError, - message: - "use connect_and_spawn(auth: %{auth_provider: \"...\", user_id: \"...\"} | %{token: \"...\"}), got: #{inspect(invalid)}" - end - end - - def maybe_subscribe(conn, stream_ref, opts) do - case Keyword.get(opts, :sub, nil) do - nil -> - :ok - - "" -> - sub_req = serialize(%SatInStartReplicationReq{}) - :gun.ws_send(conn, stream_ref, {:binary, sub_req}) - Logger.debug("Subscribed at LSN=0") - - lsn -> - sub_req = - serialize(%SatInStartReplicationReq{ - lsn: lsn, - subscription_ids: Keyword.get(opts, :subscription_ids, []) - }) - - :gun.ws_send(conn, stream_ref, {:binary, sub_req}) - Logger.debug("Subscribed at LSN=#{inspect(lsn)}") - end - end - - def maybe_debug(format, %{debug: true}) do - Logger.debug(format) - end - - def maybe_debug(_, _) do - :ok - end - - def serialize(data) do - {:ok, type, iodata} = PB.encode(data) - [<>, iodata] - end - - def deserialize(binary, format \\ :term) do - <> = binary - - case format do - :compact -> - {:ok, data} = PB.decode(type, data) - compact(data) - - :term -> - {:ok, data} = PB.decode(type, data) - data - - :json when data !== <<"">> -> - {:ok, data} = PB.json_decode(type, data, [:return_maps, :use_nil]) - data - - _ -> - {:ok, data} = PB.decode(type, data) - data - end - end - - def compact(%SatOpLog{ops: ops}) do - Enum.reduce( - ops, - nil, - fn - %SatTransOp{op: {:begin, %SatOpBegin{commit_timestamp: tmp, lsn: lsn, trans_id: id}}}, - _acc -> - %{commit_timestamp: tmp, lsn: lsn, trans_id: id} - - %SatTransOp{op: {:commit, _}}, acc -> - acc - - %SatTransOp{op: {key, %{tags: _} = op}}, acc -> - acc1 = Map.update(acc, key, 1, fn n -> n + 1 end) - Map.update(acc1, :tags, op.tags, fn tags -> op.tags ++ tags end) - - %SatTransOp{op: {key, _op}}, acc -> - Map.update(acc, key, 1, fn n -> n + 1 end) - end - ) - end - - def compact(other) do - other - end - - defp map_to_row([a, b, c]) do - map = %{"id" => a, "content" => b, "content_b" => c} - columns = ["id", "content", "content_b"] - Serialization.map_to_row(map, columns) - end -end diff --git a/components/electric/lib/satellite/test_ws_client.ex b/components/electric/lib/satellite/test_ws_client.ex new file mode 100644 index 0000000000..11efcbbe29 --- /dev/null +++ b/components/electric/lib/satellite/test_ws_client.ex @@ -0,0 +1,222 @@ +defmodule Satellite.TestWsClient do + use Mint.WebSocketClient + alias Electric.Satellite.Auth + + use Electric.Satellite.Protobuf + + # Public API + + @protocol_prefix "electric." + @satellite_vsn @protocol_prefix <> "#{Electric.vsn().major}.#{Electric.vsn().minor}" + + def connect(opts) do + connection_opts = + [ + host: "127.0.0.1", + port: 5133, + protocol: :ws, + path: "/ws", + subprotocol: @satellite_vsn, + init_arg: Keyword.drop(opts, [:host, :port]) ++ [parent: self()] + ] + |> Keyword.merge(Keyword.take(opts, [:host, :port])) + + GenServer.start(__MODULE__, connection_opts, Keyword.take(opts, [:name])) + end + + def with_connect(opts, fun) when is_function(fun, 1) do + {:ok, pid} = connect(opts) + + try do + fun.(pid) + after + disconnect(pid) + end + end + + def disconnect(pid) do + GenServer.stop(pid, :shutdown) + catch + :exit, _ -> :ok + end + + @doc """ + Send given WebSocket frames to the server. + """ + @spec send_frames(GenServer.server(), Mint.WebSocket.frame() | [Mint.WebSocket.frame()]) :: + :ok | {:error, term()} + def send_frames(pid, data), do: GenServer.call(pid, {:send_frames, List.wrap(data)}) + + @doc """ + Send Satellite protocol messages to the server. + """ + @spec send_data(GenServer.server(), PB.sq_pb_msg() | [PB.sq_pb_msg()]) :: :ok | {:error, term()} + def send_data(pid, messages), do: GenServer.call(pid, {:send_data, List.wrap(messages)}) + + # Implementation + + @impl WebSocketClient + def handle_connection(@protocol_prefix <> vsn, conn, opts) do + Logger.info("Connection established with protocol vsn #{vsn}") + + opts = Map.new(opts) + + with {:ok, conn, unprocessed} <- maybe_auth(conn, opts), + {:ok, _conn} <- maybe_subscribe(conn, opts) do + table = :ets.new(:ws_client_received_messages, [:ordered_set]) + {:ok, %{opts: opts, count: 0, table: table}, unprocessed} + end + end + + def handle_connection(_, _, _) do + {:error, :wrong_subprotocol} + end + + defp maybe_auth(conn, opts) do + case auth_token!(opts[:auth]) do + {:ok, token} -> + id = Map.get(opts, :id, "id") + + headers = [%SatAuthHeaderPair{key: :PROTO_VERSION, value: PB.get_long_proto_vsn()}] + auth_req = serialize(%SatAuthReq{id: id, token: token, headers: headers}) + + {:ok, conn} = WebSocketClient.send_frames(conn, [auth_req]) + {:ok, conn, frames} = WebSocketClient.receive_next_frames!(conn) + + decoded = + Enum.map(frames, fn {:binary, <>} -> PB.decode!(type, data) end) + + {[%SatAuthResp{}], rest} = Enum.split_with(decoded, &is_struct(&1, SatAuthResp)) + + Logger.debug("Auth passed") + {:ok, conn, Enum.map(rest, &serialize/1)} + + :no_auth -> + {:ok, conn, []} + end + end + + defp maybe_subscribe(conn, opts) do + case opts[:sub] do + nil -> + {:ok, conn} + + "" -> + sub_req = serialize(%SatInStartReplicationReq{}) + {:ok, conn} = WebSocketClient.send_frames(conn, [sub_req]) + Logger.debug("Subscribed at LSN=0") + {:ok, conn} + + lsn -> + sub_req = + serialize(%SatInStartReplicationReq{ + lsn: lsn, + subscription_ids: Map.get(opts, :subscription_ids, []) + }) + + {:ok, conn} = WebSocketClient.send_frames(conn, [sub_req]) + Logger.debug("Subscribed at LSN=#{inspect(lsn)}") + {:ok, conn} + end + end + + @impl WebSocketClient + def handle_frame({:text, _}, state) do + Logger.error("Text frames are not supported for Electric protocol") + {:stop, :normal, {:close, 1003, ""}, state} + end + + def handle_frame({:binary, <>}, state) do + Logger.debug("Received type #{type} and binary: #{inspect(data, limit: :infinity)}") + + case PB.decode(type, data) do + {:ok, msg} -> + state + |> tap(&log(&1, msg)) + |> store(msg) + |> tap(&send(&1.opts.parent, {self(), msg})) + |> maybe_autorespond(msg) + + {:error, reason} -> + Logger.error("Couldn't decode message from the server: #{inspect(reason)}") + {:stop, {:error, reason}, {:close, 1007, ""}, state} + end + end + + @impl GenServer + def terminate(:shutdown, {conn, _}) do + WebSocketClient.send_frames(conn, [{:close, 1001, ""}]) + end + + def terminate(_, _), do: nil + + @impl GenServer + def handle_call({:send_frames, frames}, _from, {conn, state}) do + case WebSocketClient.send_frames(conn, frames) do + {:ok, conn} -> + {:reply, :ok, {conn, state}} + + {:error, %Mint.TransportError{reason: :closed}} = error -> + {:stop, :normal, error, {conn, state}} + + error -> + {:reply, error, {conn, state}} + end + end + + def handle_call({:send_data, messages}, _from, {conn, state}) do + frames = Enum.map(messages, &serialize/1) + + case WebSocketClient.send_frames(conn, frames) do + {:ok, conn} -> + {:reply, :ok, {conn, state}} + + {:error, %Mint.TransportError{reason: :closed}} = error -> + {:stop, :normal, error, {conn, state}} + + error -> + {:reply, error, {conn, state}} + end + end + + defp store(state, %SatPingReq{}), do: state + + defp store(%{count: count} = state, msg) do + :ets.insert(state.table, {count, msg}) + %{state | count: count + 1} + end + + defp log(_, %SatPingReq{} = msg), do: Logger.info("rec: #{inspect(msg)}") + defp log(state, msg), do: Logger.info("rec [#{state.count}]: #{inspect(msg)}") + + defp maybe_autorespond(%{opts: %{auto_ping: true}} = state, %SatPingReq{}) do + {:reply, serialize(%SatPingResp{lsn: nil}), state} + end + + defp maybe_autorespond(%{opts: %{auto_in_sub: true}} = state, %SatInStartReplicationReq{}) do + {:reply, serialize(%SatInStartReplicationResp{}), state} + end + + defp maybe_autorespond(state, _), do: {:noreply, state} + + defp auth_token!(nil), do: :no_auth + defp auth_token!(%{token: token}), do: {:ok, token} + + defp auth_token!(%{auth_config: config, user_id: user_id}), + do: {:ok, Auth.Secure.create_token(user_id, config: config)} + + defp auth_token!(%{user_id: user_id}), do: {:ok, Auth.Secure.create_token(user_id)} + + defp auth_token!(invalid), + do: + raise(ArgumentError, + message: + ~s'expected %{auth_provider: "...", user_id: "..."} | %{token: "..."}, got: #{inspect(invalid)}' + ) + + @spec serialize(struct) :: {:binary, binary()} + def serialize(data) do + {:ok, type, iodata} = PB.encode(data) + {:binary, IO.iodata_to_binary([<>, iodata])} + end +end diff --git a/components/electric/mix.exs b/components/electric/mix.exs index a2e37168a4..099e04ac7d 100644 --- a/components/electric/mix.exs +++ b/components/electric/mix.exs @@ -18,7 +18,7 @@ defmodule Electric.MixProject do releases: [ electric: [applications: [electric: :permanent], include_executables_for: [:unix]], ws_client: [ - applications: [electric: :load, gun: :permanent], + applications: [electric: :load], include_executables_for: [:unix], runtime_config_path: false ] @@ -49,7 +49,6 @@ defmodule Electric.MixProject do {:excoveralls, "~> 0.14", only: :test, runtime: false}, {:gproc, "~> 0.9.0"}, {:protox, "~> 1.7"}, - {:gun, "~> 2.0"}, {:gen_stage, "~> 1.2"}, {:telemetry, "~> 1.1", override: true}, {:telemetry_poller, "~> 1.0"}, @@ -64,7 +63,9 @@ defmodule Electric.MixProject do {:pg_query_ex, github: "electric-sql/pg_query_ex"}, {:nimble_pool, "~> 1.0"}, {:bandit, "~> 1.0-pre"}, - {:thousand_island, "~> 1.0-pre"} + {:thousand_island, "~> 1.0-pre"}, + {:mint_web_socket, "~> 1.0"}, + {:mint, "~> 1.5"} ] end diff --git a/components/electric/mix.lock b/components/electric/mix.lock index b7356c0cc0..b50f723f8b 100644 --- a/components/electric/mix.lock +++ b/components/electric/mix.lock @@ -3,7 +3,6 @@ "bandit": {:hex, :bandit, "1.0.0-pre.14", "3ac3f41cf4fdf2bb234bc6a27aad2b63c4d55a1e003f0271dc9118ff113392b6", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0-pre.5", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "550f79ad03ccfd85c86a392b033a075637bcb06ca0a2abad05bbafd1e9989e70"}, "cc_precompiler": {:hex, :cc_precompiler, "0.1.7", "77de20ac77f0e53f20ca82c563520af0237c301a1ec3ab3bc598e8a96c7ee5d9", [:mix], [{:elixir_make, "~> 0.7.3", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "2768b28bf3c2b4f788c995576b39b8cb5d47eb788526d93bd52206c1d8bf4b75"}, "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, - "cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"}, "db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "dialyxir": {:hex, :dialyxir, "1.3.0", "fd1672f0922b7648ff9ce7b1b26fcf0ef56dda964a459892ad15f6b4410b5284", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "00b2a4bcd6aa8db9dcb0b38c1225b7277dca9bc370b6438715667071a304696f"}, @@ -15,7 +14,6 @@ "exqlite": {:hex, :exqlite, "0.13.14", "acd8b58c2245c6aa611262a887509c6aa862a05bfeb174faf348375bd9fc7edb", [:make, :mix], [{:cc_precompiler, "~> 0.1", [hex: :cc_precompiler, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "e81cd9b811e70a43b8d2d4ee76d3ce57ff349890ec4182f8f5223ead38ac4996"}, "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, - "gun": {:hex, :gun, "2.0.1", "160a9a5394800fcba41bc7e6d421295cf9a7894c2252c0678244948e3336ad73", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "a10bc8d6096b9502205022334f719cc9a08d9adcfbfc0dbee9ef31b56274a20b"}, "hackney": {:hex, :hackney, "1.18.2", "d7ff544ddae5e1cb49e9cf7fa4e356d7f41b283989a1c304bfc47a8cc1cf966f", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "af94d5c9f97857db257090a4a10e5426ecb6f4918aa5cc666798566ae14b65fd"}, "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, @@ -27,6 +25,8 @@ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"}, + "mint_web_socket": {:hex, :mint_web_socket, "1.0.3", "aab42fff792a74649916236d0b01f560a0b3f03ca5dea693c230d1c44736b50e", [:mix], [{:mint, ">= 1.4.1 and < 2.0.0-0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "ca3810ca44cc8532e3dce499cc17f958596695d226bb578b2fbb88c09b5954b0"}, "mock": {:hex, :mock, "0.3.8", "7046a306b71db2488ef54395eeb74df0a7f335a7caca4a3d3875d1fc81c884dd", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "7fa82364c97617d79bb7d15571193fc0c4fe5afd0c932cef09426b3ee6fe2022"}, "mox": {:hex, :mox, "1.0.2", "dc2057289ac478b35760ba74165b4b3f402f68803dd5aecd3bfd19c183815d64", [:mix], [], "hexpm", "f9864921b3aaf763c8741b5b8e6f908f44566f1e427b2630e89e9a73b981fef2"}, "nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"}, diff --git a/components/electric/test/electric/satellite/subscriptions_test.exs b/components/electric/test/electric/satellite/subscriptions_test.exs index b9f7ccd1aa..fb5bc2a78c 100644 --- a/components/electric/test/electric/satellite/subscriptions_test.exs +++ b/components/electric/test/electric/satellite/subscriptions_test.exs @@ -5,7 +5,7 @@ defmodule Electric.Satellite.SubscriptionsTest do import Electric.Postgres.TestConnection import Electric.Utils, only: [uuid4: 0] import ElectricTest.SetupHelpers - alias Electric.Test.SatelliteWsClient, as: MockClient + alias Satellite.TestWsClient, as: MockClient alias Electric.Postgres.CachedWal describe "Handling of real subscriptions" do diff --git a/components/electric/test/electric/satellite/ws_pg_to_satellite_test.exs b/components/electric/test/electric/satellite/ws_pg_to_satellite_test.exs index cf7db5b100..bca8e68c50 100644 --- a/components/electric/test/electric/satellite/ws_pg_to_satellite_test.exs +++ b/components/electric/test/electric/satellite/ws_pg_to_satellite_test.exs @@ -6,7 +6,7 @@ defmodule Electric.Satellite.WsPgToSatelliteTest do import Electric.Postgres.TestConnection import ElectricTest.SatelliteHelpers - alias Electric.Test.SatelliteWsClient, as: MockClient + alias Satellite.TestWsClient, as: MockClient alias Electric.Satellite.Auth setup :setup_replicated_db diff --git a/components/electric/test/electric/satellite/ws_server_test.exs b/components/electric/test/electric/satellite/ws_server_test.exs index 28a76f7a12..9038d8fdb4 100644 --- a/components/electric/test/electric/satellite/ws_server_test.exs +++ b/components/electric/test/electric/satellite/ws_server_test.exs @@ -9,7 +9,7 @@ defmodule Electric.Satellite.WebsocketServerTest do alias Electric.Replication.SatelliteConnector alias Electric.Postgres.CachedWal.Producer - alias Electric.Test.SatelliteWsClient, as: MockClient + alias Satellite.TestWsClient, as: MockClient alias Electric.Replication.Changes @@ -118,7 +118,7 @@ defmodule Electric.Satellite.WebsocketServerTest do test "sanity check", ctx do with_connect([port: ctx.port], fn conn -> Process.sleep(1000) - assert true == MockClient.is_alive(conn) + assert Process.alive?(conn) end) end @@ -168,14 +168,14 @@ defmodule Electric.Satellite.WebsocketServerTest do test "Server will handle bad requests", ctx do with_connect([port: ctx.port], fn conn -> - MockClient.send_bin_data(conn, <<"rubbish">>) + MockClient.send_frames(conn, {:binary, "rubbish"}) assert_receive {^conn, %SatErrorResp{}}, @default_wait end) end test "Server will handle bad requests after auth", ctx do with_connect([port: ctx.port, auth: ctx], fn conn -> - MockClient.send_bin_data(conn, <<"rubbish">>) + MockClient.send_frames(conn, {:binary, "rubbish"}) assert_receive {^conn, %SatErrorResp{}}, @default_wait end) end @@ -268,7 +268,7 @@ defmodule Electric.Satellite.WebsocketServerTest do test "Server will forbid two connections that use same id", ctx do with_connect([auth: ctx, id: ctx.client_id, port: ctx.port], fn _conn -> - {:ok, pid} = MockClient.connect_and_spawn(auto_register: false, port: ctx.port) + {:ok, pid} = MockClient.connect(port: ctx.port) MockClient.send_data(pid, %SatAuthReq{ id: ctx.client_id, @@ -671,8 +671,6 @@ defmodule Electric.Satellite.WebsocketServerTest do end def clean_connections() do - MockClient.disconnect() - :ok = drain_pids(active_clients()) :ok = drain_active_resources(connectors()) end diff --git a/components/electric/test/electric/satellite/ws_validations_test.exs b/components/electric/test/electric/satellite/ws_validations_test.exs index d11a06e4de..31fd139050 100644 --- a/components/electric/test/electric/satellite/ws_validations_test.exs +++ b/components/electric/test/electric/satellite/ws_validations_test.exs @@ -6,7 +6,7 @@ defmodule Electric.Satellite.WsValidationsTest do import Electric.Postgres.TestConnection import ElectricTest.SatelliteHelpers - alias Electric.Test.SatelliteWsClient, as: MockClient + alias Satellite.TestWsClient, as: MockClient alias Electric.Satellite.Auth alias Electric.Satellite.Serialization diff --git a/components/electric/test/support/satellite_helpers.ex b/components/electric/test/support/satellite_helpers.ex index 9403986ed2..4191255945 100644 --- a/components/electric/test/support/satellite_helpers.ex +++ b/components/electric/test/support/satellite_helpers.ex @@ -3,7 +3,7 @@ defmodule ElectricTest.SatelliteHelpers do import ExUnit.Assertions - alias Electric.Test.SatelliteWsClient, as: MockClient + alias Satellite.TestWsClient, as: MockClient # Send a ping to WebsocketServer. Useful to make sure it is done with initial sync. def ping_server(conn) do @@ -27,7 +27,7 @@ defmodule ElectricTest.SatelliteHelpers do assert lsn > 0 end - def with_connect(opts, fun), do: Electric.Test.SatelliteWsClient.with_connect(opts, fun) + def with_connect(opts, fun), do: MockClient.with_connect(opts, fun) def migrate(conn, version, table \\ nil, sql) do results = diff --git a/components/electric/test/support/setup_helpers.ex b/components/electric/test/support/setup_helpers.ex index d5d2aabfa5..e2875ca52a 100644 --- a/components/electric/test/support/setup_helpers.ex +++ b/components/electric/test/support/setup_helpers.ex @@ -21,7 +21,7 @@ defmodule ElectricTest.SetupHelpers do end @doc """ - Asserts server sends all messages that it should to `Electric.Test.SatelliteWsClient` after + Asserts server sends all messages that it should to `Satellite.TestWsClient` after replication request has been sent. Assumes that the database has been migrated before the replication started, and that @@ -49,7 +49,7 @@ defmodule ElectricTest.SetupHelpers do end @doc """ - Wait for and receives subscription data response as sent back to the test process by `Electric.Test.SatelliteWsClient`. + Wait for and receives subscription data response as sent back to the test process by `Satellite.TestWsClient`. Waits for the `SatSubsDataBegin` message, then for each shape data, then for the end message, and verifies their order. Returns a map, where the shape request ids are keys, and the `SatOpInsert` operations are values. diff --git a/e2e/common.luxinc b/e2e/common.luxinc index 9b3273c113..8d7ce064bd 100644 --- a/e2e/common.luxinc +++ b/e2e/common.luxinc @@ -72,6 +72,7 @@ ?$eprompt !Logger.configure(level: :debug) !Application.put_env(:elixir, :ansi_enabled, false) + !alias Satellite.{TestWsClient, ProtocolHelpers} ?$eprompt [endmacro] diff --git a/e2e/tests/02.04_satellite_write_gets_propagated_to_postgres.lux b/e2e/tests/02.04_satellite_write_gets_propagated_to_postgres.lux index 0ba195a778..34bf74ad2a 100644 --- a/e2e/tests/02.04_satellite_write_gets_propagated_to_postgres.lux +++ b/e2e/tests/02.04_satellite_write_gets_propagated_to_postgres.lux @@ -11,19 +11,15 @@ [invoke client_session $user_id_1 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt """! - Electric.Test.SatelliteWsClient.entries_table_send_insert( - conn, - "1", # lsn - 1686009600000, # Unix timestamp of ~U[2023-06-06 00:00:00Z] |> DateTime.to_unix(:millisecond) - %{ - "id" => "00000000-0000-0000-0000-000000000000", - "content" => "sentinel value" - } - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", ~U[2023-06-06 00:00:00Z], [ + ProtocolHelpers.insert("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000", "content" => "sentinel value"} + ) + ])) """ ?:ok ?$eprompt diff --git a/e2e/tests/02.05_delete_gets_registered_correctly.lux b/e2e/tests/02.05_delete_gets_registered_correctly.lux index ac33a7fb4a..3c5670cd8c 100644 --- a/e2e/tests/02.05_delete_gets_registered_correctly.lux +++ b/e2e/tests/02.05_delete_gets_registered_correctly.lux @@ -9,7 +9,7 @@ [invoke start_elixir_test 1] [invoke client_session 1 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt @@ -25,13 +25,12 @@ [my seen_tag=$1] # We do an update having "seen" only the insert """! - Electric.Test.SatelliteWsClient.entries_table_send_delete( - conn, - "1", # lsn - DateTime.utc_now() |> DateTime.to_unix(:millisecond), - %{"id" => "00000000-0000-0000-0000-000000000000", "content" => "original value", "content_b" => nil}, - ["$seen_tag"] - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.utc_now(), [ + ProtocolHelpers.delete("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000", "content" => "original value", "content_b" => nil}, + ["$seen_tag"] + ) + ])) """ ?:ok ?$eprompt diff --git a/e2e/tests/02.06_subscriptions_can_be_resumed_on_reconnect.lux b/e2e/tests/02.06_subscriptions_can_be_resumed_on_reconnect.lux index 067887c658..5d8f21fca3 100644 --- a/e2e/tests/02.06_subscriptions_can_be_resumed_on_reconnect.lux +++ b/e2e/tests/02.06_subscriptions_can_be_resumed_on_reconnect.lux @@ -28,7 +28,7 @@ ?\} ?tags: \["postgres_1@\d{13,16}"\] ?\} - !:proc_lib.stop(conn) + !TestWsClient.disconnect(conn) ?$eprompt # New row inserted while client is "offline" diff --git a/e2e/tests/05.01_confict_resolution_updates_in_the_past_discarded.lux b/e2e/tests/05.01_confict_resolution_updates_in_the_past_discarded.lux index 60d3a68e3f..bed7ef0536 100644 --- a/e2e/tests/05.01_confict_resolution_updates_in_the_past_discarded.lux +++ b/e2e/tests/05.01_confict_resolution_updates_in_the_past_discarded.lux @@ -9,7 +9,7 @@ [invoke start_elixir_test 1] [invoke client_session 1 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt @@ -22,14 +22,13 @@ [shell user_1_ws1] # Send one update touching a single column """! - Electric.Test.SatelliteWsClient.entries_table_send_update( - conn, - "1", # lsn - 1686009600000, # Unix timestamp of ~U[2023-06-06 00:00:00Z] |> DateTime.to_unix(:millisecond) - "00000000-0000-0000-0000-000000000000", # id - %{"content" => "original value", "content_b" => nil}, # old - %{"content" => "updated on client 1", "content_b" => nil} # new - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", ~U[2023-06-06 00:00:00Z], [ + ProtocolHelpers.update("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000"}, # pk + %{"content" => "original value", "content_b" => nil}, # old + %{"content" => "updated on client 1", "content_b" => nil} # new + ) + ])) """ ?:ok ?$eprompt diff --git a/e2e/tests/05.02_confict_resolution_per_column.lux b/e2e/tests/05.02_confict_resolution_per_column.lux index bb4d980ed1..86bd2d2610 100644 --- a/e2e/tests/05.02_confict_resolution_per_column.lux +++ b/e2e/tests/05.02_confict_resolution_per_column.lux @@ -9,7 +9,7 @@ [invoke start_elixir_test 1] [invoke client_session 1 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt @@ -18,7 +18,7 @@ [invoke start_elixir_test 2] [invoke client_session 2 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt @@ -32,15 +32,14 @@ [my seen_tag=$1] # Send one update touching a single column """! - Electric.Test.SatelliteWsClient.entries_table_send_update( - conn, - "1", # lsn - DateTime.utc_now() |> DateTime.to_unix(:millisecond), - "00000000-0000-0000-0000-000000000000", # id - %{"content" => "original value", "content_b" => nil}, # old - %{"content" => "updated on client 1", "content_b" => nil}, # new - ["$seen_tag"] - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.utc_now(), [ + ProtocolHelpers.update("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000"}, # pk + %{"content" => "original value", "content_b" => nil}, # old + %{"content" => "updated on client 1", "content_b" => nil}, # new + ["$seen_tag"] + ) + ])) """ ?:ok ?$eprompt @@ -50,15 +49,14 @@ [my seen_tag=$1] # Send one update touching a single column """! - Electric.Test.SatelliteWsClient.entries_table_send_update( - conn, - "1", # lsn - DateTime.utc_now() |> DateTime.to_unix(:millisecond), - "00000000-0000-0000-0000-000000000000", # id - %{"content" => "original value", "content_b" => nil}, # old - %{"content" => "original value", "content_b" => "updated on client 2"}, # new - ["$seen_tag"] - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.utc_now(), [ + ProtocolHelpers.update("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000"}, # pk + %{"content" => "original value", "content_b" => nil}, # old + %{"content" => "original value", "content_b" => "updated on client 2"}, # new + ["$seen_tag"] + ) + ])) """ ?:ok ?$eprompt diff --git a/e2e/tests/05.03_postgres_wins_against_concurrent_transactions.lux b/e2e/tests/05.03_postgres_wins_against_concurrent_transactions.lux index 5cf5f18584..0eacd14d00 100644 --- a/e2e/tests/05.03_postgres_wins_against_concurrent_transactions.lux +++ b/e2e/tests/05.03_postgres_wins_against_concurrent_transactions.lux @@ -9,7 +9,7 @@ [invoke start_elixir_test 1] [invoke client_session 1 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt @@ -27,15 +27,14 @@ [my seen_tag=$1] # We do an update while transaction on PG is open """! - Electric.Test.SatelliteWsClient.entries_table_send_update( - conn, - "1", # lsn - DateTime.utc_now() |> DateTime.add(86400, :second) |> DateTime.to_unix(:millisecond), # + 1 day - "00000000-0000-0000-0000-000000000000", # id - %{"content" => "original value", "content_b" => nil}, # old - %{"content" => "updated on client 1", "content_b" => nil}, # new - ["$seen_tag"] - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.add(DateTime.utc_now(), 86400, :second), [ + ProtocolHelpers.update("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000"}, # pk + %{"content" => "original value", "content_b" => nil}, # old + %{"content" => "updated on client 1", "content_b" => nil}, # new + ["$seen_tag"] + ) + ])) """ ?:ok ?$eprompt diff --git a/e2e/tests/05.04_update_that_didnt_see_delete_resurrects.lux b/e2e/tests/05.04_update_that_didnt_see_delete_resurrects.lux index 02eb827285..565a5c538e 100644 --- a/e2e/tests/05.04_update_that_didnt_see_delete_resurrects.lux +++ b/e2e/tests/05.04_update_that_didnt_see_delete_resurrects.lux @@ -9,7 +9,7 @@ [invoke start_elixir_test 1] [invoke client_session 1 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt @@ -30,15 +30,14 @@ [my seen_tag=$1] # We do an update having "seen" only the insert """! - Electric.Test.SatelliteWsClient.entries_table_send_update( - conn, - "1", # lsn - DateTime.utc_now() |> DateTime.add(86400, :second) |> DateTime.to_unix(:millisecond), # + 1 day - "00000000-0000-0000-0000-000000000000", # id - %{"content" => "original value", "content_b" => nil}, # old - %{"content" => "original value", "content_b" => "new value"}, # new - ["$seen_tag"] - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.add(DateTime.utc_now(), 86400, :second), [ + ProtocolHelpers.update("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000"}, # pk + %{"content" => "original value", "content_b" => nil}, # old + %{"content" => "original value", "content_b" => "new value"}, # new + ["$seen_tag"] + ) + ])) """ ?:ok ?$eprompt diff --git a/e2e/tests/05.05_compensations_within_same_tx_are_fine.lux b/e2e/tests/05.05_compensations_within_same_tx_are_fine.lux index 3eb5bfad5b..f916a13f3d 100644 --- a/e2e/tests/05.05_compensations_within_same_tx_are_fine.lux +++ b/e2e/tests/05.05_compensations_within_same_tx_are_fine.lux @@ -29,7 +29,7 @@ !alias Electric.Satellite.V14.{SatRelation, SatRelationColumn, SatOpInsert, SatOpUpdate, SatOpRow} """! - Electric.Test.SatelliteWsClient.send_data(conn, %SatRelation{ + Satellite.TestWsClient.send_data(conn, %SatRelation{ columns: [ %SatRelationColumn{name: "id", type: "text", is_nullable: false}, %SatRelationColumn{name: "content", type: "text", is_nullable: false}, @@ -42,7 +42,7 @@ """ ?$eprompt """! - Electric.Test.SatelliteWsClient.send_data(conn, %SatRelation{ + Satellite.TestWsClient.send_data(conn, %SatRelation{ columns: [ %SatRelationColumn{name: "id", type: "text", is_nullable: false}, %SatRelationColumn{name: "content", type: "text", is_nullable: false}, @@ -56,11 +56,11 @@ """ ?$eprompt """! - Electric.Test.SatelliteWsClient.send_tx_data(conn, "1", DateTime.to_unix(DateTime.utc_now(), :millisecond), [ - %SatOpInsert{relation_id: 1, row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["00000000-0000-0000-0000-000000000000", "test_content"]}}, - %SatOpUpdate{relation_id: 1, row_data: %SatOpRow{nulls_bitmask: <<0::1, 1::1, 0::6>>, values: ["00000000-0000-0000-0000-000000000000", ""]}}, - %SatOpInsert{relation_id: 2, row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["00000000-0000-0000-0000-000000000001", "child content", "00000000-0000-0000-0000-000000000000"]}}, - ]) + Satellite.TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.to_unix(DateTime.utc_now(), :millisecond), [ + %SatOpInsert{relation_id: 1, row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["00000000-0000-0000-0000-000000000000", "test_content"]}}, + %SatOpUpdate{relation_id: 1, row_data: %SatOpRow{nulls_bitmask: <<0::1, 1::1, 0::6>>, values: ["00000000-0000-0000-0000-000000000000", ""]}}, + %SatOpInsert{relation_id: 2, row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["00000000-0000-0000-0000-000000000001", "child content", "00000000-0000-0000-0000-000000000000"]}}, + ])) """ ?$eprompt diff --git a/e2e/tests/_shared.luxinc b/e2e/tests/_shared.luxinc index b43c801376..7e1b650516 100644 --- a/e2e/tests/_shared.luxinc +++ b/e2e/tests/_shared.luxinc @@ -38,18 +38,16 @@ alg: "HS256", key: "integration-tests-signing-key-example" \ ) ?$eprompt - """!{:ok, conn} = Electric.Test.SatelliteWsClient.connect_and_spawn( + """!{:ok, conn} = TestWsClient.connect( auth: %{auth_config: auth_config, user_id: "$user_id"}, id: "$client_id", - debug: true, sub: "", auto_in_sub: true, - format: :term, host: "electric_1", auto_ping: true) """ ?+$eprompt - ?+sending to: (.*)%Electric.Satellite.V\d+.SatInStartReplicationReq\{(.*)lsn: "", ([^\}]*)\} + ?+rec \[\d\]: %Electric.Satellite.V\d+.SatInStartReplicationReq\{ ?rec \[\d\]: %Electric.Satellite.V\d+.SatInStartReplicationResp\{ [endmacro] @@ -60,27 +58,25 @@ alg: "HS256", key: "integration-tests-signing-key-example" \ ) ?$eprompt - """!{:ok, conn} = Electric.Test.SatelliteWsClient.connect_and_spawn( + """!{:ok, conn} = TestWsClient.connect( auth: %{auth_config: auth_config, user_id: "$user_id"}, id: "$client_id", - debug: true, sub: "$position", subscription_ids: ~w|$subscription_ids|, auto_in_sub: true, - format: :term, host: "electric_1", auto_ping: true) """ - ?started #PID + ?Connection established with protocol vsn ?Auth passed ?Subscribed - ?sending to: (.*)%Electric.Satellite.V\d+.SatInStartReplicationReq\{(.*)lsn: "", ([^\}]*)\} + ?+rec \[\d\]: %Electric.Satellite.V\d+.SatInStartReplicationReq\{ ?rec \[\d\]: %Electric.Satellite.V\d+.SatInStartReplicationResp\{ [endmacro] [macro elixir_client_subscribe tables] """! - Electric.Test.SatelliteWsClient.send_data(conn, Electric.Test.SatelliteWsClient.build_subscription_request(request_1: [tables: ~w|$tables|])) + TestWsClient.send_data(conn, ProtocolHelpers.subscription_request(request_1: [tables: ~w|$tables|])) """ ?+$eprompt ?rec \[\d\]: %Electric.Satellite.V\d+.SatSubsResp\{[^\}]+err: nil @@ -88,7 +84,7 @@ [macro elixir_client_subscribe_with_id id tables] """! - Electric.Test.SatelliteWsClient.send_data(conn, Electric.Test.SatelliteWsClient.build_subscription_request("$id", request_1: [tables: ~w|$tables|])) + TestWsClient.send_data(conn, ProtocolHelpers.subscription_request("$id", request_1: [tables: ~w|$tables|])) """ ?+$eprompt ?rec \[\d\]: %Electric.Satellite.V\d+.SatSubsResp\{[^\}]+err: nil