Skip to content

Commit

Permalink
MVP implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ajuvercr committed Aug 4, 2021
1 parent 156ee43 commit a81f6d3
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 62 deletions.
38 changes: 19 additions & 19 deletions lib/cache/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,25 @@ defmodule Cache.Registry do
end
end

def create_coalescer(key) do
GenServer.call(__MODULE__, {:start_coalesce, key})
def get_or_create_coalescer(key) do
GenServer.call(__MODULE__, {:get_or_create_coalescer, key})
end

def get_coalesce_pid(key) do
GenServer.call(__MODULE__, {:get_coalesce, key})
IO.inspect(key, label: "Getting coalesce")

case GenServer.call(__MODULE__, {:get_coalesce, key})
|> IO.inspect(label: "got return value") do
{:ok, response} -> response
{:not_found} -> nil
end
end

def remove_coalesce_key(key) do
GenServer.call(__MODULE__, {:remove_coalesce, key})
end

def store({_method, _full_path, _get_params, _allowed_groups} = key, response) do
# IO.puts "Going to store new content"
# IO.inspect( key, label: "Key to store under" )
# IO.inspect( response, label: "Response to save" )
GenServer.call(__MODULE__, {:store, key, response})
end

Expand All @@ -53,18 +56,18 @@ defmodule Cache.Registry do
{:ok, %{cache: %{}, caches_by_key: %{}, coalesce_handlers: %{}}}
end

def handle_call({:get_or_create_coalescer, key}, _from, state) do
case Map.get(state.coalesce_handlers, key, nil) do
nil ->
{:ok, pid} = Coalesce.Registry.start(%{})
new_state = put_in(state[:coalesce_handlers][key], pid)
{:reply, {:created, pid}, new_state}

def handle_call({:start_coalesce, key}, _from, state) do
if Map.has_key?(state.coalesce_handlers, key) do
{:reply, {:alread_started}, state}
else
{:ok, pid} = Coalesce.Registry.start(%{})
new_state = put_in(state[:coalesce_handlers][key], pid)
{:reply, {:ok, pid}, new_state}
pid ->
{:reply, {:attach, pid}, state}
end
end


def handle_call({:get_coalesce, key}, _from, state) do
if Map.has_key?(state.coalesce_handlers, key) do
{:reply, {:ok, Map.get(state.coalesce_handlers, key)}, state}
Expand All @@ -73,10 +76,8 @@ defmodule Cache.Registry do
end
end


def handle_call({:remove_coalesce, key}, _from, state) do
if Map.has_key?(state.coalesce_handlers, key) do
handlers = Map.pop(state.coalesce_handlers, key)
{_, new_state} = pop_in(state[:coalesce_handlers][key])
{:reply, {:ok}, new_state}
else
Expand All @@ -85,23 +86,21 @@ defmodule Cache.Registry do
end

def handle_call({:find_cache, key}, _from, state) do
if Map.has_key?(state.cache, key) do
if has_key?(state, key) do
{:reply, {:ok, Map.get(state.cache, key)}, state}
else
{:reply, {:not_found}, state}
end
end

def handle_call({:store, request_key, response}, _from, state) do

%{cache_keys: cache_keys, clear_keys: clear_keys} = response

state =
state
# update state for clear_keys
|> clear_keys!(clear_keys)


if cache_keys == [] do
{:reply, :ok, state}
else
Expand Down Expand Up @@ -135,6 +134,7 @@ defmodule Cache.Registry do
cache =
Enum.reduce(clear_keys, cache, fn clear_key, cache ->
keys_to_remove = Map.get(caches_by_key, clear_key, [])

cache = Map.drop(cache, keys_to_remove)
cache
end)
Expand Down
98 changes: 73 additions & 25 deletions lib/cache/coalesce.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ defmodule Coalesce.Registry do

# TODO define coalesce status
# With headers, body (in chunks), status code something else?


def add_connection(pid, connection) do
{:ok, conn} = GenServer.call(pid, {:add_conn, connection})
conn
end

use GenServer
###
Expand All @@ -16,51 +18,97 @@ defmodule Coalesce.Registry do
GenServer.start(__MODULE__, [%{}])
end

def assure_status_sent(state, status) do
if not is_nil(status) and is_nil(state.status) and not is_nil(state.headers) do
# First time there was a status
conns =
state.connections
|> Enum.map(fn {conn, from} ->
conn =
Plug.Conn.merge_resp_headers(conn, state.headers)
|> Plug.Conn.send_chunked(status)

{conn, from}
end)
|> Enum.flat_map(fn {conn, from} ->
case push_body_parts(conn, state.body) do
nil -> []
conn -> [{conn, from}]
end
end)

%{state | connections: conns, status: status}
else
state
end
end

def push_body_parts(conn, body_parts) do
Enum.reduce_while(body_parts, conn, fn ch, conn ->
case Plug.Conn.chunk(conn, ch) do
{:ok, conn} -> {:cont, conn}
{:error, :closed} -> {:halt, nil}
end
end)
end

