From 25c437f9bd1861d24c8dbca0c2ad4b70e36a51f4 Mon Sep 17 00:00:00 2001 From: Stefanos Mousafeiris Date: Mon, 14 Oct 2024 17:47:56 +0300 Subject: [PATCH] feat: Add `columns` API query parameter to filter table columns (#1829) Implements https://github.com/electric-sql/electric/issues/1804 ### Overview - I've picked `columns` as the query parameter name, as `select` might confuse people and get compared to SQL `SELECT`s, and `filter` is too close to what the `where` parameter does. - The columns are comma separated column names, parsed at the validation stage - If the parameter is specified, it *has to include* the primary key columns - We could make it such that the PK is always included, but it feels like it might be confusing - explicit is better? - For tables without a primary key, we treat _all_ columns as primary keys - we might want to revisit that w.r.t. to this feature - Columns are validated also by ensuring that all specified column names match the ones in the schema - The selected columns are stored in the `Shape` definition as an array of column names - We had the choice of potentially filtering the `table_info` data based on the selection of columns instead of storing them separately, but this might cause issues with relation change handling and generally I prefer the definition of the shape to contain all information related to it (i.e. the schema at the time of creation + what column filters where applied, rather than compacting the two) - I modified `Shape` related APIs for converting changes etc to also apply the column filtering - this is all we need to ensure it works correctly for replication stream log entries - In the `Snapshotter`, I modified a `get_column_names` method to also apply the filtering if present, which takes care of the snapshot log entries. ### Other changes - I've changed `Shapes.new` to return the errors along with the field they are associated with (`:root_table` or `:where` or `:columns`) in order to return correct validation errors, since all of the stronger validation occrus at `cast_root_table` when the shape is created and PG is used but it really validates more than just `root_table`. - I've updated the client to accept a `columns` argument which is a list of strings - I've updated the OpenAPI spec to include the `columns` parameter ### Things to consider - How do we want to handle column names with special names (quoted)? In my opinion the client needs to type them exactly as they are on postgres, otherwise they get a 400 validation error back telling them which columns are invalid, so it should be fairly easy to fix. - Replication publication filtering of columns is available, but [updating it might cause errors](https://www.postgresql.org/docs/15/logical-replication-col-lists.html#LOGICAL-REPLICATION-COL-LIST-COMBINING) according to the PG docs, so I'm not sure if that's something we want to consider --- .changeset/large-days-attend.md | 6 + .../lib/electric/plug/serve_shape_plug.ex | 26 +++- .../sync-service/lib/electric/plug/utils.ex | 49 +++++++ .../lib/electric/postgres/identifiers.ex | 126 ++++++++++++++++++ .../lib/electric/replication/changes.ex | 45 +++++++ .../lib/electric/shapes/querying.ex | 21 ++- .../sync-service/lib/electric/shapes/shape.ex | 117 ++++++++++++---- .../test/electric/plug/router_test.exs | 46 +++++++ .../electric/plug/serve_shape_plug_test.exs | 37 ++++- .../test/electric/plug/utils_test.exs | 5 + .../electric/postgres/identifiers_test.exs | 5 + .../test/electric/shapes/shape_test.exs | 107 +++++++++++++-- packages/typescript-client/src/client.ts | 14 +- packages/typescript-client/src/constants.ts | 1 + .../test/integration.test.ts | 45 +++++++ website/electric-api.yaml | 20 +++ 16 files changed, 622 insertions(+), 48 deletions(-) create mode 100644 .changeset/large-days-attend.md create mode 100644 packages/sync-service/lib/electric/plug/utils.ex create mode 100644 packages/sync-service/lib/electric/postgres/identifiers.ex create mode 100644 packages/sync-service/test/electric/plug/utils_test.exs create mode 100644 packages/sync-service/test/electric/postgres/identifiers_test.exs diff --git a/.changeset/large-days-attend.md b/.changeset/large-days-attend.md new file mode 100644 index 0000000000..8bb3652a78 --- /dev/null +++ b/.changeset/large-days-attend.md @@ -0,0 +1,6 @@ +--- +"@electric-sql/client": patch +"@core/sync-service": patch +--- + +Implement `columns` query parameter for `GET v1/shapes` API to allow filtering rows for a subset of table columns. diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index e998193e63..f5bf8db90a 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -53,6 +53,7 @@ defmodule Electric.Plug.ServeShapePlug do field(:shape_id, :string) field(:live, :boolean, default: false) field(:where, :string) + field(:columns, :string) field(:shape_definition, :string) end @@ -63,6 +64,7 @@ defmodule Electric.Plug.ServeShapePlug do ) |> validate_required([:root_table, :offset]) |> cast_offset() + |> cast_columns() |> validate_shape_id_with_offset() |> validate_live_with_offset() |> cast_root_table(opts) @@ -95,6 +97,21 @@ defmodule Electric.Plug.ServeShapePlug do end end + def cast_columns(%Ecto.Changeset{valid?: false} = changeset), do: changeset + + def cast_columns(%Ecto.Changeset{} = changeset) do + case fetch_field!(changeset, :columns) do + nil -> + changeset + + columns -> + case Electric.Plug.Utils.parse_columns_param(columns) do + {:ok, parsed_cols} -> put_change(changeset, :columns, parsed_cols) + {:error, reason} -> add_error(changeset, :columns, reason) + end + end + end + def validate_shape_id_with_offset(%Ecto.Changeset{valid?: false} = changeset), do: changeset def validate_shape_id_with_offset(%Ecto.Changeset{} = changeset) do @@ -122,18 +139,19 @@ defmodule Electric.Plug.ServeShapePlug do def cast_root_table(%Ecto.Changeset{} = changeset, opts) do table = fetch_change!(changeset, :root_table) where = fetch_field!(changeset, :where) + columns = get_change(changeset, :columns, nil) - case Shapes.Shape.new(table, opts ++ [where: where]) do + case Shapes.Shape.new(table, opts ++ [where: where, columns: columns]) do {:ok, result} -> put_change(changeset, :shape_definition, result) - {:error, reasons} -> + {:error, {field, reasons}} -> Enum.reduce(List.wrap(reasons), changeset, fn {message, keys}, changeset -> - add_error(changeset, :root_table, message, keys) + add_error(changeset, field, message, keys) message, changeset when is_binary(message) -> - add_error(changeset, :root_table, message) + add_error(changeset, field, message) end) end end diff --git a/packages/sync-service/lib/electric/plug/utils.ex b/packages/sync-service/lib/electric/plug/utils.ex new file mode 100644 index 0000000000..d97ce47147 --- /dev/null +++ b/packages/sync-service/lib/electric/plug/utils.ex @@ -0,0 +1,49 @@ +defmodule Electric.Plug.Utils do + @moduledoc """ + Utility functions for Electric endpoints, e.g. for parsing and validating + path and query parameters. + """ + + @doc """ + Parse columns parameter from a string consisting of a comma separated list + of potentially quoted column names into a sorted list of strings. + + ## Examples + iex> Electric.Plug.Utils.parse_columns_param("") + {:error, "Invalid zero-length delimited identifier"} + iex> Electric.Plug.Utils.parse_columns_param("foo,") + {:error, "Invalid zero-length delimited identifier"} + iex> Electric.Plug.Utils.parse_columns_param("id") + {:ok, ["id"]} + iex> Electric.Plug.Utils.parse_columns_param("id,name") + {:ok, ["id", "name"]} + iex> Electric.Plug.Utils.parse_columns_param(~S|"PoT@To",PoTaTo|) + {:ok, ["PoT@To", "potato"]} + iex> Electric.Plug.Utils.parse_columns_param(~S|"PoTaTo,sunday",foo|) + {:ok, ["PoTaTo,sunday", "foo"]} + iex> Electric.Plug.Utils.parse_columns_param(~S|"fo""o",bar|) + {:ok, [~S|fo"o|, "bar"]} + iex> Electric.Plug.Utils.parse_columns_param(~S|"id,"name"|) + {:error, ~S|Invalid unquoted identifier contains special characters: "id|} + """ + @spec parse_columns_param(binary()) :: {:ok, [String.t(), ...]} | {:error, term()} + + def parse_columns_param(columns) when is_binary(columns) do + columns + # Split by commas that are not inside quotes + |> String.split(~r/,(?=(?:[^"]*"[^"]*")*[^"]*$)/) + |> Enum.reduce_while([], fn column, acc -> + case Electric.Postgres.Identifiers.parse(column) do + {:ok, casted_column} -> {:cont, [casted_column | acc]} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + |> then(fn result -> + case result do + # TODO: convert output to MapSet? + parsed_cols when is_list(parsed_cols) -> {:ok, Enum.reverse(parsed_cols)} + {:error, reason} -> {:error, reason} + end + end) + end +end diff --git a/packages/sync-service/lib/electric/postgres/identifiers.ex b/packages/sync-service/lib/electric/postgres/identifiers.ex new file mode 100644 index 0000000000..75c590c20d --- /dev/null +++ b/packages/sync-service/lib/electric/postgres/identifiers.ex @@ -0,0 +1,126 @@ +defmodule Electric.Postgres.Identifiers do + @namedatalen 63 + @ascii_downcase ?a - ?A + + @doc """ + Parse a PostgreSQL identifier, removing quotes if present and escaping internal ones + and downcasing the identifier otherwise. + + ## Examples + + iex> Electric.Postgres.Identifiers.parse("FooBar") + {:ok, "foobar"} + iex> Electric.Postgres.Identifiers.parse(~S|"FooBar"|) + {:ok, "FooBar"} + iex> Electric.Postgres.Identifiers.parse(~S|Foo"Bar"|) + {:error, ~S|Invalid unquoted identifier contains special characters: Foo"Bar"|} + iex> Electric.Postgres.Identifiers.parse(~S| |) + {:error, ~S|Invalid unquoted identifier contains special characters: |} + iex> Electric.Postgres.Identifiers.parse("foob@r") + {:error, ~S|Invalid unquoted identifier contains special characters: foob@r|} + iex> Electric.Postgres.Identifiers.parse(~S|"Foo"Bar"|) + {:error, ~S|Invalid identifier with unescaped quote: Foo"Bar|} + iex> Electric.Postgres.Identifiers.parse(~S|""|) + {:error, "Invalid zero-length delimited identifier"} + iex> Electric.Postgres.Identifiers.parse("") + {:error, "Invalid zero-length delimited identifier"} + iex> Electric.Postgres.Identifiers.parse(~S|" "|) + {:ok, " "} + iex> Electric.Postgres.Identifiers.parse(~S|"Foo""Bar"|) + {:ok, ~S|Foo"Bar|} + """ + @spec parse(binary(), boolean(), boolean()) :: {:ok, binary()} | {:error, term()} + def parse(ident, truncate \\ false, single_byte_encoding \\ false) when is_binary(ident) do + if String.starts_with?(ident, ~S|"|) and String.ends_with?(ident, ~S|"|) do + ident_unquoted = String.slice(ident, 1..-2//1) + parse_quoted_identifier(ident_unquoted) + else + parse_unquoted_identifier(ident, truncate, single_byte_encoding) + end + end + + defp parse_quoted_identifier(""), do: {:error, "Invalid zero-length delimited identifier"} + + defp parse_quoted_identifier(ident) do + if contains_unescaped_quote?(ident), + do: {:error, "Invalid identifier with unescaped quote: #{ident}"}, + else: {:ok, unescape_quotes(ident)} + end + + defp parse_unquoted_identifier("", _, _), do: parse_quoted_identifier("") + + defp parse_unquoted_identifier(ident, truncate, single_byte_encoding) do + unless valid_unquoted_identifier?(ident), + do: {:error, "Invalid unquoted identifier contains special characters: #{ident}"}, + else: {:ok, downcase(ident, truncate, single_byte_encoding)} + end + + defp contains_unescaped_quote?(string) do + Regex.match?(~r/(? String.replace(~r/""/, ~S|"|) + end + + defp valid_unquoted_identifier?(identifier) do + Regex.match?(~r/^[a-zA-Z_][a-zA-Z0-9_]*$/, identifier) + end + + @doc """ + Downcase the identifier and truncate if necessary, using + PostgreSQL's algorithm for downcasing. + + Setting `truncate` to `true` will truncate the identifier to 63 characters + + Setting `single_byte_encoding` to `true` will downcase the identifier + using single byte encoding + + See: + https://github.com/postgres/postgres/blob/259a0a99fe3d45dcf624788c1724d9989f3382dc/src/backend/parser/scansup.c#L46-L80 + + ## Examples + + iex> Electric.Postgres.Identifiers.downcase("FooBar") + "foobar" + iex> Electric.Postgres.Identifiers.downcase(String.duplicate("a", 100), true) + String.duplicate("a", 63) + """ + def downcase(ident, truncate \\ false, single_byte_encoding \\ false) + + def downcase(ident, false, single_byte_encoding) do + downcased_ident = + ident + |> String.to_charlist() + |> Enum.map(&downcase_char(&1, single_byte_encoding)) + |> List.to_string() + + downcased_ident + end + + def downcase(ident, true, single_byte_encoding) do + downcased_ident = downcase(ident, false, single_byte_encoding) + + truncated_ident = + if String.length(ident) >= @namedatalen do + String.slice(downcased_ident, 0, @namedatalen) + else + downcased_ident + end + + truncated_ident + end + + # Helper function to downcase a character + defp downcase_char(ch, _) when ch in ?A..?Z, do: ch + @ascii_downcase + + defp downcase_char(ch, true) when ch > 127, + do: + if(ch == Enum.at(:unicode_util.uppercase(ch), 0), + do: Enum.at(:unicode_util.lowercase(ch), 0), + else: ch + ) + + defp downcase_char(ch, _), do: ch +end diff --git a/packages/sync-service/lib/electric/replication/changes.ex b/packages/sync-service/lib/electric/replication/changes.ex index c403699126..7fef2340da 100644 --- a/packages/sync-service/lib/electric/replication/changes.ex +++ b/packages/sync-service/lib/electric/replication/changes.ex @@ -285,4 +285,49 @@ defmodule Electric.Replication.Changes do end def convert_update(%UpdatedRecord{} = change, to: :updated_record), do: change + + @doc """ + Filter the columns of a change to include only those provided in `columns_to_keep`. + + ## Examples + + iex> filter_columns(%NewRecord{record: %{"a" => "b", "c" => "d"}}, ["a"]) + %NewRecord{record: %{"a" => "b"}} + + iex> filter_columns(UpdatedRecord.new( + ...> record: %{"a" => "b", "c" => "d"}, + ...> old_record: %{"a" => "d", "c" => "f"} + ...> ), ["a"]) + UpdatedRecord.new(record: %{"a" => "b"}, old_record: %{"a" => "d"}) + + iex> filter_columns(%DeletedRecord{old_record: %{"a" => "b", "c" => "d"}}, ["c"]) + %DeletedRecord{old_record: %{"c" => "d"}} + """ + @spec filter_columns(change(), [String.t()]) :: change() + def filter_columns(%NewRecord{} = change, columns_to_keep) do + %NewRecord{ + change + | record: change.record |> Map.take(columns_to_keep) + } + end + + def filter_columns(%UpdatedRecord{} = change, columns_to_keep) do + %UpdatedRecord{ + change + | old_record: change.old_record |> Map.take(columns_to_keep), + record: change.record |> Map.take(columns_to_keep), + changed_columns: + change.changed_columns + |> MapSet.reject(fn col -> col not in columns_to_keep end) + } + end + + def filter_columns(%DeletedRecord{} = change, columns_to_keep) do + %DeletedRecord{ + change + | old_record: change.old_record |> Map.take(columns_to_keep) + } + end + + def filter_columns(change, _), do: change end diff --git a/packages/sync-service/lib/electric/shapes/querying.ex b/packages/sync-service/lib/electric/shapes/querying.ex index 2f2e894047..6a961eea3e 100644 --- a/packages/sync-service/lib/electric/shapes/querying.ex +++ b/packages/sync-service/lib/electric/shapes/querying.ex @@ -12,14 +12,14 @@ defmodule Electric.Shapes.Querying do @type json_result_stream :: Enumerable.t(json_iodata()) @spec stream_initial_data(DBConnection.t(), Shape.t()) :: json_result_stream() - def stream_initial_data(conn, %Shape{root_table: root_table, table_info: table_info} = shape) do + def stream_initial_data(conn, %Shape{root_table: root_table} = shape) do OpenTelemetry.with_span("shape_read.stream_initial_data", [], fn -> table = Utils.relation_to_sql(root_table) where = if not is_nil(shape.where), do: " WHERE " <> shape.where.query, else: "" - {json_like_select, params} = json_like_select(table_info, root_table, Shape.pk(shape)) + {json_like_select, params} = json_like_select(shape) query = Postgrex.prepare!(conn, table, ~s|SELECT #{json_like_select} FROM #{table} #{where}|) @@ -30,8 +30,15 @@ defmodule Electric.Shapes.Querying do end) end - defp json_like_select(table_info, root_table, pk_cols) do - columns = get_column_names(table_info, root_table) + defp json_like_select( + %Shape{ + table_info: table_info, + root_table: root_table, + selected_columns: selected_columns + } = shape + ) do + pk_cols = Shape.pk(shape) + columns = get_column_names(table_info, root_table, selected_columns) key_part = build_key_part(root_table, pk_cols) value_part = build_value_part(columns) @@ -57,13 +64,17 @@ defmodule Electric.Shapes.Querying do {query, []} end - defp get_column_names(table_info, root_table) do + defp get_column_names(table_info, root_table, nil) do table_info |> Map.fetch!(root_table) |> Map.fetch!(:columns) |> Enum.map(& &1.name) end + defp get_column_names(_table_info, _root_table, selected_columns) do + selected_columns + end + defp build_headers_part(root_table) do ~s['"headers":{"operation":"insert","relation":#{build_relation_header(root_table)}}'] end diff --git a/packages/sync-service/lib/electric/shapes/shape.ex b/packages/sync-service/lib/electric/shapes/shape.ex index 0d31bf6d81..f79cf27dbe 100644 --- a/packages/sync-service/lib/electric/shapes/shape.ex +++ b/packages/sync-service/lib/electric/shapes/shape.ex @@ -9,7 +9,7 @@ defmodule Electric.Shapes.Shape do alias Electric.Replication.Changes @enforce_keys [:root_table, :root_table_id] - defstruct [:root_table, :root_table_id, :table_info, :where] + defstruct [:root_table, :root_table_id, :table_info, :where, :selected_columns] @type table_info() :: %{ columns: [Inspector.column_info(), ...], @@ -21,7 +21,8 @@ defmodule Electric.Shapes.Shape do table_info: %{ Electric.relation() => table_info() }, - where: Electric.Replication.Eval.Expr.t() | nil + where: Electric.Replication.Eval.Expr.t() | nil, + selected_columns: [String.t(), ...] | nil } @type table_with_where_clause() :: {Electric.relation(), String.t() | nil} @@ -33,6 +34,7 @@ defmodule Electric.Shapes.Shape do root_table: json_relation(), root_table_id: non_neg_integer(), where: String.t(), + selected_columns: [String.t(), ...] | nil, table_info: [json_table_list(), ...] } @@ -46,8 +48,8 @@ defmodule Electric.Shapes.Shape do def new!(table, opts \\ []) do case new(table, opts) do {:ok, shape} -> shape - {:error, [message | _]} -> raise message - {:error, message} when is_binary(message) -> raise message + {:error, {_field, [message | _]}} -> raise message + {:error, {_field, message}} when is_binary(message) -> raise message end end @@ -57,6 +59,7 @@ defmodule Electric.Shapes.Shape do @shape_schema NimbleOptions.new!( where: [type: {:or, [:string, nil]}], + columns: [type: {:or, [{:list, :string}, nil]}], inspector: [ type: :mod_arg, default: {Electric.Postgres.Inspector, Electric.DbPool} @@ -68,6 +71,8 @@ defmodule Electric.Shapes.Shape do with inspector <- Access.fetch!(opts, :inspector), {:ok, %{relation: table, relation_id: relation_id}} <- validate_table(table, inspector), {:ok, column_info, pk_cols} <- load_column_info(table, inspector), + {:ok, selected_columns} <- + validate_selected_columns(column_info, pk_cols, Access.get(opts, :columns)), refs = Inspector.columns_to_expr(column_info), {:ok, where} <- maybe_parse_where_clause(Access.get(opts, :where), refs) do {:ok, @@ -75,20 +80,59 @@ defmodule Electric.Shapes.Shape do root_table: table, root_table_id: relation_id, table_info: %{table => %{pk: pk_cols, columns: column_info}}, - where: where + where: where, + selected_columns: selected_columns }} end end defp maybe_parse_where_clause(nil, _), do: {:ok, nil} - defp maybe_parse_where_clause(where, info), - do: Parser.parse_and_validate_expression(where, info) + defp maybe_parse_where_clause(where, info) do + case Parser.parse_and_validate_expression(where, info) do + {:ok, expr} -> {:ok, expr} + {:error, reason} -> {:error, {:where, reason}} + end + end + + @spec validate_selected_columns( + [Inspector.column_info()], + [String.t()], + [String.t(), ...] | nil + ) :: + {:ok, [String.t(), ...] | nil} | {:error, {:columns, [String.t()]}} + defp validate_selected_columns(_column_info, _pk_cols, nil) do + {:ok, nil} + end + + defp validate_selected_columns(column_info, pk_cols, columns_to_select) do + missing_pk_cols = pk_cols -- columns_to_select + invalid_cols = columns_to_select -- Enum.map(column_info, & &1.name) + + cond do + missing_pk_cols != [] -> + {:error, + {:columns, + [ + "Must include all primary key columns, missing: #{missing_pk_cols |> Enum.join(", ")}" + ]}} + + invalid_cols != [] -> + {:error, + {:columns, + [ + "The following columns could not be found: #{invalid_cols |> Enum.join(", ")}" + ]}} + + true -> + {:ok, Enum.sort(columns_to_select)} + end + end defp load_column_info(table, inspector) do case Inspector.load_column_info(table, inspector) do :table_not_found -> - {:error, ["table not found"]} + {:error, {:root_table, ["table not found"]}} {:ok, column_info} -> # %{["column_name"] => :type} @@ -106,12 +150,13 @@ defmodule Electric.Shapes.Shape do case Regex.run(~r/.+ relation "(?.+)" does not exist/, err, capture: :all_names) do [table_name] -> {:error, - [ - ~s|Table "#{table_name}" does not exist. If the table name contains capitals or special characters you must quote it.| - ]} + {:root_table, + [ + ~s|Table "#{table_name}" does not exist. If the table name contains capitals or special characters you must quote it.| + ]}} _ -> - {:error, [err]} + {:error, {:root_table, [err]}} end {:ok, rel} -> @@ -142,31 +187,53 @@ defmodule Electric.Shapes.Shape do when table != relation, do: [] - def convert_change(%__MODULE__{where: nil}, change), do: [change] - def convert_change(%__MODULE__{where: _}, %Changes.TruncatedRelation{} = change), do: [change] + def convert_change(%__MODULE__{where: nil, selected_columns: nil}, change), do: [change] - def convert_change(%__MODULE__{where: where}, change) + def convert_change(%__MODULE__{}, %Changes.TruncatedRelation{} = change), do: [change] + + def convert_change(%__MODULE__{where: where, selected_columns: selected_columns}, change) when is_struct(change, Changes.NewRecord) when is_struct(change, Changes.DeletedRecord) do record = if is_struct(change, Changes.NewRecord), do: change.record, else: change.old_record - if record_in_shape?(where, record), do: [change], else: [] + + if record_in_shape?(where, record), + do: [filter_change_columns(selected_columns, change)], + else: [] end def convert_change( - %__MODULE__{where: where}, + %__MODULE__{where: where, selected_columns: selected_columns}, %Changes.UpdatedRecord{old_record: old_record, record: record} = change ) do old_record_in_shape = record_in_shape?(where, old_record) new_record_in_shape = record_in_shape?(where, record) - case {old_record_in_shape, new_record_in_shape} do - {true, true} -> [change] - {true, false} -> [Changes.convert_update(change, to: :deleted_record)] - {false, true} -> [Changes.convert_update(change, to: :new_record)] - {false, false} -> [] - end + converted_changes = + case {old_record_in_shape, new_record_in_shape} do + {true, true} -> [change] + {true, false} -> [Changes.convert_update(change, to: :deleted_record)] + {false, true} -> [Changes.convert_update(change, to: :new_record)] + {false, false} -> [] + end + + converted_changes + |> Enum.map(&filter_change_columns(selected_columns, &1)) + |> Enum.filter(&filtered_columns_changed/1) end + defp filter_change_columns(nil, change), do: change + + defp filter_change_columns(selected_columns, change) do + Changes.filter_columns(change, selected_columns) + end + + defp filtered_columns_changed(%Changes.UpdatedRecord{old_record: record, record: record}), + do: false + + defp filtered_columns_changed(_), do: true + + defp record_in_shape?(nil, _record), do: true + defp record_in_shape?(where, record) do with {:ok, refs} <- Runner.record_to_ref_values(where.used_refs, record), {:ok, evaluated} <- Runner.execute(where, refs) do @@ -224,6 +291,7 @@ defmodule Electric.Shapes.Shape do root_table: {schema, name}, root_table_id: root_table_id, where: where, + selected_columns: selected_columns, table_info: table_info } = shape @@ -237,6 +305,7 @@ defmodule Electric.Shapes.Shape do root_table: [schema, name], root_table_id: root_table_id, where: query, + selected_columns: selected_columns, table_info: if(table_info, do: @@ -265,6 +334,7 @@ defmodule Electric.Shapes.Shape do "root_table" => [schema, name], "root_table_id" => root_table_id, "where" => where, + "selected_columns" => selected_columns, "table_info" => info } = map @@ -286,6 +356,7 @@ defmodule Electric.Shapes.Shape do root_table: {schema, name}, root_table_id: root_table_id, where: where, + selected_columns: selected_columns, table_info: table_info } end diff --git a/packages/sync-service/test/electric/plug/router_test.exs b/packages/sync-service/test/electric/plug/router_test.exs index ef8fd91000..4a68c8ef38 100644 --- a/packages/sync-service/test/electric/plug/router_test.exs +++ b/packages/sync-service/test/electric/plug/router_test.exs @@ -389,6 +389,52 @@ defmodule Electric.Plug.RouterTest do assert key3 != key end + @tag with_sql: [ + "CREATE TABLE wide_table (id BIGINT PRIMARY KEY, value1 TEXT NOT NULL, value2 TEXT NOT NULL, value3 TEXT NOT NULL)", + "INSERT INTO wide_table VALUES (1, 'test value 1', 'test value 1', 'test value 1')" + ] + test "GET receives only specified columns out of wide table", %{opts: opts, db_conn: db_conn} do + conn = conn("GET", "/v1/shape/wide_table?offset=-1&columns=id,value1") |> Router.call(opts) + assert %{status: 200} = conn + shape_id = get_resp_shape_id(conn) + + assert [ + %{ + "value" => %{"id" => "1", "value1" => "test value 1"}, + "key" => key, + "offset" => next_offset + }, + @up_to_date + ] = Jason.decode!(conn.resp_body) + + test_pid = self() + + task = + Task.async(fn -> + conn( + "GET", + "/v1/shape/wide_table?offset=#{next_offset}&columns=id,value1&shape_id=#{shape_id}&live" + ) + |> Router.call(opts) + |> then(fn conn -> + send(test_pid, :got_response) + conn + end) + end) + + # Ensure updates to not-selected columns do not trigger responses + Postgrex.query!(db_conn, "UPDATE wide_table SET value2 = 'test value 2' WHERE id = 1", []) + refute_receive :got_response, 1000 + + Postgrex.query!(db_conn, "UPDATE wide_table SET value1 = 'test value 3' WHERE id = 1", []) + + assert_receive :got_response + assert %{status: 200} = conn = Task.await(task) + + value = %{"id" => "1", "value1" => "test value 3"} + assert [%{"key" => ^key, "value" => ^value}, _] = Jason.decode!(conn.resp_body) + end + test "GET works when there are changes not related to the shape in the same txn", %{ opts: opts, db_conn: db_conn diff --git a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs index 177c37ab4e..aecba35c15 100644 --- a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs +++ b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs @@ -23,7 +23,8 @@ defmodule Electric.Plug.ServeShapePlugTest do table_info: %{ {"public", "users"} => %{ columns: [ - %{name: "id", type: "int8", type_id: {20, 1}, pk_position: 0, array_dimensions: 0} + %{name: "id", type: "int8", type_id: {20, 1}, pk_position: 0, array_dimensions: 0}, + %{name: "value", type: "text", type_id: {28, 1}, pk_position: nil, array_dimensions: 0} ], pk: ["id"] } @@ -266,7 +267,7 @@ defmodule Electric.Plug.ServeShapePlugTest do |> ServeShapePlug.call([]) assert Plug.Conn.get_resp_header(conn, "electric-schema") == [ - ~s|{"id":{"type":"int8","pk_index":0}}| + ~s|{"id":{"type":"int8","pk_index":0},"value":{"type":"text"}}| ] end @@ -577,6 +578,38 @@ defmodule Electric.Plug.ServeShapePlugTest do assert conn.status == 400 assert Jason.decode!(conn.resp_body) == [%{"headers" => %{"control" => "must-refetch"}}] end + + test "sends 400 when omitting primary key columns in selection" do + conn = + conn( + :get, + %{"root_table" => "public.users", "columns" => "value"}, + "?offset=-1" + ) + |> ServeShapePlug.call([]) + + assert conn.status == 400 + + assert Jason.decode!(conn.resp_body) == %{ + "columns" => ["Must include all primary key columns, missing: id"] + } + end + + test "sends 400 when selecting invalid columns" do + conn = + conn( + :get, + %{"root_table" => "public.users", "columns" => "id,invalid"}, + "?offset=-1" + ) + |> ServeShapePlug.call([]) + + assert conn.status == 400 + + assert Jason.decode!(conn.resp_body) == %{ + "columns" => ["The following columns could not be found: invalid"] + } + end end defp put_in_config(%Plug.Conn{assigns: assigns} = conn, key, value), diff --git a/packages/sync-service/test/electric/plug/utils_test.exs b/packages/sync-service/test/electric/plug/utils_test.exs new file mode 100644 index 0000000000..e79d347485 --- /dev/null +++ b/packages/sync-service/test/electric/plug/utils_test.exs @@ -0,0 +1,5 @@ +defmodule Electric.Plug.UtilsTest do + alias Electric.Plug.Utils + use ExUnit.Case, async: true + doctest Utils, import: true +end diff --git a/packages/sync-service/test/electric/postgres/identifiers_test.exs b/packages/sync-service/test/electric/postgres/identifiers_test.exs new file mode 100644 index 0000000000..d68edfe518 --- /dev/null +++ b/packages/sync-service/test/electric/postgres/identifiers_test.exs @@ -0,0 +1,5 @@ +defmodule Electric.Postgres.IdentifiersTest do + alias Electric.Postgres.Identifiers + use ExUnit.Case, async: true + doctest Identifiers, import: true +end diff --git a/packages/sync-service/test/electric/shapes/shape_test.exs b/packages/sync-service/test/electric/shapes/shape_test.exs index 9617368718..95ea413853 100644 --- a/packages/sync-service/test/electric/shapes/shape_test.exs +++ b/packages/sync-service/test/electric/shapes/shape_test.exs @@ -1,7 +1,7 @@ defmodule Electric.Shapes.ShapeTest do use ExUnit.Case, async: true - alias Electric.Replication.Changes.NewRecord + alias Electric.Replication.Changes.{NewRecord, DeletedRecord, UpdatedRecord} alias Electric.Replication.Eval.Parser alias Electric.Replication.Changes alias Electric.Shapes.Shape @@ -64,12 +64,12 @@ defmodule Electric.Shapes.ShapeTest do test "lets DELETEs through only if the row matches the where filter" do shape = %Shape{root_table: {"public", "table"}, root_table_id: @relation_id, where: @where} - matching_delete = %Changes.DeletedRecord{ + matching_delete = %DeletedRecord{ relation: {"public", "table"}, old_record: %{"id" => 1, "value" => "matches filter"} } - non_matching_delete = %Changes.DeletedRecord{ + non_matching_delete = %DeletedRecord{ relation: {"public", "table"}, old_record: %{"id" => 2, "value" => "doesn't match filter"} } @@ -81,7 +81,7 @@ defmodule Electric.Shapes.ShapeTest do test "lets UPDATEs through as-is only if both old and new versions match the where filter" do shape = %Shape{root_table: {"public", "table"}, root_table_id: @relation_id, where: @where} - matching_update = %Changes.UpdatedRecord{ + matching_update = %UpdatedRecord{ relation: {"public", "table"}, old_record: %{"id" => 1, "value" => "old matches"}, record: %{"id" => 1, "value" => "new matches"} @@ -93,7 +93,7 @@ defmodule Electric.Shapes.ShapeTest do test "converts UPDATE to INSERT if only new version matches the where filter" do shape = %Shape{root_table: {"public", "table"}, root_table_id: @relation_id, where: @where} - update_to_insert = %Changes.UpdatedRecord{ + update_to_insert = %UpdatedRecord{ relation: {"public", "table"}, old_record: %{"id" => 1, "value" => "old doesn't match"}, record: %{"id" => 1, "value" => "new matches"} @@ -106,7 +106,7 @@ defmodule Electric.Shapes.ShapeTest do test "converts UPDATE to DELETE if only old version matches the where filter" do shape = %Shape{root_table: {"public", "table"}, root_table_id: @relation_id, where: @where} - update_to_delete = %Changes.UpdatedRecord{ + update_to_delete = %UpdatedRecord{ relation: {"public", "table"}, old_record: %{"id" => 1, "value" => "old matches"}, record: %{"id" => 1, "value" => "new doesn't match"} @@ -119,7 +119,7 @@ defmodule Electric.Shapes.ShapeTest do test "doesn't let the update through if no version of the row matches the where filter" do shape = %Shape{root_table: {"public", "table"}, root_table_id: @relation_id, where: @where} - non_matching_update = %Changes.UpdatedRecord{ + non_matching_update = %UpdatedRecord{ relation: {"public", "table"}, old_record: %{"id" => 1, "value" => "old doesn't match"}, record: %{"id" => 1, "value" => "new doesn't match either"} @@ -127,6 +127,62 @@ defmodule Electric.Shapes.ShapeTest do assert Shape.convert_change(shape, non_matching_update) == [] end + + test "filters INSERTs to allow only selected columns" do + shape = %Shape{ + root_table: {"public", "table"}, + root_table_id: @relation_id, + selected_columns: ["id", "value"] + } + + insert = %NewRecord{ + relation: {"public", "table"}, + record: %{"id" => 1, "value" => "foo", "other_value" => "bar"} + } + + assert Shape.convert_change(shape, insert) == [ + %NewRecord{ + relation: {"public", "table"}, + record: %{"id" => 1, "value" => "foo"} + } + ] + end + + test "filters DELETEs to allow only selected columns" do + shape = %Shape{ + root_table: {"public", "table"}, + root_table_id: @relation_id, + selected_columns: ["id", "value"] + } + + delete = %DeletedRecord{ + relation: {"public", "table"}, + old_record: %{"id" => 1, "value" => "foo", "other_value" => "bar"} + } + + assert Shape.convert_change(shape, delete) == [ + %DeletedRecord{ + relation: {"public", "table"}, + old_record: %{"id" => 1, "value" => "foo"} + } + ] + end + + test "doesn't let the update through if filtered columns have not changed" do + shape = %Shape{ + root_table: {"public", "table"}, + root_table_id: @relation_id, + selected_columns: ["id", "value"] + } + + non_matching_update = %UpdatedRecord{ + relation: {"public", "table"}, + old_record: %{"id" => 1, "value" => "same", "other_value" => "old"}, + record: %{"id" => 1, "value" => "same", "other_value" => "new"} + } + + assert Shape.convert_change(shape, non_matching_update) == [] + end end describe "new/2" do @@ -205,15 +261,18 @@ defmodule Electric.Shapes.ShapeTest do end test "errors on empty table name", %{inspector: inspector} do - {:error, ["ERROR 42602 (invalid_name) invalid name syntax"]} = + {:error, {:root_table, ["ERROR 42602 (invalid_name) invalid name syntax"]}} = Shape.new("", inspector: inspector) end test "errors when the table doesn't exist", %{inspector: inspector} do {:error, - [ - ~S|Table "nonexistent" does not exist. If the table name contains capitals or special characters you must quote it.| - ]} = + { + :root_table, + [ + ~S|Table "nonexistent" does not exist. If the table name contains capitals or special characters you must quote it.| + ] + }} = Shape.new("nonexistent", inspector: inspector) end @@ -229,9 +288,33 @@ defmodule Electric.Shapes.ShapeTest do "CREATE TABLE IF NOT EXISTS other_table (value TEXT PRIMARY KEY)" ] test "validates a where clause based on inspected columns", %{inspector: inspector} do - assert {:error, "At location 6" <> _} = + assert {:error, {:where, "At location 6" <> _}} = Shape.new("other_table", inspector: inspector, where: "value + 1 > 10") end + + @tag with_sql: [ + "CREATE TABLE IF NOT EXISTS col_table (id INT PRIMARY KEY, value1 TEXT, value2 TEXT)" + ] + test "builds a shape with selected columns", %{inspector: inspector} do + assert {:ok, %Shape{selected_columns: ["id", "value2"]}} = + Shape.new("col_table", inspector: inspector, columns: ["id", "value2"]) + end + + @tag with_sql: [ + "CREATE TABLE IF NOT EXISTS col_table (id INT PRIMARY KEY, value1 TEXT, value2 TEXT)" + ] + test "validates selected columns for invalid columns", %{inspector: inspector} do + assert {:error, {:columns, ["The following columns could not be found: invalid"]}} = + Shape.new("col_table", inspector: inspector, columns: ["id", "invalid"]) + end + + @tag with_sql: [ + "CREATE TABLE IF NOT EXISTS col_table (id INT PRIMARY KEY, value1 TEXT, value2 TEXT)" + ] + test "validates selected columns for missing PK columns", %{inspector: inspector} do + assert {:error, {:columns, ["Must include all primary key columns, missing: id"]}} = + Shape.new("col_table", inspector: inspector, columns: ["value1"]) + end end describe "new!/2" do diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 7a1a4e50da..1434d07346 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -19,6 +19,7 @@ import { CHUNK_LAST_OFFSET_HEADER, LIVE_CACHE_BUSTER_HEADER, LIVE_CACHE_BUSTER_QUERY_PARAM, + COLUMNS_QUERY_PARAM, LIVE_QUERY_PARAM, OFFSET_QUERY_PARAM, SHAPE_ID_HEADER, @@ -37,9 +38,16 @@ export interface ShapeStreamOptions { */ url: string /** - * where clauses for the shape. + * The where clauses for the shape. */ where?: string + + /** + * The columns to include in the shape. + * Must include primary keys, and can only inlude valid columns. + */ + columns?: string[] + /** * The "offset" on the shape log. This is typically not set as the ShapeStream * will handle this automatically. A common scenario where you might pass an offset @@ -188,7 +196,7 @@ export class ShapeStream = Row> async start() { this.#isUpToDate = false - const { url, where, signal } = this.options + const { url, where, columns, signal } = this.options try { while ( @@ -197,6 +205,8 @@ export class ShapeStream = Row> ) { const fetchUrl = new URL(url) if (where) fetchUrl.searchParams.set(WHERE_QUERY_PARAM, where) + if (columns && columns.length > 0) + fetchUrl.searchParams.set(COLUMNS_QUERY_PARAM, columns.join(`,`)) fetchUrl.searchParams.set(OFFSET_QUERY_PARAM, this.#lastOffset) if (this.#isUpToDate) { diff --git a/packages/typescript-client/src/constants.ts b/packages/typescript-client/src/constants.ts index bfa2676ee0..c2ba435eab 100644 --- a/packages/typescript-client/src/constants.ts +++ b/packages/typescript-client/src/constants.ts @@ -7,4 +7,5 @@ export const SHAPE_SCHEMA_HEADER = `electric-schema` export const SHAPE_ID_QUERY_PARAM = `shape_id` export const OFFSET_QUERY_PARAM = `offset` export const WHERE_QUERY_PARAM = `where` +export const COLUMNS_QUERY_PARAM = `columns` export const LIVE_QUERY_PARAM = `live` diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index c61e242d11..c847ba7901 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -662,6 +662,51 @@ describe(`HTTP Sync`, () => { ) }) + mit( + `should correctly select columns for initial sync and updates`, + async ({ dbClient, aborter, tableSql, tableUrl }) => { + await dbClient.query( + `INSERT INTO ${tableSql} (txt, i2, i4, i8) VALUES ($1, $2, $3, $4)`, + [`test1`, 1, 10, 100] + ) + + // Get initial data + const shapeData = new Map() + const issueStream = new ShapeStream({ + url: `${BASE_URL}/v1/shape/${tableUrl}`, + columns: [`txt`, `i2`, `i4`], + signal: aborter.signal, + }) + await h.forEachMessage(issueStream, aborter, async (res, msg, nth) => { + if (!isChangeMessage(msg)) return + shapeData.set(msg.key, msg.value) + console.log(msg) + + if (nth === 0) { + expect(msg.value).toStrictEqual({ + txt: `test1`, + i2: 1, + i4: 10, + }) + await dbClient.query( + `UPDATE ${tableSql} SET txt = $1, i4 = $2, i8 = $3 WHERE i2 = $4`, + [`test2`, 20, 200, 1] + ) + } else if (nth === 1) { + res() + } + }) + + expect([...shapeData.values()]).toStrictEqual([ + { + txt: `test2`, + i2: 1, + i4: 20, + }, + ]) + } + ) + it(`should chunk a large log with reasonably sized chunks`, async ({ insertIssues, issuesTableUrl, diff --git a/website/electric-api.yaml b/website/electric-api.yaml index 35a8a585b1..755bed46e0 100644 --- a/website/electric-api.yaml +++ b/website/electric-api.yaml @@ -120,6 +120,26 @@ paths: status_filter: value: '"status IN (''backlog'', ''todo'')"' summary: Only include rows whose status is either 'backlog' or 'todo'. + + - name: columns + in: query + schema: + type: string + description: |- + Optional list of columns to include in the rows from the `root_table`. + + They should always include the primary key columns, and should be formed + as a comma separated list of column names exactly as they are in the database schema. + + If the identifier was defined as case sensitive and/or with special characters, then\ + you must quote it in the `columns` parameter as well. + examples: + select_columns: + value: "id,title,status" + summary: Only include the id, title, and status columns. + select_columns_special: + value: 'id,"Status-Check"' + summary: Only include id and Status-Check columns, quoting the identifiers where necessary. # Headers - name: If-None-Match in: header