Skip to content

Commit

Permalink
Add manipulator based coalescher to coalesce multiple simultaneous th…
Browse files Browse the repository at this point in the history
…at could be cached
  • Loading branch information
ajuvercr committed Aug 4, 2021
1 parent 1f5d779 commit a23d7fc
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 27 deletions.
62 changes: 49 additions & 13 deletions lib/cache/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,22 @@ defmodule Cache.Registry do
end
end

def get_or_create_coalescer(key) do
GenServer.call(__MODULE__, {:get_or_create_coalescer, key})
end

def get_coalesce_pid(key) do
case GenServer.call(__MODULE__, {:get_coalesce, key}) do
{:ok, response} -> response
{:not_found} -> nil
end
end

def remove_coalesce_key(key) do
GenServer.call(__MODULE__, {:remove_coalesce, key})
end

def store({_method, _full_path, _get_params, _allowed_groups} = key, response) do
# IO.puts "Going to store new content"
# IO.inspect( key, label: "Key to store under" )
# IO.inspect( response, label: "Response to save" )
GenServer.call(__MODULE__, {:store, key, response})
end

Expand All @@ -32,11 +44,42 @@ defmodule Cache.Registry do
# GenServer API
###
def start_link(_) do
GenServer.start_link(__MODULE__, [%{cache: %{}, caches_by_key: %{}}], name: __MODULE__)
GenServer.start_link(__MODULE__, [%{cache: %{}, caches_by_key: %{}, coalesce_handlers: %{}}],
name: __MODULE__
)
end

def init(_) do
{:ok, %{cache: %{}, caches_by_key: %{}}}
{:ok, %{cache: %{}, caches_by_key: %{}, coalesce_handlers: %{}}}
end

def handle_call({:get_or_create_coalescer, key}, _from, state) do
case Map.get(state.coalesce_handlers, key, nil) do
nil ->
{:ok, pid} = Coalesce.Registry.start(%{})
new_state = put_in(state[:coalesce_handlers][key], pid)
{:reply, {:created, pid}, new_state}

pid ->
{:reply, {:attach, pid}, state}
end
end

def handle_call({:get_coalesce, key}, _from, state) do
if Map.has_key?(state.coalesce_handlers, key) do
{:reply, {:ok, Map.get(state.coalesce_handlers, key)}, state}
else
{:reply, {:not_found}, state}
end
end

def handle_call({:remove_coalesce, key}, _from, state) do
if Map.has_key?(state.coalesce_handlers, key) do
{_, new_state} = pop_in(state[:coalesce_handlers][key])
{:reply, {:ok}, new_state}
else
{:reply, {:not_found}, state}
end
end

def handle_call({:find_cache, key}, _from, state) do
Expand All @@ -48,21 +91,13 @@ defmodule Cache.Registry do
end

def handle_call({:store, request_key, response}, _from, state) do
# IO.inspect( request_key, label: "Request key" )
# IO.inspect( response, label: "Response" )

%{cache_keys: cache_keys, clear_keys: clear_keys} = response

# IO.inspect { :cache_keys, cache_keys }
# IO.inspect { :clear_keys, clear_keys }

state =
state
# update state for clear_keys
|> clear_keys!(clear_keys)

# IO.puts "Executed clear keys"

if cache_keys == [] do
{:reply, :ok, state}
else
Expand Down Expand Up @@ -96,6 +131,7 @@ defmodule Cache.Registry do
cache =
Enum.reduce(clear_keys, cache, fn clear_key, cache ->
keys_to_remove = Map.get(caches_by_key, clear_key, [])

cache = Map.drop(cache, keys_to_remove)
cache
end)
Expand Down
109 changes: 109 additions & 0 deletions lib/cache/coalesce.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
defmodule Coalesce.Registry do
@moduledoc """
Maintains coalescing requests for specific request, dependant on key.
"""

use GenServer

def add_connection(pid, connection) do
{:ok, conn} = GenServer.call(pid, {:add_conn, connection})
conn
end

def assure_status_sent(state, status) do
if not is_nil(status) and is_nil(state.status) and not is_nil(state.headers) do
# First time there was a status
conns =
state.connections
|> Enum.map(fn {conn, from} ->
conn =
Plug.Conn.merge_resp_headers(conn, state.headers)
|> Plug.Conn.send_chunked(status)

{conn, from}
end)
|> Enum.flat_map(fn {conn, from} ->
case push_body_parts(conn, state.body) do
nil -> []
conn -> [{conn, from}]
end
end)

%{state | connections: conns, status: status}
else
state
end
end

def push_body_parts(conn, body_parts) do
Enum.reduce_while(body_parts, conn, fn ch, conn ->
case Plug.Conn.chunk(conn, ch) do
{:ok, conn} -> {:cont, conn}
{:error, :closed} -> {:halt, nil}
end
end)
end