@impl true
def init(s) do
IO.inspect(s, label: "init config")
{:ok, %{connections: [], state: %{headers: nil, body: []}}}
{:ok, %{connections: [], headers: nil, body: [], status: nil}}
end

# TODO
@impl true
def handle_call({:add_conn, _connection}, _from, state) do
# Put connection in state
# Make connection up to date
def handle_call({:add_conn, conn}, from, state) do
IO.inspect("adding connection to coalescer")

if not is_nil(state.status) and not is_nil(state.headers) do
conn =
Plug.Conn.merge_resp_headers(conn, state.headers)
|> Plug.Conn.send_chunked(state.status)
|> push_body_parts(state.body)
end

{:reply, :ok, state}
conns = [{conn, from} | state.connections]

new_state = %{state | connections: conns}

{:noreply, new_state}
end

@impl true
def handle_cast({:headers, headers}, state) do
IO.inspect(headers, label: "coalescing cast headers")
def handle_cast({:headers, headers, status}, state) do
state_with_headers = %{state | headers: headers}
new_state = assure_status_sent(state_with_headers, status)

{:noreply, put_in(state.state.headers, headers)}
{:noreply, new_state}
end


# TODO
@impl true
def handle_cast({:chunk, data}, state) do
IO.inspect(data, label: "coalescing cast chunk")
def handle_cast({:chunk, data, status}, state) do
new_state = assure_status_sent(state, status)

# foreach connection
# frontend_conn
# |> Plug.Conn.chunk(chunk)
conns =
Enum.flat_map(new_state.connections, fn {conn, from} ->
case Plug.Conn.chunk(conn, data) do
{:ok, conn} -> [{conn, from}]
{:error, :closed} -> []
end
end)

{:noreply, state}
end
new_state = %{new_state | connections: conns}

{:noreply, new_state}
end

# TODO
@impl true
def handle_cast({:finished, status}, state) do
IO.inspect({state, status}, label: "coalescing cast finish")
# foreach connection
# frontend_conn
# |> Plug.Conn.send_resp(return_status, "")
new_state = assure_status_sent(state, status)

Enum.each(state.connections, fn {conn, from} -> GenServer.reply(from, {:ok, conn}) end)

{:noreply, state}
# {:stop, :normal, state}
{:stop, :normal, state}
end
end
18 changes: 9 additions & 9 deletions lib/manipulators/coalesce_response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ defmodule Manipulators.CoalesceResponse do
all_response_headers
|> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1))
|> elem(1)
|> Poison.decode!()

key = {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}

{:ok, pid} = Cache.create_coalescer(key) |> IO.inspect(label: "did it make the journey?")
pid = Cache.get_coalesce_pid(key) |> IO.inspect(label: "did it make the journey?")

conn_out = conn_out
|> Mint.HTTP.put_private(:coalesce_pid, pid)
|> Mint.HTTP.put_private(:coalesce_key, key)
conn_out =
conn_out
|> Mint.HTTP.put_private(:coalesce_pid, pid)
|> Mint.HTTP.put_private(:coalesce_key, key)

GenServer.cast(pid, {:headers, headers})
GenServer.cast(pid, {:headers, headers, conn_in.status})

{headers, {conn_in, conn_out}}
end

@impl true
def chunk(chunk, {_conn_in, conn_out}) do
IO.inspect({chunk, nil}, label: "chunk")

def chunk(chunk, {conn_in, conn_out}) do
pid = Mint.HTTP.get_private(conn_out, :coalesce_pid)
GenServer.cast(pid, {:chunk, chunk})
GenServer.cast(pid, {:chunk, chunk, conn_in.status})

:skip
end
Expand Down
5 changes: 1 addition & 4 deletions lib/manipulators/store_response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ defmodule Manipulators.StoreResponse do
all_response_headers
|> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1))
|> elem(1)
|> Poison.decode!()

cache_keys =
all_response_headers
Expand All @@ -79,8 +80,6 @@ defmodule Manipulators.StoreResponse do
|> elem(1)
|> Poison.decode!()

# IO.inspect( {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}, label: "Signature to store" )

Cache.store(
{conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups},
%{
Expand Down Expand Up @@ -109,8 +108,6 @@ defmodule Manipulators.StoreResponse do
|> elem(1)
|> Poison.decode!()

IO.inspect(clear_keys, label: "Clear keys")

Cache.clear_keys(clear_keys)

true ->
Expand Down
11 changes: 6 additions & 5 deletions lib/mu_cache_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,16 @@ defmodule MuCachePlug do
ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)

cached_value =
Cache.find_cache({conn.method, full_path, conn.query_string, known_allowed_groups}) ->
Cache.find_cache({conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)}) ->
# with allowed groups and a cache, we should use the cache
respond_with_cache(conn, cached_value)

true ->
# without a cache, we should consult the backend
# IO.inspect(
# {conn.method, full_path, conn.query_string, known_allowed_groups}, label: "Cache miss for signature")
ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)
key = {conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)}
case Cache.get_or_create_coalescer(key) do
{:created, _pid} -> ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)
{:attach, pid} -> Coalesce.Registry.add_connection(pid, conn)
end
end
end

Expand Down

0 comments on commit a81f6d3

Please sign in to comment.