diff --git a/README.md b/README.md index 22fdad0..8016273 100644 --- a/README.md +++ b/README.md @@ -374,6 +374,8 @@ Flags which can be either on or off translate the environment variable string to - `INSPECT_OUTGOING_SPARQL_QUERY_RESPONSES` : Inspects the responses coming back from the backing triplestore - `LOG_OUTGOING_SPARQL_QUERY_ROUNDTRIP` : Logs both the request and the response to/from the backing triplestore closely together in the logs - `LOG_WORKLOAD_INFO_REQUESTS` : Logs workload information to the console when it is requested through an http call +- `QUAD_CHANGE_CACHE_TIMEOUT` : Specifies how long (in milliseconds) quad changes should be cached before getting applied to the triplestore. +- `QUAD_CHANGE_CACHE_SESSION` : Specifies if the timeout should be restarted with a quad change. ### Query timeout configuration Complex SPARQL queries can take a long time to process and execute. The time mu-authorization is allowed to spend on this processing and execution before timing out can be configured through the following environment variables: diff --git a/config/config.exs b/config/config.exs index 77dd232..49b7f2e 100644 --- a/config/config.exs +++ b/config/config.exs @@ -76,7 +76,9 @@ config :"mu-authorization", log_workload_info_requests: CH.system_boolean("LOG_WORKLOAD_INFO_REQUESTS"), testing_auth_query_error_rate: CH.system_float("TESTING_AUTH_QUERY_ERROR_RATE"), error_on_unwritten_data: CH.system_boolean("ERROR_ON_UNWRITTEN_DATA"), - errors: CH.system_boolean("LOG_ERRORS", true) + errors: CH.system_boolean("LOG_ERRORS", true), + quad_change_cache_timeout: CH.system_number("QUAD_CHANGE_CACHE_TIMEOUT", 500), + quad_change_cache_session: CH.system_boolean("QUAD_CHANGE_CACHE_SESSION", true) # and access this configuration in your application as: # diff --git a/lib/cache/cache_types.ex b/lib/cache/cache_types.ex new file mode 100644 index 0000000..02c4e9f --- /dev/null +++ b/lib/cache/cache_types.ex @@ -0,0 +1,244 @@ +defmodule CacheType do + alias Updates.QueryAnalyzer + alias Updates.QueryAnalyzer.P, as: QueryAnalyzerProtocol + alias Updates.QueryAnalyzer.Types.Quad, as: Quad + + require Logger + require ALog + + defmodule ConstructAndAsk do + @enforce_keys [:triple_counts, :triples_in_store] + defstruct [:triple_counts, :triples_in_store] + end + + defmodule OnlyAsk do + defstruct [] + end + + defmodule MultipleConstructs do + @enforce_keys [:non_overlapping_quads] + defstruct [:non_overlapping_quads] + end + + defmodule Select do + @enforce_keys [:quads_in_store] + defstruct [:quads_in_store] + end + + defp quad_in_store_with_ask?(quad) do + (QueryAnalyzer.construct_ask_query(quad) + |> SparqlClient.execute_parsed(query_type: :read))["boolean"] + end + + ## Create tuple from literal {type, value} + defp get_result_tuple(x) do + out = QueryAnalyzerProtocol.to_sparql_result_value(x) + {out.type, out.value} + end + + defp tuple_from_bindings(%{"o" => object, "s" => subject, "p" => predicate, "g" => graph}) do + { + {graph["type"], graph["value"]}, + {subject["type"], subject["value"]}, + {predicate["type"], predicate["value"]}, + {object["type"], object["value"]} + } + end + + defp tuple_from_bindings(%{"o" => object, "s" => subject, "p" => predicate}) do + { + {subject["type"], subject["value"]}, + {predicate["type"], predicate["value"]}, + {object["type"], object["value"]} + } + end + + defp query_to_results(query) do + query + |> SparqlClient.execute_parsed(query_type: :read) + |> Map.get("results") + |> Map.get("bindings") + |> Enum.map(&tuple_from_bindings/1) + |> MapSet.new() + end + + # From current quads, analyse what quads are already present + defp triples_in_store_with_construct(quads) do + quads + |> QueryAnalyzer.construct_asks_query() + |> query_to_results() + end + + def quads_in_store_with_select(quads) do + quads + |> QueryAnalyzer.construct_select_distinct_matching_quads() + |> query_to_results() + end + + # From current quads, calculate frequency of _triple_ + # Equal quads have no influence, but same triples from different graphs + # cannot be queried with the same CONSTRUCT query + # (because CONSTRUCT only returns triples) + defp triple_counts_with_graph_differences(quads) do + quads + |> Enum.uniq() + |> Enum.map(fn %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: _graph + } -> + {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} + end) + |> Enum.frequencies() + end + + defp quad_equal_without_graph( + %Quad{ + subject: s1, + predicate: p1, + object: o1 + }, + %Quad{ + subject: s2, + predicate: p2, + object: o2 + } + ) do + s1 == s2 and p1 == p2 and o1 == o2 + end + + defp split_into_nonoverlapping(cum, []) do + cum + end + + defp split_into_nonoverlapping(cum, xs) do + # if el can merge into acc, return {[], acc ++ el} + # else {[el], acc} + el_can_merge = fn el, acc -> + not Enum.any?(el, fn x -> Enum.any?(acc, &quad_equal_without_graph(x, &1)) end) + end + + {xs, cum} = + Enum.flat_map_reduce(xs, cum, fn el, acc -> + # TODO check syntax! + (el_can_merge.(el, acc) && {[], acc ++ el}) || {[el], acc} + end) + + [cum | split_into_nonoverlapping([], xs)] + end + + defp merge_quads_in_non_overlapping_quads(quads) do + # Filter per graph + # Merge seperate graphs + # return quads + per_graph = + Enum.group_by(quads, fn x -> x.graph end) + |> Map.values() + + split_into_nonoverlapping([], per_graph) + end + + defp quad_list_to_constructed_graphs(quads) do + graphs = + Enum.map(quads, fn x -> get_result_tuple(x.graph) end) + |> MapSet.new() + + triples_in_store = triples_in_store_with_construct(quads) + {graphs, triples_in_store} + end + + # Test if a quad is inn the store + # If the calculated frequency is one, the existence of the triple in the CONSTRUCT query + # uniquely represents the existence of the quad in the triplestore + # If the calculated frequency is more, the triple might exist in more graphs + # so the CONSTRUCT query does not uniquely represent the quad in the triplestore + # so an ASK query is executed (this shouldn't happen too often) + @spec create_cache_type(:construct | :multiple_constructs | :only_asks | :select, any) :: %{ + :__struct__ => + CacheType.ConstructAndAsk + | CacheType.MultipleConstructs + | CacheType.OnlyAsk + | CacheType.Select, + optional(:non_overlapping_quads) => list, + optional(:quads_in_store) => MapSet.t(any), + optional(:triple_counts) => map, + optional(:triples_in_store) => MapSet.t(any) + } + def create_cache_type(:construct, quads) do + triple_counts = triple_counts_with_graph_differences(quads) + triples_in_store = triples_in_store_with_construct(quads) + + %ConstructAndAsk{triple_counts: triple_counts, triples_in_store: triples_in_store} + end + + def create_cache_type(:select, quads) do + quads_in_store = quads_in_store_with_select(quads) + + %Select{quads_in_store: quads_in_store} + end + + def create_cache_type(:multiple_constructs, quads) do + non_overlapping_quads = + merge_quads_in_non_overlapping_quads(quads) + |> Enum.map(&quad_list_to_constructed_graphs/1) + + %MultipleConstructs{non_overlapping_quads: non_overlapping_quads} + end + + def create_cache_type(:only_asks, _quads) do + %OnlyAsk{} + end + + def quad_in_store?(%MultipleConstructs{non_overlapping_quads: non_overlapping_quads}, %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: graph + }) do + g = get_result_tuple(graph) + value = {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} + + {_, quads_in_this_store} = Enum.find(non_overlapping_quads, fn {gs, _} -> g in gs end) + + value in quads_in_this_store + end + + def quad_in_store?(%Select{quads_in_store: quads_in_store}, %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: graph + }) do + value = + {get_result_tuple(graph), get_result_tuple(subject), get_result_tuple(predicate), + get_result_tuple(object)} + + value in quads_in_store + end + + def quad_in_store?(%OnlyAsk{}, quad) do + quad_in_store_with_ask?(quad) + end + + def quad_in_store?( + %ConstructAndAsk{ + triple_counts: triple_counts, + triples_in_store: triples_in_store + }, + %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: _graph + } = quad + ) do + value = {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} + + if Map.get(triple_counts, value, 0) > 1 do + quad_in_store_with_ask?(quad) + else + value in triples_in_store + end + end +end diff --git a/lib/cache/deltas.ex b/lib/cache/deltas.ex new file mode 100644 index 0000000..c187022 --- /dev/null +++ b/lib/cache/deltas.ex @@ -0,0 +1,308 @@ +defmodule Cache.Deltas do + alias SparqlServer.Router.AccessGroupSupport, as: AccessGroupSupport + alias Updates.QueryAnalyzer + alias Updates.QueryAnalyzer.P, as: QueryAnalyzerProtocol + alias Updates.QueryAnalyzer.Types.Quad, as: Quad + + require Logger + require ALog + use GenServer + + @type cache_logic_key :: :precache | :construct | :ask + + # {effective inserts, effective deletions, all inserts, all deletions} + defp new_cache, do: {%{}, %{}, %{}, %{}} + + ### GenServer API + @doc """ + GenServer.init/1 callback + """ + def init(state) do + state = state || %{metas: [], cache: new_cache(), index: :os.system_time(:millisecond)} + {:ok, state} + end + + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + @doc """ + Flush the current state, actually applying the delta's to the triplestore. + """ + def flush(options) do + GenServer.call(__MODULE__, {:flush, options}) + end + + # @spec add_deltas(QueryAnalyzer.quad_changes(), cache_logic_key()) :: :ok + def add_deltas(quad_changes, options, logic, delta_meta \\ []) do + # TODO + # :precache -> + # GenServer.cast(__MODULE__, {:cache_w_cache, quad_changes}) + + GenServer.cast( + __MODULE__, + {:cache, logic, quad_changes, Map.new(delta_meta), options} + ) + + # TODO + # :ask -> + # GenServer.cast(__MODULE__, {:cache_w_ask, quad_changes}) + end + + # Reduce :insert and :delete delta's into true and all delta's + # All delta's have a list of indices. Only one insert can be an actual insert, + # but multiple delta's can insert the same quad + # + # An insert is a true delta if the quad is not yet present in the triplestore + # If an insert would insert a triple that is marked as true deletion, + # this deletion and insertion are false. + defp add_delta_to_state({:insert, quad}, state) do + cache_type = List.first(state.metas).cache_type + {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache + + new_cache = + if CacheType.quad_in_store?(cache_type, quad) do + if Map.has_key?(true_deletions, quad) do + # Element in store, but would be deleted + # Remove true_deletion + all_inserts = Map.update(all_inserts, quad, [state.index], &[state.index | &1]) + true_deletions = Map.delete(true_deletions, quad) + + {true_inserts, true_deletions, all_inserts, all_deletions} + else + all_inserts = Map.update(all_inserts, quad, [state.index], &[state.index | &1]) + {true_inserts, true_deletions, all_inserts, all_deletions} + end + else + # This is important, the quad might be inserted and deleted with a previous delta + # But that index is more correct than the new index + index = Enum.min(Map.get(all_inserts, quad, [state.index])) + true_inserts = Map.put_new(true_inserts, quad, index) + all_inserts = Map.update(all_inserts, quad, [state.index], &[state.index | &1]) + + {true_inserts, true_deletions, all_inserts, all_deletions} + end + + %{state | cache: new_cache} + end + + # A deletion is a true deletion if the quad is present in the triplestore + # If a true insertion would insert this quad, the insert is actually a false insert + defp add_delta_to_state({:delete, quad}, state) do + cache_type = List.first(state.metas).cache_type + {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache + + new_cache = + if CacheType.quad_in_store?(cache_type, quad) do + index = Enum.min(Map.get(all_deletions, quad, [state.index])) + true_deletions = Map.put_new(true_deletions, quad, index) + all_deletions = Map.update(all_deletions, quad, [state.index], &[state.index | &1]) + + {true_inserts, true_deletions, all_inserts, all_deletions} + else + if Map.has_key?(true_inserts, quad) do + # Element not in store, but would be inserted and deleted + # So both false insert and false deletion + true_inserts = Map.delete(true_inserts, quad) + all_deletions = Map.update(all_deletions, quad, [state.index], &[state.index | &1]) + + {true_inserts, true_deletions, all_inserts, all_deletions} + else + all_deletions = Map.update(all_deletions, quad, [state.index], &[state.index | &1]) + + {true_inserts, true_deletions, all_inserts, all_deletions} + end + end + + %{state | cache: new_cache} + end + + defp convert_quad(%Quad{graph: graph, subject: subject, predicate: predicate, object: object}) do + [g, s, p, o] = + Enum.map( + [graph, subject, predicate, object], + &QueryAnalyzerProtocol.to_sparql_result_value/1 + ) + + %{"graph" => g, "subject" => s, "predicate" => p, "object" => o} + end + + defp delta_update(state) do + {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache + + merge_f = fn _, one, two -> one ++ two end + + # Merge on index + inserts = + Enum.group_by(true_inserts, &elem(&1, 1), &{:effective_insert, convert_quad(elem(&1, 0))}) + + deletions = + Enum.group_by(true_deletions, &elem(&1, 1), &{:effective_delete, convert_quad(elem(&1, 0))}) + + all_inserts = + Enum.flat_map(all_inserts, fn {quad, ids} -> Enum.map(ids, &{quad, &1}) end) + |> Enum.group_by(&elem(&1, 1), &{:insert, convert_quad(elem(&1, 0))}) + + all_deletions = + Enum.flat_map(all_deletions, fn {quad, ids} -> Enum.map(ids, &{quad, &1}) end) + |> Enum.group_by(&elem(&1, 1), &{:delete, convert_quad(elem(&1, 0))}) + + # Combine all things + total = + Map.merge(inserts, deletions, merge_f) + |> Map.merge(all_inserts, merge_f) + |> Map.merge(all_deletions, merge_f) + + messages = + Enum.map(state.metas, fn meta -> + index = meta.index + + other_meta = + Map.new() + |> add_index(index) + |> add_allowed_groups(meta.delta_meta) + |> add_origin(meta.delta_meta) + |> add_trail(meta.delta_meta) + + Map.get(total, index, []) + |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) + |> Enum.reduce(other_meta, &add_delta/2) + end) + + %{ + "changeSets" => messages + } + |> Poison.encode!() + |> Delta.Messenger.inform_clients() + end + + # These might be better suited in seperate file + defp add_delta({:effective_insert, items}, map) do + Map.put(map, "effectiveInserts", items) + end + + defp add_delta({:effective_delete, items}, map) do + Map.put(map, "effectiveDeletes", items) + end + + defp add_delta({:insert, items}, map) do + Map.put(map, "insert", items) + end + + defp add_delta({:delete, items}, map) do + Map.put(map, "delete", items) + end + + defp add_allowed_groups(map, %{authorization_groups: :sudo}) do + Map.put(map, "allowedGroups", "sudo") + end + + defp add_allowed_groups(map, %{authorization_groups: groups}) do + json_access_groups = AccessGroupSupport.encode_json_access_groups(groups) + Map.put(map, "allowedGroups", json_access_groups) + end + + defp add_allowed_groups(map, _), do: map + + defp add_trail(map, %{mu_call_id_trail: trail}), do: Map.put(map, "muCallIdTrail", trail) + defp add_trail(map, _), do: map + + defp add_origin(map, %{origin: origin}), do: Map.put(map, "origin", origin) + defp add_origin(map, _), do: map + + defp add_index(map, index), do: Map.put(map, "index", index) + + defp do_flush(state, options) do + {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache + + inserts = Map.keys(true_inserts) + + unless Enum.empty?(inserts) do + QueryAnalyzer.construct_insert_query_from_quads(inserts, options) + |> Regen.result() + + QueryAnalyzer.construct_insert_query_from_quads(inserts, options) + |> SparqlClient.execute_parsed(query_type: :write) + end + + deletions = Map.keys(true_deletions) + + unless Enum.empty?(deletions) do + QueryAnalyzer.construct_delete_query_from_quads(deletions, options) + |> Regen.result() + + QueryAnalyzer.construct_delete_query_from_quads(deletions, options) + |> SparqlClient.execute_parsed(query_type: :write) + end + + if not (Enum.empty?(all_inserts) and Enum.empty?(all_deletions)) do + delta_update(state) + end + + %{state | cache: new_cache(), metas: []} + end + + @doc """ + GenServer.handle_call/3 callback + """ + def handle_call({:flush, options}, _from, state) do + new_state = do_flush(state, options) + + {:reply, :ok, new_state} + end + + # delta_meta: mu_call_id_trail, authorization_groups, origin + def handle_cast({:cache, type, quads, delta_meta, options}, state) do + timeout_sessions = Application.get_env(:"mu-authorization", :quad_change_cache_session) + + current_timeout = + Map.get(state, :ref, nil) + |> IO.inspect(label: "current timeout") + + state = + if is_nil(current_timeout) or timeout_sessions do + if not is_nil(current_timeout) do + Process.cancel_timer(current_timeout) + end + + timeout_duration = + Application.get_env(:"mu-authorization", :quad_change_cache_timeout) + |> IO.inspect(label: "timeout duration") + + ref = Process.send_after(self(), {:timeout, options}, timeout_duration) + + Map.put(state, :ref, ref) + else + state + end + + deltas = Enum.flat_map(quads, fn {type, qs} -> Enum.map(qs, &{type, &1}) end) + quads = Enum.map(deltas, &elem(&1, 1)) + + # Calculate meta data + cache_type = CacheType.create_cache_type(type, quads) + + # Add metadata to state + meta = %{ + cache_type: cache_type, + delta_meta: delta_meta, + index: state.index + 1 + } + + state_with_meta = %{state | metas: [meta | state.metas], index: state.index + 1} + + # Reduce with add_delta_to_state + new_state = Enum.reduce(deltas, state_with_meta, &add_delta_to_state/2) + + {:noreply, new_state} + end + + def handle_info({:timeout, options}, state) do + IO.puts("Timeout timeout!") + new_state = do_flush(state, options) + + {:noreply, new_state} + end +end + +# You like kinda want an 'instant' struct but that changes more then `quad_in_store` diff --git a/lib/delta/delta.ex b/lib/delta/delta.ex deleted file mode 100644 index ef37f83..0000000 --- a/lib/delta/delta.ex +++ /dev/null @@ -1,53 +0,0 @@ -defmodule Delta do - alias Updates.QueryAnalyzer, as: QueryAnalyzer - - require Logger - require ALog - - @type delta :: QueryAnalyzer.quad_changes() - - @moduledoc """ - This service consumes altered triples and sends them to interested - clients. It runs in a separate thread and will always run *after* - the response has been supplied to the client. - """ - - @doc """ - Publishes the updated quads. The array is expected to contain - tuples of form {insert_type, quads} in which insert_type is one of - :insert or :delete. - """ - @spec publish_updates(QueryAnalyzer.quad_changes(), [any], Plug.Conn.t()) :: - QueryAnalyzer.quad_changes() - def publish_updates(delta, authorization_groups, conn) do - origin = - conn - |> Map.get(:remote_ip) - |> Tuple.to_list() - |> Enum.join(".") - - get_json_array_header = fn conn, header -> - case Plug.Conn.get_req_header(conn, header) do - [] -> - [] - - [value | _] -> - # ignore extra values for now, they should not happen, but - # if they do we don't want to crash either - Poison.decode!(value) - end - end - - mu_call_id_trail = - get_json_array_header.(conn, "mu-call-id-trail") - |> Kernel.++(Plug.Conn.get_req_header(conn, "mu-call-id")) - |> Poison.encode!() - - delta - |> Delta.Message.construct(authorization_groups, origin) - |> Logging.EnvLog.inspect(:log_delta_messages, label: "Constructed body for clients") - |> Delta.Messenger.inform_clients(mu_call_id_trail: mu_call_id_trail) - - delta - end -end diff --git a/lib/delta/message.ex b/lib/delta/message.ex deleted file mode 100644 index a453b61..0000000 --- a/lib/delta/message.ex +++ /dev/null @@ -1,74 +0,0 @@ -defmodule Delta.Message do - alias Updates.QueryAnalyzer.Types.Quad, as: Quad - alias SparqlServer.Router.AccessGroupSupport, as: AccessGroupSupport - - @moduledoc """ - Contains code to construct the correct messenges for informing - clients. - """ - - @typedoc """ - Type of the messages which can be sent to a client. Currently, this - is a binary string. - """ - @type t :: String.t() - - @doc """ - Constructs a new message which can be sent to the clients based on a - quad delta. - """ - @spec construct(Delta.delta(), AccessGroupSupport.decoded_json_access_groups(), String.t()) :: - Delta.Message.t() - def construct(delta, access_groups, origin) do - # TODO we should include the current access rigths and an - # identifier for the originating service. This would help - # services ignore content which came from their end and would - # allow services to perform updates in the name of a specific - # user. - - json_model = %{ - "changeSets" => - Enum.map(delta, fn delta_item -> - delta_item - |> convert_delta_item - |> add_allowed_groups(access_groups) - |> add_origin(origin) - end) - } - - Poison.encode!(json_model) - end - - defp convert_delta_item({:insert, quads}) do - %{"insert" => Enum.map(quads, &convert_quad/1)} - end - - defp convert_delta_item({:delete, quads}) do - %{"delete" => Enum.map(quads, &convert_quad/1)} - end - - @spec add_allowed_groups(Poison.Decoder.t(), AccessGroupSupport.decoded_json_access_groups()) :: - Poison.Decoder.t() - defp add_allowed_groups(map, :sudo) do - Map.put(map, "allowedGroups", "sudo") - end - - defp add_allowed_groups(map, access_groups) do - json_access_groups = AccessGroupSupport.encode_json_access_groups(access_groups) - Map.put(map, "allowedGroups", json_access_groups) - end - - defp add_origin(map, origin) do - Map.put(map, "origin", origin) - end - - defp convert_quad(%Quad{graph: graph, subject: subject, predicate: predicate, object: object}) do - [g, s, p, o] = - Enum.map( - [graph, subject, predicate, object], - &Updates.QueryAnalyzer.P.to_sparql_result_value/1 - ) - - %{"graph" => g, "subject" => s, "predicate" => p, "object" => o} - end -end diff --git a/lib/delta/messenger.ex b/lib/delta/messenger.ex index 1a36efc..9f17362 100644 --- a/lib/delta/messenger.ex +++ b/lib/delta/messenger.ex @@ -6,8 +6,11 @@ defmodule Delta.Messenger do @moduledoc """ Sends constructed messages to all interested clients. + + Type of the messages which can be sent to a client. Currently, this + is a binary string. """ - @spec inform_clients(Delta.Message.t(), call_options) :: :ok + @spec inform_clients(String.t(), call_options) :: :ok def inform_clients(message, options \\ []) do # TODO we should create one thread per callee and push messages on # there. As long as the client hasn't shown a sign of life, we @@ -18,7 +21,7 @@ defmodule Delta.Messenger do # connections. Delta.Config.targets() |> ALog.di("Targets to inform") - |> Enum.map(&spawn(Delta.Messenger, :send_message_to_client, [message, &1, options])) + |> Enum.each(&spawn(Delta.Messenger, :send_message_to_client, [message, &1, options])) :ok end diff --git a/lib/graph_reasoner/support/term_selectors.ex b/lib/graph_reasoner/support/term_selectors.ex index 3bc6283..cb860d6 100644 --- a/lib/graph_reasoner/support/term_selectors.ex +++ b/lib/graph_reasoner/support/term_selectors.ex @@ -1,5 +1,4 @@ -defmodule TermSelectors do - alias GraphReasoner.Support.TermSelectors +defmodule GraphReasoner.Support.TermSelectors do alias InterpreterTerms.SymbolMatch, as: Sym alias InterpreterTerms.WordMatch, as: Word diff --git a/lib/graph_reasoner/type_reasoner/type_reasoner.ex b/lib/graph_reasoner/type_reasoner/type_reasoner.ex index 539fcf8..638909f 100644 --- a/lib/graph_reasoner/type_reasoner/type_reasoner.ex +++ b/lib/graph_reasoner/type_reasoner/type_reasoner.ex @@ -1,6 +1,6 @@ -defmodule TypeReasoner do +defmodule GraphReasoner.TypeReasoner do + alias GraphReasoner.{ModelInfo, QueryInfo} alias Updates.QueryAnalyzer.Iri - alias GraphReasoner.{TypeReasoner, ModelInfo, QueryInfo} @moduledoc """ The TypeReasoner derives types for known entities and pushes the diff --git a/lib/sparql_server/router/handler_support.ex b/lib/sparql_server/router/handler_support.ex index fe9c9ab..03c12e6 100644 --- a/lib/sparql_server/router/handler_support.ex +++ b/lib/sparql_server/router/handler_support.ex @@ -110,36 +110,89 @@ defmodule SparqlServer.Router.HandlerSupport do |> wrap_query_in_toplevel |> ALog.di("Wrapped parsed query") - query_manipulator = - if is_select_query(parsed_form) do - &manipulate_select_query/2 - else - &manipulate_update_query/2 - end + # TODO: Check where the default_graph is used where these options are passed and verify whether this is a sensible name. + options = %{ + default_graph: Iri.from_iri_string("", %{}), + prefixes: %{ + "xsd" => Iri.from_iri_string(""), + "foaf" => Iri.from_iri_string("") + } + } - case query_manipulator.(parsed_form, conn) do - {conn, new_parsed_forms, post_processing} -> - query_type = - if Enum.any?(new_parsed_forms, fn q -> !is_select_query(q) end) do - :read - else - :write - end + if is_select_query(parsed_form) do + Cache.Deltas.flush(options) - encoded_response = - new_parsed_forms - |> ALog.di("New parsed forms") - |> Enum.map(&SparqlClient.execute_parsed(&1, request: conn, query_type: query_type)) - |> List.first() - |> Poison.encode!() + {conn, new_parsed_forms} = manipulate_select_query(parsed_form, conn) - post_processing.() + query_type = + if Enum.any?(new_parsed_forms, fn q -> !is_select_query(q) end) do + :read + else + :write + end - {conn, {200, encoded_response}, new_template_local_store} + encoded_response = + new_parsed_forms + |> Enum.map(&SparqlClient.execute_parsed(&1, request: conn, query_type: query_type)) + |> List.first() + |> Poison.encode!() - {:fail, reason} -> - encoded_response_string = Poison.encode!(%{errors: [%{status: "403", title: reason}]}) - {conn, {403, encoded_response_string}, new_template_local_store} + {conn, {200, encoded_response}, new_template_local_store} + else + {conn, authorization_groups} = AccessGroupSupport.calculate_access_groups(conn) + + origin = + conn + |> Map.get(:remote_ip) + |> Tuple.to_list() + |> Enum.join(".") + + mu_call_id_trail = + case Plug.Conn.get_req_header(conn, "mu-call-id-trail") do + # ignore extra values for now, they should not happen, but + # if they do we don't want to crash either + [value | _] -> Poison.decode!(value) + _ -> [] + end + |> Kernel.++(Plug.Conn.get_req_header(conn, "mu-call-id")) + |> Poison.encode!() + + analyzed_quads = + parsed_form + |> ALog.di("Parsed query") + |> QueryAnalyzer.quad_changes(%{ + default_graph: Iri.from_iri_string("", %{}), + authorization_groups: authorization_groups + }) + |> Enum.reject(&match?({_, []}, &1)) + |> ALog.di("Non-empty operations") + |> enrich_manipulations_with_access_rights(authorization_groups) + |> maybe_verify_all_triples_written() + + cache_type = Plug.Conn.get_req_header(conn, "cache_type") + |> List.first() || "select" + + case analyzed_quads do + {:fail, reason} -> + encoded_response_string = Poison.encode!(%{errors: [%{status: "403", title: reason}]}) + {conn, {403, encoded_response_string}, new_template_local_store} + + _ -> + analyzed_quads + |> Enum.map(fn {manipulation, _requested_quads, effective_quads} -> + {manipulation, effective_quads} + end) + |> Cache.Deltas.add_deltas( + options, + String.to_atom(cache_type), + origin: origin, + mu_call_id_trail: mu_call_id_trail, + authorization_groups: authorization_groups + ) + + succesful = Poison.encode!(%{sucessful: true}) + {conn, {200, succesful}, new_template_local_store} + end end end @@ -187,7 +240,7 @@ defmodule SparqlServer.Router.HandlerSupport do {conn, query} end - {conn, [query], fn -> :ok end} + {conn, [query]} end @doc """ @@ -209,70 +262,6 @@ defmodule SparqlServer.Router.HandlerSupport do end end - ### Manipulates the update query yielding back the valid set of - ### queries which should be executed on the database. - defp manipulate_update_query(query, conn) do - Logger.debug("This is an update query") - - {conn, authorization_groups} = AccessGroupSupport.calculate_access_groups(conn) - - # TODO: DRY into/from QueryAnalyzer.insert_quads - - # TODO: Check where the default_graph is used where these options are passed and verify whether this is a sensible name. - options = %{ - default_graph: Iri.from_iri_string("", %{}), - prefixes: %{ - "xsd" => Iri.from_iri_string(""), - "foaf" => Iri.from_iri_string("") - } - } - - analyzed_quads = - query - |> ALog.di("Parsed query") - |> QueryAnalyzer.quad_changes(%{ - default_graph: Iri.from_iri_string("", %{}), - authorization_groups: authorization_groups - }) - |> Enum.reject(&match?({_, []}, &1)) - |> ALog.di("Non-empty operations") - |> enrich_manipulations_with_access_rights(authorization_groups) - |> maybe_verify_all_triples_written() - - case analyzed_quads do - {:fail, reason} -> - {:fail, reason} - - _ -> - processed_manipulations = - analyzed_quads - |> Enum.map(fn {manipulation, _requested_quads, effective_quads} -> - {manipulation, effective_quads} - end) - - executable_queries = - processed_manipulations - |> join_quad_updates - |> Enum.map(fn {statement, processed_quads} -> - case statement do - :insert -> - QueryAnalyzer.construct_insert_query_from_quads(processed_quads, options) - - :delete -> - QueryAnalyzer.construct_delete_query_from_quads(processed_quads, options) - end - end) - - delta_updater = fn -> - Delta.publish_updates(processed_manipulations, authorization_groups, conn) - end - - # TODO: should we set the access groups on update queries too? - # see AccessGroupSupport.put_access_groups/2 ( conn, authorization_groups ) - {conn, executable_queries, delta_updater} - end - end - defp enrich_manipulations_with_access_rights(manipulations, authorization_groups) do manipulations |> Enum.map(fn {kind, quads} -> @@ -298,11 +287,11 @@ defmodule SparqlServer.Router.HandlerSupport do |> Enum.map(&{&1.subject, &1.predicate, &1.object}) |> MapSet.new() - all_triples_written? = Set.equal?(requested_triples, effective_triples) + all_triples_written? = MapSet.equal?(requested_triples, effective_triples) unless all_triples_written? do Logging.EnvLog.inspect( - Set.difference(requested_triples, effective_triples), + MapSet.difference(requested_triples, effective_triples), :error, label: "These triples would not be written to the triplestore" ) @@ -321,46 +310,6 @@ defmodule SparqlServer.Router.HandlerSupport do end end - @spec join_quad_updates(QueryAnalyzer.quad_changes()) :: - QueryAnalyzer.quad_changes() - defp join_quad_updates(elts) do - elts - |> Enum.map(fn {op, quads} -> {op, MapSet.new(quads)} end) - |> join_quad_map_updates([]) - |> Enum.map(fn {op, quads} -> {op, MapSet.to_list(quads)} end) - |> Enum.reject(&match?({_, []}, &1)) - end - - @type map_quad :: {QueryAnalyzer.quad_change_key(), MapSet.t(Quad.t())} - - @spec join_quad_map_updates([map_quad], [map_quad]) :: [map_quad] - defp join_quad_map_updates([], res), do: res - defp join_quad_map_updates([elt | rest], []), do: join_quad_map_updates(rest, [elt]) - - defp join_quad_map_updates([{type, quads} | rest], [{type, other_quads}]), - do: join_quad_map_updates(rest, [{type, MapSet.union(quads, other_quads)}]) - - defp join_quad_map_updates([quads | rest], [other_quads]) do - new_other_quads = fold_quad_map_updates(other_quads, quads) - join_quad_map_updates(rest, [new_other_quads, quads]) - end - - defp join_quad_map_updates([quad_updates | rest], [left, right]) do - new_left = fold_quad_map_updates(left, quad_updates) - new_right = fold_quad_map_updates(right, quad_updates) - - join_quad_map_updates(rest, [new_left, new_right]) - end - - @spec fold_quad_map_updates(map_quad, map_quad) :: map_quad - defp fold_quad_map_updates({key, left_quads}, {key, right_quads}), - # :inserts, :inserts or :deletes, :deletes - do: {key, MapSet.union(left_quads, right_quads)} - - defp fold_quad_map_updates({left_type, left_quads}, {_right_type, right_quads}), - # :inserts, :deletes or :deletes, :inserts - do: {left_type, MapSet.difference(left_quads, right_quads)} - @spec enforce_write_rights([Quad.t()], Acl.UserGroups.Config.t()) :: [Quad.t()] defp enforce_write_rights(quads, authorization_groups) do Logger.info("Enforcing write rights") diff --git a/lib/sparql_server/sparql_server.ex b/lib/sparql_server/sparql_server.ex index f3865fc..ba4cd11 100644 --- a/lib/sparql_server/sparql_server.ex +++ b/lib/sparql_server/sparql_server.ex @@ -56,15 +56,16 @@ defmodule SparqlServer do label: "server setup, log database recovery mode tick" ) - Logging.EnvLog.inspect( Application.get_env(:"mu-authorization", :testing_auth_query_error_rate), :testing_auth_query_error_rate, - label: "server setup, testing auth query error rate amount of queries authorization makes fail" + label: + "server setup, testing auth query error rate amount of queries authorization makes fail" ) children = [ {Cache.Types, %{}}, + {Cache.Deltas, nil}, {Support.Id, nil}, {SparqlClient.InfoEndpoint, nil}, {SparqlClient.WorkloadInfo, nil}, @@ -72,8 +73,7 @@ defmodule SparqlServer do {Interpreter.CachedInterpreter, nil}, {Interpreter.Diff.Store.Storage, nil}, {Interpreter.Diff.Store.Manipulator, nil}, - {Plug.Adapters.Cowboy2, - scheme: :http, plug: SparqlServer.Router, options: [port: port]}, + {Plug.Cowboy, scheme: :http, plug: SparqlServer.Router, options: [port: port]}, :poolboy.child_spec(:worker, [ {:name, {:local, :query_worker}}, {:worker_module, SparqlServer.Router.Handler.Worker}, diff --git a/lib/updates/query_analyzer.ex b/lib/updates/query_analyzer.ex index 1033d61..9ebbc0a 100644 --- a/lib/updates/query_analyzer.ex +++ b/lib/updates/query_analyzer.ex @@ -846,6 +846,7 @@ defmodule Updates.QueryAnalyzer do |> Enum.uniq() end + @spec update_options_for_with(Sym.t(), options) :: options def update_options_for_with(%Sym{symbol: :iri} = sym, options) do # TODO double_check the use of :default_graph. The may be used @@ -856,6 +857,24 @@ defmodule Updates.QueryAnalyzer do |> Map.put(:default_graph, iri) end + def construct_select_distinct_matching_quads(quads) do + graph = QueryConstructors.make_var_symbol("?g") + subject = QueryConstructors.make_var_symbol("?s") + predicate = QueryConstructors.make_var_symbol("?p") + object = QueryConstructors.make_var_symbol("?o") + where_clause = QueryConstructors.make_quad_match_values(graph, subject, predicate, object, quads) + + QueryConstructors.make_select_distinct_query([graph, subject, predicate, object], where_clause) + end + + def construct_asks_query(quads) do + QueryConstructors.make_asks_query(quads) + end + + def construct_ask_query(quad) do + QueryConstructors.make_ask_query(quad) + end + @spec construct_select_query([Var.t()], Sym.t(), options) :: {Parser.query(), Acl.allowed_groups()} def construct_select_query(variables, group_graph_pattern_sym, options) do @@ -895,6 +914,7 @@ defmodule Updates.QueryAnalyzer do |> Enum.map(&QueryConstructors.make_quad_match_from_quad/1) |> QueryConstructors.make_insert_query() + # |> TODO add prefixes end @@ -902,7 +922,6 @@ defmodule Updates.QueryAnalyzer do def construct_delete_query_from_quads(quads, options) do # TODO: this should be clearing when the query is executed clear_cache_for_typed_quads(quads, options) - quads |> Enum.map(&QueryConstructors.make_quad_match_from_quad/1) |> QueryConstructors.make_delete_query() diff --git a/lib/updates/query_constructors.ex b/lib/updates/query_constructors.ex index 2dfe875..c7de4f6 100644 --- a/lib/updates/query_constructors.ex +++ b/lib/updates/query_constructors.ex @@ -4,6 +4,254 @@ defmodule Updates.QueryConstructors do alias InterpreterTerms.WordMatch, as: Word alias Updates.QueryAnalyzer.Types.Quad, as: Quad + defp leaf_to_data_block_value(leaf) do + %Sym{ + symbol: :DataBlockValue, + submatches: [ + QueryAnalyzerProtocol.to_solution_sym(leaf) + ] + } + end + + defp map_quad_to_bracet_data_block_values(%Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: graph + }) do + [ + %Word{word: "("}, + leaf_to_data_block_value(graph), + leaf_to_data_block_value(subject), + leaf_to_data_block_value(predicate), + leaf_to_data_block_value(object), + %Word{word: ")"} + ] + end + + @doc """ + Creates a valid CONSTRUCT query, determining the existence of ~~life~~ quads + Can be used as multiple ASK queries with only a little bit of suffering + """ + def make_asks_query(quads) do + graph = make_var_symbol("?g") + subject = make_var_symbol("?s") + predicate = make_var_symbol("?p") + object = make_var_symbol("?o") + + where_clause = make_quad_match_values(graph, subject, predicate, object, quads) + + %Sym{ + symbol: :Sparql, + submatches: [ + %Sym{ + symbol: :QueryUnit, + submatches: [ + %Sym{ + symbol: :Query, + submatches: [ + %Sym{ + symbol: :Prologue, + submatches: [] + }, + %Sym{ + symbol: :ConstructQuery, + submatches: [ + %Word{word: "CONSTRUCT"}, + %Sym{ + symbol: :ConstructTemplate, + submatches: [ + %Word{word: "{"}, + %Sym{ + symbol: :ConstructTriples, + submatches: [ + make_simple_triples_same_subject( + subject, + predicate, + object + ) + ] + }, + %Word{word: "}"} + ] + }, + %Sym{ + symbol: :WhereClause, + submatches: [ + %Word{word: "WHERE"}, + where_clause + ] + }, + %Sym{ + symbol: :SolutionModifier, + submatches: [] + } + ] + }, + %Sym{ + symbol: :ValuesClause, + submatches: [] + } + ] + } + ] + } + ] + } + end + + def make_quad_match_values(graph, subject, predicate, object, quads) do + matches = quads |> Enum.flat_map(&map_quad_to_bracet_data_block_values/1) + + %Sym{ + symbol: :GroupGraphPattern, + submatches: [ + %Word{word: "{"}, + %Sym{ + symbol: :GroupGraphPatternSub, + submatches: [ + %Sym{ + symbol: :GraphPatternNotTriples, + submatches: [ + %Sym{ + symbol: :InlineData, + submatches: [ + %Word{word: "VALUES"}, + %Sym{ + symbol: :DataBlock, + submatches: [ + %Sym{ + symbol: :InlineDataFull, + submatches: + [ + %Word{word: "("}, + graph, + subject, + predicate, + object, + %Word{word: ")"}, + %Word{word: "{"} + ] ++ + matches ++ + [ + %Word{word: "}"} + ] + } + ] + } + ] + } + ] + }, + %Sym{ + symbol: :GraphPatternNotTriples, + submatches: [ + %Sym{ + symbol: :GraphGraphPattern, + submatches: [ + %Word{word: "GRAPH"}, + %Sym{ + symbol: :VarOrIri, + submatches: [ + graph + ] + }, + %Sym{ + symbol: :GroupGraphPattern, + submatches: [ + %Word{word: "{"}, + %Sym{ + symbol: :GroupGraphPatternSub, + submatches: [ + %Sym{ + symbol: :TriplesBlock, + submatches: [ + make_simple_triples_same_subject_path( + subject, + predicate, + object + ), + %Word{word: "."} + ] + } + ] + }, + %Word{word: "}"} + ] + } + ] + } + ] + } + ] + }, + %Word{word: "}"} + ] + } + end + + @doc """ + Creates a valid ASK query, for a single quad + """ + def make_ask_query(quad) do + quad_pattern = %Sym{ + symbol: :QuadData, + submatches: [ + %Word{external: %{}, whitespace: "", word: "{"}, + %Sym{ + symbol: :Quads, + submatches: [ + make_quad_match_from_quad(quad) + ] + }, + %Word{external: %{}, whitespace: "", word: "}"} + ] + } + + group_graph_pattern = Manipulator.Transform.quad_data_to_group_graph_pattern(quad_pattern) + + %Sym{ + symbol: :Sparql, + submatches: [ + %Sym{ + symbol: :QueryUnit, + submatches: [ + %Sym{ + symbol: :Query, + submatches: [ + %Sym{ + symbol: :Prologue, + submatches: [] + }, + %Sym{ + symbol: :AskQuery, + submatches: [ + %Word{word: "ASK"}, + %Sym{ + symbol: :WhereClause, + submatches: [ + %Word{word: "WHERE"}, + group_graph_pattern + ] + }, + %Sym{ + symbol: :SolutionModifier, + submatches: [] + } + ] + }, + %Sym{ + symbol: :ValuesClause, + submatches: [] + } + ] + } + ] + } + ] + } + end + def make_select_query(variable_syms, group_graph_pattern_sym) do %Sym{ symbol: :Sparql, @@ -329,7 +577,7 @@ defmodule Updates.QueryConstructors do make_select_distinct_query(variables, group_graph_pattern_sym) end - defp make_var_symbol(str) do + def make_var_symbol(str) do %Sym{ symbol: :Var, submatches: [%Sym{symbol: :VAR1, string: str, submatches: :none}] @@ -376,4 +624,52 @@ defmodule Updates.QueryConstructors do ] } end + + defp make_simple_triples_same_subject( + subject_var_or_term, + predicate_var_or_term, + object_var_or_term + ) do + %Sym{ + symbol: :TriplesSameSubject, + submatches: [ + %Sym{symbol: :VarOrTerm, submatches: [subject_var_or_term]}, + %Sym{ + symbol: :PropertyListNotEmpty, + submatches: [ + %Sym{ + symbol: :Verb, + submatches: [ + %Sym{ + symbol: :VarOrIri, + submatches: [ + predicate_var_or_term + ] + } + ] + }, + %Sym{ + symbol: :ObjectList, + submatches: [ + %Sym{ + symbol: :Object, + submatches: [ + %Sym{ + symbol: :GraphNode, + submatches: [ + %Sym{ + symbol: :VarOrTerm, + submatches: [object_var_or_term] + } + ] + } + ] + } + ] + } + ] + } + ] + } + end end