###
# GenServer API
###
def start(_) do
GenServer.start(__MODULE__, [%{}])
end

@impl true
def init(_) do
{:ok, %{connections: [], headers: nil, body: [], status: nil}}
end

@impl true
def handle_call({:add_conn, conn}, from, state) do
conn = if not is_nil(state.status) and not is_nil(state.headers) do
Plug.Conn.merge_resp_headers(conn, state.headers)
|> Plug.Conn.send_chunked(state.status)
|> push_body_parts(state.body)
else
conn
end

conns = [{conn, from} | state.connections]

new_state = %{state | connections: conns}

{:noreply, new_state}
end

@impl true
def handle_cast({:headers, headers, status}, state) do
state_with_headers = %{state | headers: headers}
new_state = assure_status_sent(state_with_headers, status)

{:noreply, new_state}
end

@impl true
def handle_cast({:chunk, data, status}, state) do
new_state = assure_status_sent(state, status)

conns =
Enum.flat_map(new_state.connections, fn {conn, from} ->
case Plug.Conn.chunk(conn, data) do
{:ok, conn} -> [{conn, from}]
{:error, :closed} -> []
end
end)

new_state = %{new_state | connections: conns}

{:noreply, new_state}
end

@impl true
def handle_cast({:finished, status}, state) do
new_state = assure_status_sent(state, status)

Enum.each(new_state.connections, fn {conn, from} -> GenServer.reply(from, {:ok, conn}) end)

{:stop, :normal, new_state}
end
end
4 changes: 3 additions & 1 deletion lib/manipulators/cache_key_logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ defmodule Manipulators.CacheKeyLogger do
end

defp header_value(headers, header_name) do
header = Enum.find(headers, header_name)
header =
headers
|> Enum.find(&match?({^header_name, _}, &1))

if header do
elem(header, 1)
Expand Down
50 changes: 50 additions & 0 deletions lib/manipulators/coalesce_response.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
defmodule Manipulators.CoalesceResponse do
@moduledoc """
Manipulates the response, notifying the Coalesce.Registry for this request.
"""

alias Cache.Registry, as: Cache

@behaviour ProxyManipulator

@impl true
def headers(headers, {conn_in, conn_out}) do
all_response_headers = Mint.HTTP.get_private(conn_out, :mu_cache_original_headers)

allowed_groups =
all_response_headers
|> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1))
|> elem(1)
|> Poison.decode!()

key = {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}

pid = Cache.get_coalesce_pid(key)

conn_out =
conn_out
|> Mint.HTTP.put_private(:coalesce_pid, pid)
|> Mint.HTTP.put_private(:coalesce_key, key)

GenServer.cast(pid, {:headers, headers, conn_in.status})

{headers, {conn_in, conn_out}}
end

@impl true
def chunk(chunk, {conn_in, conn_out}) do
pid = Mint.HTTP.get_private(conn_out, :coalesce_pid)
GenServer.cast(pid, {:chunk, chunk, conn_in.status})

:skip
end

@impl true
def finish(_, {conn_in, conn_out}) do
pid = Mint.HTTP.get_private(conn_out, :coalesce_pid)
key = Mint.HTTP.get_private(conn_out, :coalesce_key)
Cache.remove_coalesce_key(key)
GenServer.cast(pid, {:finished, conn_in.status})
:skip
end
end
4 changes: 2 additions & 2 deletions lib/manipulators/remove_cache_related_keys.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ defmodule Manipulators.RemoveCacheRelatedKeys do
@behaviour ProxyManipulator

@impl true
def headers(headers, connection) do
def headers(headers_inp, connection) do
new_headers =
headers
headers_inp
|> Enum.reject(fn
{"cache-keys", _} -> true
{"clear-keys", _} -> true
Expand Down
5 changes: 1 addition & 4 deletions lib/manipulators/store_response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ defmodule Manipulators.StoreResponse do
all_response_headers
|> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1))
|> elem(1)
|> Poison.decode!()

cache_keys =
all_response_headers
Expand All @@ -79,8 +80,6 @@ defmodule Manipulators.StoreResponse do
|> elem(1)
|> Poison.decode!()

# IO.inspect( {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}, label: "Signature to store" )

