Skip to content

Commit

Permalink
feat: Add columns API query parameter to filter table columns (#1829)
Browse files Browse the repository at this point in the history
Implements #1804


### Overview
- I've picked `columns` as the query parameter name, as `select` might
confuse people and get compared to SQL `SELECT`s, and `filter` is too
close to what the `where` parameter does.
- The columns are comma separated column names, parsed at the validation
stage
- If the parameter is specified, it *has to include* the primary key
columns
- We could make it such that the PK is always included, but it feels
like it might be confusing - explicit is better?
- For tables without a primary key, we treat _all_ columns as primary
keys - we might want to revisit that w.r.t. to this feature
- Columns are validated also by ensuring that all specified column names
match the ones in the schema
- The selected columns are stored in the `Shape` definition as an array
of column names
- We had the choice of potentially filtering the `table_info` data based
on the selection of columns instead of storing them separately, but this
might cause issues with relation change handling and generally I prefer
the definition of the shape to contain all information related to it
(i.e. the schema at the time of creation + what column filters where
applied, rather than compacting the two)
- I modified `Shape` related APIs for converting changes etc to also
apply the column filtering - this is all we need to ensure it works
correctly for replication stream log entries
- In the `Snapshotter`, I modified a `get_column_names` method to also
apply the filtering if present, which takes care of the snapshot log
entries.

### Other changes
- I've changed `Shapes.new` to return the errors along with the field
they are associated with (`:root_table` or `:where` or `:columns`) in
order to return correct validation errors, since all of the stronger
validation occrus at `cast_root_table` when the shape is created and PG
is used but it really validates more than just `root_table`.
- I've updated the client to accept a `columns` argument which is a list
of strings
- I've updated the OpenAPI spec to include the `columns` parameter

### Things to consider
- How do we want to handle column names with special names (quoted)? In
my opinion the client needs to type them exactly as they are on
postgres, otherwise they get a 400 validation error back telling them
which columns are invalid, so it should be fairly easy to fix.
- Replication publication filtering of columns is available, but
[updating it might cause
errors](https://www.postgresql.org/docs/15/logical-replication-col-lists.html#LOGICAL-REPLICATION-COL-LIST-COMBINING)
according to the PG docs, so I'm not sure if that's something we want to
consider
  • Loading branch information
msfstef authored Oct 14, 2024
1 parent f68f2d9 commit 25c437f
Show file tree
Hide file tree
Showing 16 changed files with 622 additions and 48 deletions.
6 changes: 6 additions & 0 deletions .changeset/large-days-attend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@electric-sql/client": patch
"@core/sync-service": patch
---

Implement `columns` query parameter for `GET v1/shapes` API to allow filtering rows for a subset of table columns.
26 changes: 22 additions & 4 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ defmodule Electric.Plug.ServeShapePlug do
field(:shape_id, :string)
field(:live, :boolean, default: false)
field(:where, :string)
field(:columns, :string)
field(:shape_definition, :string)
end

Expand All @@ -63,6 +64,7 @@ defmodule Electric.Plug.ServeShapePlug do
)
|> validate_required([:root_table, :offset])
|> cast_offset()
|> cast_columns()
|> validate_shape_id_with_offset()
|> validate_live_with_offset()
|> cast_root_table(opts)
Expand Down Expand Up @@ -95,6 +97,21 @@ defmodule Electric.Plug.ServeShapePlug do
end
end

def cast_columns(%Ecto.Changeset{valid?: false} = changeset), do: changeset

def cast_columns(%Ecto.Changeset{} = changeset) do
case fetch_field!(changeset, :columns) do
nil ->
changeset

columns ->
case Electric.Plug.Utils.parse_columns_param(columns) do
{:ok, parsed_cols} -> put_change(changeset, :columns, parsed_cols)
{:error, reason} -> add_error(changeset, :columns, reason)
end
end
end

def validate_shape_id_with_offset(%Ecto.Changeset{valid?: false} = changeset), do: changeset

def validate_shape_id_with_offset(%Ecto.Changeset{} = changeset) do
Expand Down Expand Up @@ -122,18 +139,19 @@ defmodule Electric.Plug.ServeShapePlug do
def cast_root_table(%Ecto.Changeset{} = changeset, opts) do
table = fetch_change!(changeset, :root_table)
where = fetch_field!(changeset, :where)
columns = get_change(changeset, :columns, nil)

case Shapes.Shape.new(table, opts ++ [where: where]) do
case Shapes.Shape.new(table, opts ++ [where: where, columns: columns]) do
{:ok, result} ->
put_change(changeset, :shape_definition, result)

{:error, reasons} ->
{:error, {field, reasons}} ->
Enum.reduce(List.wrap(reasons), changeset, fn
{message, keys}, changeset ->
add_error(changeset, :root_table, message, keys)
add_error(changeset, field, message, keys)

message, changeset when is_binary(message) ->
add_error(changeset, :root_table, message)
add_error(changeset, field, message)
end)
end
end
Expand Down
49 changes: 49 additions & 0 deletions packages/sync-service/lib/electric/plug/utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
defmodule Electric.Plug.Utils do
@moduledoc """
Utility functions for Electric endpoints, e.g. for parsing and validating
path and query parameters.
"""

@doc """
Parse columns parameter from a string consisting of a comma separated list
of potentially quoted column names into a sorted list of strings.
## Examples
iex> Electric.Plug.Utils.parse_columns_param("")
{:error, "Invalid zero-length delimited identifier"}
iex> Electric.Plug.Utils.parse_columns_param("foo,")
{:error, "Invalid zero-length delimited identifier"}
iex> Electric.Plug.Utils.parse_columns_param("id")
{:ok, ["id"]}
iex> Electric.Plug.Utils.parse_columns_param("id,name")
{:ok, ["id", "name"]}
iex> Electric.Plug.Utils.parse_columns_param(~S|"PoT@To",PoTaTo|)
{:ok, ["PoT@To", "potato"]}
iex> Electric.Plug.Utils.parse_columns_param(~S|"PoTaTo,sunday",foo|)
{:ok, ["PoTaTo,sunday", "foo"]}
iex> Electric.Plug.Utils.parse_columns_param(~S|"fo""o",bar|)
{:ok, [~S|fo"o|, "bar"]}
iex> Electric.Plug.Utils.parse_columns_param(~S|"id,"name"|)
{:error, ~S|Invalid unquoted identifier contains special characters: "id|}
"""
@spec parse_columns_param(binary()) :: {:ok, [String.t(), ...]} | {:error, term()}

def parse_columns_param(columns) when is_binary(columns) do
columns
# Split by commas that are not inside quotes
|> String.split(~r/,(?=(?:[^"]*"[^"]*")*[^"]*$)/)
|> Enum.reduce_while([], fn column, acc ->
case Electric.Postgres.Identifiers.parse(column) do
{:ok, casted_column} -> {:cont, [casted_column | acc]}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
|> then(fn result ->
case result do
# TODO: convert output to MapSet?
parsed_cols when is_list(parsed_cols) -> {:ok, Enum.reverse(parsed_cols)}
{:error, reason} -> {:error, reason}
end
end)
end
end
126 changes: 126 additions & 0 deletions packages/sync-service/lib/electric/postgres/identifiers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
defmodule Electric.Postgres.Identifiers do
@namedatalen 63
@ascii_downcase ?a - ?A

@doc """
Parse a PostgreSQL identifier, removing quotes if present and escaping internal ones
and downcasing the identifier otherwise.
## Examples
iex> Electric.Postgres.Identifiers.parse("FooBar")
{:ok, "foobar"}
iex> Electric.Postgres.Identifiers.parse(~S|"FooBar"|)
{:ok, "FooBar"}
iex> Electric.Postgres.Identifiers.parse(~S|Foo"Bar"|)
{:error, ~S|Invalid unquoted identifier contains special characters: Foo"Bar"|}
iex> Electric.Postgres.Identifiers.parse(~S| |)
{:error, ~S|Invalid unquoted identifier contains special characters: |}
iex> Electric.Postgres.Identifiers.parse("foob@r")
{:error, ~S|Invalid unquoted identifier contains special characters: foob@r|}
iex> Electric.Postgres.Identifiers.parse(~S|"Foo"Bar"|)
{:error, ~S|Invalid identifier with unescaped quote: Foo"Bar|}
iex> Electric.Postgres.Identifiers.parse(~S|""|)
{:error, "Invalid zero-length delimited identifier"}
iex> Electric.Postgres.Identifiers.parse("")
{:error, "Invalid zero-length delimited identifier"}
iex> Electric.Postgres.Identifiers.parse(~S|" "|)
{:ok, " "}
iex> Electric.Postgres.Identifiers.parse(~S|"Foo""Bar"|)
{:ok, ~S|Foo"Bar|}
"""
@spec parse(binary(), boolean(), boolean()) :: {:ok, binary()} | {:error, term()}
def parse(ident, truncate \\ false, single_byte_encoding \\ false) when is_binary(ident) do
if String.starts_with?(ident, ~S|"|) and String.ends_with?(ident, ~S|"|) do
ident_unquoted = String.slice(ident, 1..-2//1)
parse_quoted_identifier(ident_unquoted)
else
parse_unquoted_identifier(ident, truncate, single_byte_encoding)
end
end

defp parse_quoted_identifier(""), do: {:error, "Invalid zero-length delimited identifier"}

defp parse_quoted_identifier(ident) do
if contains_unescaped_quote?(ident),
do: {:error, "Invalid identifier with unescaped quote: #{ident}"},
else: {:ok, unescape_quotes(ident)}
end

defp parse_unquoted_identifier("", _, _), do: parse_quoted_identifier("")

defp parse_unquoted_identifier(ident, truncate, single_byte_encoding) do
unless valid_unquoted_identifier?(ident),
do: {:error, "Invalid unquoted identifier contains special characters: #{ident}"},
else: {:ok, downcase(ident, truncate, single_byte_encoding)}
end

defp contains_unescaped_quote?(string) do
Regex.match?(~r/(?<!")"(?!")/, string)
end

defp unescape_quotes(string) do
string
|> String.replace(~r/""/, ~S|"|)
end

defp valid_unquoted_identifier?(identifier) do
Regex.match?(~r/^[a-zA-Z_][a-zA-Z0-9_]*$/, identifier)
end

@doc """
Downcase the identifier and truncate if necessary, using
PostgreSQL's algorithm for downcasing.
Setting `truncate` to `true` will truncate the identifier to 63 characters
Setting `single_byte_encoding` to `true` will downcase the identifier
using single byte encoding
See:
https://github.com/postgres/postgres/blob/259a0a99fe3d45dcf624788c1724d9989f3382dc/src/backend/parser/scansup.c#L46-L80
## Examples
iex> Electric.Postgres.Identifiers.downcase("FooBar")
"foobar"
iex> Electric.Postgres.Identifiers.downcase(String.duplicate("a", 100), true)
String.duplicate("a", 63)
"""
def downcase(ident, truncate \\ false, single_byte_encoding \\ false)

def downcase(ident, false, single_byte_encoding) do
downcased_ident =
ident
|> String.to_charlist()
|> Enum.map(&downcase_char(&1, single_byte_encoding))
|> List.to_string()

downcased_ident
end

def downcase(ident, true, single_byte_encoding) do
downcased_ident = downcase(ident, false, single_byte_encoding)

truncated_ident =
if String.length(ident) >= @namedatalen do
String.slice(downcased_ident, 0, @namedatalen)
else
downcased_ident
end

truncated_ident
end

# Helper function to downcase a character
defp downcase_char(ch, _) when ch in ?A..?Z, do: ch + @ascii_downcase

defp downcase_char(ch, true) when ch > 127,
do:
if(ch == Enum.at(:unicode_util.uppercase(ch), 0),
do: Enum.at(:unicode_util.lowercase(ch), 0),
else: ch
)

defp downcase_char(ch, _), do: ch
end
45 changes: 45 additions & 0 deletions packages/sync-service/lib/electric/replication/changes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,49 @@ defmodule Electric.Replication.Changes do
end

def convert_update(%UpdatedRecord{} = change, to: :updated_record), do: change

@doc """
Filter the columns of a change to include only those provided in `columns_to_keep`.
## Examples
iex> filter_columns(%NewRecord{record: %{"a" => "b", "c" => "d"}}, ["a"])
%NewRecord{record: %{"a" => "b"}}
iex> filter_columns(UpdatedRecord.new(
...> record: %{"a" => "b", "c" => "d"},
...> old_record: %{"a" => "d", "c" => "f"}
...> ), ["a"])
UpdatedRecord.new(record: %{"a" => "b"}, old_record: %{"a" => "d"})
iex> filter_columns(%DeletedRecord{old_record: %{"a" => "b", "c" => "d"}}, ["c"])
%DeletedRecord{old_record: %{"c" => "d"}}
"""
@spec filter_columns(change(), [String.t()]) :: change()
def filter_columns(%NewRecord{} = change, columns_to_keep) do
%NewRecord{
change
| record: change.record |> Map.take(columns_to_keep)
}
end

def filter_columns(%UpdatedRecord{} = change, columns_to_keep) do
%UpdatedRecord{
change
| old_record: change.old_record |> Map.take(columns_to_keep),
record: change.record |> Map.take(columns_to_keep),
changed_columns:
change.changed_columns
|> MapSet.reject(fn col -> col not in columns_to_keep end)
}
end

def filter_columns(%DeletedRecord{} = change, columns_to_keep) do
%DeletedRecord{
change
| old_record: change.old_record |> Map.take(columns_to_keep)
}
end

def filter_columns(change, _), do: change
end
21 changes: 16 additions & 5 deletions packages/sync-service/lib/electric/shapes/querying.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ defmodule Electric.Shapes.Querying do
@type json_result_stream :: Enumerable.t(json_iodata())

@spec stream_initial_data(DBConnection.t(), Shape.t()) :: json_result_stream()
def stream_initial_data(conn, %Shape{root_table: root_table, table_info: table_info} = shape) do
def stream_initial_data(conn, %Shape{root_table: root_table} = shape) do
OpenTelemetry.with_span("shape_read.stream_initial_data", [], fn ->
table = Utils.relation_to_sql(root_table)

where =
if not is_nil(shape.where), do: " WHERE " <> shape.where.query, else: ""

{json_like_select, params} = json_like_select(table_info, root_table, Shape.pk(shape))
{json_like_select, params} = json_like_select(shape)

query =
Postgrex.prepare!(conn, table, ~s|SELECT #{json_like_select} FROM #{table} #{where}|)
Expand All @@ -30,8 +30,15 @@ defmodule Electric.Shapes.Querying do
end)
end

defp json_like_select(table_info, root_table, pk_cols) do
columns = get_column_names(table_info, root_table)
defp json_like_select(
%Shape{
table_info: table_info,
root_table: root_table,
selected_columns: selected_columns
} = shape
) do
pk_cols = Shape.pk(shape)
columns = get_column_names(table_info, root_table, selected_columns)

key_part = build_key_part(root_table, pk_cols)
value_part = build_value_part(columns)
Expand All @@ -57,13 +64,17 @@ defmodule Electric.Shapes.Querying do
{query, []}
end

defp get_column_names(table_info, root_table) do
defp get_column_names(table_info, root_table, nil) do
table_info
|> Map.fetch!(root_table)
|> Map.fetch!(:columns)
|> Enum.map(& &1.name)
end

defp get_column_names(_table_info, _root_table, selected_columns) do
selected_columns
end

defp build_headers_part(root_table) do
~s['"headers":{"operation":"insert","relation":#{build_relation_header(root_table)}}']
end
Expand Down
Loading

0 comments on commit 25c437f

Please sign in to comment.