From f991283c91d8573d732010a7b2304efa72dd64f7 Mon Sep 17 00:00:00 2001 From: Ilia Borovitinov Date: Thu, 7 Sep 2023 13:47:43 +0300 Subject: [PATCH] chore: removed SatPingReq/Resp from the protocol (#408) --- .../src/_generated/protocol/satellite.ts | 105 ----- clients/typescript/src/satellite/client.ts | 23 - clients/typescript/src/util/proto.ts | 10 - .../typescript/test/satellite/client.test.ts | 23 - .../test/satellite/server_ws_stub.ts | 20 - .../lib/electric/satellite/protobuf.ex | 14 +- .../electric/satellite/protobuf_messages.ex | 397 ------------------ .../lib/electric/satellite/protocol.ex | 14 +- .../lib/electric/satellite/ws_server.ex | 6 - .../electric/lib/satellite/test_ws_client.ex | 7 - .../test/electric/satellite/protobuf_test.exs | 7 - .../satellite/ws_pg_to_satellite_test.exs | 5 - .../electric/satellite/ws_server_test.exs | 12 +- .../satellite/ws_validations_test.exs | 12 - .../test/support/satellite_helpers.ex | 6 - protocol/satellite.proto | 11 - 16 files changed, 12 insertions(+), 660 deletions(-) diff --git a/clients/typescript/src/_generated/protocol/satellite.ts b/clients/typescript/src/_generated/protocol/satellite.ts index 20f8d8117d..64c53299bb 100644 --- a/clients/typescript/src/_generated/protocol/satellite.ts +++ b/clients/typescript/src/_generated/protocol/satellite.ts @@ -32,22 +32,6 @@ export enum SatAuthHeader { UNRECOGNIZED = -1, } -/** Ping request. Can be sent by any party. */ -export interface SatPingReq { - $type: "Electric.Satellite.SatPingReq"; -} - -/** Ping response. */ -export interface SatPingResp { - $type: "Electric.Satellite.SatPingResp"; - /** - * If LSN is present, it conveys to producer the latest LSN position that - * was applied on the consumer side. If there is no active replication - * ongoing the field should be left empty. - */ - lsn?: Uint8Array | undefined; -} - export interface SatAuthHeaderPair { $type: "Electric.Satellite.SatAuthHeaderPair"; key: SatAuthHeader; @@ -585,95 +569,6 @@ export interface SatShapeDataEnd { $type: "Electric.Satellite.SatShapeDataEnd"; } -function createBaseSatPingReq(): SatPingReq { - return { $type: "Electric.Satellite.SatPingReq" }; -} - -export const SatPingReq = { - $type: "Electric.Satellite.SatPingReq" as const, - - encode(_: SatPingReq, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { - return writer; - }, - - decode(input: _m0.Reader | Uint8Array, length?: number): SatPingReq { - const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); - let end = length === undefined ? reader.len : reader.pos + length; - const message = createBaseSatPingReq(); - while (reader.pos < end) { - const tag = reader.uint32(); - switch (tag >>> 3) { - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - - create, I>>(base?: I): SatPingReq { - return SatPingReq.fromPartial(base ?? {}); - }, - - fromPartial, I>>(_: I): SatPingReq { - const message = createBaseSatPingReq(); - return message; - }, -}; - -messageTypeRegistry.set(SatPingReq.$type, SatPingReq); - -function createBaseSatPingResp(): SatPingResp { - return { $type: "Electric.Satellite.SatPingResp", lsn: undefined }; -} - -export const SatPingResp = { - $type: "Electric.Satellite.SatPingResp" as const, - - encode(message: SatPingResp, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { - if (message.lsn !== undefined) { - writer.uint32(10).bytes(message.lsn); - } - return writer; - }, - - decode(input: _m0.Reader | Uint8Array, length?: number): SatPingResp { - const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); - let end = length === undefined ? reader.len : reader.pos + length; - const message = createBaseSatPingResp(); - while (reader.pos < end) { - const tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 10) { - break; - } - - message.lsn = reader.bytes(); - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - - create, I>>(base?: I): SatPingResp { - return SatPingResp.fromPartial(base ?? {}); - }, - - fromPartial, I>>(object: I): SatPingResp { - const message = createBaseSatPingResp(); - message.lsn = object.lsn ?? undefined; - return message; - }, -}; - -messageTypeRegistry.set(SatPingResp.$type, SatPingResp); - function createBaseSatAuthHeaderPair(): SatAuthHeaderPair { return { $type: "Electric.Satellite.SatAuthHeaderPair", key: 0, value: "" }; } diff --git a/clients/typescript/src/satellite/client.ts b/clients/typescript/src/satellite/client.ts index e540da8e35..9c2186f5da 100644 --- a/clients/typescript/src/satellite/client.ts +++ b/clients/typescript/src/satellite/client.ts @@ -12,7 +12,6 @@ import { SatTransOp, SatOpRow, SatOpLog, - SatPingResp, SatRelation, SatRelationColumn, SatSubsResp, @@ -144,11 +143,6 @@ export class SatelliteClient extends EventEmitter implements Client { handle: () => this.handleStopResp(), isRpc: true, }, - SatPingReq: { handle: () => this.handlePingReq(), isRpc: true }, - SatPingResp: { - handle: (req: any) => this.handlePingResp(req), - isRpc: false, - }, SatRelation: { handle: (req: any) => this.handleRelation(req), isRpc: false, @@ -730,23 +724,6 @@ export class SatelliteClient extends EventEmitter implements Client { } } - private handlePingReq() { - Log.info( - `respond to ping with last ack ${ - this.inbound.last_lsn ? base64.fromBytes(this.inbound.last_lsn) : 'NULL' - }` - ) - const pong = SatPingResp.fromPartial({ lsn: this.inbound.last_lsn }) - this.sendMessage(pong) - } - - private handlePingResp(_message: SatPingResp) { - // TODO: This message is not used in any way right now. - // We might be dropping client-initiated pings completely. - // However, the server sends these messages without any prompting, - // so this handler cannot just throw an error - } - private handleError(error: SatErrorResp) { this.emit('error', serverErrorToSatelliteError(error)) } diff --git a/clients/typescript/src/util/proto.ts b/clients/typescript/src/util/proto.ts index a4ab05ce45..bf0619bd55 100644 --- a/clients/typescript/src/util/proto.ts +++ b/clients/typescript/src/util/proto.ts @@ -111,8 +111,6 @@ const msgtypetuples: MappingTuples = { SatErrorResp: [0, Pb.SatErrorResp], SatAuthReq: [1, Pb.SatAuthReq], SatAuthResp: [2, Pb.SatAuthResp], - SatPingReq: [3, Pb.SatPingReq], - SatPingResp: [4, Pb.SatPingResp], SatInStartReplicationReq: [5, Pb.SatInStartReplicationReq], SatInStartReplicationResp: [6, Pb.SatInStartReplicationResp], SatInStopReplicationReq: [7, Pb.SatInStopReplicationReq], @@ -143,8 +141,6 @@ export type SatPbMsg = | Pb.SatErrorResp | Pb.SatAuthReq | Pb.SatAuthResp - | Pb.SatPingReq - | Pb.SatPingResp | Pb.SatInStartReplicationReq | Pb.SatInStartReplicationResp | Pb.SatInStopReplicationReq @@ -329,12 +325,6 @@ export function msgToString(message: SatPbMsg): string { return `#SatInStopReplicationResp{}` case 'Electric.Satellite.SatMigrationNotification': return `#SatMigrationNotification{to: ${message.newSchemaVersion}, from: ${message.newSchemaVersion}}` - case 'Electric.Satellite.SatPingReq': - return `#SatPingReq{}` - case 'Electric.Satellite.SatPingResp': - return `#SatPingResp{lsn: ${ - message.lsn ? base64.fromBytes(message.lsn) : 'NULL' - }}` case 'Electric.Satellite.SatRelation': { const cols = message.columns .map((x) => `${x.name}: ${x.type}${x.primaryKey ? ' PK' : ''}`) diff --git a/clients/typescript/test/satellite/client.test.ts b/clients/typescript/test/satellite/client.test.ts index e445ac7ad1..6179b1a097 100644 --- a/clients/typescript/test/satellite/client.test.ts +++ b/clients/typescript/test/satellite/client.test.ts @@ -232,29 +232,6 @@ test.serial('replication stop failure', async (t) => { } }) -test.serial('server pings client', async (t) => { - await connectAndAuth(t.context) - const { client, server } = t.context - - const start = Proto.SatInStartReplicationResp.fromPartial({}) - const ping = Proto.SatPingReq.fromPartial({}) - const stop = Proto.SatInStopReplicationResp.fromPartial({}) - - return new Promise(async (resolve) => { - server.nextResponses([start, ping]) - server.nextResponses([ - () => { - t.pass() - resolve() - }, - ]) - server.nextResponses([stop]) - - await client.startReplication() - await client.stopReplication() - }) -}) - test.serial('receive transaction over multiple messages', async (t) => { await connectAndAuth(t.context) const { client, server } = t.context diff --git a/clients/typescript/test/satellite/server_ws_stub.ts b/clients/typescript/test/satellite/server_ws_stub.ts index 3d8af8a0f1..334677617b 100644 --- a/clients/typescript/test/satellite/server_ws_stub.ts +++ b/clients/typescript/test/satellite/server_ws_stub.ts @@ -7,8 +7,6 @@ import { SatInStartReplicationResp, SatInStopReplicationResp, SatOpLog, - SatPingReq, - SatPingResp, SatRelation, SatShapeDataBegin, SatShapeDataEnd, @@ -118,24 +116,6 @@ export class SatelliteWSServerStub { ) } - if (msgType == getTypeFromString(SatPingReq.$type)) { - socket.send( - Buffer.concat([ - getSizeBuf(msg), - SatPingReq.encode(msg as SatPingReq).finish(), - ]) - ) - } - - if (msgType == getTypeFromString(SatPingResp.$type)) { - socket.send( - Buffer.concat([ - getSizeBuf(msg), - SatPingResp.encode(msg as SatPingResp).finish(), - ]) - ) - } - if (msgType == getTypeFromString(SatSubsResp.$type)) { socket.send( Buffer.concat([ diff --git a/components/electric/lib/electric/satellite/protobuf.ex b/components/electric/lib/electric/satellite/protobuf.ex index 0b2ced0fe1..0d1369ce0d 100644 --- a/components/electric/lib/electric/satellite/protobuf.ex +++ b/components/electric/lib/electric/satellite/protobuf.ex @@ -6,8 +6,6 @@ defmodule Electric.Satellite.Protobuf do SatErrorResp, SatAuthReq, SatAuthResp, - SatPingReq, - SatPingResp, SatInStartReplicationReq, SatInStartReplicationResp, SatInStopReplicationReq, @@ -28,14 +26,14 @@ defmodule Electric.Satellite.Protobuf do require Logger + @reserved [3, 4] + # This mapping should be kept in sync with Satellite repo. Message is present # in the mapping ONLY if it could be send as an individual message. @mapping %{ SatErrorResp => 0, SatAuthReq => 1, SatAuthResp => 2, - SatPingReq => 3, - SatPingResp => 4, SatInStartReplicationReq => 5, SatInStartReplicationResp => 6, SatInStopReplicationReq => 7, @@ -54,13 +52,15 @@ defmodule Electric.Satellite.Protobuf do SatUnsubsResp => 20 } + if Enum.any?(Map.values(@mapping), &(&1 in @reserved)) do + raise "Cannot use a reserved value as the message tag" + end + @type relation_id() :: non_neg_integer() @type sq_pb_msg() :: %SatErrorResp{} | %SatAuthReq{} | %SatAuthResp{} - | %SatPingReq{} - | %SatPingResp{} | %SatInStartReplicationReq{} | %SatInStartReplicationResp{} | %SatInStopReplicationReq{} @@ -87,8 +87,6 @@ defmodule Electric.Satellite.Protobuf do SatAuthReq, SatAuthHeaderPair, SatAuthResp, - SatPingReq, - SatPingResp, SatInStartReplicationReq, SatInStartReplicationResp, SatInStopReplicationReq, diff --git a/components/electric/lib/electric/satellite/protobuf_messages.ex b/components/electric/lib/electric/satellite/protobuf_messages.ex index a8795f09bf..4b73fd87fb 100644 --- a/components/electric/lib/electric/satellite/protobuf_messages.ex +++ b/components/electric/lib/electric/satellite/protobuf_messages.ex @@ -12330,237 +12330,6 @@ end ) end, - defmodule Electric.Satellite.SatPingResp do - @moduledoc false - defstruct lsn: nil - - ( - ( - @spec encode(struct) :: {:ok, iodata} | {:error, any} - def encode(msg) do - try do - {:ok, encode!(msg)} - rescue - e in [Protox.EncodingError, Protox.RequiredFieldsError] -> {:error, e} - end - end - - @spec encode!(struct) :: iodata | no_return - def encode!(msg) do - [] |> encode_lsn(msg) - end - ) - - [] - - [ - defp encode_lsn(acc, msg) do - try do - case msg.lsn do - nil -> [acc] - child_field_value -> [acc, "\n", Protox.Encode.encode_bytes(child_field_value)] - end - rescue - ArgumentError -> - reraise Protox.EncodingError.new(:lsn, "invalid field value"), __STACKTRACE__ - end - end - ] - - [] - ) - - ( - ( - @spec decode(binary) :: {:ok, struct} | {:error, any} - def decode(bytes) do - try do - {:ok, decode!(bytes)} - rescue - e in [Protox.DecodingError, Protox.IllegalTagError, Protox.RequiredFieldsError] -> - {:error, e} - end - end - - ( - @spec decode!(binary) :: struct | no_return - def decode!(bytes) do - parse_key_value(bytes, struct(Electric.Satellite.SatPingResp)) - end - ) - ) - - ( - @spec parse_key_value(binary, struct) :: struct - defp parse_key_value(<<>>, msg) do - msg - end - - defp parse_key_value(bytes, msg) do - {field, rest} = - case Protox.Decode.parse_key(bytes) do - {0, _, _} -> - raise %Protox.IllegalTagError{} - - {1, _, bytes} -> - {len, bytes} = Protox.Varint.decode(bytes) - {delimited, rest} = Protox.Decode.parse_delimited(bytes, len) - {[lsn: delimited], rest} - - {tag, wire_type, rest} -> - {_, rest} = Protox.Decode.parse_unknown(tag, wire_type, rest) - {[], rest} - end - - msg_updated = struct(msg, field) - parse_key_value(rest, msg_updated) - end - ) - - [] - ) - - ( - @spec json_decode(iodata(), keyword()) :: {:ok, struct()} | {:error, any()} - def json_decode(input, opts \\ []) do - try do - {:ok, json_decode!(input, opts)} - rescue - e in Protox.JsonDecodingError -> {:error, e} - end - end - - @spec json_decode!(iodata(), keyword()) :: struct() | no_return() - def json_decode!(input, opts \\ []) do - {json_library_wrapper, json_library} = Protox.JsonLibrary.get_library(opts, :decode) - - Protox.JsonDecode.decode!( - input, - Electric.Satellite.SatPingResp, - &json_library_wrapper.decode!(json_library, &1) - ) - end - - @spec json_encode(struct(), keyword()) :: {:ok, iodata()} | {:error, any()} - def json_encode(msg, opts \\ []) do - try do - {:ok, json_encode!(msg, opts)} - rescue - e in Protox.JsonEncodingError -> {:error, e} - end - end - - @spec json_encode!(struct(), keyword()) :: iodata() | no_return() - def json_encode!(msg, opts \\ []) do - {json_library_wrapper, json_library} = Protox.JsonLibrary.get_library(opts, :encode) - Protox.JsonEncode.encode!(msg, &json_library_wrapper.encode!(json_library, &1)) - end - ) - - ( - @deprecated "Use fields_defs()/0 instead" - @spec defs() :: %{ - required(non_neg_integer) => {atom, Protox.Types.kind(), Protox.Types.type()} - } - def defs() do - %{1 => {:lsn, {:oneof, :_lsn}, :bytes}} - end - - @deprecated "Use fields_defs()/0 instead" - @spec defs_by_name() :: %{ - required(atom) => {non_neg_integer, Protox.Types.kind(), Protox.Types.type()} - } - def defs_by_name() do - %{lsn: {1, {:oneof, :_lsn}, :bytes}} - end - ) - - ( - @spec fields_defs() :: list(Protox.Field.t()) - def fields_defs() do - [ - %{ - __struct__: Protox.Field, - json_name: "lsn", - kind: {:oneof, :_lsn}, - label: :proto3_optional, - name: :lsn, - tag: 1, - type: :bytes - } - ] - end - - [ - @spec(field_def(atom) :: {:ok, Protox.Field.t()} | {:error, :no_such_field}), - ( - def field_def(:lsn) do - {:ok, - %{ - __struct__: Protox.Field, - json_name: "lsn", - kind: {:oneof, :_lsn}, - label: :proto3_optional, - name: :lsn, - tag: 1, - type: :bytes - }} - end - - def field_def("lsn") do - {:ok, - %{ - __struct__: Protox.Field, - json_name: "lsn", - kind: {:oneof, :_lsn}, - label: :proto3_optional, - name: :lsn, - tag: 1, - type: :bytes - }} - end - - [] - ), - def field_def(_) do - {:error, :no_such_field} - end - ] - ) - - [] - - ( - @spec required_fields() :: [] - def required_fields() do - [] - end - ) - - ( - @spec syntax() :: atom() - def syntax() do - :proto3 - end - ) - - [ - @spec(default(atom) :: {:ok, boolean | integer | String.t() | float} | {:error, atom}), - def default(:lsn) do - {:error, :no_default_value} - end, - def default(_) do - {:error, :no_such_field} - end - ] - - ( - @spec file_options() :: nil - def file_options() do - nil - end - ) - end, defmodule Electric.Satellite.SatUnsubsResp do @moduledoc false defstruct [] @@ -14724,172 +14493,6 @@ end ) end, - defmodule Electric.Satellite.SatPingReq do - @moduledoc false - defstruct [] - - ( - ( - @spec encode(struct) :: {:ok, iodata} - def encode(msg) do - {:ok, encode!(msg)} - end - - @spec encode!(struct) :: iodata - def encode!(_msg) do - [] - end - ) - - [] - [] - [] - ) - - ( - ( - @spec decode(binary) :: {:ok, struct} | {:error, any} - def decode(bytes) do - try do - {:ok, decode!(bytes)} - rescue - e in [Protox.DecodingError, Protox.IllegalTagError, Protox.RequiredFieldsError] -> - {:error, e} - end - end - - ( - @spec decode!(binary) :: struct | no_return - def decode!(bytes) do - parse_key_value(bytes, struct(Electric.Satellite.SatPingReq)) - end - ) - ) - - ( - @spec parse_key_value(binary, struct) :: struct - defp parse_key_value(<<>>, msg) do - msg - end - - defp parse_key_value(bytes, msg) do - {field, rest} = - case Protox.Decode.parse_key(bytes) do - {0, _, _} -> - raise %Protox.IllegalTagError{} - - {tag, wire_type, rest} -> - {_, rest} = Protox.Decode.parse_unknown(tag, wire_type, rest) - {[], rest} - end - - msg_updated = struct(msg, field) - parse_key_value(rest, msg_updated) - end - ) - - [] - ) - - ( - @spec json_decode(iodata(), keyword()) :: {:ok, struct()} | {:error, any()} - def json_decode(input, opts \\ []) do - try do - {:ok, json_decode!(input, opts)} - rescue - e in Protox.JsonDecodingError -> {:error, e} - end - end - - @spec json_decode!(iodata(), keyword()) :: struct() | no_return() - def json_decode!(input, opts \\ []) do - {json_library_wrapper, json_library} = Protox.JsonLibrary.get_library(opts, :decode) - - Protox.JsonDecode.decode!( - input, - Electric.Satellite.SatPingReq, - &json_library_wrapper.decode!(json_library, &1) - ) - end - - @spec json_encode(struct(), keyword()) :: {:ok, iodata()} | {:error, any()} - def json_encode(msg, opts \\ []) do - try do - {:ok, json_encode!(msg, opts)} - rescue - e in Protox.JsonEncodingError -> {:error, e} - end - end - - @spec json_encode!(struct(), keyword()) :: iodata() | no_return() - def json_encode!(msg, opts \\ []) do - {json_library_wrapper, json_library} = Protox.JsonLibrary.get_library(opts, :encode) - Protox.JsonEncode.encode!(msg, &json_library_wrapper.encode!(json_library, &1)) - end - ) - - ( - @deprecated "Use fields_defs()/0 instead" - @spec defs() :: %{ - required(non_neg_integer) => {atom, Protox.Types.kind(), Protox.Types.type()} - } - def defs() do - %{} - end - - @deprecated "Use fields_defs()/0 instead" - @spec defs_by_name() :: %{ - required(atom) => {non_neg_integer, Protox.Types.kind(), Protox.Types.type()} - } - def defs_by_name() do - %{} - end - ) - - ( - @spec fields_defs() :: list(Protox.Field.t()) - def fields_defs() do - [] - end - - [ - @spec(field_def(atom) :: {:ok, Protox.Field.t()} | {:error, :no_such_field}), - def field_def(_) do - {:error, :no_such_field} - end - ] - ) - - [] - - ( - @spec required_fields() :: [] - def required_fields() do - [] - end - ) - - ( - @spec syntax() :: atom() - def syntax() do - :proto3 - end - ) - - [ - @spec(default(atom) :: {:ok, boolean | integer | String.t() | float} | {:error, atom}), - def default(_) do - {:error, :no_such_field} - end - ] - - ( - @spec file_options() :: nil - def file_options() do - nil - end - ) - end, defmodule Electric.Satellite.SatOpMigrate.Stmt do @moduledoc false defstruct type: :CREATE_TABLE, sql: "" diff --git a/components/electric/lib/electric/satellite/protocol.ex b/components/electric/lib/electric/satellite/protocol.ex index a293f5db5a..682680d879 100644 --- a/components/electric/lib/electric/satellite/protocol.ex +++ b/components/electric/lib/electric/satellite/protocol.ex @@ -250,17 +250,6 @@ defmodule Electric.Satellite.Protocol do end end - def process_message(%SatPingReq{} = _msg, %State{in_rep: in_rep} = state) do - Logger.debug("Received ping request, sending lsn #{inspect(in_rep.lsn)}") - {%SatPingResp{lsn: in_rep.lsn}, state} - end - - def process_message(%SatPingResp{lsn: confirmed_lsn}, %State{out_rep: out_rep} = state) - when confirmed_lsn !== "" do - Logger.debug("Received ping response, with clients lsn: #{inspect(confirmed_lsn)}") - {nil, %{state | out_rep: %OutRep{out_rep | lsn: confirmed_lsn}}} - end - # Satellite client request replication def process_message( %SatInStartReplicationReq{lsn: client_lsn, options: opts} = msg, @@ -553,10 +542,9 @@ defmodule Electric.Satellite.Protocol do end end - defp report_lsn(satellite, pid, lsn) do + defp report_lsn(satellite, _pid, lsn) do Logger.info("report lsn: #{inspect(lsn)} for #{satellite}") OffsetStorage.put_satellite_lsn(satellite, lsn) - Process.send(pid, {__MODULE__, :lsn_report, lsn}, []) end @spec handle_outgoing_txs([{Transaction.t(), term()}], State.t()) :: diff --git a/components/electric/lib/electric/satellite/ws_server.ex b/components/electric/lib/electric/satellite/ws_server.ex index abd2ec5c80..201627b275 100644 --- a/components/electric/lib/electric/satellite/ws_server.ex +++ b/components/electric/lib/electric/satellite/ws_server.ex @@ -141,12 +141,6 @@ defmodule Electric.Satellite.WebsocketServer do end end - # Consumer (SatelliteCollectorConsumer) has reported that this lsn has been stored successfully. - # We need to report to Satellite. - def handle_info({Protocol, :lsn_report, lsn}, %State{} = state) do - push({%SatPingResp{lsn: lsn}, state}) - end - # While processing the SatInStartReplicationReq message, Protocol has determined that a new # client has connected which needs to perform the initial sync of migrations and the current database state before # subscribing to the replication stream. diff --git a/components/electric/lib/satellite/test_ws_client.ex b/components/electric/lib/satellite/test_ws_client.ex index d8acab8adc..a8373d1871 100644 --- a/components/electric/lib/satellite/test_ws_client.ex +++ b/components/electric/lib/satellite/test_ws_client.ex @@ -178,20 +178,13 @@ defmodule Satellite.TestWsClient do end end - defp store(state, %SatPingReq{}), do: state - defp store(%{count: count} = state, msg) do :ets.insert(state.table, {count, msg}) %{state | count: count + 1} end - defp log(_, %SatPingReq{} = msg), do: Logger.info("rec: #{inspect(msg)}") defp log(state, msg), do: Logger.info("rec [#{state.count}]: #{inspect(msg)}") - defp maybe_autorespond(%{opts: %{auto_ping: true}} = state, %SatPingReq{}) do - {:reply, serialize(%SatPingResp{lsn: nil}), state} - end - defp maybe_autorespond(%{opts: %{auto_in_sub: true}} = state, %SatInStartReplicationReq{}) do {:reply, serialize(%SatInStartReplicationResp{}), state} end diff --git a/components/electric/test/electric/satellite/protobuf_test.exs b/components/electric/test/electric/satellite/protobuf_test.exs index 6d81bfbdb1..bd4f59eb75 100644 --- a/components/electric/test/electric/satellite/protobuf_test.exs +++ b/components/electric/test/electric/satellite/protobuf_test.exs @@ -10,13 +10,6 @@ defmodule Electric.Postgres.PBTest do assert original_msg == decoded_msg end - test "message for SatPingReq is encoded and decoded" do - original_msg = %SatPingReq{} - {:ok, type, iodata} = PB.encode(original_msg) - {:ok, decoded_msg} = PB.decode(type, :erlang.iolist_to_binary(iodata)) - assert original_msg == decoded_msg - end - test "message for transaction" do begin = %SatOpBegin{ commit_timestamp: :os.system_time(:millisecond), diff --git a/components/electric/test/electric/satellite/ws_pg_to_satellite_test.exs b/components/electric/test/electric/satellite/ws_pg_to_satellite_test.exs index bca8e68c50..9cb45d5baf 100644 --- a/components/electric/test/electric/satellite/ws_pg_to_satellite_test.exs +++ b/components/electric/test/electric/satellite/ws_pg_to_satellite_test.exs @@ -37,7 +37,6 @@ defmodule Electric.Satellite.WsPgToSatelliteTest do MockClient.send_data(conn, %SatInStartReplicationReq{}) assert_receive {^conn, %SatInStartReplicationResp{}} - ping_server(conn) refute_receive {^conn, _} end) end @@ -65,14 +64,12 @@ defmodule Electric.Satellite.WsPgToSatelliteTest do assert_receive_migration(conn, vsn1, "foo") assert_receive_migration(conn, vsn2, "bar") - ping_server(conn) refute_receive {^conn, _} # Make sure the server keeps streaming migrations to the client after the initial sync is done. :ok = migrate(ctx.db, vsn3, "ALTER TABLE foo ADD COLUMN bar TEXT DEFAULT 'quux'") assert_receive_migration(conn, vsn3, "foo") - ping_server(conn) refute_receive {^conn, _} end) end @@ -94,7 +91,6 @@ defmodule Electric.Satellite.WsPgToSatelliteTest do assert_receive_migration(conn, vsn1, "foo") assert_receive_migration(conn, vsn2, "bar") - ping_server(conn) refute_receive {^conn, _} end) @@ -108,7 +104,6 @@ defmodule Electric.Satellite.WsPgToSatelliteTest do assert_receive_migration(conn, vsn2, "bar") - ping_server(conn) refute_receive {^conn, _} end) end diff --git a/components/electric/test/electric/satellite/ws_server_test.exs b/components/electric/test/electric/satellite/ws_server_test.exs index 1b2d7e56d3..ebefc1ff41 100644 --- a/components/electric/test/electric/satellite/ws_server_test.exs +++ b/components/electric/test/electric/satellite/ws_server_test.exs @@ -152,7 +152,7 @@ defmodule Electric.Satellite.WebsocketServerTest do test "Server will respond with error on attempt to skip auth", ctx do with_connect([port: ctx.port], fn conn -> - MockClient.send_data(conn, %SatPingReq{}) + MockClient.send_data(conn, %SatInStartReplicationReq{}) assert_receive {^conn, %SatErrorResp{error_type: :AUTH_REQUIRED}}, @default_wait end) @@ -164,8 +164,6 @@ defmodule Electric.Satellite.WebsocketServerTest do server_id = ctx.server_id assert_receive {^conn, %SatAuthResp{id: ^server_id}}, @default_wait - - ping_server(conn) end) end @@ -173,7 +171,7 @@ defmodule Electric.Satellite.WebsocketServerTest do server_id = ctx.server_id with_connect([port: ctx.port], fn conn -> - MockClient.send_data(conn, %SatPingReq{}) + MockClient.send_data(conn, %SatInStartReplicationReq{}) assert_receive {^conn, %SatErrorResp{error_type: :AUTH_REQUIRED}}, @default_wait end) @@ -573,11 +571,11 @@ defmodule Electric.Satellite.WebsocketServerTest do assert tx.origin !== "" - assert_receive {^conn, %SatPingResp{lsn: ^lsn}}, @default_wait - - # After restart we still get same lsn + # Wait for everything to be persisted + Process.sleep(200) end) + # After restart we still get same lsn with_connect([auth: ctx, id: ctx.client_id, port: ctx.port], fn conn -> lsn = "some_long_internal_lsn" diff --git a/components/electric/test/electric/satellite/ws_validations_test.exs b/components/electric/test/electric/satellite/ws_validations_test.exs index 31fd139050..11423a5088 100644 --- a/components/electric/test/electric/satellite/ws_validations_test.exs +++ b/components/electric/test/electric/satellite/ws_validations_test.exs @@ -53,9 +53,6 @@ defmodule Electric.Satellite.WsValidationsTest do tx_op_log = serialize_trans(%{"id" => "3", "num" => "-1", "t1" => "", "t2" => "..."}) MockClient.send_data(conn, tx_op_log) - # Wait long enough for the server to process our messages, thus confirming it has been accepted - ping_server(conn) - refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}, @receive_timeout end) end @@ -113,9 +110,6 @@ defmodule Electric.Satellite.WsValidationsTest do MockClient.send_data(conn, tx_op_log) end) - # Wait long enough for the server to process our messages, thus confirming it has been accepted - ping_server(conn) - refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}} end) @@ -168,9 +162,6 @@ defmodule Electric.Satellite.WsValidationsTest do MockClient.send_data(conn, tx_op_log) end) - # Wait long enough for the server to process our messages, thus confirming it has been accepted - ping_server(conn) - refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}} end) @@ -216,9 +207,6 @@ defmodule Electric.Satellite.WsValidationsTest do MockClient.send_data(conn, tx_op_log) end) - # Wait long enough for the server to process our messages, thus confirming it has been accepted - ping_server(conn) - refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}} end) diff --git a/components/electric/test/support/satellite_helpers.ex b/components/electric/test/support/satellite_helpers.ex index 4191255945..9613bc18a1 100644 --- a/components/electric/test/support/satellite_helpers.ex +++ b/components/electric/test/support/satellite_helpers.ex @@ -5,12 +5,6 @@ defmodule ElectricTest.SatelliteHelpers do alias Satellite.TestWsClient, as: MockClient - # Send a ping to WebsocketServer. Useful to make sure it is done with initial sync. - def ping_server(conn) do - MockClient.send_data(conn, %SatPingReq{}) - assert_receive {^conn, %SatPingResp{lsn: ""}} - end - def assert_receive_migration(conn, version, table_name) do assert_receive {^conn, %SatRelation{table_name: ^table_name}} diff --git a/protocol/satellite.proto b/protocol/satellite.proto index 27e82ba457..92d276a856 100644 --- a/protocol/satellite.proto +++ b/protocol/satellite.proto @@ -28,17 +28,6 @@ syntax = "proto3"; // package Electric.Satellite; -// Ping request. Can be sent by any party. -message SatPingReq{ -} - -// Ping response. -message SatPingResp{ - // If LSN is present, it conveys to producer the latest LSN position that - // was applied on the consumer side. If there is no active replication - // ongoing the field should be left empty. - optional bytes lsn = 1; -} enum SatAuthHeader { reserved 1;