Skip to content

Commit

Permalink
Use new header and param names
Browse files Browse the repository at this point in the history
  • Loading branch information
magnetised committed Nov 5, 2024
1 parent 4937e2e commit 139cfe1
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 115 deletions.
2 changes: 1 addition & 1 deletion packages/elixir-client/lib/electric/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ defmodule Electric.Client do
]
)

@type shape_id :: String.t()
@type shape_handle :: String.t()
@type cursor :: integer()
@type update_mode :: :modified | :full
@type column :: %{
Expand Down
18 changes: 9 additions & 9 deletions packages/elixir-client/lib/electric/client/fetch/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Electric.Client.Fetch.Request do

defstruct [
:base_url,
:shape_id,
:shape_handle,
:live,
:shape,
:next_cursor,
Expand All @@ -30,7 +30,7 @@ defmodule Electric.Client.Fetch.Request do
method: quote(do: :get | :head | :delete),
base_url: quote(do: URI.t()),
offset: quote(do: Electric.Client.Offset.t()),
shape_id: quote(do: Electric.Client.shape_id() | nil),
shape_handle: quote(do: Electric.Client.shape_handle() | nil),
update_mode: quote(do: Electric.Client.update_mode()),
live: quote(do: boolean()),
next_cursor: quote(do: Electric.Client.cursor()),
Expand Down Expand Up @@ -61,23 +61,23 @@ defmodule Electric.Client.Fetch.Request do
{:via, Registry, {Electric.Client.Registry, {__MODULE__, request_id}}}
end

defp request_id(%Client{fetch: {fetch_impl, _}}, %__MODULE__{shape_id: nil} = request) do
defp request_id(%Client{fetch: {fetch_impl, _}}, %__MODULE__{shape_handle: nil} = request) do
%{base_url: base_url, shape: shape_definition} = request
{fetch_impl, base_url, shape_definition}
end

defp request_id(%Client{fetch: {fetch_impl, _}}, %__MODULE__{} = request) do
%{base_url: base_url, offset: offset, live: live, shape_id: shape_id} = request
{fetch_impl, base_url, shape_id, Offset.to_tuple(offset), live}
%{base_url: base_url, offset: offset, live: live, shape_handle: shape_handle} = request
{fetch_impl, base_url, shape_handle, Offset.to_tuple(offset), live}
end

@doc """
Returns the URL for the Request.
"""
@spec url(t()) :: binary()
def url(%__MODULE__{} = request, opts \\ []) do
%{base_url: base_url, shape: shape} = request
path = "/v1/shape/#{ShapeDefinition.url_table_name(shape)}"
%{base_url: base_url} = request
path = "/v1/shape"
uri = URI.append_path(base_url, path)

if Keyword.get(opts, :query, true) do
Expand All @@ -96,7 +96,7 @@ defmodule Electric.Client.Fetch.Request do
shape: shape,
update_mode: update_mode,
live: live?,
shape_id: shape_id,
shape_handle: shape_handle,
offset: %Offset{} = offset,
next_cursor: cursor,
params: params
Expand All @@ -106,7 +106,7 @@ defmodule Electric.Client.Fetch.Request do
|> Map.merge(ShapeDefinition.params(shape))
|> Map.merge(%{"offset" => Offset.to_string(offset)})
|> Util.map_put_if("update_mode", to_string(update_mode), update_mode != :modified)
|> Util.map_put_if("shape_id", shape_id, is_binary(shape_id))
|> Util.map_put_if("handle", shape_handle, is_binary(shape_handle))
|> Util.map_put_if("live", "true", live?)
|> Util.map_put_if("cursor", cursor, !is_nil(cursor))
end
Expand Down
16 changes: 8 additions & 8 deletions packages/elixir-client/lib/electric/client/fetch/response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Electric.Client.Fetch.Response do
defstruct [
:status,
:last_offset,
:shape_id,
:shape_handle,
:schema,
:next_cursor,
body: [],
Expand All @@ -16,7 +16,7 @@ defmodule Electric.Client.Fetch.Response do
body: [map()],
headers: %{String.t() => [String.t()]},
last_offset: nil | Client.Offset.t(),
shape_id: nil | Client.shape_id(),
shape_handle: nil | Client.shape_handle(),
schema: nil | Client.schema(),
next_cursor: nil | Client.cursor()
}
Expand All @@ -27,7 +27,7 @@ defmodule Electric.Client.Fetch.Response do
status: status,
headers: decode_headers(headers),
body: body,
shape_id: decode_shape_id(headers),
shape_handle: decode_shape_handle(headers),
last_offset: decode_offset(headers),
schema: decode_schema(headers),
next_cursor: decode_next_cursor(headers)
Expand All @@ -38,13 +38,13 @@ defmodule Electric.Client.Fetch.Response do
Map.new(headers, fn {k, v} -> {k, List.wrap(v)} end)
end

defp decode_shape_id(%{"electric-shape-id" => shape_id}) do
unlist(shape_id)
defp decode_shape_handle(%{"electric-handle" => shape_handle}) do
unlist(shape_handle)
end

defp decode_shape_id(_headers), do: nil
defp decode_shape_handle(_headers), do: nil

defp decode_offset(%{"electric-chunk-last-offset" => offset}) do
defp decode_offset(%{"electric-offset" => offset}) do
offset |> unlist() |> Client.Offset.from_string!()
end

Expand All @@ -60,7 +60,7 @@ defmodule Electric.Client.Fetch.Response do
defp unlist([]), do: nil
defp unlist(value), do: value

defp decode_next_cursor(%{"electric-next-cursor" => cursor}) do
defp decode_next_cursor(%{"electric-cursor" => cursor}) do
cursor |> unlist() |> String.to_integer()
end

Expand Down
6 changes: 3 additions & 3 deletions packages/elixir-client/lib/electric/client/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ defmodule Electric.Client.Message do
```
"""

@enforce_keys [:shape_id, :offset, :schema]
@enforce_keys [:shape_handle, :offset, :schema]

defstruct [:shape_id, :offset, :schema]
defstruct [:shape_handle, :offset, :schema]

@type t :: %__MODULE__{
shape_id: Client.shape_id(),
shape_handle: Client.shape_handle(),
offset: Offset.t(),
schema: Client.schema()
}
Expand Down
12 changes: 6 additions & 6 deletions packages/elixir-client/lib/electric/client/mock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Electric.Client.Mock do
status: 200,
schema: %{id: %{type: "int8"}, name: %{type: "text"}},
last_offset: Client.Offset.first(),
shape_id: "users-1",
shape_handle: "users-1",
body: Electric.Client.Mock.transaction(users, operation: :insert)
)
Expand Down Expand Up @@ -82,7 +82,7 @@ defmodule Electric.Client.Mock do
| {:headers, %{String.t() => String.t() | [String.t(), ...]}}
| {:body, [map()]}
| {:schema, Client.schema()}
| {:shape_id, Client.shape_id()}
| {:shape_handle, Client.shape_handle()}
| {:last_offset, Client.Offset.t()}
@type response_opts :: [response_opt()]

Expand Down Expand Up @@ -197,21 +197,21 @@ defmodule Electric.Client.Mock do
headers: headers(opts[:headers] || []),
body: jsonify(opts[:body] || []),
schema: Keyword.get(opts, :schema, nil),
shape_id: Keyword.get(opts, :shape_id, nil),
shape_handle: Keyword.get(opts, :shape_handle, nil),
last_offset: Keyword.get(opts, :last_offset, nil)
}
end

@spec headers([
{:shape_id, Client.shape_id()}
{:shape_handle, Client.shape_handle()}
| {:last_offset, Client.Offset.t()}
| {:schema, Client.schema()}
]) :: %{String.t() => [String.t()]}
def headers(args) do
%{}
|> put_optional_header("electric-shape-id", args[:shape_id])
|> put_optional_header("electric-handle", args[:shape_handle])
|> put_optional_header(
"electric-chunk-last-offset",
"electric-offset",
args[:last_offset],
&Client.Offset.to_string/1
)
Expand Down
13 changes: 6 additions & 7 deletions packages/elixir-client/lib/electric/client/shape_definition.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ defmodule Electric.Client.ShapeDefinition do
@type option :: unquote(NimbleOptions.option_typespec(@schema))
@type options :: [option()]

@quot "%22"

@spec new(String.t(), options()) :: {:ok, t()} | {:error, term()}
@doc """
Create a `ShapeDefinition` for the given `table_name`.
Expand Down Expand Up @@ -94,7 +92,7 @@ defmodule Electric.Client.ShapeDefinition do
"my_app.my_table"
iex> ShapeDefinition.url_table_name(ShapeDefinition.new!("my table", namespace: "my app"))
"%22my app%22.%22my table%22"
~s["my app"."my table"]
"""
@spec url_table_name(t()) :: String.t()
Expand All @@ -116,18 +114,19 @@ defmodule Electric.Client.ShapeDefinition do

defp quote_table_name(name) do
IO.iodata_to_binary([
@quot,
:binary.replace(name, ~s["], ~s[#{@quot}#{@quot}], [:global]),
@quot
?",
:binary.replace(name, ~s["], ~s[""], [:global]),
?"
])
end

@doc false
@spec params(t()) :: Electric.Client.Fetch.Request.params()
def params(%__MODULE__{} = shape) do
%{where: where, columns: columns} = shape
table_name = url_table_name(shape)

%{}
%{table: table_name}
|> Util.map_put_if("where", where, is_binary(where))
|> Util.map_put_if("columns", fn -> Enum.join(columns, ",") end, is_list(columns))
end
Expand Down
37 changes: 19 additions & 18 deletions packages/elixir-client/lib/electric/client/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Electric.Client.Stream do
up_to_date?: false,
update_mode: :modified,
offset: Offset.before_all(),
shape_id: nil,
shape_handle: nil,
next_cursor: nil,
state: :init,
opts: %{}
Expand Down Expand Up @@ -92,7 +92,7 @@ defmodule Electric.Client.Stream do
up_to_date?: boolean(),
offset: Offset.t(),
update_mode: Client.update_mode(),
shape_id: nil | Client.shape_id(),
shape_handle: nil | Client.shape_handle(),
state: :init | :stream | :done,
opts: opts()
}
Expand Down Expand Up @@ -150,12 +150,13 @@ defmodule Electric.Client.Stream do
defp handle_response(%Fetch.Response{status: status} = resp, stream)
when status in 200..299 do
start_offset = stream.offset
shape_id = shape_id!(resp)
shape_handle = shape_handle!(resp)
final_offset = last_offset(resp, stream.offset)
next_cursor = resp.next_cursor

%{value_mapper_fun: value_mapper_fun} =
stream = handle_schema(resp, %{stream | shape_id: shape_id, next_cursor: next_cursor})
stream =
handle_schema(resp, %{stream | shape_handle: shape_handle, next_cursor: next_cursor})

resp.body
|> List.wrap()
Expand All @@ -178,7 +179,7 @@ defmodule Electric.Client.Stream do
offset = last_offset(resp, stream.offset)

stream
|> reset(shape_id(resp))
|> reset(shape_handle(resp))
|> buffer(Enum.flat_map(resp.body, &Message.parse(&1, offset, value_mapper_fun)))
|> dispatch()
end
Expand Down Expand Up @@ -208,7 +209,7 @@ defmodule Electric.Client.Stream do
resume_message = %Message.ResumeMessage{
schema: stream.schema,
offset: offset,
shape_id: stream.shape_id
shape_handle: stream.shape_handle
}

{:halt, {offset, %{stream | buffer: :queue.in(resume_message, stream.buffer), state: :done}}}
Expand All @@ -218,7 +219,7 @@ defmodule Electric.Client.Stream do
resume_message = %Message.ResumeMessage{
schema: stream.schema,
offset: stream.offset,
shape_id: stream.shape_id
shape_handle: stream.shape_handle
}

{msgs, %{stream | buffer: :queue.in(resume_message, stream.buffer), state: :done}}
Expand All @@ -241,14 +242,14 @@ defmodule Electric.Client.Stream do
shape: shape,
up_to_date?: up_to_date?,
update_mode: update_mode,
shape_id: shape_id,
shape_handle: shape_handle,
offset: offset,
next_cursor: cursor
} = stream

Client.request(client,
offset: offset,
shape_id: shape_id,
shape_handle: shape_handle,
update_mode: update_mode,
live: up_to_date?,
next_cursor: cursor,
Expand All @@ -266,11 +267,11 @@ defmodule Electric.Client.Stream do
Fetch.Request.request(stream.client, request)
end

defp reset(stream, shape_id) do
defp reset(stream, shape_handle) do
%{
stream
| offset: Offset.before_all(),
shape_id: shape_id,
shape_handle: shape_handle,
up_to_date?: false,
buffer: :queue.new(),
schema: nil,
Expand All @@ -282,13 +283,13 @@ defmodule Electric.Client.Stream do
%{stream | buffer: Enum.reduce(msgs, stream.buffer, &:queue.in/2)}
end

defp shape_id!(resp) do
shape_id(resp) ||
raise Client.Error, message: "Missing electric-shape-id header", resp: resp
defp shape_handle!(resp) do
shape_handle(resp) ||
raise Client.Error, message: "Missing electric-handle header", resp: resp
end

defp shape_id(%Fetch.Response{shape_id: shape_id}) do
shape_id
defp shape_handle(%Fetch.Response{shape_handle: shape_handle}) do
shape_handle
end

defp last_offset(%Fetch.Response{last_offset: %Offset{} = offset}, _offset) do
Expand Down Expand Up @@ -323,9 +324,9 @@ defmodule Electric.Client.Stream do
end

defp resume(%{opts: %{resume: %Message.ResumeMessage{} = resume}} = stream) do
%{shape_id: shape_id, offset: offset, schema: schema} = resume
%{shape_handle: shape_handle, offset: offset, schema: schema} = resume

generate_value_mapper(schema, %{stream | shape_id: shape_id, offset: offset})
generate_value_mapper(schema, %{stream | shape_handle: shape_handle, offset: offset})
end

defp resume(stream) do
Expand Down
Loading

0 comments on commit 139cfe1

Please sign in to comment.