From eb722c9be9749a94e12564197db7c7fab47d56ac Mon Sep 17 00:00:00 2001 From: Garry Hill Date: Thu, 23 Nov 2023 10:18:08 +0000 Subject: [PATCH] chore(electric,client): Create new protocol op to represent a compensation (#639) For permissions etc we need to always have the `OLD` values available, so for clarity don't send out SatOpUpdate with no old_data values --- .changeset/itchy-carrots-invite.md | 6 + .../src/_generated/protocol/satellite.ts | 99 ++++ clients/typescript/src/migrators/triggers.ts | 11 +- clients/typescript/src/satellite/client.ts | 9 + clients/typescript/src/util/types.ts | 1 + .../postgres/shadow_table_transformation.ex | 3 +- .../lib/electric/replication/changes.ex | 17 +- .../replication/postgres/slot_server.ex | 5 +- .../lib/electric/satellite/protobuf.ex | 10 +- .../electric/satellite/protobuf_messages.ex | 468 +++++++++++++++++- .../lib/electric/satellite/serialization.ex | 22 +- .../lib/satellite/protocol_helpers.ex | 1 + ..._compensations_within_same_tx_are_fine.lux | 4 +- protocol/satellite.proto | 8 + 14 files changed, 629 insertions(+), 35 deletions(-) create mode 100644 .changeset/itchy-carrots-invite.md diff --git a/.changeset/itchy-carrots-invite.md b/.changeset/itchy-carrots-invite.md new file mode 100644 index 0000000000..f0bd5f9201 --- /dev/null +++ b/.changeset/itchy-carrots-invite.md @@ -0,0 +1,6 @@ +--- +"@core/electric": minor +"electric-sql": minor +--- + +[VAX-1335] Create new protocol op to represent a compensation diff --git a/clients/typescript/src/_generated/protocol/satellite.ts b/clients/typescript/src/_generated/protocol/satellite.ts index 1d76c76510..13156e7818 100644 --- a/clients/typescript/src/_generated/protocol/satellite.ts +++ b/clients/typescript/src/_generated/protocol/satellite.ts @@ -217,6 +217,7 @@ export interface SatTransOp { insert?: SatOpInsert | undefined; delete?: SatOpDelete | undefined; migrate?: SatOpMigrate | undefined; + compensation?: SatOpCompensation | undefined; } /** @@ -299,6 +300,16 @@ export interface SatOpDelete { tags: string[]; } +export interface SatOpCompensation { + $type: "Electric.Satellite.SatOpCompensation"; + relationId: number; + pkData: + | SatOpRow + | undefined; + /** dependency information */ + tags: string[]; +} + /** Message that corresponds to the single row. */ export interface SatOpRow { $type: "Electric.Satellite.SatOpRow"; @@ -1508,6 +1519,7 @@ function createBaseSatTransOp(): SatTransOp { insert: undefined, delete: undefined, migrate: undefined, + compensation: undefined, }; } @@ -1533,6 +1545,9 @@ export const SatTransOp = { if (message.migrate !== undefined) { SatOpMigrate.encode(message.migrate, writer.uint32(50).fork()).ldelim(); } + if (message.compensation !== undefined) { + SatOpCompensation.encode(message.compensation, writer.uint32(58).fork()).ldelim(); + } return writer; }, @@ -1585,6 +1600,13 @@ export const SatTransOp = { message.migrate = SatOpMigrate.decode(reader, reader.uint32()); continue; + case 7: + if (tag !== 58) { + break; + } + + message.compensation = SatOpCompensation.decode(reader, reader.uint32()); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -1618,6 +1640,9 @@ export const SatTransOp = { message.migrate = (object.migrate !== undefined && object.migrate !== null) ? SatOpMigrate.fromPartial(object.migrate) : undefined; + message.compensation = (object.compensation !== undefined && object.compensation !== null) + ? SatOpCompensation.fromPartial(object.compensation) + : undefined; return message; }, }; @@ -2042,6 +2067,80 @@ export const SatOpDelete = { messageTypeRegistry.set(SatOpDelete.$type, SatOpDelete); +function createBaseSatOpCompensation(): SatOpCompensation { + return { $type: "Electric.Satellite.SatOpCompensation", relationId: 0, pkData: undefined, tags: [] }; +} + +export const SatOpCompensation = { + $type: "Electric.Satellite.SatOpCompensation" as const, + + encode(message: SatOpCompensation, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.relationId !== 0) { + writer.uint32(8).uint32(message.relationId); + } + if (message.pkData !== undefined) { + SatOpRow.encode(message.pkData, writer.uint32(18).fork()).ldelim(); + } + for (const v of message.tags) { + writer.uint32(34).string(v!); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): SatOpCompensation { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseSatOpCompensation(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 8) { + break; + } + + message.relationId = reader.uint32(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.pkData = SatOpRow.decode(reader, reader.uint32()); + continue; + case 4: + if (tag !== 34) { + break; + } + + message.tags.push(reader.string()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + create, I>>(base?: I): SatOpCompensation { + return SatOpCompensation.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): SatOpCompensation { + const message = createBaseSatOpCompensation(); + message.relationId = object.relationId ?? 0; + message.pkData = (object.pkData !== undefined && object.pkData !== null) + ? SatOpRow.fromPartial(object.pkData) + : undefined; + message.tags = object.tags?.map((e) => e) || []; + return message; + }, +}; + +messageTypeRegistry.set(SatOpCompensation.$type, SatOpCompensation); + function createBaseSatOpRow(): SatOpRow { return { $type: "Electric.Satellite.SatOpRow", nullsBitmask: new Uint8Array(), values: [] }; } diff --git a/clients/typescript/src/migrators/triggers.ts b/clients/typescript/src/migrators/triggers.ts index b58b9fffda..610165b8f7 100644 --- a/clients/typescript/src/migrators/triggers.ts +++ b/clients/typescript/src/migrators/triggers.ts @@ -118,10 +118,9 @@ export function generateOplogTriggers( /** * Generates triggers for compensations for all foreign keys in the provided table. * - * Compensation is recorded as a specially-formatted update. It acts as a no-op, with - * previous value set to NULL, and it's on the server to figure out that this is a no-op - * compensation operation (usually `UPDATE` would have previous row state known). The entire - * reason for it existing is to maybe revive the row if it has been deleted, so we need correct tags. + * Compensation is recorded as a SatOpCompensation messaage. The entire reason + * for it existing is to maybe revive the row if it has been deleted, so we need + * correct tags. * * The compensation update contains _just_ the primary keys, no other columns are present. * @@ -155,7 +154,7 @@ function generateCompensationTriggers(table: Table): Statement[] { 1 == (SELECT value from _electric_meta WHERE key == 'compensations') BEGIN INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp) - SELECT '${fkTableNamespace}', '${fkTableName}', 'UPDATE', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL + SELECT '${fkTableNamespace}', '${fkTableName}', 'COMPENSATION', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL FROM "${fkTableNamespace}"."${fkTableName}" WHERE "${foreignKey.parentKey}" = new."${foreignKey.childKey}"; END; `, @@ -167,7 +166,7 @@ function generateCompensationTriggers(table: Table): Statement[] { 1 == (SELECT value from _electric_meta WHERE key == 'compensations') BEGIN INSERT INTO _electric_oplog (namespace, tablename, optype, primaryKey, newRow, oldRow, timestamp) - SELECT '${fkTableNamespace}', '${fkTableName}', 'UPDATE', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL + SELECT '${fkTableNamespace}', '${fkTableName}', 'COMPENSATION', json_object(${joinedFkPKs}), json_object(${joinedFkPKs}), NULL, NULL FROM "${fkTableNamespace}"."${fkTableName}" WHERE "${foreignKey.parentKey}" = new."${foreignKey.childKey}"; END; `, diff --git a/clients/typescript/src/satellite/client.ts b/clients/typescript/src/satellite/client.ts index f9c43ba6ad..11a33297f8 100644 --- a/clients/typescript/src/satellite/client.ts +++ b/clients/typescript/src/satellite/client.ts @@ -592,6 +592,15 @@ export class SatelliteClient implements Client { }, }) break + case DataChangeType.COMPENSATION: + changeOp = SatTransOp.fromPartial({ + compensation: { + pkData: record, + relationId: relation.id, + tags: tags, + }, + }) + break } ops.push(changeOp) }) diff --git a/clients/typescript/src/util/types.ts b/clients/typescript/src/util/types.ts index a8871e0656..3e1a8e267e 100644 --- a/clients/typescript/src/util/types.ts +++ b/clients/typescript/src/util/types.ts @@ -118,6 +118,7 @@ export enum DataChangeType { INSERT = 'INSERT', UPDATE = 'UPDATE', DELETE = 'DELETE', + COMPENSATION = 'COMPENSATION', } export type Change = DataChange | SchemaChange diff --git a/components/electric/lib/electric/postgres/shadow_table_transformation.ex b/components/electric/lib/electric/postgres/shadow_table_transformation.ex index 337b85de70..e552246acd 100644 --- a/components/electric/lib/electric/postgres/shadow_table_transformation.ex +++ b/components/electric/lib/electric/postgres/shadow_table_transformation.ex @@ -96,8 +96,7 @@ defmodule Electric.Postgres.ShadowTableTransformation do defp build_bitmask(%Changes.NewRecord{}, columns), do: Enum.map(columns, fn _ -> "t" end) - defp build_bitmask(%Changes.UpdatedRecord{old_record: nil}, columns), - do: Enum.map(columns, fn _ -> "f" end) + defp build_bitmask(%Changes.Compensation{}, columns), do: Enum.map(columns, fn _ -> "f" end) defp build_bitmask(%Changes.UpdatedRecord{old_record: old, record: new}, columns), do: Enum.map(columns, fn col -> if old[col] != new[col], do: "t", else: "f" end) diff --git a/components/electric/lib/electric/replication/changes.ex b/components/electric/lib/electric/replication/changes.ex index 5fea923272..68efcd77ed 100644 --- a/components/electric/lib/electric/replication/changes.ex +++ b/components/electric/lib/electric/replication/changes.ex @@ -57,7 +57,7 @@ defmodule Electric.Replication.Changes do ] def count_operations(%__MODULE__{changes: changes}) do - base = %{operations: 0, inserts: 0, updates: 0, deletes: 0} + base = %{operations: 0, inserts: 0, updates: 0, deletes: 0, compensations: 0} Enum.reduce(changes, base, fn %module{}, acc -> key = @@ -65,11 +65,10 @@ defmodule Electric.Replication.Changes do Changes.NewRecord -> :inserts Changes.UpdatedRecord -> :updates Changes.DeletedRecord -> :deletes + Changes.Compensation -> :compensations end - %{^key => value, :operations => total} = acc - - %{acc | key => value + 1, :operations => total + 1} + Map.update!(%{acc | operations: acc.operations + 1}, key, &(&1 + 1)) end) end end @@ -105,6 +104,16 @@ defmodule Electric.Replication.Changes do } end + defmodule Compensation do + defstruct [:relation, :record, tags: []] + + @type t() :: %__MODULE__{ + relation: Changes.relation(), + record: Changes.record(), + tags: [Changes.tag()] + } + end + defmodule TruncatedRelation do defstruct [:relation] end diff --git a/components/electric/lib/electric/replication/postgres/slot_server.ex b/components/electric/lib/electric/replication/postgres/slot_server.ex index 0447edd733..d7f7593d72 100644 --- a/components/electric/lib/electric/replication/postgres/slot_server.ex +++ b/components/electric/lib/electric/replication/postgres/slot_server.ex @@ -458,10 +458,7 @@ defmodule Electric.Replication.Postgres.SlotServer do } end - defp changes_to_wal( - %Changes.UpdatedRecord{relation: table, old_record: nil, record: new}, - relations - ) do + defp changes_to_wal(%Changes.Compensation{relation: table, record: new}, relations) do %ReplicationMessages.Update{ relation_id: relations[table].oid, tuple_data: record_to_tuple(new, relations[table].columns) diff --git a/components/electric/lib/electric/satellite/protobuf.ex b/components/electric/lib/electric/satellite/protobuf.ex index bb109bcbba..30c9bea483 100644 --- a/components/electric/lib/electric/satellite/protobuf.ex +++ b/components/electric/lib/electric/satellite/protobuf.ex @@ -107,6 +107,7 @@ defmodule Electric.Satellite.Protobuf do SatOpInsert, SatOpUpdate, SatOpMigrate, + SatOpCompensation, SatTransOp, SatRelation, SatRelationColumn, @@ -125,15 +126,6 @@ defmodule Electric.Satellite.Protobuf do end end - defmodule Version do - defstruct major: nil, minor: nil - - @type t() :: %__MODULE__{ - major: integer, - minor: integer - } - end - @spec decode(byte(), binary()) :: {:ok, sq_pb_msg()} | {:error, any()} for {module, tag} <- @mapping do def decode(unquote(tag), binary) do diff --git a/components/electric/lib/electric/satellite/protobuf_messages.ex b/components/electric/lib/electric/satellite/protobuf_messages.ex index fd59d69189..7867f8a3db 100644 --- a/components/electric/lib/electric/satellite/protobuf_messages.ex +++ b/components/electric/lib/electric/satellite/protobuf_messages.ex @@ -1002,6 +1002,7 @@ {:insert, _field_value} -> encode_insert(acc, msg) {:delete, _field_value} -> encode_delete(acc, msg) {:migrate, _field_value} -> encode_migrate(acc, msg) + {:compensation, _field_value} -> encode_compensation(acc, msg) end end ] @@ -1060,6 +1061,16 @@ ArgumentError -> reraise Protox.EncodingError.new(:migrate, "invalid field value"), __STACKTRACE__ end + end, + defp encode_compensation(acc, msg) do + try do + {_, child_field_value} = msg.op + [acc, ":", Protox.Encode.encode_message(child_field_value)] + rescue + ArgumentError -> + reraise Protox.EncodingError.new(:compensation, "invalid field value"), + __STACKTRACE__ + end end ] @@ -1212,6 +1223,26 @@ end ], rest} + {7, _, bytes} -> + {len, bytes} = Protox.Varint.decode(bytes) + {delimited, rest} = Protox.Decode.parse_delimited(bytes, len) + + {[ + case msg.op do + {:compensation, previous_value} -> + {:op, + {:compensation, + Protox.MergeMessage.merge( + previous_value, + Electric.Satellite.SatOpCompensation.decode!(delimited) + )}} + + _ -> + {:op, + {:compensation, Electric.Satellite.SatOpCompensation.decode!(delimited)}} + end + ], rest} + {tag, wire_type, rest} -> {_, rest} = Protox.Decode.parse_unknown(tag, wire_type, rest) {[], rest} @@ -1274,7 +1305,8 @@ 3 => {:update, {:oneof, :op}, {:message, Electric.Satellite.SatOpUpdate}}, 4 => {:insert, {:oneof, :op}, {:message, Electric.Satellite.SatOpInsert}}, 5 => {:delete, {:oneof, :op}, {:message, Electric.Satellite.SatOpDelete}}, - 6 => {:migrate, {:oneof, :op}, {:message, Electric.Satellite.SatOpMigrate}} + 6 => {:migrate, {:oneof, :op}, {:message, Electric.Satellite.SatOpMigrate}}, + 7 => {:compensation, {:oneof, :op}, {:message, Electric.Satellite.SatOpCompensation}} } end @@ -1286,6 +1318,7 @@ %{ begin: {1, {:oneof, :op}, {:message, Electric.Satellite.SatOpBegin}}, commit: {2, {:oneof, :op}, {:message, Electric.Satellite.SatOpCommit}}, + compensation: {7, {:oneof, :op}, {:message, Electric.Satellite.SatOpCompensation}}, delete: {5, {:oneof, :op}, {:message, Electric.Satellite.SatOpDelete}}, insert: {4, {:oneof, :op}, {:message, Electric.Satellite.SatOpInsert}}, migrate: {6, {:oneof, :op}, {:message, Electric.Satellite.SatOpMigrate}}, @@ -1351,6 +1384,15 @@ name: :migrate, tag: 6, type: {:message, Electric.Satellite.SatOpMigrate} + }, + %{ + __struct__: Protox.Field, + json_name: "compensation", + kind: {:oneof, :op}, + label: :optional, + name: :compensation, + tag: 7, + type: {:message, Electric.Satellite.SatOpCompensation} } ] end @@ -1531,6 +1573,35 @@ [] ), + ( + def field_def(:compensation) do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "compensation", + kind: {:oneof, :op}, + label: :optional, + name: :compensation, + tag: 7, + type: {:message, Electric.Satellite.SatOpCompensation} + }} + end + + def field_def("compensation") do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "compensation", + kind: {:oneof, :op}, + label: :optional, + name: :compensation, + tag: 7, + type: {:message, Electric.Satellite.SatOpCompensation} + }} + end + + [] + ), def field_def(_) do {:error, :no_such_field} end @@ -1573,6 +1644,9 @@ def default(:migrate) do {:error, :no_default_value} end, + def default(:compensation) do + {:error, :no_default_value} + end, def default(_) do {:error, :no_such_field} end @@ -13661,6 +13735,398 @@ end ) end, + defmodule Electric.Satellite.SatOpCompensation do + @moduledoc false + defstruct relation_id: 0, pk_data: nil, tags: [] + + ( + ( + @spec encode(struct) :: {:ok, iodata} | {:error, any} + def encode(msg) do + try do + {:ok, encode!(msg)} + rescue + e in [Protox.EncodingError, Protox.RequiredFieldsError] -> {:error, e} + end + end + + @spec encode!(struct) :: iodata | no_return + def encode!(msg) do + [] |> encode_relation_id(msg) |> encode_pk_data(msg) |> encode_tags(msg) + end + ) + + [] + + [ + defp encode_relation_id(acc, msg) do + try do + if msg.relation_id == 0 do + acc + else + [acc, "\b", Protox.Encode.encode_uint32(msg.relation_id)] + end + rescue + ArgumentError -> + reraise Protox.EncodingError.new(:relation_id, "invalid field value"), + __STACKTRACE__ + end + end, + defp encode_pk_data(acc, msg) do + try do + if msg.pk_data == nil do + acc + else + [acc, "\x12", Protox.Encode.encode_message(msg.pk_data)] + end + rescue + ArgumentError -> + reraise Protox.EncodingError.new(:pk_data, "invalid field value"), __STACKTRACE__ + end + end, + defp encode_tags(acc, msg) do + try do + case msg.tags do + [] -> + acc + + values -> + [ + acc, + Enum.reduce(values, [], fn value, acc -> + [acc, "\"", Protox.Encode.encode_string(value)] + end) + ] + end + rescue + ArgumentError -> + reraise Protox.EncodingError.new(:tags, "invalid field value"), __STACKTRACE__ + end + end + ] + + [] + ) + + ( + ( + @spec decode(binary) :: {:ok, struct} | {:error, any} + def decode(bytes) do + try do + {:ok, decode!(bytes)} + rescue + e in [Protox.DecodingError, Protox.IllegalTagError, Protox.RequiredFieldsError] -> + {:error, e} + end + end + + ( + @spec decode!(binary) :: struct | no_return + def decode!(bytes) do + parse_key_value(bytes, struct(Electric.Satellite.SatOpCompensation)) + end + ) + ) + + ( + @spec parse_key_value(binary, struct) :: struct + defp parse_key_value(<<>>, msg) do + msg + end + + defp parse_key_value(bytes, msg) do + {field, rest} = + case Protox.Decode.parse_key(bytes) do + {0, _, _} -> + raise %Protox.IllegalTagError{} + + {1, _, bytes} -> + {value, rest} = Protox.Decode.parse_uint32(bytes) + {[relation_id: value], rest} + + {2, _, bytes} -> + {len, bytes} = Protox.Varint.decode(bytes) + {delimited, rest} = Protox.Decode.parse_delimited(bytes, len) + + {[ + pk_data: + Protox.MergeMessage.merge( + msg.pk_data, + Electric.Satellite.SatOpRow.decode!(delimited) + ) + ], rest} + + {4, _, bytes} -> + {len, bytes} = Protox.Varint.decode(bytes) + {delimited, rest} = Protox.Decode.parse_delimited(bytes, len) + {[tags: msg.tags ++ [delimited]], rest} + + {tag, wire_type, rest} -> + {_, rest} = Protox.Decode.parse_unknown(tag, wire_type, rest) + {[], rest} + end + + msg_updated = struct(msg, field) + parse_key_value(rest, msg_updated) + end + ) + + [] + ) + + ( + @spec json_decode(iodata(), keyword()) :: {:ok, struct()} | {:error, any()} + def json_decode(input, opts \\ []) do + try do + {:ok, json_decode!(input, opts)} + rescue + e in Protox.JsonDecodingError -> {:error, e} + end + end + + @spec json_decode!(iodata(), keyword()) :: struct() | no_return() + def json_decode!(input, opts \\ []) do + {json_library_wrapper, json_library} = Protox.JsonLibrary.get_library(opts, :decode) + + Protox.JsonDecode.decode!( + input, + Electric.Satellite.SatOpCompensation, + &json_library_wrapper.decode!(json_library, &1) + ) + end + + @spec json_encode(struct(), keyword()) :: {:ok, iodata()} | {:error, any()} + def json_encode(msg, opts \\ []) do + try do + {:ok, json_encode!(msg, opts)} + rescue + e in Protox.JsonEncodingError -> {:error, e} + end + end + + @spec json_encode!(struct(), keyword()) :: iodata() | no_return() + def json_encode!(msg, opts \\ []) do + {json_library_wrapper, json_library} = Protox.JsonLibrary.get_library(opts, :encode) + Protox.JsonEncode.encode!(msg, &json_library_wrapper.encode!(json_library, &1)) + end + ) + + ( + @deprecated "Use fields_defs()/0 instead" + @spec defs() :: %{ + required(non_neg_integer) => {atom, Protox.Types.kind(), Protox.Types.type()} + } + def defs() do + %{ + 1 => {:relation_id, {:scalar, 0}, :uint32}, + 2 => {:pk_data, {:scalar, nil}, {:message, Electric.Satellite.SatOpRow}}, + 4 => {:tags, :unpacked, :string} + } + end + + @deprecated "Use fields_defs()/0 instead" + @spec defs_by_name() :: %{ + required(atom) => {non_neg_integer, Protox.Types.kind(), Protox.Types.type()} + } + def defs_by_name() do + %{ + pk_data: {2, {:scalar, nil}, {:message, Electric.Satellite.SatOpRow}}, + relation_id: {1, {:scalar, 0}, :uint32}, + tags: {4, :unpacked, :string} + } + end + ) + + ( + @spec fields_defs() :: list(Protox.Field.t()) + def fields_defs() do + [ + %{ + __struct__: Protox.Field, + json_name: "relationId", + kind: {:scalar, 0}, + label: :optional, + name: :relation_id, + tag: 1, + type: :uint32 + }, + %{ + __struct__: Protox.Field, + json_name: "pkData", + kind: {:scalar, nil}, + label: :optional, + name: :pk_data, + tag: 2, + type: {:message, Electric.Satellite.SatOpRow} + }, + %{ + __struct__: Protox.Field, + json_name: "tags", + kind: :unpacked, + label: :repeated, + name: :tags, + tag: 4, + type: :string + } + ] + end + + [ + @spec(field_def(atom) :: {:ok, Protox.Field.t()} | {:error, :no_such_field}), + ( + def field_def(:relation_id) do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "relationId", + kind: {:scalar, 0}, + label: :optional, + name: :relation_id, + tag: 1, + type: :uint32 + }} + end + + def field_def("relationId") do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "relationId", + kind: {:scalar, 0}, + label: :optional, + name: :relation_id, + tag: 1, + type: :uint32 + }} + end + + def field_def("relation_id") do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "relationId", + kind: {:scalar, 0}, + label: :optional, + name: :relation_id, + tag: 1, + type: :uint32 + }} + end + ), + ( + def field_def(:pk_data) do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "pkData", + kind: {:scalar, nil}, + label: :optional, + name: :pk_data, + tag: 2, + type: {:message, Electric.Satellite.SatOpRow} + }} + end + + def field_def("pkData") do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "pkData", + kind: {:scalar, nil}, + label: :optional, + name: :pk_data, + tag: 2, + type: {:message, Electric.Satellite.SatOpRow} + }} + end + + def field_def("pk_data") do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "pkData", + kind: {:scalar, nil}, + label: :optional, + name: :pk_data, + tag: 2, + type: {:message, Electric.Satellite.SatOpRow} + }} + end + ), + ( + def field_def(:tags) do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "tags", + kind: :unpacked, + label: :repeated, + name: :tags, + tag: 4, + type: :string + }} + end + + def field_def("tags") do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "tags", + kind: :unpacked, + label: :repeated, + name: :tags, + tag: 4, + type: :string + }} + end + + [] + ), + def field_def(_) do + {:error, :no_such_field} + end + ] + ) + + [] + + ( + @spec required_fields() :: [] + def required_fields() do + [] + end + ) + + ( + @spec syntax() :: atom() + def syntax() do + :proto3 + end + ) + + [ + @spec(default(atom) :: {:ok, boolean | integer | String.t() | float} | {:error, atom}), + def default(:relation_id) do + {:ok, 0} + end, + def default(:pk_data) do + {:ok, nil} + end, + def default(:tags) do + {:error, :no_default_value} + end, + def default(_) do + {:error, :no_such_field} + end + ] + + ( + @spec file_options() :: nil + def file_options() do + nil + end + ) + end, defmodule Electric.Satellite.SatInStartReplicationResp do @moduledoc false defstruct err: nil diff --git a/components/electric/lib/electric/satellite/serialization.ex b/components/electric/lib/electric/satellite/serialization.ex index 2d9f6ba2f3..2cff59d607 100644 --- a/components/electric/lib/electric/satellite/serialization.ex +++ b/components/electric/lib/electric/satellite/serialization.ex @@ -7,7 +7,8 @@ defmodule Electric.Satellite.Serialization do Transaction, NewRecord, UpdatedRecord, - DeletedRecord + DeletedRecord, + Compensation } use Electric.Satellite.Protobuf @@ -393,13 +394,20 @@ defmodule Electric.Satellite.Serialization do %NewRecord{record: decode_record!(row_data, columns), tags: tags} end - defp op_to_change( - %SatOpUpdate{row_data: row_data, old_row_data: nil, tags: tags}, - columns - ) do - %UpdatedRecord{ + defp op_to_change(%SatOpCompensation{pk_data: pk_data, tags: tags}, columns) do + %Compensation{ + record: decode_record!(pk_data, columns, :allow_nulls), + tags: tags + } + end + + # TODO: Kept for compatibility with old clients that send a special update for compensation + # messages. remove once we're sure all clients have been updated. + defp op_to_change(%SatOpUpdate{row_data: row_data, old_row_data: nil, tags: tags} = op, columns) do + Logger.warning("Received old-style compensation update #{inspect(op)}") + + %Compensation{ record: decode_record!(row_data, columns, :allow_nulls), - old_record: nil, tags: tags } end diff --git a/components/electric/lib/satellite/protocol_helpers.ex b/components/electric/lib/satellite/protocol_helpers.ex index 0df89d3508..6ef74cfe97 100644 --- a/components/electric/lib/satellite/protocol_helpers.ex +++ b/components/electric/lib/satellite/protocol_helpers.ex @@ -129,6 +129,7 @@ defmodule Satellite.ProtocolHelpers do %SatOpInsert{} = op -> %SatTransOp{op: {:insert, op}} %SatOpUpdate{} = op -> %SatTransOp{op: {:update, op}} %SatOpDelete{} = op -> %SatTransOp{op: {:delete, op}} + %SatOpCompensation{} = op -> %SatTransOp{op: {:compensation, op}} end) %SatOpLog{ops: ops} diff --git a/e2e/tests/05.05_compensations_within_same_tx_are_fine.lux b/e2e/tests/05.05_compensations_within_same_tx_are_fine.lux index 95277fcb8b..5f2fc12f60 100644 --- a/e2e/tests/05.05_compensations_within_same_tx_are_fine.lux +++ b/e2e/tests/05.05_compensations_within_same_tx_are_fine.lux @@ -26,7 +26,7 @@ [invoke start_elixir_test 1] [invoke client_session 1 1] - !alias Electric.Satellite.{SatRelation, SatRelationColumn, SatOpInsert, SatOpUpdate, SatOpRow} + !alias Electric.Satellite.{SatRelation, SatRelationColumn, SatOpInsert, SatOpUpdate, SatOpCompensation, SatOpRow} """! Satellite.TestWsClient.send_data(conn, %SatRelation{ @@ -58,7 +58,7 @@ """! Satellite.TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.to_unix(DateTime.utc_now(), :millisecond), [ %SatOpInsert{relation_id: 1, row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["00000000-0000-0000-0000-000000000000", "test_content"]}}, - %SatOpUpdate{relation_id: 1, row_data: %SatOpRow{nulls_bitmask: <<0::1, 1::1, 0::6>>, values: ["00000000-0000-0000-0000-000000000000", ""]}}, + %SatOpCompensation{relation_id: 1, pk_data: %SatOpRow{nulls_bitmask: <<0::1, 1::1, 0::6>>, values: ["00000000-0000-0000-0000-000000000000", ""]}}, %SatOpInsert{relation_id: 2, row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["00000000-0000-0000-0000-000000000001", "child content", "00000000-0000-0000-0000-000000000000"]}}, ])) """ diff --git a/protocol/satellite.proto b/protocol/satellite.proto index 820ccd1ba4..0c9b50d6af 100644 --- a/protocol/satellite.proto +++ b/protocol/satellite.proto @@ -225,6 +225,7 @@ message SatTransOp { SatOpInsert insert = 4; SatOpDelete delete = 5; SatOpMigrate migrate = 6; + SatOpCompensation compensation = 7; } } @@ -283,6 +284,13 @@ message SatOpDelete { } +message SatOpCompensation { + uint32 relation_id = 1; + SatOpRow pk_data = 2; + // dependency information + repeated string tags = 4; +} + // Dependency information for row data. // ------------------------------------ //