Skip to content

Commit

Permalink
chore: removed SatPingReq/Resp from the protocol (#408)
Browse files Browse the repository at this point in the history
  • Loading branch information
icehaunter authored Sep 7, 2023
1 parent 39081c9 commit f991283
Show file tree
Hide file tree
Showing 16 changed files with 12 additions and 660 deletions.
105 changes: 0 additions & 105 deletions clients/typescript/src/_generated/protocol/satellite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,6 @@ export enum SatAuthHeader {
UNRECOGNIZED = -1,
}

/** Ping request. Can be sent by any party. */
export interface SatPingReq {
$type: "Electric.Satellite.SatPingReq";
}

/** Ping response. */
export interface SatPingResp {
$type: "Electric.Satellite.SatPingResp";
/**
* If LSN is present, it conveys to producer the latest LSN position that
* was applied on the consumer side. If there is no active replication
* ongoing the field should be left empty.
*/
lsn?: Uint8Array | undefined;
}

export interface SatAuthHeaderPair {
$type: "Electric.Satellite.SatAuthHeaderPair";
key: SatAuthHeader;
Expand Down Expand Up @@ -585,95 +569,6 @@ export interface SatShapeDataEnd {
$type: "Electric.Satellite.SatShapeDataEnd";
}

function createBaseSatPingReq(): SatPingReq {
return { $type: "Electric.Satellite.SatPingReq" };
}

export const SatPingReq = {
$type: "Electric.Satellite.SatPingReq" as const,

encode(_: SatPingReq, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): SatPingReq {
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseSatPingReq();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},

create<I extends Exact<DeepPartial<SatPingReq>, I>>(base?: I): SatPingReq {
return SatPingReq.fromPartial(base ?? {});
},

fromPartial<I extends Exact<DeepPartial<SatPingReq>, I>>(_: I): SatPingReq {
const message = createBaseSatPingReq();
return message;
},
};

messageTypeRegistry.set(SatPingReq.$type, SatPingReq);

function createBaseSatPingResp(): SatPingResp {
return { $type: "Electric.Satellite.SatPingResp", lsn: undefined };
}

export const SatPingResp = {
$type: "Electric.Satellite.SatPingResp" as const,

encode(message: SatPingResp, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.lsn !== undefined) {
writer.uint32(10).bytes(message.lsn);
}
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): SatPingResp {
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseSatPingResp();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 10) {
break;
}

message.lsn = reader.bytes();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},

create<I extends Exact<DeepPartial<SatPingResp>, I>>(base?: I): SatPingResp {
return SatPingResp.fromPartial(base ?? {});
},

fromPartial<I extends Exact<DeepPartial<SatPingResp>, I>>(object: I): SatPingResp {
const message = createBaseSatPingResp();
message.lsn = object.lsn ?? undefined;
return message;
},
};

messageTypeRegistry.set(SatPingResp.$type, SatPingResp);

function createBaseSatAuthHeaderPair(): SatAuthHeaderPair {
return { $type: "Electric.Satellite.SatAuthHeaderPair", key: 0, value: "" };
}
Expand Down
23 changes: 0 additions & 23 deletions clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
SatTransOp,
SatOpRow,
SatOpLog,
SatPingResp,
SatRelation,
SatRelationColumn,
SatSubsResp,
Expand Down Expand Up @@ -144,11 +143,6 @@ export class SatelliteClient extends EventEmitter implements Client {
handle: () => this.handleStopResp(),
isRpc: true,
},
SatPingReq: { handle: () => this.handlePingReq(), isRpc: true },
SatPingResp: {
handle: (req: any) => this.handlePingResp(req),
isRpc: false,
},
SatRelation: {
handle: (req: any) => this.handleRelation(req),
isRpc: false,
Expand Down Expand Up @@ -730,23 +724,6 @@ export class SatelliteClient extends EventEmitter implements Client {
}
}

private handlePingReq() {
Log.info(
`respond to ping with last ack ${
this.inbound.last_lsn ? base64.fromBytes(this.inbound.last_lsn) : 'NULL'
}`
)
const pong = SatPingResp.fromPartial({ lsn: this.inbound.last_lsn })
this.sendMessage(pong)
}

private handlePingResp(_message: SatPingResp) {
// TODO: This message is not used in any way right now.
// We might be dropping client-initiated pings completely.
// However, the server sends these messages without any prompting,
// so this handler cannot just throw an error
}

