From 26622519ef0c62f9aa80833f6c4467d53c7205c3 Mon Sep 17 00:00:00 2001 From: Ilia Borovitinov Date: Thu, 7 Sep 2023 13:03:41 +0300 Subject: [PATCH] chore: add protocol version negotiation on websocket connect (#400) 1. Figures out compatibility between the client and the server: 1. Protocol version for client is taken from major & minor versions of the package 2. Protocol version for the server is it's package version verbatim 3. Client cannot be ahead of the server - meaning that minor package version of the client cannot run ahead of the server 4. We can just add a lower bound of supported client if/when we do more major protocol logic breaks, that way the server will always be able to tell the clients if they are compatible or not 2. Small refactoring of sockets on the client to remove an unnecessary level of abstraction 3. Replace `:gun` ## Replacing `:gun` Reasoning for this change is twofold: 1. `:gun` requires an explicit separate process that implements a subprotocol IF the server replies with a subprotocol and immeditaly closes the connection otherwise 2. Current `SatelliteWsClient` module was written using a bare `:proc_lib` without any GenServer niceties or anything, making most of that file actually irrelevant to what it was doing. I extracted the generic websocket logic, and also extracted "builders" - we had a bunch of now-unused functions in the `SatelliteWsClient`, and other functions that built protocol messages were a bit hard to find and reason about. All of those were used only in e2e test, so I extracted all that. Now I have 3 Elixir modules: generic websocket client over Mint, particular implmentation that is relevant to use, and a `ProtocolHelpers` module that's used by e2e code to prepare objects --- .changeset/cold-ads-cough.md | 6 + clients/typescript/package.json | 1 + .../src/drivers/better-sqlite3/index.ts | 4 +- .../drivers/cordova-sqlite-storage/index.ts | 4 +- .../drivers/cordova-sqlite-storage/test.ts | 4 +- .../src/drivers/expo-sqlite/index.ts | 4 +- .../src/drivers/expo-sqlite/test.ts | 4 +- .../react-native-sqlite-storage/index.ts | 4 +- .../react-native-sqlite-storage/test.ts | 4 +- .../typescript/src/drivers/wa-sqlite/index.ts | 4 +- clients/typescript/src/satellite/client.ts | 4 +- clients/typescript/src/sockets/index.ts | 9 +- clients/typescript/src/sockets/mock.ts | 10 +- clients/typescript/src/sockets/node.ts | 12 +- .../typescript/src/sockets/react-native.ts | 13 +- clients/typescript/src/sockets/web.ts | 12 +- clients/typescript/src/version.ts | 1 + .../typescript/test/migrators/builder.test.ts | 4 +- .../typescript/test/satellite/client.test.ts | 4 +- components/electric/.formatter.exs | 3 +- components/electric/config/runtime.exs | 5 +- components/electric/lib/electric.ex | 5 + .../electric/lib/electric/plug/router.ex | 16 +- .../electric/plug/satellite_websocket_plug.ex | 67 +- .../lib/electric/satellite/protobuf.ex | 5 + .../electric/lib/mint/web_socket_client.ex | 351 ++++++++++ .../lib/satellite/protocol_helpers.ex | 108 +++ .../lib/satellite/satellite_ws_client.ex | 656 ------------------ .../electric/lib/satellite/test_ws_client.ex | 222 ++++++ components/electric/mix.exs | 7 +- components/electric/mix.lock | 4 +- .../electric/satellite/subscriptions_test.exs | 2 +- .../satellite/ws_pg_to_satellite_test.exs | 2 +- .../electric/satellite/ws_server_test.exs | 12 +- .../satellite/ws_validations_test.exs | 2 +- .../test/support/satellite_helpers.ex | 4 +- .../electric/test/support/setup_helpers.ex | 4 +- e2e/common.luxinc | 1 + ...lite_write_gets_propagated_to_postgres.lux | 16 +- ...02.05_delete_gets_registered_correctly.lux | 15 +- ...scriptions_can_be_resumed_on_reconnect.lux | 2 +- ...solution_updates_in_the_past_discarded.lux | 17 +- .../05.02_confict_resolution_per_column.lux | 38 +- ...s_wins_against_concurrent_transactions.lux | 19 +- ...pdate_that_didnt_see_delete_resurrects.lux | 19 +- ..._compensations_within_same_tx_are_fine.lux | 14 +- e2e/tests/_shared.luxinc | 18 +- 47 files changed, 902 insertions(+), 840 deletions(-) create mode 100644 .changeset/cold-ads-cough.md create mode 100644 clients/typescript/src/version.ts create mode 100644 components/electric/lib/mint/web_socket_client.ex create mode 100644 components/electric/lib/satellite/protocol_helpers.ex delete mode 100644 components/electric/lib/satellite/satellite_ws_client.ex create mode 100644 components/electric/lib/satellite/test_ws_client.ex diff --git a/.changeset/cold-ads-cough.md b/.changeset/cold-ads-cough.md new file mode 100644 index 0000000000..52da3b80a2 --- /dev/null +++ b/.changeset/cold-ads-cough.md @@ -0,0 +1,6 @@ +--- +"@core/electric": minor +"electric-sql": minor +--- + +Add protocol version negotiation to websocket connection step diff --git a/clients/typescript/package.json b/clients/typescript/package.json index 53680f95c6..6c012060f6 100644 --- a/clients/typescript/package.json +++ b/clients/typescript/package.json @@ -108,6 +108,7 @@ } }, "scripts": { + "prebuild": "node -p \"'export const LIB_VERSION = \\'' + require('./package.json').version + '\\''\" > src/version.ts", "dev": "tsmodule dev", "build": "rm -rf ./dist && tsc && tsmodule build", "build-dev": "rm -rf ./dist && tsc && tsmodule build --dev", diff --git a/clients/typescript/src/drivers/better-sqlite3/index.ts b/clients/typescript/src/drivers/better-sqlite3/index.ts index e9f3477029..d8e79ab4d0 100644 --- a/clients/typescript/src/drivers/better-sqlite3/index.ts +++ b/clients/typescript/src/drivers/better-sqlite3/index.ts @@ -14,7 +14,7 @@ import { DatabaseAdapter } from './adapter' import { Database } from './database' import { ElectricClient } from '../../client/model/client' import { DbSchema } from '../../client/model/schema' -import { WebSocketNodeFactory } from '../../sockets/node' +import { WebSocketNode } from '../../sockets/node' export { DatabaseAdapter } export type { Database } @@ -27,7 +27,7 @@ export const electrify = async , T extends Database>( ): Promise> => { const dbName: DbName = db.name const adapter = opts?.adapter || new DatabaseAdapter(db) - const socketFactory = opts?.socketFactory || new WebSocketNodeFactory() + const socketFactory = opts?.socketFactory || WebSocketNode const namespace = await baseElectrify( dbName, diff --git a/clients/typescript/src/drivers/cordova-sqlite-storage/index.ts b/clients/typescript/src/drivers/cordova-sqlite-storage/index.ts index 3b7439fef8..6601760641 100644 --- a/clients/typescript/src/drivers/cordova-sqlite-storage/index.ts +++ b/clients/typescript/src/drivers/cordova-sqlite-storage/index.ts @@ -11,7 +11,7 @@ import { import { DatabaseAdapter } from './adapter' import { ElectricConfig } from '../../config' import { Database } from './database' -import { MockSocketFactory } from '../../sockets/mock' +import { MockSocket } from '../../sockets/mock' import { ElectricClient } from '../../client/model/client' import { DbSchema } from '../../client/model/schema' @@ -26,7 +26,7 @@ export const electrify = async >( ): Promise> => { const dbName: DbName = db.dbname! const adapter = opts?.adapter || new DatabaseAdapter(db) - const socketFactory = opts?.socketFactory || new MockSocketFactory() + const socketFactory = opts?.socketFactory || MockSocket const namespace = await baseElectrify( dbName, diff --git a/clients/typescript/src/drivers/cordova-sqlite-storage/test.ts b/clients/typescript/src/drivers/cordova-sqlite-storage/test.ts index 3e7425e0df..313fc95c9f 100644 --- a/clients/typescript/src/drivers/cordova-sqlite-storage/test.ts +++ b/clients/typescript/src/drivers/cordova-sqlite-storage/test.ts @@ -12,7 +12,7 @@ import { MockRegistry } from '../../satellite/mock' import { DatabaseAdapter } from './adapter' import { Database } from './database' import { MockDatabase } from './mock' -import { MockSocketFactory } from '../../sockets/mock' +import { MockSocket } from '../../sockets/mock' import { ElectricClient } from '../../client/model/client' import { ElectricConfig } from '../../config' import { DbSchema } from '../../client/model' @@ -41,7 +41,7 @@ export const initTestable = async < const adapter = opts?.adapter || new DatabaseAdapter(db) const notifier = (opts?.notifier as N) || new MockNotifier(dbName) const migrator = opts?.migrator || new MockMigrator() - const socketFactory = opts?.socketFactory || new MockSocketFactory() + const socketFactory = opts?.socketFactory || MockSocket const registry = opts?.registry || new MockRegistry() const dal = await electrify( diff --git a/clients/typescript/src/drivers/expo-sqlite/index.ts b/clients/typescript/src/drivers/expo-sqlite/index.ts index 2c9d25f568..b653bdf69f 100644 --- a/clients/typescript/src/drivers/expo-sqlite/index.ts +++ b/clients/typescript/src/drivers/expo-sqlite/index.ts @@ -11,7 +11,6 @@ import { import { DatabaseAdapter } from './adapter' import { ElectricConfig } from '../../config' import { Database } from './database' -import { WebSocketReactNativeFactory } from '../../sockets/react-native' // Provide implementation for TextEncoder/TextDecoder import 'fastestsmallesttextencoderdecoder' @@ -30,6 +29,7 @@ import uuid from 'react-native-uuid' import { ElectricClient } from '../../client/model/client' import { DbSchema } from '../../client/model/schema' +import { WebSocketReactNative } from '../../sockets/react-native' export { DatabaseAdapter } export type { Database } @@ -42,7 +42,7 @@ export const electrify = async >( ): Promise> => { const dbName: DbName = db._name! const adapter = opts?.adapter || new DatabaseAdapter(db) - const socketFactory = opts?.socketFactory || new WebSocketReactNativeFactory() + const socketFactory = opts?.socketFactory || WebSocketReactNative const namespace = await baseElectrify( dbName, diff --git a/clients/typescript/src/drivers/expo-sqlite/test.ts b/clients/typescript/src/drivers/expo-sqlite/test.ts index ae260aba5b..ec787645e1 100644 --- a/clients/typescript/src/drivers/expo-sqlite/test.ts +++ b/clients/typescript/src/drivers/expo-sqlite/test.ts @@ -12,7 +12,7 @@ import { MockRegistry } from '../../satellite/mock' import { DatabaseAdapter } from './adapter' import { Database } from './database' import { MockDatabase, MockWebSQLDatabase } from './mock' -import { MockSocketFactory } from '../../sockets/mock' +import { MockSocket } from '../../sockets/mock' import { ElectricConfig } from '../../config' import { ElectricClient } from '../../client/model/client' import { DbSchema } from '../../client/model' @@ -70,7 +70,7 @@ export async function initTestable< const adapter = opts?.adapter || new DatabaseAdapter(db) const migrator = opts?.migrator || new MockMigrator() const notifier = (opts?.notifier as N) || new MockNotifier(dbName) - const socketFactory = opts?.socketFactory || new MockSocketFactory() + const socketFactory = opts?.socketFactory || MockSocket const registry = opts?.registry || new MockRegistry() const dal = await electrify( diff --git a/clients/typescript/src/drivers/react-native-sqlite-storage/index.ts b/clients/typescript/src/drivers/react-native-sqlite-storage/index.ts index 7e03fb25be..41a9a16852 100644 --- a/clients/typescript/src/drivers/react-native-sqlite-storage/index.ts +++ b/clients/typescript/src/drivers/react-native-sqlite-storage/index.ts @@ -10,7 +10,7 @@ import { import { DatabaseAdapter } from './adapter' import { ElectricConfig } from '../../config' -import { WebSocketReactNativeFactory } from '../../sockets/react-native' +import { WebSocketReactNative } from '../../sockets/react-native' import { Database } from './database' import { ElectricClient } from '../../client/model/client' @@ -42,7 +42,7 @@ export const electrify = async >( ): Promise> => { const dbName: DbName = db.dbName const adapter = opts?.adapter || new DatabaseAdapter(db, promisesEnabled) - const socketFactory = opts?.socketFactory || new WebSocketReactNativeFactory() + const socketFactory = opts?.socketFactory || WebSocketReactNative const namespace = await baseElectrify( dbName, diff --git a/clients/typescript/src/drivers/react-native-sqlite-storage/test.ts b/clients/typescript/src/drivers/react-native-sqlite-storage/test.ts index b9eb19191f..c3f52583c9 100644 --- a/clients/typescript/src/drivers/react-native-sqlite-storage/test.ts +++ b/clients/typescript/src/drivers/react-native-sqlite-storage/test.ts @@ -15,7 +15,7 @@ import { MockRegistry } from '../../satellite/mock' import { DatabaseAdapter } from './adapter' import { Database } from './index' import { enablePromiseRuntime, MockDatabase } from './mock' -import { MockSocketFactory } from '../../sockets/mock' +import { MockSocket } from '../../sockets/mock' import { ElectricClient } from '../../client/model/client' import { ElectricConfig } from '../../config' import { DbSchema } from '../../client/model' @@ -46,7 +46,7 @@ export const initTestable = async < const adapter = opts?.adapter || new DatabaseAdapter(db, promisesEnabled) const migrator = opts?.migrator || new MockMigrator() const notifier = (opts?.notifier as N) || new MockNotifier(dbName) - const socketFactory = opts?.socketFactory || new MockSocketFactory() + const socketFactory = opts?.socketFactory || MockSocket const registry = opts?.registry || new MockRegistry() const dal = await baseElectrify( diff --git a/clients/typescript/src/drivers/wa-sqlite/index.ts b/clients/typescript/src/drivers/wa-sqlite/index.ts index d99e5ddde4..08395afcaf 100644 --- a/clients/typescript/src/drivers/wa-sqlite/index.ts +++ b/clients/typescript/src/drivers/wa-sqlite/index.ts @@ -2,7 +2,7 @@ import { DatabaseAdapter } from './adapter' import { ElectricDatabase } from './database' import { ElectricConfig } from '../../config' import { electrify as baseElectrify, ElectrifyOptions } from '../../electric' -import { WebSocketWebFactory } from '../../sockets/web' +import { WebSocketWeb } from '../../sockets/web' import { ElectricClient, DbSchema } from '../../client/model' import { Database } from './database' @@ -17,7 +17,7 @@ export const electrify = async >( ): Promise> => { const dbName = db.name const adapter = opts?.adapter || new DatabaseAdapter(db) - const socketFactory = opts?.socketFactory || new WebSocketWebFactory() + const socketFactory = opts?.socketFactory || WebSocketWeb const client = await baseElectrify( dbName, diff --git a/clients/typescript/src/satellite/client.ts b/clients/typescript/src/satellite/client.ts index 20c7fbdfd1..5062d1d08d 100644 --- a/clients/typescript/src/satellite/client.ts +++ b/clients/typescript/src/satellite/client.ts @@ -40,7 +40,7 @@ import { msgToString, serverErrorToSatelliteError, } from '../util/proto' -import { Socket, SocketFactory } from '../sockets/index' +import { PROTOCOL_VSN, Socket, SocketFactory } from '../sockets/index' import _m0 from 'protobufjs/minimal.js' import { EventEmitter } from 'events' import { @@ -232,7 +232,7 @@ export class SatelliteClient extends EventEmitter implements Client { } return new Promise((resolve, reject) => { - this.socket = this.socketFactory.create() + this.socket = new this.socketFactory(PROTOCOL_VSN) const onceError = (error: Error) => { this.close() diff --git a/clients/typescript/src/sockets/index.ts b/clients/typescript/src/sockets/index.ts index 399f69e780..303ab79678 100644 --- a/clients/typescript/src/sockets/index.ts +++ b/clients/typescript/src/sockets/index.ts @@ -1,4 +1,5 @@ import { SatelliteError } from '../util' +import { LIB_VERSION } from '../version' export type Data = string | Uint8Array @@ -6,6 +7,10 @@ export interface ConnectionOptions { url: string } +// Take major & minor version of the library +export const PROTOCOL_VSN = + 'electric.' + LIB_VERSION.split('.').slice(0, 2).join('.') + export interface Socket { open(opts: ConnectionOptions): this write(data: Data): this @@ -21,6 +26,4 @@ export interface Socket { removeErrorListener(cb: (error: SatelliteError) => void): void } -export interface SocketFactory { - create(): Socket -} +export type SocketFactory = new (protocolVersion: string) => Socket diff --git a/clients/typescript/src/sockets/mock.ts b/clients/typescript/src/sockets/mock.ts index 5a1595b5aa..08075dfd0d 100644 --- a/clients/typescript/src/sockets/mock.ts +++ b/clients/typescript/src/sockets/mock.ts @@ -1,14 +1,12 @@ import { EventEmitter } from 'events' -import { ConnectionOptions, Data, Socket, SocketFactory } from './index' +import { ConnectionOptions, Data, Socket } from './index' import { SatelliteError } from '../util' -export class MockSocketFactory implements SocketFactory { - create(): MockSocket { - return new MockSocket() +export class MockSocket extends EventEmitter implements Socket { + constructor() { + super() } -} -export class MockSocket extends EventEmitter implements Socket { open(_opts: ConnectionOptions): this { return this } diff --git a/clients/typescript/src/sockets/node.ts b/clients/typescript/src/sockets/node.ts index 689b5b20a5..9de0a2d7fa 100644 --- a/clients/typescript/src/sockets/node.ts +++ b/clients/typescript/src/sockets/node.ts @@ -1,18 +1,12 @@ import EventEmitter from 'events' -import { ConnectionOptions, Data, Socket, SocketFactory } from './index' +import { ConnectionOptions, Data, Socket } from './index' import { WebSocket } from 'ws' import { SatelliteError, SatelliteErrorCode } from '../util' -export class WebSocketNodeFactory implements SocketFactory { - create(): WebSocketNode { - return new WebSocketNode() - } -} - export class WebSocketNode extends EventEmitter implements Socket { private socket?: WebSocket - constructor() { + constructor(private protocolVsn: string) { super() } @@ -24,7 +18,7 @@ export class WebSocketNode extends EventEmitter implements Socket { ) } - this.socket = new WebSocket(opts.url) + this.socket = new WebSocket(opts.url, [this.protocolVsn]) this.socket.binaryType = 'nodebuffer' this.socket.on('open', () => this.emit('open')) diff --git a/clients/typescript/src/sockets/react-native.ts b/clients/typescript/src/sockets/react-native.ts index 8adf0e28f6..51d61ca109 100644 --- a/clients/typescript/src/sockets/react-native.ts +++ b/clients/typescript/src/sockets/react-native.ts @@ -1,13 +1,6 @@ -import { ConnectionOptions, Data, Socket, SocketFactory } from '.' +import { ConnectionOptions, Data, Socket } from '.' import { SatelliteError, SatelliteErrorCode } from '../util' -// FIXME: This implementation is a bit contrived because it is not using EventEmitter -export class WebSocketReactNativeFactory implements SocketFactory { - create(): WebSocketReactNative { - return new WebSocketReactNative() - } -} - export class WebSocketReactNative implements Socket { private socket?: WebSocket @@ -16,6 +9,8 @@ export class WebSocketReactNative implements Socket { private onceErrorCallbacks: ((error: SatelliteError) => void)[] = [] private messageCallbacks: ((data: any) => void)[] = [] + constructor(private protocolVsn: string) {} + open(opts: ConnectionOptions): this { if (this.socket) { throw new SatelliteError( @@ -24,7 +19,7 @@ export class WebSocketReactNative implements Socket { ) } - this.socket = new WebSocket(opts.url) + this.socket = new WebSocket(opts.url, [this.protocolVsn]) this.socket.binaryType = 'arraybuffer' this.socket.onopen = () => { diff --git a/clients/typescript/src/sockets/web.ts b/clients/typescript/src/sockets/web.ts index 6b52d4aabf..6df854fc4c 100644 --- a/clients/typescript/src/sockets/web.ts +++ b/clients/typescript/src/sockets/web.ts @@ -1,12 +1,6 @@ -import { ConnectionOptions, Data, Socket, SocketFactory } from '.' +import { ConnectionOptions, Data, Socket } from '.' import { SatelliteError, SatelliteErrorCode } from '../util' -export class WebSocketWebFactory implements SocketFactory { - create(): WebSocketWeb { - return new WebSocketWeb() - } -} - // FIXME: This implementation is a bit contrived because it is not using EventEmitter export class WebSocketWeb implements Socket { private socket?: WebSocket @@ -40,6 +34,8 @@ export class WebSocketWeb implements Socket { private messageListener?: (event: MessageEvent) => void private closeListener?: () => void + constructor(private protocolVsn: string) {} + open(opts: ConnectionOptions): this { if (this.socket) { throw new SatelliteError( @@ -48,7 +44,7 @@ export class WebSocketWeb implements Socket { ) } - this.socket = new WebSocket(opts.url) + this.socket = new WebSocket(opts.url, [this.protocolVsn]) this.socket.binaryType = 'arraybuffer' this.socket.addEventListener('open', this.connectListener) diff --git a/clients/typescript/src/version.ts b/clients/typescript/src/version.ts new file mode 100644 index 0000000000..3729cfa1da --- /dev/null +++ b/clients/typescript/src/version.ts @@ -0,0 +1 @@ +export const LIB_VERSION = '0.5.2' diff --git a/clients/typescript/test/migrators/builder.test.ts b/clients/typescript/test/migrators/builder.test.ts index d2e95cb79a..804ce3b56c 100644 --- a/clients/typescript/test/migrators/builder.test.ts +++ b/clients/typescript/test/migrators/builder.test.ts @@ -14,7 +14,7 @@ import Database from 'better-sqlite3' import { electrify } from '../../src/drivers/better-sqlite3' import path from 'path' import { DbSchema } from '../../src/client/model' -import { MockSocketFactory } from '../../src/sockets/mock' +import { MockSocket } from '../../src/sockets/mock' function encodeSatOpMigrateMsg(request: SatOpMigrate) { return ( @@ -132,7 +132,7 @@ test('load migration from meta data', async (t) => { token: 'test-token', }, }, - { socketFactory: new MockSocketFactory() } + { socketFactory: MockSocket } ) // Check that the DB is initialized with the stars table diff --git a/clients/typescript/test/satellite/client.test.ts b/clients/typescript/test/satellite/client.test.ts index 5e0d5c140f..9d520b0550 100644 --- a/clients/typescript/test/satellite/client.test.ts +++ b/clients/typescript/test/satellite/client.test.ts @@ -8,7 +8,7 @@ import { serializeRow, } from '../../src/satellite/client' import { OplogEntry, toTransactions } from '../../src/satellite/oplog' -import { WebSocketNodeFactory } from '../../src/sockets/node' +import { WebSocketNode } from '../../src/sockets/node' import { base64, bytesToNumber } from '../../src/util/common' import { getObjFromString, @@ -42,7 +42,7 @@ test.beforeEach((t) => { const client = new SatelliteClient( dbName, - new WebSocketNodeFactory(), + WebSocketNode, new MockNotifier(dbName), { host: '127.0.0.1', diff --git a/components/electric/.formatter.exs b/components/electric/.formatter.exs index 36c96ce09f..48e471a4c0 100644 --- a/components/electric/.formatter.exs +++ b/components/electric/.formatter.exs @@ -1,5 +1,6 @@ # Used by "mix format" [ inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"], - locals_without_parens: [test_tx: 2] + locals_without_parens: [test_tx: 2], + import_deps: [:plug] ] diff --git a/components/electric/config/runtime.exs b/components/electric/config/runtime.exs index e9bb04be69..7a934cf8c7 100644 --- a/components/electric/config/runtime.exs +++ b/components/electric/config/runtime.exs @@ -30,12 +30,13 @@ config :logger, :console, :pg_client, :pg_producer, :pg_slot, - :sq_client, + :remote_ip, :component, :instance_id, :client_id, :user_id, - :metadata + :metadata, + :request_id ] config :electric, diff --git a/components/electric/lib/electric.ex b/components/electric/lib/electric.ex index 7d48b895a2..b18fb2dedb 100644 --- a/components/electric/lib/electric.ex +++ b/components/electric/lib/electric.ex @@ -94,4 +94,9 @@ defmodule Electric do def instance_id do Application.fetch_env!(:electric, :instance_id) end + + @current_vsn Mix.Project.config()[:version] |> Version.parse!() + def vsn do + @current_vsn + end end diff --git a/components/electric/lib/electric/plug/router.ex b/components/electric/lib/electric/plug/router.ex index b35f9480d2..72b7b5bcb7 100644 --- a/components/electric/lib/electric/plug/router.ex +++ b/components/electric/lib/electric/plug/router.ex @@ -1,17 +1,15 @@ defmodule Electric.Plug.Router do - use Plug.Router, init_mode: :runtime + use Plug.Router import Plug.Conn require Logger - plug(:match) - plug(:dispatch) + plug :match + plug Plug.Logger + plug :dispatch - forward("/api/migrations", to: Electric.Plug.Migrations) - forward("/api/status", to: Electric.Plug.Status) - - match "/ws" do - Electric.Plug.SatelliteWebsocketPlug.call(conn, Electric.Plug.SatelliteWebsocketPlug.init([])) - end + forward "/api/migrations", to: Electric.Plug.Migrations + forward "/api/status", to: Electric.Plug.Status + forward "/ws", to: Electric.Plug.SatelliteWebsocketPlug match _ do send_resp(conn, 404, "Not found") diff --git a/components/electric/lib/electric/plug/satellite_websocket_plug.ex b/components/electric/lib/electric/plug/satellite_websocket_plug.ex index 8245ef5eb0..5e1b1fc896 100644 --- a/components/electric/lib/electric/plug/satellite_websocket_plug.ex +++ b/components/electric/lib/electric/plug/satellite_websocket_plug.ex @@ -1,9 +1,14 @@ defmodule Electric.Plug.SatelliteWebsocketPlug do - use Plug.Builder, init_mode: :runtime + require Logger + use Plug.Builder - def init(handler_opts), + @protocol_prefix "electric." + + def init(handler_opts), do: handler_opts + + defp build_websocket_opts(base_opts), do: - handler_opts + base_opts |> Keyword.put_new_lazy(:auth_provider, fn -> Electric.Satellite.Auth.provider() end) |> Keyword.put_new_lazy(:pg_connector_opts, fn -> Electric.Application.pg_connection_opts() @@ -13,21 +18,63 @@ defmodule Electric.Plug.SatelliteWebsocketPlug do end) def call(conn, handler_opts) do - if Bandit.WebSocket.Handshake.valid_upgrade?(conn) do - ws_opts = if List.first(conn.path_info) == "compress", do: [compress: true], else: [] - + with {:ok, conn} <- check_if_valid_upgrade(conn), + {:ok, conn} <- check_if_subprotocol_present(conn), + {:ok, conn} <- check_if_vsn_compatible(conn, with: "<= #{Electric.vsn()}") do Logger.metadata( remote_ip: conn.remote_ip |> :inet.ntoa() |> to_string(), instance_id: Electric.instance_id() ) - upgrade_adapter( - conn, + client_vsn = conn.assigns.satellite_vsn + protocol_vsn = "#{client_vsn.major}.#{client_vsn.minor}" + Logger.debug("Upgrading connection for client with protocol version #{protocol_vsn}") + + conn + |> put_resp_header("sec-websocket-protocol", @protocol_prefix <> protocol_vsn) + |> upgrade_adapter( :websocket, - {Electric.Satellite.WebsocketServer, handler_opts, ws_opts} + {Electric.Satellite.WebsocketServer, build_websocket_opts(handler_opts), []} ) else - send_resp(conn, 401, "Bad request") + {:error, code, body} -> + send_resp(conn, code, body) + end + end + + defp check_if_valid_upgrade(%Plug.Conn{} = conn) do + if Bandit.WebSocket.Handshake.valid_upgrade?(conn) do + {:ok, conn} + else + {:error, 400, "Bad request"} + end + end + + defp check_if_subprotocol_present(%Plug.Conn{} = conn) do + case get_satellite_subprotocol(conn) do + {:ok, vsn} -> {:ok, assign(conn, :satellite_vsn, vsn)} + :error -> {:error, 400, "Missing satellite websocket subprotocol"} + end + end + + defp check_if_vsn_compatible(%Plug.Conn{} = conn, with: requirements) do + if Version.match?(conn.assigns.satellite_vsn, requirements) do + {:ok, conn} + else + {:error, 400, + "Cannot connect satellite version #{conn.assigns.satellite_vsn}: this server requires #{requirements}"} + end + end + + defp get_satellite_subprotocol(%Plug.Conn{} = conn) do + get_req_header(conn, "sec-websocket-protocol") + |> Enum.filter(&String.starts_with?(&1, @protocol_prefix)) + |> case do + [@protocol_prefix <> version] when byte_size(version) < 20 -> + Version.parse(version <> ".0") + + _ -> + :error end end end diff --git a/components/electric/lib/electric/satellite/protobuf.ex b/components/electric/lib/electric/satellite/protobuf.ex index 39fd061cda..a5d7e2ddbb 100644 --- a/components/electric/lib/electric/satellite/protobuf.ex +++ b/components/electric/lib/electric/satellite/protobuf.ex @@ -144,6 +144,11 @@ defmodule Electric.Satellite.Protobuf do {:error, :unknown_msg_type} end + def decode!(tag, binary) do + {:ok, msg} = decode(tag, binary) + msg + end + @spec json_decode(byte(), binary(), list()) :: {:ok, sq_pb_msg()} | {:error, any()} for {module, tag} <- @mapping do def json_decode(unquote(tag), binary, opts) do diff --git a/components/electric/lib/mint/web_socket_client.ex b/components/electric/lib/mint/web_socket_client.ex new file mode 100644 index 0000000000..6b9687f4b5 --- /dev/null +++ b/components/electric/lib/mint/web_socket_client.ex @@ -0,0 +1,351 @@ +defmodule Mint.WebSocketClient do + @moduledoc """ + A GenServer wrapper over a Mint.WebSocket connection. + + `c:Genserver.init/1` callback tries to establish an HTTP connection + and then upgrade it to websocket. + + Any GenServer callbacks will have a state as a 2-tuple: a conn struct + to be used with functions from this module, and user-provided state. + + Any callbacks from this module (`c:handle_frame/2`, `c:do_handle_info/2`) get only + user-provided state as the second argument. + + `c:GenServer.handle_info/2` callback cannot be used due to how Mint is set up, so all messages + are forwarded to a `c:do_handle_info` callback instead. + """ + + alias Mint.WebSocketClient + require Logger + + @type conn :: %{ + conn: Mint.HTTP.t(), + websocket: Mint.WebSocket.t(), + ref: reference(), + subprotocol: String.t() | nil + } + + @type handler_return :: + {:noreply, term()} + | {:reply, [{:text, String.t()} | {:binary, binary()}], term()} + | {:stop, term(), term(), term()} + | {:stop, term(), term(), [{:text, String.t()} | {:binary, binary()}], term()} + + @callback handle_connection(subprotocol :: String.t(), conn(), init_arg :: term()) :: + {:ok, term(), [Mint.WebSocket.frame()]} | {:error, term()} + @callback handle_frame({:text, String.t()} | {:binary, binary()}, term()) :: handler_return() + + @callback do_handle_info(term(), term()) :: handler_return() + + defmacro __using__(_opts) do + quote do + use GenServer, restart: :temporary + require Logger + + alias Mint.WebSocketClient + + @behaviour Mint.WebSocketClient + + @impl GenServer + def init(init_arg) do + protocol = Keyword.get(init_arg, :protocol, :ws) + host = Keyword.fetch!(init_arg, :host) + port = Keyword.get(init_arg, :port, if(protocol == :wss, do: 443, else: 80)) + path = Keyword.get(init_arg, :path, "/") + subprotocol = Keyword.get(init_arg, :subprotocol) + + ws_headers = + [] ++ if(subprotocol, do: [{"sec-websocket-protocol", subprotocol}], else: []) + + with {:ok, state} <- + Mint.WebSocketClient.setup_ws_connection( + protocol, + host, + port, + path, + ws_headers + ) do + case handle_connection(state.subprotocol, state, Keyword.get(init_arg, :init_arg)) do + {:ok, user_state, extra_frames} -> + state = {Map.merge(state, %{status: :open}), user_state} + + Enum.reduce_while(extra_frames, {:ok, state}, fn frame, + {:ok, {conn_state, user_state}} -> + frame + |> do_handle_frame(user_state) + |> Mint.WebSocketClient.process_callback_response(conn_state) + |> case do + {:noreply, state} -> {:cont, {:ok, state}} + {:stop, reason, _} -> {:halt, {:stop, reason}} + end + end) + + {:error, reason} -> + Mint.HTTP.close(state.conn) + {:stop, {:error, reason}} + end + else + {:error, reason} -> + Logger.error("HTTP connection to the server failed: #{inspect(reason)}") + {:stop, {:error, reason}} + + {:error, conn, reason} -> + Logger.error("Failed to upgrade websocket connection: #{inspect(reason)}") + + Mint.HTTP.close(conn) + {:stop, {:error, reason}} + end + end + + @impl GenServer + def handle_info(message, {%{conn: conn, ref: ref} = conn_state, user_state} = state) do + case Mint.WebSocket.stream(conn, message) do + :unknown -> + do_handle_info(message, user_state) + |> WebSocketClient.process_callback_response(conn_state) + |> WebSocketClient.maybe_close_socket() + + {:ok, conn, [{:data, ^ref, data}]} -> + decode_and_handle(data, WebSocketClient.update_state(state, %{conn: conn})) + + {:error, conn, error, response} -> + Mint.HTTP.close(conn) + {:stop, {:error, error, response}, state} + end + end + + @spec do_handle_frame(Mint.WebSocket.frame(), {WebSocketClient.conn(), term()}) :: + WebSocketClient.handler_return() + defp do_handle_frame({:ping, text}, state), do: {:reply, {:pong, text}, state} + defp do_handle_frame({:pong, _}, state), do: {:noreply, state} + + defp do_handle_frame({:close, 1000, _}, state) do + Logger.info("Server closed the websocket connection") + + {:stop, :normal, :close, state} + end + + defp do_handle_frame({:close, code, reason}, state) do + Logger.warning( + "Server closed the websocket connection with code #{code} and reason #{reason}" + ) + + {:stop, :normal, :close, state} + end + + defp do_handle_frame(frame, state) do + handle_frame(frame, state) + end + + @spec decode_and_handle(binary(), {WebSocketClient.conn(), term()}) :: + {:noreply, term()} | {:stop, term(), term()} + defp decode_and_handle(data, {conn_state, _} = state) do + case Mint.WebSocket.decode(conn_state.websocket, data) do + {:ok, websocket, frames} -> + result_tuple = {:noreply, WebSocketClient.update_state(state, websocket: websocket)} + + Enum.reduce_while(frames, result_tuple, fn frame, {:noreply, {conn, user_state}} -> + result = + do_handle_frame(frame, user_state) + |> WebSocketClient.process_callback_response(conn) + |> WebSocketClient.maybe_close_socket() + + if elem(result, 0) == :stop do + {:halt, result} + else + {:cont, result} + end + end) + + {:error, _websocket, error} -> + Logger.debug("Invalid frame received: #{inspect(error)}") + Mint.HTTP.close(conn_state.conn) + {:stop, {:error, error}, state} + end + end + + def handle_connection(_, _, state), do: {:ok, state, []} + def handle_frame(_, state), do: {:noreply, state} + + def do_handle_info(msg, state) do + proc = + case Process.info(self(), :registered_name) do + {_, []} -> self() + {_, name} -> name + end + + Logger.warning( + "#{inspect(__MODULE__)} #{inspect(proc)} received unexpected message in do_handle_info/2: #{inspect(msg)}" + ) + + {:noreply, state} + end + + defoverridable Mint.WebSocketClient + end + end + + @doc """ + Send websocket frames over an established connection + """ + @spec send_frames(conn(), [Mint.WebSocket.frame()]) :: {:ok, conn()} | {:error, any()} + def send_frames(conn_state, frames) do + with {:ok, websocket, data} <- encode_all(conn_state.websocket, frames), + {:ok, conn} <- + Mint.WebSocket.stream_request_body(conn_state.conn, conn_state.ref, data) do + {:ok, %{conn_state | websocket: websocket, conn: conn}} + else + {:error, _, error} -> + {:error, error} + end + end + + @doc """ + Try to receive next frames over an established connection + """ + @spec receive_next_frames!(conn()) :: {:ok, conn(), [Mint.WebSocket.frame()]} + def receive_next_frames!(%{conn: conn, websocket: websocket, ref: ref} = conn_state) do + msg = receive do: ({proto, _, _} = message when proto in [:tcp, :ssl] -> message) + {:ok, conn, [{:data, ^ref, data}]} = Mint.WebSocket.stream(conn, msg) + {:ok, websocket, frames} = Mint.WebSocket.decode(websocket, data) + + {:ok, %{conn_state | websocket: websocket, conn: conn}, frames} + end + + # Internal functions used by the functions created in the `__using__` macro + @doc false + @spec process_callback_response(handler_return(), conn()) :: + {:noreply, {conn(), term()}} | {:stop, term(), {conn(), term()}} + def process_callback_response({:noreply, user_state}, conn), do: {:noreply, {conn, user_state}} + + def process_callback_response({:reply, frames, user_state}, conn) do + case send_frames(conn, List.wrap(frames)) do + {:ok, conn} -> {:noreply, {conn, user_state}} + {:error, reason} -> {:stop, {:error, reason}, {conn, user_state}} + end + end + + def process_callback_response({:stop, reason, close_reason, user_state}, conn) + when close_reason == :close + when is_tuple(close_reason) and tuple_size(close_reason) == 3 and + elem(close_reason, 0) == :close do + case send_frames(conn, List.wrap(close_reason)) do + {:ok, conn} -> {:stop, reason, {conn, user_state}} + {:error, %Mint.TransportError{reason: :closed}} -> {:stop, :normal, {conn, user_state}} + {:error, reason} -> {:stop, {:error, reason}, {conn, user_state}} + end + end + + def process_callback_response({:stop, reason, close_reason, frames, user_state}, conn) + when close_reason == :close + when is_tuple(close_reason) and tuple_size(close_reason) == 3 and + elem(close_reason, 0) == :close do + case send_frames(conn, List.wrap(frames) ++ List.wrap(close_reason)) do + {:ok, conn} -> {:stop, reason, {conn, user_state}} + {:error, %Mint.TransportError{reason: :closed}} -> {:stop, :normal, {conn, user_state}} + {:error, reason} -> {:stop, {:error, reason}, {conn, user_state}} + end + end + + @doc false + def maybe_close_socket({:stop, reason, {%{conn: conn}, _} = state}) do + {:ok, conn} = Mint.HTTP.close(conn) + {:stop, reason, update_state(state, %{conn: conn})} + end + + def maybe_close_socket({:noreply, state}), do: {:noreply, state} + + @spec encode_all(Mint.WebSocket.t(), [Mint.WebSocket.frame()]) :: + {:ok, Mint.WebSocket.t(), iolist()} | {:error, Mint.WebSocket.t(), any()} + defp encode_all(websocket, frames) do + Enum.reduce_while(List.wrap(frames), {:ok, websocket, []}, fn frame, {:ok, ws, acc} -> + case Mint.WebSocket.encode(ws, frame) do + {:ok, ws, data} -> {:cont, {:ok, ws, [data | acc]}} + {:error, ws, error} -> {:halt, {:error, ws, error}} + end + end) + |> case do + {:ok, ws, data} -> {:ok, ws, Enum.reverse(data)} + {:error, _, _} = error -> error + end + end + + @doc false + @spec update_state({conn(), term()}, map() | keyword()) :: {conn(), term()} + def update_state({internal, user}, updates) do + {Enum.into(updates, internal), user} + end + + @doc false + @spec setup_ws_connection( + :ws | :wss, + Mint.Types.address(), + :inet.port_number(), + String.t(), + Mint.Types.headers() + ) :: {:ok, conn()} | {:error, term()} | {:error, Mint.HTTP.t(), term()} + def setup_ws_connection(protocol, host, port, path \\ "/", ws_headers \\ []) + when protocol in [:ws, :wss] do + http_protocol = if protocol == :ws, do: :http, else: :https + + Logger.debug( + "Trying to establish websocket connection to #{protocol}://#{host}:#{port}#{path}" + ) + + with {:ok, conn} <- Mint.HTTP.connect(http_protocol, host, port, log: true), + {:ok, conn, ref} <- Mint.WebSocket.upgrade(protocol, conn, path, ws_headers), + http_reply_message = receive(do: (message -> message)), + {:ok, conn, [{:status, ^ref, status}, {:headers, ^ref, resp_headers}, {:done, ^ref}]} <- + Mint.WebSocket.stream(conn, http_reply_message), + {:ok, subprotocol} <- validate_subprotocol(conn, ws_headers, resp_headers), + {:ok, conn, websocket} <- + Mint.WebSocket.new(conn, ref, status, resp_headers) do + {:ok, %{conn: conn, websocket: websocket, ref: ref, subprotocol: subprotocol}} + end + end + + defp validate_subprotocol(conn, ws_headers, resp_headers) do + protocols = { + get_header(ws_headers, "sec-websocket-protocol"), + get_header(resp_headers, "sec-websocket-protocol") + } + + case protocols do + # Subprotocol wasn't requested + {nil, nil} -> + {:ok, nil} + + # Subprotocol wasn't requested, but was sent by the server + {nil, got} -> + Logger.error("No subprotocol was requested, but the server returned #{inspect(got)}") + {:error, conn, :invalid_subprotocol} + + # Subprotocol was requested, but the server ignored it + {requested, nil} -> + Logger.error("Subprotocols #{requested} were requested, but the server didn't send one") + {:error, conn, :invalid_subprotocol} + + {requested, got} -> + requested = + String.split(requested, ",", trim: true) + |> Enum.map(&String.trim/1) + + if got in requested do + {:ok, got} + else + Logger.error( + "Subprotocols #{requested} were requested, but the server returned #{inspect(got)}" + ) + + {:error, conn, :invalid_subprotocol} + end + end + end + + defp get_header(headers, key) do + Enum.find_value(headers, fn + {^key, value} -> value + _ -> nil + end) + end +end diff --git a/components/electric/lib/satellite/protocol_helpers.ex b/components/electric/lib/satellite/protocol_helpers.ex new file mode 100644 index 0000000000..0783da6aca --- /dev/null +++ b/components/electric/lib/satellite/protocol_helpers.ex @@ -0,0 +1,108 @@ +defmodule Satellite.ProtocolHelpers do + @moduledoc """ + Helper module for building protobuf objects to send via the test WS client. + Used mainly in E2E tests, which is why this is not in the test folder. + """ + use Electric.Satellite.Protobuf + alias Electric.Satellite.Serialization + + def subscription_request(id \\ nil, shape_requests) do + shape_requests + |> Enum.map(fn + {id, tables: tables} -> + %SatShapeReq{ + request_id: to_string(id), + shape_definition: %SatShapeDef{ + selects: tables |> Enum.map(&%SatShapeDef.Select{tablename: &1}) + } + } + end) + |> then( + &%SatSubsReq{ + subscription_id: id || Electric.Utils.uuid4(), + shape_requests: &1 + } + ) + end + + def schema("public.entries") do + %{ + schema: "public", + name: "entries", + oid: 11111, + primary_keys: ["id"], + columns: [ + %{name: "id", type: :uuid}, + %{name: "content", type: :varchar}, + %{name: "content_b", type: :varchar} + ] + } + end + + def relation("public.entries") do + %SatRelation{ + columns: [ + %SatRelationColumn{name: "id", type: "uuid", is_nullable: false}, + %SatRelationColumn{name: "content", type: "varchar", is_nullable: false}, + %SatRelationColumn{name: "content_b", type: "varchar", is_nullable: true} + ], + relation_id: 11111, + schema_name: "public", + table_name: "entries", + table_type: :TABLE + } + end + + def insert(table, data) when is_map(data) do + schema = schema(table) + columns = Enum.map(schema.columns, & &1.name) + + %SatOpInsert{relation_id: schema.oid, row_data: Serialization.map_to_row(data, columns)} + end + + def update(table, pk, old_data, new_data, tags \\ []) + when is_list(tags) and is_map(pk) and is_map(old_data) and is_map(new_data) do + schema = schema(table) + columns = Enum.map(schema.columns, & &1.name) + + %SatOpUpdate{ + relation_id: schema.oid, + old_row_data: Serialization.map_to_row(Map.merge(old_data, pk), columns), + row_data: Serialization.map_to_row(Map.merge(new_data, pk), columns), + tags: tags + } + end + + def delete(table, old_data, tags \\ []) when is_list(tags) and is_map(old_data) do + schema = schema(table) + columns = Enum.map(schema.columns, & &1.name) + + %SatOpDelete{ + relation_id: schema.oid, + old_row_data: Serialization.map_to_row(old_data, columns), + tags: tags + } + end + + def transaction(lsn, commit_time, op_or_ops) + when is_binary(lsn) and (is_integer(commit_time) or is_struct(commit_time, DateTime)) do + commit_time = + if is_integer(commit_time), + do: commit_time, + else: DateTime.to_unix(commit_time, :millisecond) + + begin = {:begin, %SatOpBegin{commit_timestamp: commit_time, lsn: lsn, trans_id: ""}} + commit = {:commit, %SatOpCommit{commit_timestamp: commit_time, lsn: lsn, trans_id: ""}} + ops = [begin] ++ List.wrap(op_or_ops) ++ [commit] + + ops = + Enum.map(ops, fn + {type, op} -> %SatTransOp{op: {type, op}} + %SatOpInsert{} = op -> %SatTransOp{op: {:insert, op}} + %SatOpUpdate{} = op -> %SatTransOp{op: {:update, op}} + %SatOpDelete{} = op -> %SatTransOp{op: {:delete, op}} + end) + + %SatOpLog{ops: ops} + end +end diff --git a/components/electric/lib/satellite/satellite_ws_client.ex b/components/electric/lib/satellite/satellite_ws_client.ex deleted file mode 100644 index af7e6c8c5e..0000000000 --- a/components/electric/lib/satellite/satellite_ws_client.ex +++ /dev/null @@ -1,656 +0,0 @@ -defmodule Electric.Test.SatelliteWsClient do - @moduledoc """ - - """ - alias Electric.Satellite.Serialization - alias Electric.Replication.Changes.Transaction - - use Electric.Satellite.Protobuf - - require Logger - - defmodule State do - defstruct auto_in_sub: false, - auto_ping: false, - debug: false, - filter_reply: nil, - format: :term, - history: nil, - num: 0, - parent: nil, - stream_ref: nil, - conn: nil, - last_lsn: nil - end - - def connect() do - host = {127, 0, 0, 1} - port = 5133 - connect(host, port) - end - - def connect(host, port) do - host = - case host do - h when is_binary(h) -> :erlang.binary_to_list(host) - _ -> host - end - - {:ok, conn} = :gun.open(host, port, %{:transport => :tcp}) - {:ok, _} = :gun.await_up(conn) - stream_ref = :gun.ws_upgrade(conn, "/ws", []) - - {:upgrade, [<<"websocket">>], _} = :gun.await(conn, stream_ref) - {:ok, {conn, stream_ref}} - end - - # Automatically send auth - @type opt() :: - {:auth, boolean()} - | {:host, String.t()} - | {:port, pos_integer()} - | {:debug, boolean()} - # Logging format - | {:format, :term | :json | :compact} - # Automatically respond to pings - | {:auto_ping, boolean()} - # Automatically acknowledge subscription from Electric - | {:auto_in_sub, boolean()} - # Client identification - | {:id, term()} - # Automatically subscribe to Electric starting from lsn - | {:sub, String.t()} - # Request to continue following subscriptions - | {:subscription_ids, [String.t()]} - | {:auto_register, boolean()} - - @type conn() :: atom() | pid() - - @spec connect_and_spawn([opt()]) :: {:ok, pid()} - def connect_and_spawn(opts \\ []) do - self = self() - :application.ensure_all_started(:gun) - :proc_lib.start(__MODULE__, :loop_init, [self, opts]) - end - - def with_connect(opts, fun) do - {:ok, pid} = connect_and_spawn(opts) - - try do - fun.(pid) - after - :ok = disconnect(pid) - end - end - - def is_alive(conn \\ __MODULE__) do - conn = - cond do - is_pid(conn) -> - conn - - is_atom(conn) -> - :erlang.whereis(conn) - end - - Process.alive?(conn) - end - - def gen_schema() do - %{ - schema: "public", - name: "entries", - oid: 11111, - primary_keys: ["id"], - columns: [ - %{name: "id", type: :uuid}, - %{name: "content", type: :varchar}, - %{name: "content_b", type: :varchar} - ] - } - end - - def gen_owned_schema() do - %{ - schema_name: "public", - table_name: "entries", - oid: 22222, - columns: [ - %{name: "id", type: :uuid}, - %{name: "electric_user_id", type: :varchar}, - %{name: "content", type: :varchar} - ] - } - end - - def build_subscription_request(id \\ nil, shape_requests) do - shape_requests - |> Enum.map(fn - {id, tables: tables} -> - %SatShapeReq{ - request_id: to_string(id), - shape_definition: %SatShapeDef{ - selects: tables |> Enum.map(&%SatShapeDef.Select{tablename: &1}) - } - } - end) - |> then( - &%SatSubsReq{ - subscription_id: id || Electric.Utils.uuid4(), - shape_requests: &1 - } - ) - end - - def send_test_relation(conn \\ __MODULE__) do - relation = %SatRelation{ - columns: [ - %SatRelationColumn{name: "id", type: "uuid", is_nullable: false}, - %SatRelationColumn{name: "content", type: "varchar", is_nullable: false}, - %SatRelationColumn{name: "content_b", type: "varchar", is_nullable: true} - ], - relation_id: 11111, - schema_name: "public", - table_name: "entries", - table_type: :TABLE - } - - send_data(conn, relation) - :ok - end - - def send_test_relation_owned(conn \\ __MODULE__) do - relation = %SatRelation{ - columns: [ - %SatRelationColumn{name: "id", type: "uuid", is_nullable: false}, - %SatRelationColumn{name: "electric_user_id", type: "varchar", is_nullable: false}, - %SatRelationColumn{name: "content", type: "varchar", is_nullable: false} - ], - relation_id: 22222, - schema_name: "public", - table_name: "owned_entries", - table_type: :TABLE - } - - send_data(conn, relation) - :ok - end - - def send_new_data(conn \\ __MODULE__, lsn, commit_time, id, value) do - send_tx_data( - conn, - lsn, - commit_time, - {:insert, %SatOpInsert{relation_id: 11111, row_data: map_to_row([id, value, ""])}} - ) - end - - def send_new_owned_data(conn \\ __MODULE__, lsn, commit_time, id, user_id, value) do - send_tx_data( - conn, - lsn, - commit_time, - {:insert, %SatOpInsert{relation_id: 22222, row_data: map_to_row([id, user_id, value])}} - ) - end - - def entries_table_send_insert(conn \\ __MODULE__, lsn, commit_time, data) do - columns = Enum.map(gen_schema().columns, & &1.name) - - send_tx_data( - conn, - lsn, - commit_time, - {:insert, - %SatOpInsert{ - relation_id: gen_schema().oid, - row_data: Serialization.map_to_row(data, columns) - }} - ) - end - - def entries_table_send_update( - conn \\ __MODULE__, - lsn, - commit_time, - id, - old_data, - new_data, - tags \\ [] - ) do - columns = Enum.map(gen_schema().columns, & &1.name) - - send_tx_data( - conn, - lsn, - commit_time, - {:update, - %SatOpUpdate{ - relation_id: gen_schema().oid, - old_row_data: Serialization.map_to_row(Map.put(old_data, "id", id), columns), - row_data: Serialization.map_to_row(Map.put(new_data, "id", id), columns), - tags: tags - }} - ) - end - - def entries_table_send_delete( - conn \\ __MODULE__, - lsn, - commit_time, - old_data, - tags \\ [] - ) do - columns = Enum.map(gen_schema().columns, & &1.name) - - send_tx_data( - conn, - lsn, - commit_time, - {:delete, - %SatOpDelete{ - relation_id: gen_schema().oid, - old_row_data: Serialization.map_to_row(old_data, columns), - tags: tags - }} - ) - end - - def send_update_data(conn \\ __MODULE__, lsn, commit_time, id, value, old_value) do - send_tx_data( - conn, - lsn, - commit_time, - {:update, - %SatOpUpdate{ - relation_id: 11111, - old_row_data: map_to_row([id, old_value, ""]), - row_data: map_to_row([id, value, ""]) - }} - ) - end - - def send_update_owned_data(conn \\ __MODULE__, lsn, commit_time, id, user_id, value, old_value) do - send_tx_data( - conn, - lsn, - commit_time, - {:update, - %SatOpUpdate{ - relation_id: 22222, - old_row_data: map_to_row([id, user_id, old_value]), - row_data: map_to_row([id, user_id, value]) - }} - ) - end - - def send_delete_data(conn \\ __MODULE__, lsn, commit_time, id, value) do - send_tx_data( - conn, - lsn, - commit_time, - {:delete, %SatOpDelete{relation_id: 11111, old_row_data: map_to_row([id, value, ""])}} - ) - end - - def send_delete_owned_data(conn \\ __MODULE__, lsn, commit_time, id, user_id, value) do - send_tx_data( - conn, - lsn, - commit_time, - {:delete, %SatOpDelete{relation_id: 22222, old_row_data: map_to_row([id, user_id, value])}} - ) - end - - def send_tx_data(conn, lsn, commit_time, op_or_ops) do - begin = {:begin, %SatOpBegin{commit_timestamp: commit_time, lsn: lsn, trans_id: ""}} - commit = {:commit, %SatOpCommit{commit_timestamp: commit_time, lsn: lsn, trans_id: ""}} - ops = [begin] ++ List.wrap(op_or_ops) ++ [commit] - - ops = - Enum.map(ops, fn - {type, op} -> %SatTransOp{op: {type, op}} - %SatOpInsert{} = op -> %SatTransOp{op: {:insert, op}} - %SatOpUpdate{} = op -> %SatTransOp{op: {:update, op}} - %SatOpDelete{} = op -> %SatTransOp{op: {:delete, op}} - end) - - tx = %SatOpLog{ops: ops} - - send_data(conn, tx) - :ok - end - - @doc """ - Serialize transaction that is represented in internal Electric format - """ - def send_tx_internal(conn, %Transaction{} = tx, lsn, relations) do - {sat_oplog, [], _} = Serialization.serialize_trans(tx, lsn, relations) - - for op <- sat_oplog do - send_data(conn, op) - end - end - - @doc """ - Serialize relation that is represented in internal Electric format - """ - # def send_relation_internal(conn, schema, name, oid, columns) do - def send_relation_internal(conn, table_info) do - sat_rel = Serialization.serialize_relation(table_info) - send_data(conn, sat_rel) - end - - @spec send_data(conn(), PB.sq_pb_msg(), fun() | :default) :: term() - def send_data(conn, data, filter \\ :default) do - filter = - case filter do - :default -> fn _, _ -> true end - etc -> etc - end - - send(conn, {:ctrl_stream, data, filter}) - end - - @spec send_bin_data(conn(), binary(), fun() | :default) :: term() - def send_bin_data(conn, data, filter \\ :default) do - filter = - case filter do - :default -> fn _, _ -> true end - etc -> etc - end - - send(conn, {:ctrl_bin, data, filter}) - end - - def disconnect(conn \\ __MODULE__) do - conn = - case conn do - conn when is_atom(conn) -> - :erlang.whereis(conn) - - conn when is_pid(conn) -> - conn - end - - with true <- :erlang.is_pid(conn) do - ref = :erlang.monitor(:process, conn) - send(conn, {:gun_error, :none, :none, :none}) - - receive do - {:DOWN, ^ref, :process, _, _} -> - :ok - after - 5000 -> - :erlang.exit(conn, :kill) - end - else - _ -> :ok - end - end - - def get_ets() do - __MODULE__ - end - - @spec loop_init(pid(), [opt()]) :: any - def loop_init(parent, opts) do - host = Keyword.get(opts, :host, "localhost") - port = Keyword.get(opts, :port, 5133) - {:ok, {conn, stream_ref}} = connect(host, port) - - self = self() - - t = - case Keyword.get(opts, :auto_register, true) do - true -> - Process.register(self(), __MODULE__) - :ets.new(__MODULE__, [:named_table, :ordered_set]) - - false -> - :ets.new(__MODULE__, [:ordered_set]) - end - - try do - Logger.info("started #{inspect(self)}") - - maybe_auth(conn, stream_ref, opts) - maybe_subscribe(conn, stream_ref, opts) - - :proc_lib.init_ack(parent, {:ok, self()}) - - loop(%State{ - conn: conn, - stream_ref: stream_ref, - parent: parent, - history: t, - num: 0, - filter_reply: fn _, _ -> true end, - debug: Keyword.get(opts, :debug, false), - format: Keyword.get(opts, :format, :term), - auto_ping: Keyword.get(opts, :auto_ping, false), - auto_in_sub: Keyword.get(opts, :auto_in_sub, false) - }) - rescue - e -> - Logger.error(Exception.format(:error, e, __STACKTRACE__)) - reraise e, __STACKTRACE__ - end - end - - def loop(%State{conn: conn, stream_ref: stream_ref, history: table, num: num} = state) do - receive do - {:ctrl_stream, data, filter} -> - {:ok, type, _iodata} = PB.encode(data) - maybe_debug("send data #{type}: #{inspect(data)}", state) - - :gun.ws_send(conn, stream_ref, {:binary, serialize(data)}) - loop(%State{state | filter_reply: filter}) - - {:ctrl_bin, data, filter} -> - :gun.ws_send(conn, stream_ref, {:binary, data}) - maybe_debug("send bin data: #{inspect(data)}", state) - loop(%State{state | filter_reply: filter}) - - {:gun_response, ^conn, _, _, status, headers} -> - :gun.close(conn) - Logger.error("gun error: #{inspect(status)} #{inspect(headers)}") - - {:gun_error, _, _, :none} -> - :gun.close(conn) - Logger.info("instructed to close connection") - - {:gun_error, _, _, reason} -> - :gun.close(conn) - Logger.error("gun error: #{inspect(reason)}") - - {:gun_ws, ^conn, ^stream_ref, :close} -> - :gun.close(conn) - Logger.info("gun_ws: close by the server") - - {:gun_ws, ^conn, ^stream_ref, {:binary, <> = bin}} -> - maybe_debug("received bin: #{type} #{inspect(data)}", state) - data = deserialize(bin, state.format) - - case data do - %SatPingReq{} when state.auto_ping == true -> - Process.send( - self(), - {:ctrl_stream, %SatPingResp{lsn: state.last_lsn}, state.filter_reply}, - [] - ) - - %SatInStartReplicationReq{} when state.auto_in_sub == true -> - Process.send( - self(), - {:ctrl_stream, %SatInStartReplicationResp{}, state.filter_reply}, - [] - ) - - _ -> - :ok - end - - :ets.insert(table, {num, data}) - - case state.filter_reply do - nil -> - :ok - - fun -> - case fun.(num, data) do - true -> - msg = {self(), data} - maybe_debug("sending to: #{inspect(state.parent)} #{inspect(msg)}", state) - send(state.parent, msg) - - false -> - :ok - end - end - - case data do - %SatPingReq{} -> - Logger.info("rec: #{inspect(data)}") - loop(%State{state | num: num}) - - _ -> - Logger.info("rec [#{num}]: #{inspect(data)}") - loop(%State{state | num: num + 1}) - end - - msg -> - Logger.warning("Unhandled: #{inspect(msg)}") - end - end - - def maybe_auth(conn, stream_ref, opts) do - case auth_token!(opts) do - {:ok, token} -> - id = Keyword.get(opts, :id, "id") - - headers = [%SatAuthHeaderPair{key: :PROTO_VERSION, value: PB.get_long_proto_vsn()}] - auth_req = serialize(%SatAuthReq{id: id, token: token, headers: headers}) - - :gun.ws_send(conn, stream_ref, {:binary, auth_req}) - {:ws, {:binary, auth_frame}} = :gun.await(conn, stream_ref) - %SatAuthResp{} = deserialize(auth_frame) - :ok = :gun.update_flow(conn, stream_ref, 1) - - Logger.debug("Auth passed") - - :no_auth -> - :ok - end - end - - defp auth_token!(opts) do - case Keyword.get(opts, :auth, false) do - false -> - :no_auth - - %{token: token} -> - {:ok, token} - - %{auth_config: config, user_id: user_id} -> - {:ok, Electric.Satellite.Auth.Secure.create_token(user_id, config: config)} - - %{user_id: user_id} -> - {:ok, Electric.Satellite.Auth.Secure.create_token(user_id)} - - invalid -> - raise ArgumentError, - message: - "use connect_and_spawn(auth: %{auth_provider: \"...\", user_id: \"...\"} | %{token: \"...\"}), got: #{inspect(invalid)}" - end - end - - def maybe_subscribe(conn, stream_ref, opts) do - case Keyword.get(opts, :sub, nil) do - nil -> - :ok - - "" -> - sub_req = serialize(%SatInStartReplicationReq{}) - :gun.ws_send(conn, stream_ref, {:binary, sub_req}) - Logger.debug("Subscribed at LSN=0") - - lsn -> - sub_req = - serialize(%SatInStartReplicationReq{ - lsn: lsn, - subscription_ids: Keyword.get(opts, :subscription_ids, []) - }) - - :gun.ws_send(conn, stream_ref, {:binary, sub_req}) - Logger.debug("Subscribed at LSN=#{inspect(lsn)}") - end - end - - def maybe_debug(format, %{debug: true}) do - Logger.debug(format) - end - - def maybe_debug(_, _) do - :ok - end - - def serialize(data) do - {:ok, type, iodata} = PB.encode(data) - [<>, iodata] - end - - def deserialize(binary, format \\ :term) do - <> = binary - - case format do - :compact -> - {:ok, data} = PB.decode(type, data) - compact(data) - - :term -> - {:ok, data} = PB.decode(type, data) - data - - :json when data !== <<"">> -> - {:ok, data} = PB.json_decode(type, data, [:return_maps, :use_nil]) - data - - _ -> - {:ok, data} = PB.decode(type, data) - data - end - end - - def compact(%SatOpLog{ops: ops}) do - Enum.reduce( - ops, - nil, - fn - %SatTransOp{op: {:begin, %SatOpBegin{commit_timestamp: tmp, lsn: lsn, trans_id: id}}}, - _acc -> - %{commit_timestamp: tmp, lsn: lsn, trans_id: id} - - %SatTransOp{op: {:commit, _}}, acc -> - acc - - %SatTransOp{op: {key, %{tags: _} = op}}, acc -> - acc1 = Map.update(acc, key, 1, fn n -> n + 1 end) - Map.update(acc1, :tags, op.tags, fn tags -> op.tags ++ tags end) - - %SatTransOp{op: {key, _op}}, acc -> - Map.update(acc, key, 1, fn n -> n + 1 end) - end - ) - end - - def compact(other) do - other - end - - defp map_to_row([a, b, c]) do - map = %{"id" => a, "content" => b, "content_b" => c} - columns = ["id", "content", "content_b"] - Serialization.map_to_row(map, columns) - end -end diff --git a/components/electric/lib/satellite/test_ws_client.ex b/components/electric/lib/satellite/test_ws_client.ex new file mode 100644 index 0000000000..11efcbbe29 --- /dev/null +++ b/components/electric/lib/satellite/test_ws_client.ex @@ -0,0 +1,222 @@ +defmodule Satellite.TestWsClient do + use Mint.WebSocketClient + alias Electric.Satellite.Auth + + use Electric.Satellite.Protobuf + + # Public API + + @protocol_prefix "electric." + @satellite_vsn @protocol_prefix <> "#{Electric.vsn().major}.#{Electric.vsn().minor}" + + def connect(opts) do + connection_opts = + [ + host: "127.0.0.1", + port: 5133, + protocol: :ws, + path: "/ws", + subprotocol: @satellite_vsn, + init_arg: Keyword.drop(opts, [:host, :port]) ++ [parent: self()] + ] + |> Keyword.merge(Keyword.take(opts, [:host, :port])) + + GenServer.start(__MODULE__, connection_opts, Keyword.take(opts, [:name])) + end + + def with_connect(opts, fun) when is_function(fun, 1) do + {:ok, pid} = connect(opts) + + try do + fun.(pid) + after + disconnect(pid) + end + end + + def disconnect(pid) do + GenServer.stop(pid, :shutdown) + catch + :exit, _ -> :ok + end + + @doc """ + Send given WebSocket frames to the server. + """ + @spec send_frames(GenServer.server(), Mint.WebSocket.frame() | [Mint.WebSocket.frame()]) :: + :ok | {:error, term()} + def send_frames(pid, data), do: GenServer.call(pid, {:send_frames, List.wrap(data)}) + + @doc """ + Send Satellite protocol messages to the server. + """ + @spec send_data(GenServer.server(), PB.sq_pb_msg() | [PB.sq_pb_msg()]) :: :ok | {:error, term()} + def send_data(pid, messages), do: GenServer.call(pid, {:send_data, List.wrap(messages)}) + + # Implementation + + @impl WebSocketClient + def handle_connection(@protocol_prefix <> vsn, conn, opts) do + Logger.info("Connection established with protocol vsn #{vsn}") + + opts = Map.new(opts) + + with {:ok, conn, unprocessed} <- maybe_auth(conn, opts), + {:ok, _conn} <- maybe_subscribe(conn, opts) do + table = :ets.new(:ws_client_received_messages, [:ordered_set]) + {:ok, %{opts: opts, count: 0, table: table}, unprocessed} + end + end + + def handle_connection(_, _, _) do + {:error, :wrong_subprotocol} + end + + defp maybe_auth(conn, opts) do + case auth_token!(opts[:auth]) do + {:ok, token} -> + id = Map.get(opts, :id, "id") + + headers = [%SatAuthHeaderPair{key: :PROTO_VERSION, value: PB.get_long_proto_vsn()}] + auth_req = serialize(%SatAuthReq{id: id, token: token, headers: headers}) + + {:ok, conn} = WebSocketClient.send_frames(conn, [auth_req]) + {:ok, conn, frames} = WebSocketClient.receive_next_frames!(conn) + + decoded = + Enum.map(frames, fn {:binary, <>} -> PB.decode!(type, data) end) + + {[%SatAuthResp{}], rest} = Enum.split_with(decoded, &is_struct(&1, SatAuthResp)) + + Logger.debug("Auth passed") + {:ok, conn, Enum.map(rest, &serialize/1)} + + :no_auth -> + {:ok, conn, []} + end + end + + defp maybe_subscribe(conn, opts) do + case opts[:sub] do + nil -> + {:ok, conn} + + "" -> + sub_req = serialize(%SatInStartReplicationReq{}) + {:ok, conn} = WebSocketClient.send_frames(conn, [sub_req]) + Logger.debug("Subscribed at LSN=0") + {:ok, conn} + + lsn -> + sub_req = + serialize(%SatInStartReplicationReq{ + lsn: lsn, + subscription_ids: Map.get(opts, :subscription_ids, []) + }) + + {:ok, conn} = WebSocketClient.send_frames(conn, [sub_req]) + Logger.debug("Subscribed at LSN=#{inspect(lsn)}") + {:ok, conn} + end + end + + @impl WebSocketClient + def handle_frame({:text, _}, state) do + Logger.error("Text frames are not supported for Electric protocol") + {:stop, :normal, {:close, 1003, ""}, state} + end + + def handle_frame({:binary, <>}, state) do + Logger.debug("Received type #{type} and binary: #{inspect(data, limit: :infinity)}") + + case PB.decode(type, data) do + {:ok, msg} -> + state + |> tap(&log(&1, msg)) + |> store(msg) + |> tap(&send(&1.opts.parent, {self(), msg})) + |> maybe_autorespond(msg) + + {:error, reason} -> + Logger.error("Couldn't decode message from the server: #{inspect(reason)}") + {:stop, {:error, reason}, {:close, 1007, ""}, state} + end + end + + @impl GenServer + def terminate(:shutdown, {conn, _}) do + WebSocketClient.send_frames(conn, [{:close, 1001, ""}]) + end + + def terminate(_, _), do: nil + + @impl GenServer + def handle_call({:send_frames, frames}, _from, {conn, state}) do + case WebSocketClient.send_frames(conn, frames) do + {:ok, conn} -> + {:reply, :ok, {conn, state}} + + {:error, %Mint.TransportError{reason: :closed}} = error -> + {:stop, :normal, error, {conn, state}} + + error -> + {:reply, error, {conn, state}} + end + end + + def handle_call({:send_data, messages}, _from, {conn, state}) do + frames = Enum.map(messages, &serialize/1) + + case WebSocketClient.send_frames(conn, frames) do + {:ok, conn} -> + {:reply, :ok, {conn, state}} + + {:error, %Mint.TransportError{reason: :closed}} = error -> + {:stop, :normal, error, {conn, state}} + + error -> + {:reply, error, {conn, state}} + end + end + + defp store(state, %SatPingReq{}), do: state + + defp store(%{count: count} = state, msg) do + :ets.insert(state.table, {count, msg}) + %{state | count: count + 1} + end + + defp log(_, %SatPingReq{} = msg), do: Logger.info("rec: #{inspect(msg)}") + defp log(state, msg), do: Logger.info("rec [#{state.count}]: #{inspect(msg)}") + + defp maybe_autorespond(%{opts: %{auto_ping: true}} = state, %SatPingReq{}) do + {:reply, serialize(%SatPingResp{lsn: nil}), state} + end + + defp maybe_autorespond(%{opts: %{auto_in_sub: true}} = state, %SatInStartReplicationReq{}) do + {:reply, serialize(%SatInStartReplicationResp{}), state} + end + + defp maybe_autorespond(state, _), do: {:noreply, state} + + defp auth_token!(nil), do: :no_auth + defp auth_token!(%{token: token}), do: {:ok, token} + + defp auth_token!(%{auth_config: config, user_id: user_id}), + do: {:ok, Auth.Secure.create_token(user_id, config: config)} + + defp auth_token!(%{user_id: user_id}), do: {:ok, Auth.Secure.create_token(user_id)} + + defp auth_token!(invalid), + do: + raise(ArgumentError, + message: + ~s'expected %{auth_provider: "...", user_id: "..."} | %{token: "..."}, got: #{inspect(invalid)}' + ) + + @spec serialize(struct) :: {:binary, binary()} + def serialize(data) do + {:ok, type, iodata} = PB.encode(data) + {:binary, IO.iodata_to_binary([<>, iodata])} + end +end diff --git a/components/electric/mix.exs b/components/electric/mix.exs index a2e37168a4..099e04ac7d 100644 --- a/components/electric/mix.exs +++ b/components/electric/mix.exs @@ -18,7 +18,7 @@ defmodule Electric.MixProject do releases: [ electric: [applications: [electric: :permanent], include_executables_for: [:unix]], ws_client: [ - applications: [electric: :load, gun: :permanent], + applications: [electric: :load], include_executables_for: [:unix], runtime_config_path: false ] @@ -49,7 +49,6 @@ defmodule Electric.MixProject do {:excoveralls, "~> 0.14", only: :test, runtime: false}, {:gproc, "~> 0.9.0"}, {:protox, "~> 1.7"}, - {:gun, "~> 2.0"}, {:gen_stage, "~> 1.2"}, {:telemetry, "~> 1.1", override: true}, {:telemetry_poller, "~> 1.0"}, @@ -64,7 +63,9 @@ defmodule Electric.MixProject do {:pg_query_ex, github: "electric-sql/pg_query_ex"}, {:nimble_pool, "~> 1.0"}, {:bandit, "~> 1.0-pre"}, - {:thousand_island, "~> 1.0-pre"} + {:thousand_island, "~> 1.0-pre"}, + {:mint_web_socket, "~> 1.0"}, + {:mint, "~> 1.5"} ] end diff --git a/components/electric/mix.lock b/components/electric/mix.lock index b7356c0cc0..b50f723f8b 100644 --- a/components/electric/mix.lock +++ b/components/electric/mix.lock @@ -3,7 +3,6 @@ "bandit": {:hex, :bandit, "1.0.0-pre.14", "3ac3f41cf4fdf2bb234bc6a27aad2b63c4d55a1e003f0271dc9118ff113392b6", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0-pre.5", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "550f79ad03ccfd85c86a392b033a075637bcb06ca0a2abad05bbafd1e9989e70"}, "cc_precompiler": {:hex, :cc_precompiler, "0.1.7", "77de20ac77f0e53f20ca82c563520af0237c301a1ec3ab3bc598e8a96c7ee5d9", [:mix], [{:elixir_make, "~> 0.7.3", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "2768b28bf3c2b4f788c995576b39b8cb5d47eb788526d93bd52206c1d8bf4b75"}, "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, - "cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"}, "db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "dialyxir": {:hex, :dialyxir, "1.3.0", "fd1672f0922b7648ff9ce7b1b26fcf0ef56dda964a459892ad15f6b4410b5284", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "00b2a4bcd6aa8db9dcb0b38c1225b7277dca9bc370b6438715667071a304696f"}, @@ -15,7 +14,6 @@ "exqlite": {:hex, :exqlite, "0.13.14", "acd8b58c2245c6aa611262a887509c6aa862a05bfeb174faf348375bd9fc7edb", [:make, :mix], [{:cc_precompiler, "~> 0.1", [hex: :cc_precompiler, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "e81cd9b811e70a43b8d2d4ee76d3ce57ff349890ec4182f8f5223ead38ac4996"}, "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, - "gun": {:hex, :gun, "2.0.1", "160a9a5394800fcba41bc7e6d421295cf9a7894c2252c0678244948e3336ad73", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "a10bc8d6096b9502205022334f719cc9a08d9adcfbfc0dbee9ef31b56274a20b"}, "hackney": {:hex, :hackney, "1.18.2", "d7ff544ddae5e1cb49e9cf7fa4e356d7f41b283989a1c304bfc47a8cc1cf966f", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "af94d5c9f97857db257090a4a10e5426ecb6f4918aa5cc666798566ae14b65fd"}, "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, @@ -27,6 +25,8 @@ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"}, + "mint_web_socket": {:hex, :mint_web_socket, "1.0.3", "aab42fff792a74649916236d0b01f560a0b3f03ca5dea693c230d1c44736b50e", [:mix], [{:mint, ">= 1.4.1 and < 2.0.0-0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "ca3810ca44cc8532e3dce499cc17f958596695d226bb578b2fbb88c09b5954b0"}, "mock": {:hex, :mock, "0.3.8", "7046a306b71db2488ef54395eeb74df0a7f335a7caca4a3d3875d1fc81c884dd", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "7fa82364c97617d79bb7d15571193fc0c4fe5afd0c932cef09426b3ee6fe2022"}, "mox": {:hex, :mox, "1.0.2", "dc2057289ac478b35760ba74165b4b3f402f68803dd5aecd3bfd19c183815d64", [:mix], [], "hexpm", "f9864921b3aaf763c8741b5b8e6f908f44566f1e427b2630e89e9a73b981fef2"}, "nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"}, diff --git a/components/electric/test/electric/satellite/subscriptions_test.exs b/components/electric/test/electric/satellite/subscriptions_test.exs index b9f7ccd1aa..fb5bc2a78c 100644 --- a/components/electric/test/electric/satellite/subscriptions_test.exs +++ b/components/electric/test/electric/satellite/subscriptions_test.exs @@ -5,7 +5,7 @@ defmodule Electric.Satellite.SubscriptionsTest do import Electric.Postgres.TestConnection import Electric.Utils, only: [uuid4: 0] import ElectricTest.SetupHelpers - alias Electric.Test.SatelliteWsClient, as: MockClient + alias Satellite.TestWsClient, as: MockClient alias Electric.Postgres.CachedWal describe "Handling of real subscriptions" do diff --git a/components/electric/test/electric/satellite/ws_pg_to_satellite_test.exs b/components/electric/test/electric/satellite/ws_pg_to_satellite_test.exs index cf7db5b100..bca8e68c50 100644 --- a/components/electric/test/electric/satellite/ws_pg_to_satellite_test.exs +++ b/components/electric/test/electric/satellite/ws_pg_to_satellite_test.exs @@ -6,7 +6,7 @@ defmodule Electric.Satellite.WsPgToSatelliteTest do import Electric.Postgres.TestConnection import ElectricTest.SatelliteHelpers - alias Electric.Test.SatelliteWsClient, as: MockClient + alias Satellite.TestWsClient, as: MockClient alias Electric.Satellite.Auth setup :setup_replicated_db diff --git a/components/electric/test/electric/satellite/ws_server_test.exs b/components/electric/test/electric/satellite/ws_server_test.exs index 28a76f7a12..9038d8fdb4 100644 --- a/components/electric/test/electric/satellite/ws_server_test.exs +++ b/components/electric/test/electric/satellite/ws_server_test.exs @@ -9,7 +9,7 @@ defmodule Electric.Satellite.WebsocketServerTest do alias Electric.Replication.SatelliteConnector alias Electric.Postgres.CachedWal.Producer - alias Electric.Test.SatelliteWsClient, as: MockClient + alias Satellite.TestWsClient, as: MockClient alias Electric.Replication.Changes @@ -118,7 +118,7 @@ defmodule Electric.Satellite.WebsocketServerTest do test "sanity check", ctx do with_connect([port: ctx.port], fn conn -> Process.sleep(1000) - assert true == MockClient.is_alive(conn) + assert Process.alive?(conn) end) end @@ -168,14 +168,14 @@ defmodule Electric.Satellite.WebsocketServerTest do test "Server will handle bad requests", ctx do with_connect([port: ctx.port], fn conn -> - MockClient.send_bin_data(conn, <<"rubbish">>) + MockClient.send_frames(conn, {:binary, "rubbish"}) assert_receive {^conn, %SatErrorResp{}}, @default_wait end) end test "Server will handle bad requests after auth", ctx do with_connect([port: ctx.port, auth: ctx], fn conn -> - MockClient.send_bin_data(conn, <<"rubbish">>) + MockClient.send_frames(conn, {:binary, "rubbish"}) assert_receive {^conn, %SatErrorResp{}}, @default_wait end) end @@ -268,7 +268,7 @@ defmodule Electric.Satellite.WebsocketServerTest do test "Server will forbid two connections that use same id", ctx do with_connect([auth: ctx, id: ctx.client_id, port: ctx.port], fn _conn -> - {:ok, pid} = MockClient.connect_and_spawn(auto_register: false, port: ctx.port) + {:ok, pid} = MockClient.connect(port: ctx.port) MockClient.send_data(pid, %SatAuthReq{ id: ctx.client_id, @@ -671,8 +671,6 @@ defmodule Electric.Satellite.WebsocketServerTest do end def clean_connections() do - MockClient.disconnect() - :ok = drain_pids(active_clients()) :ok = drain_active_resources(connectors()) end diff --git a/components/electric/test/electric/satellite/ws_validations_test.exs b/components/electric/test/electric/satellite/ws_validations_test.exs index d11a06e4de..31fd139050 100644 --- a/components/electric/test/electric/satellite/ws_validations_test.exs +++ b/components/electric/test/electric/satellite/ws_validations_test.exs @@ -6,7 +6,7 @@ defmodule Electric.Satellite.WsValidationsTest do import Electric.Postgres.TestConnection import ElectricTest.SatelliteHelpers - alias Electric.Test.SatelliteWsClient, as: MockClient + alias Satellite.TestWsClient, as: MockClient alias Electric.Satellite.Auth alias Electric.Satellite.Serialization diff --git a/components/electric/test/support/satellite_helpers.ex b/components/electric/test/support/satellite_helpers.ex index 9403986ed2..4191255945 100644 --- a/components/electric/test/support/satellite_helpers.ex +++ b/components/electric/test/support/satellite_helpers.ex @@ -3,7 +3,7 @@ defmodule ElectricTest.SatelliteHelpers do import ExUnit.Assertions - alias Electric.Test.SatelliteWsClient, as: MockClient + alias Satellite.TestWsClient, as: MockClient # Send a ping to WebsocketServer. Useful to make sure it is done with initial sync. def ping_server(conn) do @@ -27,7 +27,7 @@ defmodule ElectricTest.SatelliteHelpers do assert lsn > 0 end - def with_connect(opts, fun), do: Electric.Test.SatelliteWsClient.with_connect(opts, fun) + def with_connect(opts, fun), do: MockClient.with_connect(opts, fun) def migrate(conn, version, table \\ nil, sql) do results = diff --git a/components/electric/test/support/setup_helpers.ex b/components/electric/test/support/setup_helpers.ex index d5d2aabfa5..e2875ca52a 100644 --- a/components/electric/test/support/setup_helpers.ex +++ b/components/electric/test/support/setup_helpers.ex @@ -21,7 +21,7 @@ defmodule ElectricTest.SetupHelpers do end @doc """ - Asserts server sends all messages that it should to `Electric.Test.SatelliteWsClient` after + Asserts server sends all messages that it should to `Satellite.TestWsClient` after replication request has been sent. Assumes that the database has been migrated before the replication started, and that @@ -49,7 +49,7 @@ defmodule ElectricTest.SetupHelpers do end @doc """ - Wait for and receives subscription data response as sent back to the test process by `Electric.Test.SatelliteWsClient`. + Wait for and receives subscription data response as sent back to the test process by `Satellite.TestWsClient`. Waits for the `SatSubsDataBegin` message, then for each shape data, then for the end message, and verifies their order. Returns a map, where the shape request ids are keys, and the `SatOpInsert` operations are values. diff --git a/e2e/common.luxinc b/e2e/common.luxinc index 9b3273c113..8d7ce064bd 100644 --- a/e2e/common.luxinc +++ b/e2e/common.luxinc @@ -72,6 +72,7 @@ ?$eprompt !Logger.configure(level: :debug) !Application.put_env(:elixir, :ansi_enabled, false) + !alias Satellite.{TestWsClient, ProtocolHelpers} ?$eprompt [endmacro] diff --git a/e2e/tests/02.04_satellite_write_gets_propagated_to_postgres.lux b/e2e/tests/02.04_satellite_write_gets_propagated_to_postgres.lux index 0ba195a778..34bf74ad2a 100644 --- a/e2e/tests/02.04_satellite_write_gets_propagated_to_postgres.lux +++ b/e2e/tests/02.04_satellite_write_gets_propagated_to_postgres.lux @@ -11,19 +11,15 @@ [invoke client_session $user_id_1 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt """! - Electric.Test.SatelliteWsClient.entries_table_send_insert( - conn, - "1", # lsn - 1686009600000, # Unix timestamp of ~U[2023-06-06 00:00:00Z] |> DateTime.to_unix(:millisecond) - %{ - "id" => "00000000-0000-0000-0000-000000000000", - "content" => "sentinel value" - } - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", ~U[2023-06-06 00:00:00Z], [ + ProtocolHelpers.insert("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000", "content" => "sentinel value"} + ) + ])) """ ?:ok ?$eprompt diff --git a/e2e/tests/02.05_delete_gets_registered_correctly.lux b/e2e/tests/02.05_delete_gets_registered_correctly.lux index ac33a7fb4a..3c5670cd8c 100644 --- a/e2e/tests/02.05_delete_gets_registered_correctly.lux +++ b/e2e/tests/02.05_delete_gets_registered_correctly.lux @@ -9,7 +9,7 @@ [invoke start_elixir_test 1] [invoke client_session 1 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt @@ -25,13 +25,12 @@ [my seen_tag=$1] # We do an update having "seen" only the insert """! - Electric.Test.SatelliteWsClient.entries_table_send_delete( - conn, - "1", # lsn - DateTime.utc_now() |> DateTime.to_unix(:millisecond), - %{"id" => "00000000-0000-0000-0000-000000000000", "content" => "original value", "content_b" => nil}, - ["$seen_tag"] - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.utc_now(), [ + ProtocolHelpers.delete("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000", "content" => "original value", "content_b" => nil}, + ["$seen_tag"] + ) + ])) """ ?:ok ?$eprompt diff --git a/e2e/tests/02.06_subscriptions_can_be_resumed_on_reconnect.lux b/e2e/tests/02.06_subscriptions_can_be_resumed_on_reconnect.lux index 067887c658..5d8f21fca3 100644 --- a/e2e/tests/02.06_subscriptions_can_be_resumed_on_reconnect.lux +++ b/e2e/tests/02.06_subscriptions_can_be_resumed_on_reconnect.lux @@ -28,7 +28,7 @@ ?\} ?tags: \["postgres_1@\d{13,16}"\] ?\} - !:proc_lib.stop(conn) + !TestWsClient.disconnect(conn) ?$eprompt # New row inserted while client is "offline" diff --git a/e2e/tests/05.01_confict_resolution_updates_in_the_past_discarded.lux b/e2e/tests/05.01_confict_resolution_updates_in_the_past_discarded.lux index 60d3a68e3f..bed7ef0536 100644 --- a/e2e/tests/05.01_confict_resolution_updates_in_the_past_discarded.lux +++ b/e2e/tests/05.01_confict_resolution_updates_in_the_past_discarded.lux @@ -9,7 +9,7 @@ [invoke start_elixir_test 1] [invoke client_session 1 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt @@ -22,14 +22,13 @@ [shell user_1_ws1] # Send one update touching a single column """! - Electric.Test.SatelliteWsClient.entries_table_send_update( - conn, - "1", # lsn - 1686009600000, # Unix timestamp of ~U[2023-06-06 00:00:00Z] |> DateTime.to_unix(:millisecond) - "00000000-0000-0000-0000-000000000000", # id - %{"content" => "original value", "content_b" => nil}, # old - %{"content" => "updated on client 1", "content_b" => nil} # new - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", ~U[2023-06-06 00:00:00Z], [ + ProtocolHelpers.update("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000"}, # pk + %{"content" => "original value", "content_b" => nil}, # old + %{"content" => "updated on client 1", "content_b" => nil} # new + ) + ])) """ ?:ok ?$eprompt diff --git a/e2e/tests/05.02_confict_resolution_per_column.lux b/e2e/tests/05.02_confict_resolution_per_column.lux index bb4d980ed1..86bd2d2610 100644 --- a/e2e/tests/05.02_confict_resolution_per_column.lux +++ b/e2e/tests/05.02_confict_resolution_per_column.lux @@ -9,7 +9,7 @@ [invoke start_elixir_test 1] [invoke client_session 1 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt @@ -18,7 +18,7 @@ [invoke start_elixir_test 2] [invoke client_session 2 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt @@ -32,15 +32,14 @@ [my seen_tag=$1] # Send one update touching a single column """! - Electric.Test.SatelliteWsClient.entries_table_send_update( - conn, - "1", # lsn - DateTime.utc_now() |> DateTime.to_unix(:millisecond), - "00000000-0000-0000-0000-000000000000", # id - %{"content" => "original value", "content_b" => nil}, # old - %{"content" => "updated on client 1", "content_b" => nil}, # new - ["$seen_tag"] - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.utc_now(), [ + ProtocolHelpers.update("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000"}, # pk + %{"content" => "original value", "content_b" => nil}, # old + %{"content" => "updated on client 1", "content_b" => nil}, # new + ["$seen_tag"] + ) + ])) """ ?:ok ?$eprompt @@ -50,15 +49,14 @@ [my seen_tag=$1] # Send one update touching a single column """! - Electric.Test.SatelliteWsClient.entries_table_send_update( - conn, - "1", # lsn - DateTime.utc_now() |> DateTime.to_unix(:millisecond), - "00000000-0000-0000-0000-000000000000", # id - %{"content" => "original value", "content_b" => nil}, # old - %{"content" => "original value", "content_b" => "updated on client 2"}, # new - ["$seen_tag"] - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.utc_now(), [ + ProtocolHelpers.update("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000"}, # pk + %{"content" => "original value", "content_b" => nil}, # old + %{"content" => "original value", "content_b" => "updated on client 2"}, # new + ["$seen_tag"] + ) + ])) """ ?:ok ?$eprompt diff --git a/e2e/tests/05.03_postgres_wins_against_concurrent_transactions.lux b/e2e/tests/05.03_postgres_wins_against_concurrent_transactions.lux index 5cf5f18584..0eacd14d00 100644 --- a/e2e/tests/05.03_postgres_wins_against_concurrent_transactions.lux +++ b/e2e/tests/05.03_postgres_wins_against_concurrent_transactions.lux @@ -9,7 +9,7 @@ [invoke start_elixir_test 1] [invoke client_session 1 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt @@ -27,15 +27,14 @@ [my seen_tag=$1] # We do an update while transaction on PG is open """! - Electric.Test.SatelliteWsClient.entries_table_send_update( - conn, - "1", # lsn - DateTime.utc_now() |> DateTime.add(86400, :second) |> DateTime.to_unix(:millisecond), # + 1 day - "00000000-0000-0000-0000-000000000000", # id - %{"content" => "original value", "content_b" => nil}, # old - %{"content" => "updated on client 1", "content_b" => nil}, # new - ["$seen_tag"] - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.add(DateTime.utc_now(), 86400, :second), [ + ProtocolHelpers.update("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000"}, # pk + %{"content" => "original value", "content_b" => nil}, # old + %{"content" => "updated on client 1", "content_b" => nil}, # new + ["$seen_tag"] + ) + ])) """ ?:ok ?$eprompt diff --git a/e2e/tests/05.04_update_that_didnt_see_delete_resurrects.lux b/e2e/tests/05.04_update_that_didnt_see_delete_resurrects.lux index 02eb827285..565a5c538e 100644 --- a/e2e/tests/05.04_update_that_didnt_see_delete_resurrects.lux +++ b/e2e/tests/05.04_update_that_didnt_see_delete_resurrects.lux @@ -9,7 +9,7 @@ [invoke start_elixir_test 1] [invoke client_session 1 1] [invoke elixir_client_subscribe "entries"] - !Electric.Test.SatelliteWsClient.send_test_relation(conn) + !TestWsClient.send_data(conn, ProtocolHelpers.relation("public.entries")) ?:ok ?$eprompt @@ -30,15 +30,14 @@ [my seen_tag=$1] # We do an update having "seen" only the insert """! - Electric.Test.SatelliteWsClient.entries_table_send_update( - conn, - "1", # lsn - DateTime.utc_now() |> DateTime.add(86400, :second) |> DateTime.to_unix(:millisecond), # + 1 day - "00000000-0000-0000-0000-000000000000", # id - %{"content" => "original value", "content_b" => nil}, # old - %{"content" => "original value", "content_b" => "new value"}, # new - ["$seen_tag"] - ) + TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.add(DateTime.utc_now(), 86400, :second), [ + ProtocolHelpers.update("public.entries", + %{"id" => "00000000-0000-0000-0000-000000000000"}, # pk + %{"content" => "original value", "content_b" => nil}, # old + %{"content" => "original value", "content_b" => "new value"}, # new + ["$seen_tag"] + ) + ])) """ ?:ok ?$eprompt diff --git a/e2e/tests/05.05_compensations_within_same_tx_are_fine.lux b/e2e/tests/05.05_compensations_within_same_tx_are_fine.lux index 3eb5bfad5b..f916a13f3d 100644 --- a/e2e/tests/05.05_compensations_within_same_tx_are_fine.lux +++ b/e2e/tests/05.05_compensations_within_same_tx_are_fine.lux @@ -29,7 +29,7 @@ !alias Electric.Satellite.V14.{SatRelation, SatRelationColumn, SatOpInsert, SatOpUpdate, SatOpRow} """! - Electric.Test.SatelliteWsClient.send_data(conn, %SatRelation{ + Satellite.TestWsClient.send_data(conn, %SatRelation{ columns: [ %SatRelationColumn{name: "id", type: "text", is_nullable: false}, %SatRelationColumn{name: "content", type: "text", is_nullable: false}, @@ -42,7 +42,7 @@ """ ?$eprompt """! - Electric.Test.SatelliteWsClient.send_data(conn, %SatRelation{ + Satellite.TestWsClient.send_data(conn, %SatRelation{ columns: [ %SatRelationColumn{name: "id", type: "text", is_nullable: false}, %SatRelationColumn{name: "content", type: "text", is_nullable: false}, @@ -56,11 +56,11 @@ """ ?$eprompt """! - Electric.Test.SatelliteWsClient.send_tx_data(conn, "1", DateTime.to_unix(DateTime.utc_now(), :millisecond), [ - %SatOpInsert{relation_id: 1, row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["00000000-0000-0000-0000-000000000000", "test_content"]}}, - %SatOpUpdate{relation_id: 1, row_data: %SatOpRow{nulls_bitmask: <<0::1, 1::1, 0::6>>, values: ["00000000-0000-0000-0000-000000000000", ""]}}, - %SatOpInsert{relation_id: 2, row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["00000000-0000-0000-0000-000000000001", "child content", "00000000-0000-0000-0000-000000000000"]}}, - ]) + Satellite.TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.to_unix(DateTime.utc_now(), :millisecond), [ + %SatOpInsert{relation_id: 1, row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["00000000-0000-0000-0000-000000000000", "test_content"]}}, + %SatOpUpdate{relation_id: 1, row_data: %SatOpRow{nulls_bitmask: <<0::1, 1::1, 0::6>>, values: ["00000000-0000-0000-0000-000000000000", ""]}}, + %SatOpInsert{relation_id: 2, row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["00000000-0000-0000-0000-000000000001", "child content", "00000000-0000-0000-0000-000000000000"]}}, + ])) """ ?$eprompt diff --git a/e2e/tests/_shared.luxinc b/e2e/tests/_shared.luxinc index b43c801376..7e1b650516 100644 --- a/e2e/tests/_shared.luxinc +++ b/e2e/tests/_shared.luxinc @@ -38,18 +38,16 @@ alg: "HS256", key: "integration-tests-signing-key-example" \ ) ?$eprompt - """!{:ok, conn} = Electric.Test.SatelliteWsClient.connect_and_spawn( + """!{:ok, conn} = TestWsClient.connect( auth: %{auth_config: auth_config, user_id: "$user_id"}, id: "$client_id", - debug: true, sub: "", auto_in_sub: true, - format: :term, host: "electric_1", auto_ping: true) """ ?+$eprompt - ?+sending to: (.*)%Electric.Satellite.V\d+.SatInStartReplicationReq\{(.*)lsn: "", ([^\}]*)\} + ?+rec \[\d\]: %Electric.Satellite.V\d+.SatInStartReplicationReq\{ ?rec \[\d\]: %Electric.Satellite.V\d+.SatInStartReplicationResp\{ [endmacro] @@ -60,27 +58,25 @@ alg: "HS256", key: "integration-tests-signing-key-example" \ ) ?$eprompt - """!{:ok, conn} = Electric.Test.SatelliteWsClient.connect_and_spawn( + """!{:ok, conn} = TestWsClient.connect( auth: %{auth_config: auth_config, user_id: "$user_id"}, id: "$client_id", - debug: true, sub: "$position", subscription_ids: ~w|$subscription_ids|, auto_in_sub: true, - format: :term, host: "electric_1", auto_ping: true) """ - ?started #PID + ?Connection established with protocol vsn ?Auth passed ?Subscribed - ?sending to: (.*)%Electric.Satellite.V\d+.SatInStartReplicationReq\{(.*)lsn: "", ([^\}]*)\} + ?+rec \[\d\]: %Electric.Satellite.V\d+.SatInStartReplicationReq\{ ?rec \[\d\]: %Electric.Satellite.V\d+.SatInStartReplicationResp\{ [endmacro] [macro elixir_client_subscribe tables] """! - Electric.Test.SatelliteWsClient.send_data(conn, Electric.Test.SatelliteWsClient.build_subscription_request(request_1: [tables: ~w|$tables|])) + TestWsClient.send_data(conn, ProtocolHelpers.subscription_request(request_1: [tables: ~w|$tables|])) """ ?+$eprompt ?rec \[\d\]: %Electric.Satellite.V\d+.SatSubsResp\{[^\}]+err: nil @@ -88,7 +84,7 @@ [macro elixir_client_subscribe_with_id id tables] """! - Electric.Test.SatelliteWsClient.send_data(conn, Electric.Test.SatelliteWsClient.build_subscription_request("$id", request_1: [tables: ~w|$tables|])) + TestWsClient.send_data(conn, ProtocolHelpers.subscription_request("$id", request_1: [tables: ~w|$tables|])) """ ?+$eprompt ?rec \[\d\]: %Electric.Satellite.V\d+.SatSubsResp\{[^\}]+err: nil