Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add columns API query parameter to filter table columns #1829

Merged
merged 19 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/large-days-attend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@electric-sql/client": patch
"@core/sync-service": patch
---

Implement `columns` query parameter for `GET v1/shapes` API to allow filtering rows for a subset of table columns.
26 changes: 22 additions & 4 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ defmodule Electric.Plug.ServeShapePlug do
field(:shape_id, :string)
field(:live, :boolean, default: false)
field(:where, :string)
field(:columns, :string)
field(:shape_definition, :string)
end

Expand All @@ -63,6 +64,7 @@ defmodule Electric.Plug.ServeShapePlug do
)
|> validate_required([:root_table, :offset])
|> cast_offset()
|> cast_columns()
|> validate_shape_id_with_offset()
|> validate_live_with_offset()
|> cast_root_table(opts)
Expand Down Expand Up @@ -95,6 +97,21 @@ defmodule Electric.Plug.ServeShapePlug do
end
end

def cast_columns(%Ecto.Changeset{valid?: false} = changeset), do: changeset

def cast_columns(%Ecto.Changeset{} = changeset) do
case fetch_field!(changeset, :columns) do
nil ->
changeset

columns ->
case Electric.Plug.Utils.parse_columns_param(columns) do
{:ok, parsed_cols} -> put_change(changeset, :columns, parsed_cols)
{:error, reason} -> add_error(changeset, :columns, reason)
end
end
end

def validate_shape_id_with_offset(%Ecto.Changeset{valid?: false} = changeset), do: changeset

def validate_shape_id_with_offset(%Ecto.Changeset{} = changeset) do
Expand Down Expand Up @@ -122,18 +139,19 @@ defmodule Electric.Plug.ServeShapePlug do
def cast_root_table(%Ecto.Changeset{} = changeset, opts) do
table = fetch_change!(changeset, :root_table)
where = fetch_field!(changeset, :where)
columns = get_change(changeset, :columns, nil)

case Shapes.Shape.new(table, opts ++ [where: where]) do
case Shapes.Shape.new(table, opts ++ [where: where, columns: columns]) do
{:ok, result} ->
put_change(changeset, :shape_definition, result)

{:error, reasons} ->
{:error, {field, reasons}} ->
Enum.reduce(List.wrap(reasons), changeset, fn
{message, keys}, changeset ->
add_error(changeset, :root_table, message, keys)
add_error(changeset, field, message, keys)

message, changeset when is_binary(message) ->
add_error(changeset, :root_table, message)
add_error(changeset, field, message)
end)
end
end
Expand Down
49 changes: 49 additions & 0 deletions packages/sync-service/lib/electric/plug/utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
defmodule Electric.Plug.Utils do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's perhaps not introduce several utility files but keep everything in the outer utils.ex file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is not a general utility like the other ones and is more meant to be part of the Plug code - we already have a very long serve shae plug module which defines submodules, the idea is that parsing logic could move to this.

@moduledoc """
Utility functions for Electric endpoints, e.g. for parsing and validating
path and query parameters.
"""

@doc """
Parse columns parameter from a string consisting of a comma separated list
of potentially quoted column names into a sorted list of strings.

## Examples
iex> Electric.Plug.Utils.parse_columns_param("")
{:error, "Invalid zero-length delimited identifier"}
iex> Electric.Plug.Utils.parse_columns_param("foo,")
{:error, "Invalid zero-length delimited identifier"}
iex> Electric.Plug.Utils.parse_columns_param("id")
{:ok, ["id"]}
iex> Electric.Plug.Utils.parse_columns_param("id,name")
{:ok, ["id", "name"]}
iex> Electric.Plug.Utils.parse_columns_param(~S|"PoT@To",PoTaTo|)
{:ok, ["PoT@To", "potato"]}
iex> Electric.Plug.Utils.parse_columns_param(~S|"PoTaTo,sunday",foo|)
{:ok, ["PoTaTo,sunday", "foo"]}
iex> Electric.Plug.Utils.parse_columns_param(~S|"fo""o",bar|)
{:ok, [~S|fo"o|, "bar"]}
iex> Electric.Plug.Utils.parse_columns_param(~S|"id,"name"|)
{:error, ~S|Invalid unquoted identifier contains special characters: "id|}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this saying "unquoted identifier" ? The sigil does contain quotes.
The problem should be that the quote inside the quote is not escaped (with a double quote).

Copy link
Contributor Author

@msfstef msfstef Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided column was "id - so I'm treating this as an unquoted identifier that has included a special character "

If the strategy to split the comas was different it would provide id,"name as a quoted identifier and produce a different error

"""
@spec parse_columns_param(binary()) :: {:ok, [String.t(), ...]} | {:error, term()}