Cache.store(
{conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups},
%{
Expand Down Expand Up @@ -109,8 +108,6 @@ defmodule Manipulators.StoreResponse do
|> elem(1)
|> Poison.decode!()

IO.inspect(clear_keys, label: "Clear keys")

Cache.clear_keys(clear_keys)

true ->
Expand Down
17 changes: 10 additions & 7 deletions lib/mu_cache_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ defmodule MuCachePlug do
@response_manipulators [
Manipulators.CacheKeyLogger,
Manipulators.StoreResponse,
Manipulators.RemoveCacheRelatedKeys
Manipulators.RemoveCacheRelatedKeys,

# Make sure this is the last one, coalescing responses as generated at this point
Manipulators.CoalesceResponse
]
@manipulators ProxyManipulatorSettings.make_settings(
@request_manipulators,
Expand Down Expand Up @@ -49,16 +52,16 @@ defmodule MuCachePlug do
ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)

cached_value =
Cache.find_cache({conn.method, full_path, conn.query_string, known_allowed_groups}) ->
Cache.find_cache({conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)}) ->
# with allowed groups and a cache, we should use the cache
respond_with_cache(conn, cached_value)

true ->
# without a cache, we should consult the backend
# IO.inspect(
# {conn.method, full_path, conn.query_string, known_allowed_groups}, label: "Cache miss for signature")

ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)
key = {conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)}
case Cache.get_or_create_coalescer(key) do
{:created, _pid} -> ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)
{:attach, pid} -> Coalesce.Registry.add_connection(pid, conn)
end
end
end

Expand Down
11 changes: 11 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
%{
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"castore": {:hex, :castore, "0.1.9", "eb08a94c12ebff92a92d844c6ccd90728dc7662aab9bdc8b3b785ba653c499d5", [:mix], [], "hexpm", "99c3a38ad9c0bab03fee1418c98390da1a31f3b85e317db5840d51a1443d26c8"},
"certifi": {:hex, :certifi, "2.6.1", "dbab8e5e155a0763eea978c913ca280a6b544bfa115633fa20249c3d396d9493", [:rebar3], [], "hexpm", "524c97b4991b3849dd5c17a631223896272c6b0af446778ba4675a1dff53bb7e"},
"cowboy": {:hex, :cowboy, "2.8.0", "f3dc62e35797ecd9ac1b50db74611193c29815401e53bac9a5c0577bd7bc667d", [:rebar3], [{:cowlib, "~> 2.9.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "4643e4fba74ac96d4d152c75803de6fad0b3fa5df354c71afdd6cbeeb15fac8a"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"},
"cowlib": {:hex, :cowlib, "2.9.1", "61a6c7c50cf07fdd24b2f45b89500bb93b6686579b069a89f88cb211e1125c78", [:rebar3], [], "hexpm", "e4175dc240a70d996156160891e1c62238ede1729e45740bdd38064dad476170"},
"credo": {:hex, :credo, "1.5.5", "e8f422026f553bc3bebb81c8e8bf1932f498ca03339856c7fec63d3faac8424b", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dd8623ab7091956a855dc9f3062486add9c52d310dfd62748779c4315d8247de"},
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"exsync": {:hex, :exsync, "0.2.4", "5cdc824553e0f4c4bf60018a9a6bbd5d3b51f93ef8401a0d8545f93127281d03", [:mix], [{:file_system, "~> 0.2", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f7622d8bb98abbe473aa066ae46f91afdf7a5346b8b89728404f7189d2e80896"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"hackney": {:hex, :hackney, "1.17.4", "99da4674592504d3fb0cfef0db84c3ba02b4508bae2dff8c0108baa0d6e0977c", [:rebar3], [{:certifi, "~>2.6.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "de16ff4996556c8548d512f4dbe22dd58a587bf3332e7fd362430a7ef3986b16"},
"httpoison": {:hex, :httpoison, "1.8.0", "6b85dea15820b7804ef607ff78406ab449dd78bed923a49c7160e1886e987a3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "28089eaa98cf90c66265b6b5ad87c59a3729bea2e74e9d08f9b51eb9729b3c3a"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mint": {:hex, :mint, "1.2.1", "369cc8fecc54afd170e11740aa7efd066709e5ef3b5a2c63f0a47d1542cbd56a", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "053fe2f48c965f31878a16272478d9299fa412bc4df86dee2678986f2e40e018"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"plug": {:hex, :plug, "1.11.1", "f2992bac66fdae679453c9e86134a4201f6f43a687d8ff1cd1b2862d53c80259", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "23524e4fefbb587c11f0833b3910bfb414bf2e2534d61928e920f54e3a1b881f"},
"plug_cowboy": {:hex, :plug_cowboy, "2.4.1", "779ba386c0915027f22e14a48919a9545714f849505fa15af2631a0d298abf0f", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d72113b6dff7b37a7d9b2a5b68892808e3a9a752f2bf7e503240945385b70507"},
"plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"},
"plug_mint_proxy": {:git, "https://github.com/madnificent/plug-mint-proxy.git", "cb52954d260117a0b0e65baa8d3f313561bc2cf7", [branch: "feature/separate-example-runner"]},
"poison": {:hex, :poison, "2.2.0", "4763b69a8a77bd77d26f477d196428b741261a761257ff1cf92753a0d4d24a63", [:mix], [], "hexpm", "519bc209e4433961284174c497c8524c001e285b79bdf80212b47a1f898084cc"},
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"},
}

0 comments on commit a23d7fc

Please sign in to comment.