From a81f6d3a8da13366c8802d6a25a1c3b8d387d794 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Wed, 4 Aug 2021 14:07:39 +0200 Subject: [PATCH] MVP implementation --- lib/cache/cache.ex | 38 +++++------ lib/cache/coalesce.ex | 98 ++++++++++++++++++++------- lib/manipulators/coalesce_response.ex | 18 ++--- lib/manipulators/store_response.ex | 5 +- lib/mu_cache_plug.ex | 11 +-- 5 files changed, 108 insertions(+), 62 deletions(-) diff --git a/lib/cache/cache.ex b/lib/cache/cache.ex index 13cdb2e..7addb4f 100644 --- a/lib/cache/cache.ex +++ b/lib/cache/cache.ex @@ -17,12 +17,18 @@ defmodule Cache.Registry do end end - def create_coalescer(key) do - GenServer.call(__MODULE__, {:start_coalesce, key}) + def get_or_create_coalescer(key) do + GenServer.call(__MODULE__, {:get_or_create_coalescer, key}) end def get_coalesce_pid(key) do - GenServer.call(__MODULE__, {:get_coalesce, key}) + IO.inspect(key, label: "Getting coalesce") + + case GenServer.call(__MODULE__, {:get_coalesce, key}) + |> IO.inspect(label: "got return value") do + {:ok, response} -> response + {:not_found} -> nil + end end def remove_coalesce_key(key) do @@ -30,9 +36,6 @@ defmodule Cache.Registry do end def store({_method, _full_path, _get_params, _allowed_groups} = key, response) do - # IO.puts "Going to store new content" - # IO.inspect( key, label: "Key to store under" ) - # IO.inspect( response, label: "Response to save" ) GenServer.call(__MODULE__, {:store, key, response}) end @@ -53,18 +56,18 @@ defmodule Cache.Registry do {:ok, %{cache: %{}, caches_by_key: %{}, coalesce_handlers: %{}}} end + def handle_call({:get_or_create_coalescer, key}, _from, state) do + case Map.get(state.coalesce_handlers, key, nil) do + nil -> + {:ok, pid} = Coalesce.Registry.start(%{}) + new_state = put_in(state[:coalesce_handlers][key], pid) + {:reply, {:created, pid}, new_state} - def handle_call({:start_coalesce, key}, _from, state) do - if Map.has_key?(state.coalesce_handlers, key) do - {:reply, {:alread_started}, state} - else - {:ok, pid} = Coalesce.Registry.start(%{}) - new_state = put_in(state[:coalesce_handlers][key], pid) - {:reply, {:ok, pid}, new_state} + pid -> + {:reply, {:attach, pid}, state} end end - def handle_call({:get_coalesce, key}, _from, state) do if Map.has_key?(state.coalesce_handlers, key) do {:reply, {:ok, Map.get(state.coalesce_handlers, key)}, state} @@ -73,10 +76,8 @@ defmodule Cache.Registry do end end - def handle_call({:remove_coalesce, key}, _from, state) do if Map.has_key?(state.coalesce_handlers, key) do - handlers = Map.pop(state.coalesce_handlers, key) {_, new_state} = pop_in(state[:coalesce_handlers][key]) {:reply, {:ok}, new_state} else @@ -85,7 +86,7 @@ defmodule Cache.Registry do end def handle_call({:find_cache, key}, _from, state) do - if Map.has_key?(state.cache, key) do + if has_key?(state, key) do {:reply, {:ok, Map.get(state.cache, key)}, state} else {:reply, {:not_found}, state} @@ -93,7 +94,6 @@ defmodule Cache.Registry do end def handle_call({:store, request_key, response}, _from, state) do - %{cache_keys: cache_keys, clear_keys: clear_keys} = response state = @@ -101,7 +101,6 @@ defmodule Cache.Registry do # update state for clear_keys |> clear_keys!(clear_keys) - if cache_keys == [] do {:reply, :ok, state} else @@ -135,6 +134,7 @@ defmodule Cache.Registry do cache = Enum.reduce(clear_keys, cache, fn clear_key, cache -> keys_to_remove = Map.get(caches_by_key, clear_key, []) + cache = Map.drop(cache, keys_to_remove) cache end) diff --git a/lib/cache/coalesce.ex b/lib/cache/coalesce.ex index 6824295..cda7280 100644 --- a/lib/cache/coalesce.ex +++ b/lib/cache/coalesce.ex @@ -5,8 +5,10 @@ defmodule Coalesce.Registry do # TODO define coalesce status # With headers, body (in chunks), status code something else? - - + def add_connection(pid, connection) do + {:ok, conn} = GenServer.call(pid, {:add_conn, connection}) + conn + end use GenServer ### @@ -16,51 +18,97 @@ defmodule Coalesce.Registry do GenServer.start(__MODULE__, [%{}]) end + def assure_status_sent(state, status) do + if not is_nil(status) and is_nil(state.status) and not is_nil(state.headers) do + # First time there was a status + conns = + state.connections + |> Enum.map(fn {conn, from} -> + conn = + Plug.Conn.merge_resp_headers(conn, state.headers) + |> Plug.Conn.send_chunked(status) + + {conn, from} + end) + |> Enum.flat_map(fn {conn, from} -> + case push_body_parts(conn, state.body) do + nil -> [] + conn -> [{conn, from}] + end + end) + + %{state | connections: conns, status: status} + else + state + end + end + + def push_body_parts(conn, body_parts) do + Enum.reduce_while(body_parts, conn, fn ch, conn -> + case Plug.Conn.chunk(conn, ch) do + {:ok, conn} -> {:cont, conn} + {:error, :closed} -> {:halt, nil} + end + end) + end + @impl true def init(s) do IO.inspect(s, label: "init config") - {:ok, %{connections: [], state: %{headers: nil, body: []}}} + {:ok, %{connections: [], headers: nil, body: [], status: nil}} end # TODO @impl true - def handle_call({:add_conn, _connection}, _from, state) do - # Put connection in state - # Make connection up to date + def handle_call({:add_conn, conn}, from, state) do + IO.inspect("adding connection to coalescer") + + if not is_nil(state.status) and not is_nil(state.headers) do + conn = + Plug.Conn.merge_resp_headers(conn, state.headers) + |> Plug.Conn.send_chunked(state.status) + |> push_body_parts(state.body) + end - {:reply, :ok, state} + conns = [{conn, from} | state.connections] + + new_state = %{state | connections: conns} + + {:noreply, new_state} end @impl true - def handle_cast({:headers, headers}, state) do - IO.inspect(headers, label: "coalescing cast headers") + def handle_cast({:headers, headers, status}, state) do + state_with_headers = %{state | headers: headers} + new_state = assure_status_sent(state_with_headers, status) - {:noreply, put_in(state.state.headers, headers)} + {:noreply, new_state} end - # TODO @impl true - def handle_cast({:chunk, data}, state) do - IO.inspect(data, label: "coalescing cast chunk") + def handle_cast({:chunk, data, status}, state) do + new_state = assure_status_sent(state, status) - # foreach connection - # frontend_conn - # |> Plug.Conn.chunk(chunk) + conns = + Enum.flat_map(new_state.connections, fn {conn, from} -> + case Plug.Conn.chunk(conn, data) do + {:ok, conn} -> [{conn, from}] + {:error, :closed} -> [] + end + end) - {:noreply, state} - end + new_state = %{new_state | connections: conns} + {:noreply, new_state} + end - # TODO @impl true def handle_cast({:finished, status}, state) do - IO.inspect({state, status}, label: "coalescing cast finish") - # foreach connection - # frontend_conn - # |> Plug.Conn.send_resp(return_status, "") + new_state = assure_status_sent(state, status) + + Enum.each(state.connections, fn {conn, from} -> GenServer.reply(from, {:ok, conn}) end) - {:noreply, state} - # {:stop, :normal, state} + {:stop, :normal, state} end end diff --git a/lib/manipulators/coalesce_response.ex b/lib/manipulators/coalesce_response.ex index 2dfad6f..d912d5b 100644 --- a/lib/manipulators/coalesce_response.ex +++ b/lib/manipulators/coalesce_response.ex @@ -11,26 +11,26 @@ defmodule Manipulators.CoalesceResponse do all_response_headers |> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1)) |> elem(1) + |> Poison.decode!() key = {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups} - {:ok, pid} = Cache.create_coalescer(key) |> IO.inspect(label: "did it make the journey?") + pid = Cache.get_coalesce_pid(key) |> IO.inspect(label: "did it make the journey?") - conn_out = conn_out - |> Mint.HTTP.put_private(:coalesce_pid, pid) - |> Mint.HTTP.put_private(:coalesce_key, key) + conn_out = + conn_out + |> Mint.HTTP.put_private(:coalesce_pid, pid) + |> Mint.HTTP.put_private(:coalesce_key, key) - GenServer.cast(pid, {:headers, headers}) + GenServer.cast(pid, {:headers, headers, conn_in.status}) {headers, {conn_in, conn_out}} end @impl true - def chunk(chunk, {_conn_in, conn_out}) do - IO.inspect({chunk, nil}, label: "chunk") - + def chunk(chunk, {conn_in, conn_out}) do pid = Mint.HTTP.get_private(conn_out, :coalesce_pid) - GenServer.cast(pid, {:chunk, chunk}) + GenServer.cast(pid, {:chunk, chunk, conn_in.status}) :skip end diff --git a/lib/manipulators/store_response.ex b/lib/manipulators/store_response.ex index 3d462e4..3dac912 100644 --- a/lib/manipulators/store_response.ex +++ b/lib/manipulators/store_response.ex @@ -66,6 +66,7 @@ defmodule Manipulators.StoreResponse do all_response_headers |> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1)) |> elem(1) + |> Poison.decode!() cache_keys = all_response_headers @@ -79,8 +80,6 @@ defmodule Manipulators.StoreResponse do |> elem(1) |> Poison.decode!() - # IO.inspect( {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}, label: "Signature to store" ) - Cache.store( {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}, %{ @@ -109,8 +108,6 @@ defmodule Manipulators.StoreResponse do |> elem(1) |> Poison.decode!() - IO.inspect(clear_keys, label: "Clear keys") - Cache.clear_keys(clear_keys) true -> diff --git a/lib/mu_cache_plug.ex b/lib/mu_cache_plug.ex index 85a9cf8..1b60fc2 100644 --- a/lib/mu_cache_plug.ex +++ b/lib/mu_cache_plug.ex @@ -52,15 +52,16 @@ defmodule MuCachePlug do ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators) cached_value = - Cache.find_cache({conn.method, full_path, conn.query_string, known_allowed_groups}) -> + Cache.find_cache({conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)}) -> # with allowed groups and a cache, we should use the cache respond_with_cache(conn, cached_value) true -> - # without a cache, we should consult the backend - # IO.inspect( - # {conn.method, full_path, conn.query_string, known_allowed_groups}, label: "Cache miss for signature") - ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators) + key = {conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)} + case Cache.get_or_create_coalescer(key) do + {:created, _pid} -> ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators) + {:attach, pid} -> Coalesce.Registry.add_connection(pid, conn) + end end end