def parse_columns_param(columns) when is_binary(columns) do
columns
# Split by commas that are not inside quotes
|> String.split(~r/,(?=(?:[^"]*"[^"]*")*[^"]*$)/)
|> Enum.reduce_while([], fn column, acc ->
case Electric.Postgres.Identifiers.parse(column) do
{:ok, casted_column} -> {:cont, [casted_column | acc]}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
|> then(fn result ->
case result do
# TODO: convert output to MapSet?
parsed_cols when is_list(parsed_cols) -> {:ok, Enum.reverse(parsed_cols)}
{:error, reason} -> {:error, reason}
end
end)
end
end
126 changes: 126 additions & 0 deletions packages/sync-service/lib/electric/postgres/identifiers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
defmodule Electric.Postgres.Identifiers do
@namedatalen 63
@ascii_downcase ?a - ?A

@doc """
Parse a PostgreSQL identifier, removing quotes if present and escaping internal ones
and downcasing the identifier otherwise.

## Examples

iex> Electric.Postgres.Identifiers.parse("FooBar")
{:ok, "foobar"}
iex> Electric.Postgres.Identifiers.parse(~S|"FooBar"|)
{:ok, "FooBar"}
iex> Electric.Postgres.Identifiers.parse(~S|Foo"Bar"|)
{:error, ~S|Invalid unquoted identifier contains special characters: Foo"Bar"|}
iex> Electric.Postgres.Identifiers.parse(~S| |)
{:error, ~S|Invalid unquoted identifier contains special characters: |}
iex> Electric.Postgres.Identifiers.parse("foob@r")
{:error, ~S|Invalid unquoted identifier contains special characters: foob@r|}
iex> Electric.Postgres.Identifiers.parse(~S|"Foo"Bar"|)
{:error, ~S|Invalid identifier with unescaped quote: Foo"Bar|}
iex> Electric.Postgres.Identifiers.parse(~S|""|)
{:error, "Invalid zero-length delimited identifier"}
iex> Electric.Postgres.Identifiers.parse("")
{:error, "Invalid zero-length delimited identifier"}
iex> Electric.Postgres.Identifiers.parse(~S|" "|)
{:ok, " "}
iex> Electric.Postgres.Identifiers.parse(~S|"Foo""Bar"|)
{:ok, ~S|Foo"Bar|}
"""
@spec parse(binary(), boolean(), boolean()) :: {:ok, binary()} | {:error, term()}
def parse(ident, truncate \\ false, single_byte_encoding \\ false) when is_binary(ident) do
if String.starts_with?(ident, ~S|"|) and String.ends_with?(ident, ~S|"|) do
ident_unquoted = String.slice(ident, 1..-2//1)
parse_quoted_identifier(ident_unquoted)
else
parse_unquoted_identifier(ident, truncate, single_byte_encoding)
end
end

defp parse_quoted_identifier(""), do: {:error, "Invalid zero-length delimited identifier"}

defp parse_quoted_identifier(ident) do
if contains_unescaped_quote?(ident),
do: {:error, "Invalid identifier with unescaped quote: #{ident}"},
else: {:ok, unescape_quotes(ident)}
end

defp parse_unquoted_identifier("", _, _), do: parse_quoted_identifier("")

defp parse_unquoted_identifier(ident, truncate, single_byte_encoding) do
unless valid_unquoted_identifier?(ident),
do: {:error, "Invalid unquoted identifier contains special characters: #{ident}"},
else: {:ok, downcase(ident, truncate, single_byte_encoding)}
end

defp contains_unescaped_quote?(string) do
Regex.match?(~r/(?<!")"(?!")/, string)
end

defp unescape_quotes(string) do
string
|> String.replace(~r/""/, ~S|"|)
end

defp valid_unquoted_identifier?(identifier) do
Regex.match?(~r/^[a-zA-Z_][a-zA-Z0-9_]*$/, identifier)
end

@doc """
Downcase the identifier and truncate if necessary, using
PostgreSQL's algorithm for downcasing.

Setting `truncate` to `true` will truncate the identifier to 63 characters

Setting `single_byte_encoding` to `true` will downcase the identifier
using single byte encoding

See:
https://github.com/postgres/postgres/blob/259a0a99fe3d45dcf624788c1724d9989f3382dc/src/backend/parser/scansup.c#L46-L80

## Examples

iex> Electric.Postgres.Identifiers.downcase("FooBar")
"foobar"
iex> Electric.Postgres.Identifiers.downcase(String.duplicate("a", 100), true)
String.duplicate("a", 63)
"""
def downcase(ident, truncate \\ false, single_byte_encoding \\ false)

def downcase(ident, false, single_byte_encoding) do
downcased_ident =
ident
|> String.to_charlist()
|> Enum.map(&downcase_char(&1, single_byte_encoding))
|> List.to_string()

downcased_ident
end

def downcase(ident, true, single_byte_encoding) do
downcased_ident = downcase(ident, false, single_byte_encoding)

truncated_ident =
if String.length(ident) >= @namedatalen do
String.slice(downcased_ident, 0, @namedatalen)
else
downcased_ident
end

truncated_ident
end

# Helper function to downcase a character
defp downcase_char(ch, _) when ch in ?A..?Z, do: ch + @ascii_downcase

defp downcase_char(ch, true) when ch > 127,
do:
if(ch == Enum.at(:unicode_util.uppercase(ch), 0),
do: Enum.at(:unicode_util.lowercase(ch), 0),
else: ch
)

defp downcase_char(ch, _), do: ch
end
45 changes: 45 additions & 0 deletions packages/sync-service/lib/electric/replication/changes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,49 @@ defmodule Electric.Replication.Changes do
end

def convert_update(%UpdatedRecord{} = change, to: :updated_record), do: change

@doc """
Filter the columns of a change to include only those provided in `columns_to_keep`.

## Examples

iex> filter_columns(%NewRecord{record: %{"a" => "b", "c" => "d"}}, ["a"])
%NewRecord{record: %{"a" => "b"}}

iex> filter_columns(UpdatedRecord.new(
...> record: %{"a" => "b", "c" => "d"},
...> old_record: %{"a" => "d", "c" => "f"}
...> ), ["a"])
UpdatedRecord.new(record: %{"a" => "b"}, old_record: %{"a" => "d"})

iex> filter_columns(%DeletedRecord{old_record: %{"a" => "b", "c" => "d"}}, ["c"])
%DeletedRecord{old_record: %{"c" => "d"}}
"""
@spec filter_columns(change(), [String.t()]) :: change()
def filter_columns(%NewRecord{} = change, columns_to_keep) do
%NewRecord{
change
| record: change.record |> Map.take(columns_to_keep)
}
end

def filter_columns(%UpdatedRecord{} = change, columns_to_keep) do
%UpdatedRecord{
change
| old_record: change.old_record |> Map.take(columns_to_keep),
record: change.record |> Map.take(columns_to_keep),
changed_columns:
change.changed_columns
|> MapSet.reject(fn col -> col not in columns_to_keep end)
}
end

def filter_columns(%DeletedRecord{} = change, columns_to_keep) do
%DeletedRecord{
change
| old_record: change.old_record |> Map.take(columns_to_keep)
}
end

def filter_columns(change, _), do: change
end
21 changes: 16 additions & 5 deletions packages/sync-service/lib/electric/shapes/querying.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ defmodule Electric.Shapes.Querying do
@type json_result_stream :: Enumerable.t(json_iodata())

@spec stream_initial_data(DBConnection.t(), Shape.t()) :: json_result_stream()
def stream_initial_data(conn, %Shape{root_table: root_table, table_info: table_info} = shape) do
def stream_initial_data(conn, %Shape{root_table: root_table} = shape) do
OpenTelemetry.with_span("shape_read.stream_initial_data", [], fn ->
table = Utils.relation_to_sql(root_table)

where =
if not is_nil(shape.where), do: " WHERE " <> shape.where.query, else: ""

{json_like_select, params} = json_like_select(table_info, root_table, Shape.pk(shape))
{json_like_select, params} = json_like_select(shape)

query =
Postgrex.prepare!(conn, table, ~s|SELECT #{json_like_select} FROM #{table} #{where}|)
Expand All @@ -30,8 +30,15 @@ defmodule Electric.Shapes.Querying do
end)
end

defp json_like_select(table_info, root_table, pk_cols) do
columns = get_column_names(table_info, root_table)
defp json_like_select(
%Shape{
table_info: table_info,
root_table: root_table,
selected_columns: selected_columns
} = shape
) do
pk_cols = Shape.pk(shape)
columns = get_column_names(table_info, root_table, selected_columns)

key_part = build_key_part(root_table, pk_cols)
value_part = build_value_part(columns)
Expand All @@ -57,13 +64,17 @@ defmodule Electric.Shapes.Querying do
{query, []}
end

defp get_column_names(table_info, root_table) do
defp get_column_names(table_info, root_table, nil) do
table_info
|> Map.fetch!(root_table)
|> Map.fetch!(:columns)
|> Enum.map(& &1.name)
end

defp get_column_names(_table_info, _root_table, selected_columns) do
selected_columns
end

defp build_headers_part(root_table) do
~s['"headers":{"operation":"insert","relation":#{build_relation_header(root_table)}}']
end
Expand Down
Loading
Loading