private handleError(error: SatErrorResp) {
this.emit('error', serverErrorToSatelliteError(error))
}
Expand Down
10 changes: 0 additions & 10 deletions clients/typescript/src/util/proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ const msgtypetuples: MappingTuples = {
SatErrorResp: [0, Pb.SatErrorResp],
SatAuthReq: [1, Pb.SatAuthReq],
SatAuthResp: [2, Pb.SatAuthResp],
SatPingReq: [3, Pb.SatPingReq],
SatPingResp: [4, Pb.SatPingResp],
SatInStartReplicationReq: [5, Pb.SatInStartReplicationReq],
SatInStartReplicationResp: [6, Pb.SatInStartReplicationResp],
SatInStopReplicationReq: [7, Pb.SatInStopReplicationReq],
Expand Down Expand Up @@ -143,8 +141,6 @@ export type SatPbMsg =
| Pb.SatErrorResp
| Pb.SatAuthReq
| Pb.SatAuthResp
| Pb.SatPingReq
| Pb.SatPingResp
| Pb.SatInStartReplicationReq
| Pb.SatInStartReplicationResp
| Pb.SatInStopReplicationReq
Expand Down Expand Up @@ -329,12 +325,6 @@ export function msgToString(message: SatPbMsg): string {
return `#SatInStopReplicationResp{}`
case 'Electric.Satellite.SatMigrationNotification':
return `#SatMigrationNotification{to: ${message.newSchemaVersion}, from: ${message.newSchemaVersion}}`
case 'Electric.Satellite.SatPingReq':
return `#SatPingReq{}`
case 'Electric.Satellite.SatPingResp':
return `#SatPingResp{lsn: ${
message.lsn ? base64.fromBytes(message.lsn) : 'NULL'
}}`
case 'Electric.Satellite.SatRelation': {
const cols = message.columns
.map((x) => `${x.name}: ${x.type}${x.primaryKey ? ' PK' : ''}`)
Expand Down
23 changes: 0 additions & 23 deletions clients/typescript/test/satellite/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,29 +232,6 @@ test.serial('replication stop failure', async (t) => {
}
})

test.serial('server pings client', async (t) => {
await connectAndAuth(t.context)
const { client, server } = t.context

const start = Proto.SatInStartReplicationResp.fromPartial({})
const ping = Proto.SatPingReq.fromPartial({})
const stop = Proto.SatInStopReplicationResp.fromPartial({})

return new Promise(async (resolve) => {
server.nextResponses([start, ping])
server.nextResponses([
() => {
t.pass()
resolve()
},
])
server.nextResponses([stop])

await client.startReplication()
await client.stopReplication()
})
})

test.serial('receive transaction over multiple messages', async (t) => {
await connectAndAuth(t.context)
const { client, server } = t.context
Expand Down
20 changes: 0 additions & 20 deletions clients/typescript/test/satellite/server_ws_stub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import {
SatInStartReplicationResp,
SatInStopReplicationResp,
SatOpLog,
SatPingReq,
SatPingResp,
SatRelation,
SatShapeDataBegin,
SatShapeDataEnd,
Expand Down Expand Up @@ -118,24 +116,6 @@ export class SatelliteWSServerStub {
)
}

if (msgType == getTypeFromString(SatPingReq.$type)) {
socket.send(
Buffer.concat([
getSizeBuf(msg),
SatPingReq.encode(msg as SatPingReq).finish(),
])
)
}

if (msgType == getTypeFromString(SatPingResp.$type)) {
socket.send(
Buffer.concat([
getSizeBuf(msg),
SatPingResp.encode(msg as SatPingResp).finish(),
])
)
}

if (msgType == getTypeFromString(SatSubsResp.$type)) {
socket.send(
Buffer.concat([
Expand Down
14 changes: 6 additions & 8 deletions components/electric/lib/electric/satellite/protobuf.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ defmodule Electric.Satellite.Protobuf do
SatErrorResp,
SatAuthReq,
SatAuthResp,
SatPingReq,
SatPingResp,
SatInStartReplicationReq,
SatInStartReplicationResp,
SatInStopReplicationReq,
Expand All @@ -28,14 +26,14 @@ defmodule Electric.Satellite.Protobuf do

require Logger

@reserved [3, 4]

# This mapping should be kept in sync with Satellite repo. Message is present
# in the mapping ONLY if it could be send as an individual message.
@mapping %{
SatErrorResp => 0,
SatAuthReq => 1,
SatAuthResp => 2,
SatPingReq => 3,
SatPingResp => 4,
SatInStartReplicationReq => 5,
SatInStartReplicationResp => 6,
SatInStopReplicationReq => 7,
Expand All @@ -54,13 +52,15 @@ defmodule Electric.Satellite.Protobuf do
SatUnsubsResp => 20
}

if Enum.any?(Map.values(@mapping), &(&1 in @reserved)) do
raise "Cannot use a reserved value as the message tag"
end

@type relation_id() :: non_neg_integer()
@type sq_pb_msg() ::
%SatErrorResp{}
| %SatAuthReq{}
| %SatAuthResp{}
| %SatPingReq{}
| %SatPingResp{}
| %SatInStartReplicationReq{}
| %SatInStartReplicationResp{}
| %SatInStopReplicationReq{}
Expand All @@ -87,8 +87,6 @@ defmodule Electric.Satellite.Protobuf do
SatAuthReq,
SatAuthHeaderPair,
SatAuthResp,
SatPingReq,
SatPingResp,
SatInStartReplicationReq,
SatInStartReplicationResp,
SatInStopReplicationReq,
Expand Down
Loading

0 comments on commit f991283

Please sign in to comment.