From 3708cbc269bdc2e5b27814da0f33131f150d4f6d Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Mon, 9 Aug 2021 13:03:59 +0200 Subject: [PATCH 01/19] remove linter warnings --- config/config.exs | 10 +++++----- lib/graph_reasoner/support/term_selectors.ex | 3 +-- lib/graph_reasoner/type_reasoner/type_reasoner.ex | 4 ++-- lib/sparql_server/sparql_server.ex | 2 +- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/config/config.exs b/config/config.exs index 77dd232..5dbe510 100644 --- a/config/config.exs +++ b/config/config.exs @@ -87,14 +87,14 @@ config :"mu-authorization", # config :logger, level: :info # -# config :logger, -# compile_time_purge_level: :debug, -# level: :info - config :logger, - compile_time_purge_level: :debug, + compile_time_purge_matching: :debug, level: :warn +# config :logger, +# compile_time_purge_level: :debug, +# level: :warn + if Mix.env() == :test do config :junit_formatter, report_dir: "/tmp/repo-example-test-results/exunit" diff --git a/lib/graph_reasoner/support/term_selectors.ex b/lib/graph_reasoner/support/term_selectors.ex index 3bc6283..cb860d6 100644 --- a/lib/graph_reasoner/support/term_selectors.ex +++ b/lib/graph_reasoner/support/term_selectors.ex @@ -1,5 +1,4 @@ -defmodule TermSelectors do - alias GraphReasoner.Support.TermSelectors +defmodule GraphReasoner.Support.TermSelectors do alias InterpreterTerms.SymbolMatch, as: Sym alias InterpreterTerms.WordMatch, as: Word diff --git a/lib/graph_reasoner/type_reasoner/type_reasoner.ex b/lib/graph_reasoner/type_reasoner/type_reasoner.ex index 539fcf8..638909f 100644 --- a/lib/graph_reasoner/type_reasoner/type_reasoner.ex +++ b/lib/graph_reasoner/type_reasoner/type_reasoner.ex @@ -1,6 +1,6 @@ -defmodule TypeReasoner do +defmodule GraphReasoner.TypeReasoner do + alias GraphReasoner.{ModelInfo, QueryInfo} alias Updates.QueryAnalyzer.Iri - alias GraphReasoner.{TypeReasoner, ModelInfo, QueryInfo} @moduledoc """ The TypeReasoner derives types for known entities and pushes the diff --git a/lib/sparql_server/sparql_server.ex b/lib/sparql_server/sparql_server.ex index f3865fc..e612dba 100644 --- a/lib/sparql_server/sparql_server.ex +++ b/lib/sparql_server/sparql_server.ex @@ -72,7 +72,7 @@ defmodule SparqlServer do {Interpreter.CachedInterpreter, nil}, {Interpreter.Diff.Store.Storage, nil}, {Interpreter.Diff.Store.Manipulator, nil}, - {Plug.Adapters.Cowboy2, + {Plug.Cowboy, scheme: :http, plug: SparqlServer.Router, options: [port: port]}, :poolboy.child_spec(:worker, [ {:name, {:local, :query_worker}}, From 8ea859354039b8071500b9915a366166bcad6d2d Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Mon, 9 Aug 2021 14:30:30 +0200 Subject: [PATCH 02/19] add indexed field in deltanotifier body --- lib/delta/message.ex | 42 +++++++++++++++++++++++------- lib/sparql_server/sparql_server.ex | 1 + 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/lib/delta/message.ex b/lib/delta/message.ex index a453b61..9819a32 100644 --- a/lib/delta/message.ex +++ b/lib/delta/message.ex @@ -2,6 +2,36 @@ defmodule Delta.Message do alias Updates.QueryAnalyzer.Types.Quad, as: Quad alias SparqlServer.Router.AccessGroupSupport, as: AccessGroupSupport + use GenServer + + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + @impl true + def init(_) do + {:ok, %{index: :os.system_time(:millisecond)}} + end + + @impl true + def handle_call({:construct, delta, access_groups, origin}, _from, state ) do + index = state.index + new_state = %{state | index: index + 1} + + json_model = %{ + "changeSets" => + Enum.map(delta, fn delta_item -> + delta_item + |> convert_delta_item + |> add_allowed_groups(access_groups) + |> add_origin(origin) + end), + "index" => index + } + + {:reply, json_model, new_state} + end + @moduledoc """ Contains code to construct the correct messenges for informing clients. @@ -25,20 +55,12 @@ defmodule Delta.Message do # services ignore content which came from their end and would # allow services to perform updates in the name of a specific # user. - - json_model = %{ - "changeSets" => - Enum.map(delta, fn delta_item -> - delta_item - |> convert_delta_item - |> add_allowed_groups(access_groups) - |> add_origin(origin) - end) - } + json_model = GenServer.call( __MODULE__, {:construct, delta, access_groups, origin}) Poison.encode!(json_model) end + defp convert_delta_item({:insert, quads}) do %{"insert" => Enum.map(quads, &convert_quad/1)} end diff --git a/lib/sparql_server/sparql_server.ex b/lib/sparql_server/sparql_server.ex index e612dba..4be598c 100644 --- a/lib/sparql_server/sparql_server.ex +++ b/lib/sparql_server/sparql_server.ex @@ -65,6 +65,7 @@ defmodule SparqlServer do children = [ {Cache.Types, %{}}, + {Delta.Message, nil}, {Support.Id, nil}, {SparqlClient.InfoEndpoint, nil}, {SparqlClient.WorkloadInfo, nil}, From 72721ee53e397fe3e1755a43cb48ce7e6b435f4f Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Tue, 10 Aug 2021 11:28:07 +0200 Subject: [PATCH 03/19] coalesce delta messages try 1: with timeout, but just array of messages per mu_call_id_trail --- lib/delta/cache.ex | 80 ++++++++++++++++++++++++++++++ lib/delta/delta.ex | 3 +- lib/delta/message.ex | 38 +++++++------- lib/sparql_server/sparql_server.ex | 1 + 4 files changed, 103 insertions(+), 19 deletions(-) create mode 100644 lib/delta/cache.ex diff --git a/lib/delta/cache.ex b/lib/delta/cache.ex new file mode 100644 index 0000000..2f3de69 --- /dev/null +++ b/lib/delta/cache.ex @@ -0,0 +1,80 @@ +defmodule Delta.Cache do + use GenServer + + @coalesce_time 2 * 1000 + + def inform(delta, mu_call_id_trail) do + GenServer.call( __MODULE__, {:inform, delta, mu_call_id_trail}) + end + + + def flush(mu_call_id_trail) do + GenServer.cast( __MODULE__, {:flush, mu_call_id_trail}) + end + + + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + + @impl true + def init(_) do + {:ok, %{}} + end + + + defp touch_timeout(state, mu_call_id_trail) do + ref = Process.send_after(self(), {:timeout, mu_call_id_trail}, @coalesce_time) + new_state = Map.update(state, mu_call_id_trail, %{buffer: [], ref: ref}, fn x -> + Process.cancel_timer(x.ref) + # Keep buffer intact + %{x | ref: ref} + end) + + new_state + end + + + defp do_flush(state, mu_call_id_trail) do + # Remove possible timeout things + {cache, new_state} = Map.pop(state, mu_call_id_trail) + + Process.cancel_timer(cache.ref) + + json_model = %{ + "changeSets" => cache.buffer + } + + json_model + |> IO.inspect(label: "hallooooo") + |> Poison.encode!() + |> Delta.Messenger.inform_clients(mu_call_id_trail: mu_call_id_trail) + + new_state + end + + + # TODO: this message might be incorrect, cause it was already in queue, after a touch message + @impl true + def handle_info({:timeout, trail}, state) do + new_state = do_flush(state, trail) + + {:noreply, new_state} + end + + + @impl true + def handle_cast({:flush, trail}, state) do + {:noreply, do_flush(state, trail)} + end + + + @impl true + def handle_call({:inform, delta, trail}, _from, state) do + new_state = touch_timeout(state, trail) + |> update_in([trail, :buffer], &(&1 ++ delta)) + + {:reply, :ok, new_state} + end +end diff --git a/lib/delta/delta.ex b/lib/delta/delta.ex index ef37f83..f7c9893 100644 --- a/lib/delta/delta.ex +++ b/lib/delta/delta.ex @@ -46,7 +46,8 @@ defmodule Delta do delta |> Delta.Message.construct(authorization_groups, origin) |> Logging.EnvLog.inspect(:log_delta_messages, label: "Constructed body for clients") - |> Delta.Messenger.inform_clients(mu_call_id_trail: mu_call_id_trail) + |> Delta.Cache.inform(mu_call_id_trail) + # |> Delta.Messenger.inform_clients(mu_call_id_trail: mu_call_id_trail) delta end diff --git a/lib/delta/message.ex b/lib/delta/message.ex index 9819a32..3cf30c5 100644 --- a/lib/delta/message.ex +++ b/lib/delta/message.ex @@ -15,21 +15,18 @@ defmodule Delta.Message do @impl true def handle_call({:construct, delta, access_groups, origin}, _from, state ) do - index = state.index - new_state = %{state | index: index + 1} - - json_model = %{ - "changeSets" => - Enum.map(delta, fn delta_item -> - delta_item - |> convert_delta_item - |> add_allowed_groups(access_groups) - |> add_origin(origin) - end), - "index" => index - } - - {:reply, json_model, new_state} + + {model, new_index} = Enum.map_reduce(delta, state.index, fn delta_item, index -> + delta_item + |> convert_delta_item + |> add_allowed_groups(access_groups) + |> add_origin(origin) + |> add_index(index) + end) + + new_state = %{state | index: new_index} + + {:reply, model, new_state} end @moduledoc """ @@ -55,11 +52,12 @@ defmodule Delta.Message do # services ignore content which came from their end and would # allow services to perform updates in the name of a specific # user. - json_model = GenServer.call( __MODULE__, {:construct, delta, access_groups, origin}) + # json_model = - Poison.encode!(json_model) - end + # Poison.encode!(json_model) + GenServer.call( __MODULE__, {:construct, delta, access_groups, origin}) + end defp convert_delta_item({:insert, quads}) do %{"insert" => Enum.map(quads, &convert_quad/1)} @@ -84,6 +82,10 @@ defmodule Delta.Message do Map.put(map, "origin", origin) end + defp add_index(map, index) do + {Map.put(map, "index", index), index + 1} + end + defp convert_quad(%Quad{graph: graph, subject: subject, predicate: predicate, object: object}) do [g, s, p, o] = Enum.map( diff --git a/lib/sparql_server/sparql_server.ex b/lib/sparql_server/sparql_server.ex index 4be598c..45b1e68 100644 --- a/lib/sparql_server/sparql_server.ex +++ b/lib/sparql_server/sparql_server.ex @@ -65,6 +65,7 @@ defmodule SparqlServer do children = [ {Cache.Types, %{}}, + {Delta.Cache, nil}, {Delta.Message, nil}, {Support.Id, nil}, {SparqlClient.InfoEndpoint, nil}, From feb34cf21650090c531d855635c21b393bf62aed Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Tue, 10 Aug 2021 15:51:01 +0200 Subject: [PATCH 04/19] add configurable timeouts --- README.md | 1 + config/config.exs | 3 ++- lib/delta/cache.ex | 7 +++---- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 22fdad0..e255e13 100644 --- a/README.md +++ b/README.md @@ -374,6 +374,7 @@ Flags which can be either on or off translate the environment variable string to - `INSPECT_OUTGOING_SPARQL_QUERY_RESPONSES` : Inspects the responses coming back from the backing triplestore - `LOG_OUTGOING_SPARQL_QUERY_ROUNDTRIP` : Logs both the request and the response to/from the backing triplestore closely together in the logs - `LOG_WORKLOAD_INFO_REQUESTS` : Logs workload information to the console when it is requested through an http call +- `DELTA_CACHE_TIMEOUT` : Specifies how long the delta messages should be coalesced ### Query timeout configuration Complex SPARQL queries can take a long time to process and execute. The time mu-authorization is allowed to spend on this processing and execution before timing out can be configured through the following environment variables: diff --git a/config/config.exs b/config/config.exs index 5dbe510..5ccd279 100644 --- a/config/config.exs +++ b/config/config.exs @@ -53,6 +53,7 @@ end # config :sparqlex, key: :value config :"mu-authorization", author: :"mu-semtech", + delta_cache_timeout: CH.system_number("DELTA_CACHE_TIMEOUT", 500), log_server_configuration: CH.system_boolean("LOG_SERVER_CONFIGURATION"), log_outgoing_sparql_queries: CH.system_boolean("LOG_OUTGOING_SPARQL_QUERIES"), log_incoming_sparql_queries: CH.system_boolean("LOG_INCOMING_SPARQL_QUERIES"), @@ -88,7 +89,7 @@ config :"mu-authorization", # config :logger, - compile_time_purge_matching: :debug, + compile_time_purge_level: :debug, level: :warn # config :logger, diff --git a/lib/delta/cache.ex b/lib/delta/cache.ex index 2f3de69..3ae8b94 100644 --- a/lib/delta/cache.ex +++ b/lib/delta/cache.ex @@ -1,8 +1,6 @@ defmodule Delta.Cache do use GenServer - @coalesce_time 2 * 1000 - def inform(delta, mu_call_id_trail) do GenServer.call( __MODULE__, {:inform, delta, mu_call_id_trail}) end @@ -25,7 +23,9 @@ defmodule Delta.Cache do defp touch_timeout(state, mu_call_id_trail) do - ref = Process.send_after(self(), {:timeout, mu_call_id_trail}, @coalesce_time) + timeout_duration = Application.get_env(:"mu-authorization", :delta_cache_timeout) + + ref = Process.send_after(self(), {:timeout, mu_call_id_trail}, timeout_duration) new_state = Map.update(state, mu_call_id_trail, %{buffer: [], ref: ref}, fn x -> Process.cancel_timer(x.ref) # Keep buffer intact @@ -47,7 +47,6 @@ defmodule Delta.Cache do } json_model - |> IO.inspect(label: "hallooooo") |> Poison.encode!() |> Delta.Messenger.inform_clients(mu_call_id_trail: mu_call_id_trail) From 8a69f6210fd4887062a7db3790b1cd8a91522433 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Fri, 13 Aug 2021 13:41:12 +0200 Subject: [PATCH 05/19] ASK if updates are necessary before executing --- lib/sparql_server/router/handler_support.ex | 72 +++++++++++++++++++-- lib/updates/query_analyzer.ex | 8 ++- lib/updates/query_constructors.ex | 62 ++++++++++++++++++ 3 files changed, 136 insertions(+), 6 deletions(-) diff --git a/lib/sparql_server/router/handler_support.ex b/lib/sparql_server/router/handler_support.ex index fe9c9ab..dc4437e 100644 --- a/lib/sparql_server/router/handler_support.ex +++ b/lib/sparql_server/router/handler_support.ex @@ -227,6 +227,14 @@ defmodule SparqlServer.Router.HandlerSupport do } } + quad_in_store = fn thing -> + returned = + QueryAnalyzer.construct_ask_query(thing) + |> SparqlClient.execute_parsed(request: conn, query_type: :read) + + returned["boolean"] + end + analyzed_quads = query |> ALog.di("Parsed query") @@ -244,14 +252,18 @@ defmodule SparqlServer.Router.HandlerSupport do {:fail, reason} _ -> - processed_manipulations = + {true_inserts, true_deletions, false_inserts, false_deletions} = analyzed_quads |> Enum.map(fn {manipulation, _requested_quads, effective_quads} -> {manipulation, effective_quads} end) + |> reduce_actual_fake(quad_in_store, {MapSet.new(), MapSet.new(), MapSet.new(), MapSet.new()}) + + + actual_processed_manipulations = [{:delete, true_deletions}, {:insert, true_inserts}] executable_queries = - processed_manipulations + actual_processed_manipulations |> join_quad_updates |> Enum.map(fn {statement, processed_quads} -> case statement do @@ -264,7 +276,7 @@ defmodule SparqlServer.Router.HandlerSupport do end) delta_updater = fn -> - Delta.publish_updates(processed_manipulations, authorization_groups, conn) + Delta.publish_updates(actual_processed_manipulations, authorization_groups, conn) end # TODO: should we set the access groups on update queries too? @@ -273,6 +285,56 @@ defmodule SparqlServer.Router.HandlerSupport do end end + defp reduce_actual_fake([], _, x), do: x + + defp reduce_actual_fake([{:insert, quads} | xs], quad_in_store, state) do + new_state = + Enum.reduce( + quads, + state, + fn quad, {true_inserts, true_deletions, false_inserts, false_deletions} -> + if not quad_in_store.(quad) do + {MapSet.put(true_inserts, quad), true_deletions, false_inserts, false_deletions} + else + if MapSet.member?(true_deletions, quad) do + # Element not in store, but would be deleted + # So both false insert and false deletion + {true_inserts, MapSet.delete(true_deletions, quad), MapSet.put(false_inserts, quad), + MapSet.put(false_deletions, quad)} + else + {true_inserts, true_deletions, MapSet.put(false_inserts, quad), false_deletions} + end + end + end + ) + + reduce_actual_fake(xs, quad_in_store, new_state) + end + + defp reduce_actual_fake([{:delete, quads} | xs], quad_in_store, state) do + new_state = + Enum.reduce( + quads, + state, + fn quad, {true_inserts, true_deletions, false_inserts, false_deletions} -> + if quad_in_store.(quad) do + {true_inserts, MapSet.put(true_deletions, quad), false_inserts, false_deletions} + else + if MapSet.member?(true_inserts, quad) do + # Element not in store, but would be deleted + # So both false insert and false deletion + {MapSet.delete(true_inserts, quad), true_deletions, MapSet.put(false_inserts, quad), + MapSet.put(false_deletions, quad)} + else + {true_inserts, true_deletions, false_inserts, MapSet.put(false_deletions, quad)} + end + end + end + ) + + reduce_actual_fake(xs, quad_in_store, new_state) + end + defp enrich_manipulations_with_access_rights(manipulations, authorization_groups) do manipulations |> Enum.map(fn {kind, quads} -> @@ -298,11 +360,11 @@ defmodule SparqlServer.Router.HandlerSupport do |> Enum.map(&{&1.subject, &1.predicate, &1.object}) |> MapSet.new() - all_triples_written? = Set.equal?(requested_triples, effective_triples) + all_triples_written? = MapSet.equal?(requested_triples, effective_triples) unless all_triples_written? do Logging.EnvLog.inspect( - Set.difference(requested_triples, effective_triples), + MapSet.difference(requested_triples, effective_triples), :error, label: "These triples would not be written to the triplestore" ) diff --git a/lib/updates/query_analyzer.ex b/lib/updates/query_analyzer.ex index 1033d61..df70fda 100644 --- a/lib/updates/query_analyzer.ex +++ b/lib/updates/query_analyzer.ex @@ -846,6 +846,7 @@ defmodule Updates.QueryAnalyzer do |> Enum.uniq() end + @spec update_options_for_with(Sym.t(), options) :: options def update_options_for_with(%Sym{symbol: :iri} = sym, options) do # TODO double_check the use of :default_graph. The may be used @@ -856,6 +857,11 @@ defmodule Updates.QueryAnalyzer do |> Map.put(:default_graph, iri) end + + def construct_ask_query(quad) do + QueryConstructors.make_ask_query(quad) + end + @spec construct_select_query([Var.t()], Sym.t(), options) :: {Parser.query(), Acl.allowed_groups()} def construct_select_query(variables, group_graph_pattern_sym, options) do @@ -895,6 +901,7 @@ defmodule Updates.QueryAnalyzer do |> Enum.map(&QueryConstructors.make_quad_match_from_quad/1) |> QueryConstructors.make_insert_query() + # |> TODO add prefixes end @@ -902,7 +909,6 @@ defmodule Updates.QueryAnalyzer do def construct_delete_query_from_quads(quads, options) do # TODO: this should be clearing when the query is executed clear_cache_for_typed_quads(quads, options) - quads |> Enum.map(&QueryConstructors.make_quad_match_from_quad/1) |> QueryConstructors.make_delete_query() diff --git a/lib/updates/query_constructors.ex b/lib/updates/query_constructors.ex index 2dfe875..8ea9f14 100644 --- a/lib/updates/query_constructors.ex +++ b/lib/updates/query_constructors.ex @@ -4,6 +4,68 @@ defmodule Updates.QueryConstructors do alias InterpreterTerms.WordMatch, as: Word alias Updates.QueryAnalyzer.Types.Quad, as: Quad + @doc """ + Creates a valid ASK query, for a single quad + """ + def make_ask_query(quad) do + quad_pattern = %Sym { + symbol: :QuadData, + submatches: [ + %Word{external: %{}, whitespace: "", word: "{"}, + %Sym{ + symbol: :Quads, + submatches: [ + make_quad_match_from_quad(quad), + ] + }, + %Word{external: %{}, whitespace: "", word: "}"}, + ] + } + + group_graph_pattern = Manipulator.Transform.quad_data_to_group_graph_pattern(quad_pattern) + + %Sym{ + symbol: :Sparql, + submatches: [ + %Sym{ + symbol: :QueryUnit, + submatches: [ + %Sym{ + symbol: :Query, + submatches: [ + %Sym{ + symbol: :Prologue, + submatches: [] + }, + %Sym{ + symbol: :AskQuery, + submatches: [ + %Word{word: "ASK"}, + %Sym{ + symbol: :WhereClause, + submatches: [ + %Word{word: "WHERE"}, + group_graph_pattern, + ] + }, + %Sym{ + symbol: :SolutionModifier, + submatches: [] + } + ] + }, + %Sym{ + symbol: :ValuesClause, + submatches: [] + } + ] + } + ] + } + ] + } + end + def make_select_query(variable_syms, group_graph_pattern_sym) do %Sym{ symbol: :Sparql, From 593c48e014c192cf33b451ccadbff36cb5c0889d Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Tue, 17 Aug 2021 11:18:26 +0200 Subject: [PATCH 06/19] move multiple ASK queries to a single CONSTRUCT query (where possible) --- lib/sparql_server/router/handler_support.ex | 86 ++++++- lib/updates/query_analyzer.ex | 3 + lib/updates/query_constructors.ex | 236 +++++++++++++++++++- 3 files changed, 312 insertions(+), 13 deletions(-) diff --git a/lib/sparql_server/router/handler_support.ex b/lib/sparql_server/router/handler_support.ex index dc4437e..8a8793b 100644 --- a/lib/sparql_server/router/handler_support.ex +++ b/lib/sparql_server/router/handler_support.ex @@ -5,6 +5,7 @@ defmodule SparqlServer.Router.HandlerSupport do alias SparqlServer.Router.AccessGroupSupport, as: AccessGroupSupport alias QueryAnalyzer.Types.Quad, as: Quad alias Updates.QueryAnalyzer, as: QueryAnalyzer + alias Updates.QueryAnalyzer.P, as: QueryAnalyzerProtocol require Logger require ALog @@ -209,6 +210,12 @@ defmodule SparqlServer.Router.HandlerSupport do end end + ## Create tuple from literal {type, value} + defp get_result_tuple(x) do + out = QueryAnalyzerProtocol.to_sparql_result_value(x) + {out.type, out.value} + end + ### Manipulates the update query yielding back the valid set of ### queries which should be executed on the database. defp manipulate_update_query(query, conn) do @@ -227,13 +234,9 @@ defmodule SparqlServer.Router.HandlerSupport do } } - quad_in_store = fn thing -> - returned = - QueryAnalyzer.construct_ask_query(thing) - |> SparqlClient.execute_parsed(request: conn, query_type: :read) - - returned["boolean"] - end + quad_in_store_with_ask = + &(QueryAnalyzer.construct_ask_query(&1) + |> SparqlClient.execute_parsed(request: conn, query_type: :read))["boolean"] analyzed_quads = query @@ -252,13 +255,72 @@ defmodule SparqlServer.Router.HandlerSupport do {:fail, reason} _ -> + # From current quads, analyse what quads are already present + triple_store_content = + analyzed_quads + |> Enum.flat_map(&elem(&1, 2)) + |> QueryAnalyzer.construct_asks_query() + |> SparqlClient.execute_parsed(request: conn, query_type: :read) + |> Map.get("results") + |> Map.get("bindings") + |> Enum.map(fn %{"o" => object, "s" => subject, "p" => predicate} -> + { + {subject["type"], subject["value"]}, + {predicate["type"], predicate["value"]}, + {object["type"], object["value"]} + } + end) + + # From current quads, calculate frequency of _triple_ + # Equal quads have no influence, but same triples from different graphs + # cannot be queried with the same CONSTRUCT query + # (because CONSTRUCT only returns triples) + tested_content_frequencies = + analyzed_quads + |> Enum.flat_map(&elem(&1, 2)) + # Same things in same graph are ignored + |> Enum.uniq() + |> Enum.map(fn %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: _graph + } -> + {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} + end) + |> Enum.frequencies() + + + # Test if a quad is inn the store + # If the calculated frequency is one, the existence of the triple in the CONSTRUCT query + # uniquely represents the existence of the quad in the triplestore + # If the calculated frequency is more, the triple might exist in more graphs + # so the CONSTRUCT query does not uniquely represent the quad in the triplestore + # so an ASK query is executed (this shouldn't happen too often) + quad_in_store = fn %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: _graph + } = quad -> + value = {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} + + if Map.get(tested_content_frequencies, value, 0) > 1 do + quad_in_store_with_ask.(quad) + else + value in triple_store_content + end + end + {true_inserts, true_deletions, false_inserts, false_deletions} = analyzed_quads |> Enum.map(fn {manipulation, _requested_quads, effective_quads} -> {manipulation, effective_quads} end) - |> reduce_actual_fake(quad_in_store, {MapSet.new(), MapSet.new(), MapSet.new(), MapSet.new()}) - + |> reduce_actual_fake( + quad_in_store, + {MapSet.new(), MapSet.new(), MapSet.new(), MapSet.new()} + ) actual_processed_manipulations = [{:delete, true_deletions}, {:insert, true_inserts}] @@ -285,8 +347,11 @@ defmodule SparqlServer.Router.HandlerSupport do end end + # Reduce :insert and :delete delta's into true and false delta's defp reduce_actual_fake([], _, x), do: x + # An insert is a true delta if the quad is not yet present in the triplestore + # If a true deletion would delete this quad, the deletion is actually a false deletion defp reduce_actual_fake([{:insert, quads} | xs], quad_in_store, state) do new_state = Enum.reduce( @@ -311,6 +376,8 @@ defmodule SparqlServer.Router.HandlerSupport do reduce_actual_fake(xs, quad_in_store, new_state) end + # A deletion is a true deletion if the quad is present in the triplestore + # If a true insertion would insert this quad, the insert is actually a false insert defp reduce_actual_fake([{:delete, quads} | xs], quad_in_store, state) do new_state = Enum.reduce( @@ -335,6 +402,7 @@ defmodule SparqlServer.Router.HandlerSupport do reduce_actual_fake(xs, quad_in_store, new_state) end + defp enrich_manipulations_with_access_rights(manipulations, authorization_groups) do manipulations |> Enum.map(fn {kind, quads} -> diff --git a/lib/updates/query_analyzer.ex b/lib/updates/query_analyzer.ex index df70fda..29e623c 100644 --- a/lib/updates/query_analyzer.ex +++ b/lib/updates/query_analyzer.ex @@ -857,6 +857,9 @@ defmodule Updates.QueryAnalyzer do |> Map.put(:default_graph, iri) end + def construct_asks_query(quads) do + QueryConstructors.make_asks_query(quads) + end def construct_ask_query(quad) do QueryConstructors.make_ask_query(quad) diff --git a/lib/updates/query_constructors.ex b/lib/updates/query_constructors.ex index 8ea9f14..e6ea6e1 100644 --- a/lib/updates/query_constructors.ex +++ b/lib/updates/query_constructors.ex @@ -4,21 +4,201 @@ defmodule Updates.QueryConstructors do alias InterpreterTerms.WordMatch, as: Word alias Updates.QueryAnalyzer.Types.Quad, as: Quad + defp leaf_to_data_block_value(leaf) do + %Sym{ + symbol: :DataBlockValue, + submatches: [ + QueryAnalyzerProtocol.to_solution_sym(leaf) + ] + } + end + + defp map_quad_to_bracet_data_block_values(%Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: graph + }) do + [ + %Word{word: "("}, + leaf_to_data_block_value(graph), + leaf_to_data_block_value(subject), + leaf_to_data_block_value(predicate), + leaf_to_data_block_value(object), + %Word{word: ")"} + ] + end + + @doc """ + Creates a valid CONSTRUCT query, determining the existence of ~~life~~ quads + Can be used as multiple ASK queries with only a little bit of suffering + """ + def make_asks_query(quads) do + matches = quads |> Enum.flat_map(&map_quad_to_bracet_data_block_values/1) + + graph = make_var_symbol("?g") + subject = make_var_symbol("?s") + predicate = make_var_symbol("?p") + object = make_var_symbol("?o") + + %Sym{ + symbol: :Sparql, + submatches: [ + %Sym{ + symbol: :QueryUnit, + submatches: [ + %Sym{ + symbol: :Query, + submatches: [ + %Sym{ + symbol: :Prologue, + submatches: [] + }, + %Sym{ + symbol: :ConstructQuery, + submatches: [ + %Word{word: "CONSTRUCT"}, + %Sym{ + symbol: :ConstructTemplate, + submatches: [ + %Word{word: "{"}, + %Sym{ + symbol: :ConstructTriples, + submatches: [ + make_simple_triples_same_subject( + subject, + predicate, + object + ) + ] + }, + %Word{word: "}"} + ] + }, + %Sym{ + symbol: :WhereClause, + submatches: [ + %Word{word: "WHERE"}, + %Sym{ + symbol: :GroupGraphPattern, + submatches: [ + %Word{word: "{"}, + %Sym{ + symbol: :GroupGraphPatternSub, + submatches: [ + %Sym{ + symbol: :GraphPatternNotTriples, + submatches: [ + %Sym{ + symbol: :InlineData, + submatches: [ + %Word{word: "VALUES"}, + %Sym{ + symbol: :DataBlock, + submatches: [ + %Sym{ + symbol: :InlineDataFull, + submatches: + [ + %Word{word: "("}, + graph, + subject, + predicate, + object, + %Word{word: ")"}, + %Word{word: "{"} + ] ++ + matches ++ + [ + %Word{word: "}"} + ] + } + ] + } + ] + } + ] + }, + %Sym{ + symbol: :GraphPatternNotTriples, + submatches: [ + %Sym{ + symbol: :GraphGraphPattern, + submatches: [ + %Word{word: "GRAPH"}, + %Sym{ + symbol: :VarOrIri, + submatches: [ + graph + ] + }, + %Sym{ + symbol: :GroupGraphPattern, + submatches: [ + %Word{word: "{"}, + %Sym{ + symbol: :GroupGraphPatternSub, + submatches: [ + %Sym{ + symbol: :TriplesBlock, + submatches: [ + make_simple_triples_same_subject_path( + subject, + predicate, + object + ), + %Word{word: "."} + ] + } + ] + }, + %Word{word: "}"} + ] + } + ] + } + ] + } + ] + }, + %Word{word: "}"} + ] + } + ] + }, + %Sym{ + symbol: :SolutionModifier, + submatches: [] + } + ] + }, + %Sym{ + symbol: :ValuesClause, + submatches: [] + } + ] + } + ] + } + ] + } + end + @doc """ Creates a valid ASK query, for a single quad """ def make_ask_query(quad) do - quad_pattern = %Sym { + quad_pattern = %Sym{ symbol: :QuadData, submatches: [ %Word{external: %{}, whitespace: "", word: "{"}, %Sym{ symbol: :Quads, submatches: [ - make_quad_match_from_quad(quad), + make_quad_match_from_quad(quad) ] }, - %Word{external: %{}, whitespace: "", word: "}"}, + %Word{external: %{}, whitespace: "", word: "}"} ] } @@ -45,7 +225,7 @@ defmodule Updates.QueryConstructors do symbol: :WhereClause, submatches: [ %Word{word: "WHERE"}, - group_graph_pattern, + group_graph_pattern ] }, %Sym{ @@ -438,4 +618,52 @@ defmodule Updates.QueryConstructors do ] } end + + defp make_simple_triples_same_subject( + subject_var_or_term, + predicate_var_or_term, + object_var_or_term + ) do + %Sym{ + symbol: :TriplesSameSubject, + submatches: [ + %Sym{symbol: :VarOrTerm, submatches: [subject_var_or_term]}, + %Sym{ + symbol: :PropertyListNotEmpty, + submatches: [ + %Sym{ + symbol: :Verb, + submatches: [ + %Sym{ + symbol: :VarOrIri, + submatches: [ + predicate_var_or_term + ] + } + ] + }, + %Sym{ + symbol: :ObjectList, + submatches: [ + %Sym{ + symbol: :Object, + submatches: [ + %Sym{ + symbol: :GraphNode, + submatches: [ + %Sym{ + symbol: :VarOrTerm, + submatches: [object_var_or_term] + } + ] + } + ] + } + ] + } + ] + } + ] + } + end end From acfd1074cb53a06585dc67ac94b0f5717225c4d5 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Tue, 17 Aug 2021 17:52:58 +0200 Subject: [PATCH 07/19] create delta's cache with actual inserts and deletions (flush at read query + broken delta messages) --- lib/cache/deltas.ex | 265 ++++++++++++++++ lib/sparql_server/router/handler_support.ex | 335 +++++--------------- lib/sparql_server/sparql_server.ex | 10 +- 3 files changed, 346 insertions(+), 264 deletions(-) create mode 100644 lib/cache/deltas.ex diff --git a/lib/cache/deltas.ex b/lib/cache/deltas.ex new file mode 100644 index 0000000..4a734a9 --- /dev/null +++ b/lib/cache/deltas.ex @@ -0,0 +1,265 @@ +defmodule Cache.Deltas do + alias Updates.QueryAnalyzer + alias Updates.QueryAnalyzer.P, as: QueryAnalyzerProtocol + alias Updates.QueryAnalyzer.Types.Quad, as: Quad + + require Logger + require ALog + use GenServer + + @type cache_logic_key :: :precache | :construct | :ask + + defp new_cache, do: {%{}, %{}, %{}, %{}} + + ### GenServer API + @doc """ + GenServer.init/1 callback + """ + def init(state) do + state = state || %{metas: [], cache: new_cache(), index: :os.system_time(:millisecond)} + {:ok, state} + end + + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + @doc """ + Flush the current state, actually applying the delta's to the triplestore. + """ + def flush(options) do + GenServer.call(__MODULE__, {:flush, options}) + end + + @spec add_deltas(QueryAnalyzer.quad_changes(), cache_logic_key()) :: :ok + def add_deltas(quad_changes, logic, delta_meta \\ []) do + case logic do + :precache -> GenServer.cast(__MODULE__, {:cache_w_cache, quad_changes}) + :construct -> GenServer.cast(__MODULE__, {:cache_w_construct, quad_changes, delta_meta}) + :ask -> GenServer.cast(__MODULE__, {:cache_w_ask, quad_changes}) + end + end + + ## Create tuple from literal {type, value} + defp get_result_tuple(x) do + out = QueryAnalyzerProtocol.to_sparql_result_value(x) + {out.type, out.value} + end + + defp quad_in_store_with_ask?(quad) do + (QueryAnalyzer.construct_ask_query(quad) + |> SparqlClient.execute_parsed(query_type: :read))["boolean"] + end + + # From current quads, analyse what quads are already present + defp quads_in_store_with_construct(quads) do + quads + |> QueryAnalyzer.construct_asks_query() + |> SparqlClient.execute_parsed(query_type: :read) + |> Map.get("results") + |> Map.get("bindings") + |> Enum.map(fn %{"o" => object, "s" => subject, "p" => predicate} -> + { + {subject["type"], subject["value"]}, + {predicate["type"], predicate["value"]}, + {object["type"], object["value"]} + } + end) + |> MapSet.new() + end + + # From current quads, calculate frequency of _triple_ + # Equal quads have no influence, but same triples from different graphs + # cannot be queried with the same CONSTRUCT query + # (because CONSTRUCT only returns triples) + defp triple_counts_with_graph_differences(quads) do + quads + |> Enum.uniq() + |> Enum.map(fn %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: _graph + } -> + {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} + end) + |> Enum.frequencies() + end + + # Test if a quad is inn the store + # If the calculated frequency is one, the existence of the triple in the CONSTRUCT query + # uniquely represents the existence of the quad in the triplestore + # If the calculated frequency is more, the triple might exist in more graphs + # so the CONSTRUCT query does not uniquely represent the quad in the triplestore + # so an ASK query is executed (this shouldn't happen too often) + defp quad_in_store?( + %{triple_counts: triple_counts, triples_in_store: triples_in_store}, + %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: _graph + } = quad + ) do + value = {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} + + if Map.get(triple_counts, value, 0) > 1 do + quad_in_store_with_ask?(quad) + else + value in triples_in_store + end + end + + # Reduce :insert and :delete delta's into true and false delta's + # + # An insert is a true delta if the quad is not yet present in the triplestore + # If a true deletion would delete this quad, the deletion is actually a false deletion + defp add_delta_to_state({:insert, quad}, state) do + meta = List.first(state.metas) + {true_inserts, true_deletions, false_inserts, false_deletions} = state.cache + + new_cache = + if quad_in_store?(meta, quad) do + if Map.has_key?(true_deletions, quad) do + # Element not in store, but would be deleted + # So both false insert and false deletion + {original_index, true_deletions} = Map.pop!(true_deletions, quad) + + {true_inserts, Map.delete(true_deletions, quad), + Map.put(false_inserts, quad, state.index), + Map.put(false_deletions, quad, original_index)} + else + {true_inserts, true_deletions, Map.put(false_inserts, quad, state.index), + false_deletions} + end + else + {Map.put(true_inserts, quad, state.index), true_deletions, false_inserts, false_deletions} + end + + %{state | cache: new_cache} + end + + # A deletion is a true deletion if the quad is present in the triplestore + # If a true insertion would insert this quad, the insert is actually a false insert + defp add_delta_to_state({:delete, quad}, state) do + meta = List.first(state.metas) + {true_inserts, true_deletions, false_inserts, false_deletions} = state.cache + + new_cache = + if quad_in_store?(meta, quad) do + {true_inserts, Map.put(true_deletions, quad, state.index), false_inserts, false_deletions} + else + if Map.has_key?(true_inserts, quad) do + # Element not in store, but would be deleted + # So both false insert and false deletion + {original_index, true_inserts} = Map.pop!(true_inserts, quad) + + {true_inserts, true_deletions, Map.put(false_inserts, quad, original_index), + Map.put(false_deletions, quad, state.index)} + else + {true_inserts, true_deletions, false_inserts, + Map.put(false_deletions, quad, state.index)} + end + end + + %{state | cache: new_cache} + end + + defp convert_quad(%Quad{graph: graph, subject: subject, predicate: predicate, object: object}) do + [g, s, p, o] = + Enum.map( + [graph, subject, predicate, object], + &QueryAnalyzerProtocol.to_sparql_result_value/1 + ) + + %{"graph" => g, "subject" => s, "predicate" => p, "object" => o} + end + + defp delta_update(state) do + {true_inserts, true_deletions, _false_inserts, _false_deletions} = state.cache + + # content = Enum.map(true_inserts, &({:insert, &1})) ++ Enum.map(true_deletions, &({:delete, &1})) + + inserts = Enum.group_by(true_inserts, &elem(&1, 1), &{:insert, convert_quad(elem(&1, 0))}) + deletions = Enum.group_by(true_deletions, &elem(&1, 1), &{:delete, convert_quad(elem(&1, 0))}) + total = Map.merge(inserts, deletions, fn _, one, two -> one ++ two end) + + messages = + Enum.map(state.metas, fn meta -> + index = meta.index + + other_meta = + meta.delta_meta + |> Map.new() + |> Map.put(:index, index) + + Map.get(total, index, []) + |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) + |> Map.merge(other_meta) + end) + + %{ + "changeSets" => messages + } + |> Poison.encode!() + |> Delta.Messenger.inform_clients() + end + + @doc """ + GenServer.handle_call/3 callback + """ + def handle_call({:flush, options}, _from, state) do + {true_inserts, true_deletions, _false_inserts, _false_deletions} = state.cache + + inserts = Map.keys(true_inserts) + + unless Enum.empty?(inserts) do + QueryAnalyzer.construct_insert_query_from_quads(inserts, options) + |> SparqlClient.execute_parsed(query_type: :write) + |> ALog.di("Results from SparqlClient after write") + end + + deletions = Map.keys(true_deletions) + + unless Enum.empty?(deletions) do + QueryAnalyzer.construct_delete_query_from_quads(deletions, options) + |> SparqlClient.execute_parsed(query_type: :write) + |> ALog.di("Results from SparqlClient after write") + end + + delta_update(state) + + new_state = %{state | cache: new_cache(), metas: []} + + {:reply, :ok, new_state} + end + + # delta_meta: mu_call_id_trail, authorization_groups, origin + def handle_cast({:cache_w_construct, quads, delta_meta}, state) do + deltas = Enum.flat_map(quads, fn {type, qs} -> Enum.map(qs, &{type, &1}) end) + quads = Enum.map(deltas, &elem(&1, 1)) + + # Calculate meta data + triple_counts = triple_counts_with_graph_differences(quads) + triples_in_store = quads_in_store_with_construct(quads) + + # Add metadata to state + meta = %{ + triple_counts: triple_counts, + triples_in_store: triples_in_store, + delta_meta: delta_meta, + index: state.index + 1 + } + + state_with_meta = %{state | metas: [meta | state.metas], index: state.index + 1} + + # Reduce with add_delta_to_state + new_state = Enum.reduce(deltas, state_with_meta, &add_delta_to_state/2) + + {:noreply, new_state} + end + + def handle_cast({:cache_w_ask, _quads}, state) do + {:noreply, state} + end +end diff --git a/lib/sparql_server/router/handler_support.ex b/lib/sparql_server/router/handler_support.ex index 8a8793b..41fc478 100644 --- a/lib/sparql_server/router/handler_support.ex +++ b/lib/sparql_server/router/handler_support.ex @@ -5,7 +5,6 @@ defmodule SparqlServer.Router.HandlerSupport do alias SparqlServer.Router.AccessGroupSupport, as: AccessGroupSupport alias QueryAnalyzer.Types.Quad, as: Quad alias Updates.QueryAnalyzer, as: QueryAnalyzer - alias Updates.QueryAnalyzer.P, as: QueryAnalyzerProtocol require Logger require ALog @@ -111,36 +110,87 @@ defmodule SparqlServer.Router.HandlerSupport do |> wrap_query_in_toplevel |> ALog.di("Wrapped parsed query") - query_manipulator = - if is_select_query(parsed_form) do - &manipulate_select_query/2 - else - &manipulate_update_query/2 - end + if is_select_query(parsed_form) do + # TODO: Check where the default_graph is used where these options are passed and verify whether this is a sensible name. + options = %{ + default_graph: Iri.from_iri_string("", %{}), + prefixes: %{ + "xsd" => Iri.from_iri_string(""), + "foaf" => Iri.from_iri_string("") + } + } - case query_manipulator.(parsed_form, conn) do - {conn, new_parsed_forms, post_processing} -> - query_type = - if Enum.any?(new_parsed_forms, fn q -> !is_select_query(q) end) do - :read - else - :write - end + Cache.Deltas.flush(options) + + {conn, new_parsed_forms} = manipulate_select_query(parsed_form, conn) + + query_type = + if Enum.any?(new_parsed_forms, fn q -> !is_select_query(q) end) do + :read + else + :write + end + + encoded_response = + new_parsed_forms + |> ALog.di("New parsed forms") + |> Enum.map(&SparqlClient.execute_parsed(&1, request: conn, query_type: query_type)) + |> List.first() + |> Poison.encode!() + + {conn, {200, encoded_response}, new_template_local_store} + else + {conn, authorization_groups} = AccessGroupSupport.calculate_access_groups(conn) - encoded_response = - new_parsed_forms - |> ALog.di("New parsed forms") - |> Enum.map(&SparqlClient.execute_parsed(&1, request: conn, query_type: query_type)) - |> List.first() - |> Poison.encode!() + origin = + conn + |> Map.get(:remote_ip) + |> Tuple.to_list() + |> Enum.join(".") - post_processing.() - {conn, {200, encoded_response}, new_template_local_store} + mu_call_id_trail = + (case Plug.Conn.get_req_header(conn, "mu-call-id-trail") do + # ignore extra values for now, they should not happen, but + # if they do we don't want to crash either + [value | _] -> Poison.decode!(value) + _ -> [] + end) + |> Kernel.++(Plug.Conn.get_req_header(conn, "mu-call-id")) + |> Poison.encode!() + + analyzed_quads = + parsed_form + |> ALog.di("Parsed query") + |> QueryAnalyzer.quad_changes(%{ + default_graph: Iri.from_iri_string("", %{}), + authorization_groups: authorization_groups + }) + |> Enum.reject(&match?({_, []}, &1)) + |> ALog.di("Non-empty operations") + |> enrich_manipulations_with_access_rights(authorization_groups) + |> maybe_verify_all_triples_written() + + case analyzed_quads do + {:fail, reason} -> + encoded_response_string = Poison.encode!(%{errors: [%{status: "403", title: reason}]}) + {conn, {403, encoded_response_string}, new_template_local_store} + + _ -> + analyzed_quads + |> Enum.map(fn {manipulation, _requested_quads, effective_quads} -> + {manipulation, effective_quads} + end) + |> Cache.Deltas.add_deltas( + :construct, + origin: origin, + mu_call_id_trail: mu_call_id_trail, + authorization_groups: AccessGroupSupport.encode_json_access_groups(authorization_groups) + ) - {:fail, reason} -> - encoded_response_string = Poison.encode!(%{errors: [%{status: "403", title: reason}]}) - {conn, {403, encoded_response_string}, new_template_local_store} + succesful = Poison.encode!(%{sucessful: true}) + {conn, {200, succesful}, new_template_local_store} + end end end @@ -188,7 +238,7 @@ defmodule SparqlServer.Router.HandlerSupport do {conn, query} end - {conn, [query], fn -> :ok end} + {conn, [query]} end @doc """ @@ -210,198 +260,6 @@ defmodule SparqlServer.Router.HandlerSupport do end end - ## Create tuple from literal {type, value} - defp get_result_tuple(x) do - out = QueryAnalyzerProtocol.to_sparql_result_value(x) - {out.type, out.value} - end - - ### Manipulates the update query yielding back the valid set of - ### queries which should be executed on the database. - defp manipulate_update_query(query, conn) do - Logger.debug("This is an update query") - - {conn, authorization_groups} = AccessGroupSupport.calculate_access_groups(conn) - - # TODO: DRY into/from QueryAnalyzer.insert_quads - - # TODO: Check where the default_graph is used where these options are passed and verify whether this is a sensible name. - options = %{ - default_graph: Iri.from_iri_string("", %{}), - prefixes: %{ - "xsd" => Iri.from_iri_string(""), - "foaf" => Iri.from_iri_string("") - } - } - - quad_in_store_with_ask = - &(QueryAnalyzer.construct_ask_query(&1) - |> SparqlClient.execute_parsed(request: conn, query_type: :read))["boolean"] - - analyzed_quads = - query - |> ALog.di("Parsed query") - |> QueryAnalyzer.quad_changes(%{ - default_graph: Iri.from_iri_string("", %{}), - authorization_groups: authorization_groups - }) - |> Enum.reject(&match?({_, []}, &1)) - |> ALog.di("Non-empty operations") - |> enrich_manipulations_with_access_rights(authorization_groups) - |> maybe_verify_all_triples_written() - - case analyzed_quads do - {:fail, reason} -> - {:fail, reason} - - _ -> - # From current quads, analyse what quads are already present - triple_store_content = - analyzed_quads - |> Enum.flat_map(&elem(&1, 2)) - |> QueryAnalyzer.construct_asks_query() - |> SparqlClient.execute_parsed(request: conn, query_type: :read) - |> Map.get("results") - |> Map.get("bindings") - |> Enum.map(fn %{"o" => object, "s" => subject, "p" => predicate} -> - { - {subject["type"], subject["value"]}, - {predicate["type"], predicate["value"]}, - {object["type"], object["value"]} - } - end) - - # From current quads, calculate frequency of _triple_ - # Equal quads have no influence, but same triples from different graphs - # cannot be queried with the same CONSTRUCT query - # (because CONSTRUCT only returns triples) - tested_content_frequencies = - analyzed_quads - |> Enum.flat_map(&elem(&1, 2)) - # Same things in same graph are ignored - |> Enum.uniq() - |> Enum.map(fn %Quad{ - subject: subject, - predicate: predicate, - object: object, - graph: _graph - } -> - {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} - end) - |> Enum.frequencies() - - - # Test if a quad is inn the store - # If the calculated frequency is one, the existence of the triple in the CONSTRUCT query - # uniquely represents the existence of the quad in the triplestore - # If the calculated frequency is more, the triple might exist in more graphs - # so the CONSTRUCT query does not uniquely represent the quad in the triplestore - # so an ASK query is executed (this shouldn't happen too often) - quad_in_store = fn %Quad{ - subject: subject, - predicate: predicate, - object: object, - graph: _graph - } = quad -> - value = {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} - - if Map.get(tested_content_frequencies, value, 0) > 1 do - quad_in_store_with_ask.(quad) - else - value in triple_store_content - end - end - - {true_inserts, true_deletions, false_inserts, false_deletions} = - analyzed_quads - |> Enum.map(fn {manipulation, _requested_quads, effective_quads} -> - {manipulation, effective_quads} - end) - |> reduce_actual_fake( - quad_in_store, - {MapSet.new(), MapSet.new(), MapSet.new(), MapSet.new()} - ) - - actual_processed_manipulations = [{:delete, true_deletions}, {:insert, true_inserts}] - - executable_queries = - actual_processed_manipulations - |> join_quad_updates - |> Enum.map(fn {statement, processed_quads} -> - case statement do - :insert -> - QueryAnalyzer.construct_insert_query_from_quads(processed_quads, options) - - :delete -> - QueryAnalyzer.construct_delete_query_from_quads(processed_quads, options) - end - end) - - delta_updater = fn -> - Delta.publish_updates(actual_processed_manipulations, authorization_groups, conn) - end - - # TODO: should we set the access groups on update queries too? - # see AccessGroupSupport.put_access_groups/2 ( conn, authorization_groups ) - {conn, executable_queries, delta_updater} - end - end - - # Reduce :insert and :delete delta's into true and false delta's - defp reduce_actual_fake([], _, x), do: x - - # An insert is a true delta if the quad is not yet present in the triplestore - # If a true deletion would delete this quad, the deletion is actually a false deletion - defp reduce_actual_fake([{:insert, quads} | xs], quad_in_store, state) do - new_state = - Enum.reduce( - quads, - state, - fn quad, {true_inserts, true_deletions, false_inserts, false_deletions} -> - if not quad_in_store.(quad) do - {MapSet.put(true_inserts, quad), true_deletions, false_inserts, false_deletions} - else - if MapSet.member?(true_deletions, quad) do - # Element not in store, but would be deleted - # So both false insert and false deletion - {true_inserts, MapSet.delete(true_deletions, quad), MapSet.put(false_inserts, quad), - MapSet.put(false_deletions, quad)} - else - {true_inserts, true_deletions, MapSet.put(false_inserts, quad), false_deletions} - end - end - end - ) - - reduce_actual_fake(xs, quad_in_store, new_state) - end - - # A deletion is a true deletion if the quad is present in the triplestore - # If a true insertion would insert this quad, the insert is actually a false insert - defp reduce_actual_fake([{:delete, quads} | xs], quad_in_store, state) do - new_state = - Enum.reduce( - quads, - state, - fn quad, {true_inserts, true_deletions, false_inserts, false_deletions} -> - if quad_in_store.(quad) do - {true_inserts, MapSet.put(true_deletions, quad), false_inserts, false_deletions} - else - if MapSet.member?(true_inserts, quad) do - # Element not in store, but would be deleted - # So both false insert and false deletion - {MapSet.delete(true_inserts, quad), true_deletions, MapSet.put(false_inserts, quad), - MapSet.put(false_deletions, quad)} - else - {true_inserts, true_deletions, false_inserts, MapSet.put(false_deletions, quad)} - end - end - end - ) - - reduce_actual_fake(xs, quad_in_store, new_state) - end - defp enrich_manipulations_with_access_rights(manipulations, authorization_groups) do manipulations @@ -451,45 +309,6 @@ defmodule SparqlServer.Router.HandlerSupport do end end - @spec join_quad_updates(QueryAnalyzer.quad_changes()) :: - QueryAnalyzer.quad_changes() - defp join_quad_updates(elts) do - elts - |> Enum.map(fn {op, quads} -> {op, MapSet.new(quads)} end) - |> join_quad_map_updates([]) - |> Enum.map(fn {op, quads} -> {op, MapSet.to_list(quads)} end) - |> Enum.reject(&match?({_, []}, &1)) - end - - @type map_quad :: {QueryAnalyzer.quad_change_key(), MapSet.t(Quad.t())} - - @spec join_quad_map_updates([map_quad], [map_quad]) :: [map_quad] - defp join_quad_map_updates([], res), do: res - defp join_quad_map_updates([elt | rest], []), do: join_quad_map_updates(rest, [elt]) - - defp join_quad_map_updates([{type, quads} | rest], [{type, other_quads}]), - do: join_quad_map_updates(rest, [{type, MapSet.union(quads, other_quads)}]) - - defp join_quad_map_updates([quads | rest], [other_quads]) do - new_other_quads = fold_quad_map_updates(other_quads, quads) - join_quad_map_updates(rest, [new_other_quads, quads]) - end - - defp join_quad_map_updates([quad_updates | rest], [left, right]) do - new_left = fold_quad_map_updates(left, quad_updates) - new_right = fold_quad_map_updates(right, quad_updates) - - join_quad_map_updates(rest, [new_left, new_right]) - end - - @spec fold_quad_map_updates(map_quad, map_quad) :: map_quad - defp fold_quad_map_updates({key, left_quads}, {key, right_quads}), - # :inserts, :inserts or :deletes, :deletes - do: {key, MapSet.union(left_quads, right_quads)} - - defp fold_quad_map_updates({left_type, left_quads}, {_right_type, right_quads}), - # :inserts, :deletes or :deletes, :inserts - do: {left_type, MapSet.difference(left_quads, right_quads)} @spec enforce_write_rights([Quad.t()], Acl.UserGroups.Config.t()) :: [Quad.t()] defp enforce_write_rights(quads, authorization_groups) do diff --git a/lib/sparql_server/sparql_server.ex b/lib/sparql_server/sparql_server.ex index 45b1e68..ba4cd11 100644 --- a/lib/sparql_server/sparql_server.ex +++ b/lib/sparql_server/sparql_server.ex @@ -56,17 +56,16 @@ defmodule SparqlServer do label: "server setup, log database recovery mode tick" ) - Logging.EnvLog.inspect( Application.get_env(:"mu-authorization", :testing_auth_query_error_rate), :testing_auth_query_error_rate, - label: "server setup, testing auth query error rate amount of queries authorization makes fail" + label: + "server setup, testing auth query error rate amount of queries authorization makes fail" ) children = [ {Cache.Types, %{}}, - {Delta.Cache, nil}, - {Delta.Message, nil}, + {Cache.Deltas, nil}, {Support.Id, nil}, {SparqlClient.InfoEndpoint, nil}, {SparqlClient.WorkloadInfo, nil}, @@ -74,8 +73,7 @@ defmodule SparqlServer do {Interpreter.CachedInterpreter, nil}, {Interpreter.Diff.Store.Storage, nil}, {Interpreter.Diff.Store.Manipulator, nil}, - {Plug.Cowboy, - scheme: :http, plug: SparqlServer.Router, options: [port: port]}, + {Plug.Cowboy, scheme: :http, plug: SparqlServer.Router, options: [port: port]}, :poolboy.child_spec(:worker, [ {:name, {:local, :query_worker}}, {:worker_module, SparqlServer.Router.Handler.Worker}, From 0e51a56fb1ac7053beb5d3281549f2b78a292a96 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Wed, 18 Aug 2021 15:31:35 +0200 Subject: [PATCH 08/19] cleanup + fix some issues --- Dockerfile | 2 +- config/config.exs | 2 + config/dev.exs | 7 +- lib/alog.ex | 54 +++++--- lib/cache/deltas.ex | 134 +++++++++++++++----- lib/delta/cache.ex | 79 ------------ lib/delta/delta.ex | 54 -------- lib/delta/message.ex | 98 -------------- lib/delta/messenger.ex | 5 +- lib/sparql_client/sparql_client.ex | 2 - lib/sparql_server/router.ex | 2 - lib/sparql_server/router/handler_support.ex | 14 +- 12 files changed, 159 insertions(+), 294 deletions(-) delete mode 100644 lib/delta/cache.ex delete mode 100644 lib/delta/delta.ex delete mode 100644 lib/delta/message.ex diff --git a/Dockerfile b/Dockerfile index a968702..c51916a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM madnificent/elixir-server:1.10.0 +FROM mine/elixir-server:latest ENV MU_SPARQL_ENDPOINT 'http://database:8890/sparql' ENV LOG_ELIXIR_STARTUP_COMMAND 'true' diff --git a/config/config.exs b/config/config.exs index 5ccd279..38a3835 100644 --- a/config/config.exs +++ b/config/config.exs @@ -92,6 +92,8 @@ config :logger, compile_time_purge_level: :debug, level: :warn + # level: :debug + # config :logger, # compile_time_purge_level: :debug, # level: :warn diff --git a/config/dev.exs b/config/dev.exs index ccbeaf9..eb76290 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -3,6 +3,7 @@ use Mix.Config config :"mu-authorization", sparql_port: 9980 -# config :logger, -# compile_time_purge_level: :debug, -# level: :info +config :logger, + compile_time_purge_level: :debug, + level: :info + # level: :debug diff --git a/lib/alog.ex b/lib/alog.ex index 60fd747..2fdf546 100644 --- a/lib/alog.ex +++ b/lib/alog.ex @@ -18,22 +18,38 @@ defmodule ALog do supplied item when debugging. It just passes the item through. Note that the form may be evaluated twice. """ - defmacro di(item, name) do - quote do - result = unquote(item) - Logger.debug(fn -> unquote(name) <> ": " <> inspect(result) end) - result + defmacro di(item, name, inspect_result \\ true) do + if inspect_result do + quote do + result = unquote(item) + Logger.debug(fn -> unquote(name) <> ": " <> inspect(result) end) + result + end + else + quote do + result = unquote(item) + Logger.debug(fn -> unquote(name) <> ": " <> result end) + result + end end end @doc """ Equivalent to ALog.di, but for warnings. """ - defmacro wi(item, name) do - quote do - result = unquote(item) - Logger.warn(fn -> unquote(name) <> ": " <> inspect(result) end) - result + defmacro wi(item, name, inspect_result \\ true) do + if inspect_result do + quote do + result = unquote(item) + Logger.warn(fn -> unquote(name) <> ": " <> inspect(result) end) + result + end + else + quote do + result = unquote(item) + Logger.warn(fn -> unquote(name) <> ": " <> result end) + result + end end end @@ -42,11 +58,19 @@ defmodule ALog do supplied item when going through info. It just passes the item through. Note that the form may be evaluated twice. """ - defmacro ii(item, name) do - quote do - result = unquote(item) - Logger.info(fn -> unquote(name) <> ": " <> inspect(result) end) - result + defmacro ii(item, name, inspect_result \\ true) do + if inspect_result do + quote do + result = unquote(item) + Logger.info(fn -> unquote(name) <> ": " <> inspect(result) end) + result + end + else + quote do + result = unquote(item) + Logger.info(fn -> unquote(name) <> ": " <> result end) + result + end end end end diff --git a/lib/cache/deltas.ex b/lib/cache/deltas.ex index 4a734a9..08a73d8 100644 --- a/lib/cache/deltas.ex +++ b/lib/cache/deltas.ex @@ -2,6 +2,7 @@ defmodule Cache.Deltas do alias Updates.QueryAnalyzer alias Updates.QueryAnalyzer.P, as: QueryAnalyzerProtocol alias Updates.QueryAnalyzer.Types.Quad, as: Quad + alias SparqlServer.Router.AccessGroupSupport, as: AccessGroupSupport require Logger require ALog @@ -34,9 +35,14 @@ defmodule Cache.Deltas do @spec add_deltas(QueryAnalyzer.quad_changes(), cache_logic_key()) :: :ok def add_deltas(quad_changes, logic, delta_meta \\ []) do case logic do - :precache -> GenServer.cast(__MODULE__, {:cache_w_cache, quad_changes}) - :construct -> GenServer.cast(__MODULE__, {:cache_w_construct, quad_changes, delta_meta}) - :ask -> GenServer.cast(__MODULE__, {:cache_w_ask, quad_changes}) + :precache -> + GenServer.cast(__MODULE__, {:cache_w_cache, quad_changes}) + + :construct -> + GenServer.cast(__MODULE__, {:cache_w_construct, quad_changes, Map.new(delta_meta)}) + + :ask -> + GenServer.cast(__MODULE__, {:cache_w_ask, quad_changes}) end end @@ -116,24 +122,26 @@ defmodule Cache.Deltas do # If a true deletion would delete this quad, the deletion is actually a false deletion defp add_delta_to_state({:insert, quad}, state) do meta = List.first(state.metas) - {true_inserts, true_deletions, false_inserts, false_deletions} = state.cache + {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache new_cache = if quad_in_store?(meta, quad) do if Map.has_key?(true_deletions, quad) do # Element not in store, but would be deleted - # So both false insert and false deletion - {original_index, true_deletions} = Map.pop!(true_deletions, quad) + # Remove true_deletion + all_inserts = Map.put_new(all_inserts, quad, state.index) + true_deletions = Map.delete(true_deletions, quad) - {true_inserts, Map.delete(true_deletions, quad), - Map.put(false_inserts, quad, state.index), - Map.put(false_deletions, quad, original_index)} + {true_inserts, true_deletions, all_inserts, all_deletions} else - {true_inserts, true_deletions, Map.put(false_inserts, quad, state.index), - false_deletions} + all_inserts = Map.put_new(all_inserts, quad, state.index) + {true_inserts, true_deletions, all_inserts, all_deletions} end else - {Map.put(true_inserts, quad, state.index), true_deletions, false_inserts, false_deletions} + true_inserts = Map.put_new(true_inserts, quad, state.index) + all_inserts = Map.put_new(all_inserts, quad, state.index) + + {true_inserts, true_deletions, all_inserts, all_deletions} end %{state | cache: new_cache} @@ -143,22 +151,26 @@ defmodule Cache.Deltas do # If a true insertion would insert this quad, the insert is actually a false insert defp add_delta_to_state({:delete, quad}, state) do meta = List.first(state.metas) - {true_inserts, true_deletions, false_inserts, false_deletions} = state.cache + {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache new_cache = if quad_in_store?(meta, quad) do - {true_inserts, Map.put(true_deletions, quad, state.index), false_inserts, false_deletions} + true_deletions = Map.put(true_deletions, quad, state.index) + all_deletions = Map.put(all_deletions, quad, state.index) + + {true_inserts, true_deletions, all_inserts, all_deletions} else if Map.has_key?(true_inserts, quad) do # Element not in store, but would be deleted # So both false insert and false deletion - {original_index, true_inserts} = Map.pop!(true_inserts, quad) + true_inserts = Map.delete(true_inserts, quad) + all_deletions = Map.put_new(all_deletions, quad, state.index) - {true_inserts, true_deletions, Map.put(false_inserts, quad, original_index), - Map.put(false_deletions, quad, state.index)} + {true_inserts, true_deletions, all_inserts, all_deletions} else - {true_inserts, true_deletions, false_inserts, - Map.put(false_deletions, quad, state.index)} + all_deletions = Map.put(all_deletions, quad, state.index) + + {true_inserts, true_deletions, all_inserts, all_deletions} end end @@ -176,26 +188,42 @@ defmodule Cache.Deltas do end defp delta_update(state) do - {true_inserts, true_deletions, _false_inserts, _false_deletions} = state.cache + {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache + + merge_f = fn _, one, two -> one ++ two end + + inserts = + Enum.group_by(true_inserts, &elem(&1, 1), &{:effective_insert, convert_quad(elem(&1, 0))}) - # content = Enum.map(true_inserts, &({:insert, &1})) ++ Enum.map(true_deletions, &({:delete, &1})) + deletions = + Enum.group_by(true_deletions, &elem(&1, 1), &{:effective_delete, convert_quad(elem(&1, 0))}) - inserts = Enum.group_by(true_inserts, &elem(&1, 1), &{:insert, convert_quad(elem(&1, 0))}) - deletions = Enum.group_by(true_deletions, &elem(&1, 1), &{:delete, convert_quad(elem(&1, 0))}) - total = Map.merge(inserts, deletions, fn _, one, two -> one ++ two end) + all_inserts = Enum.group_by(all_inserts, &elem(&1, 1), &{:insert, convert_quad(elem(&1, 0))}) + + all_deletions = + Enum.group_by(all_deletions, &elem(&1, 1), &{:delete, convert_quad(elem(&1, 0))}) + + total = + Map.merge(inserts, deletions, merge_f) + |> Map.merge(all_inserts, merge_f) + |> Map.merge(all_deletions, merge_f) messages = Enum.map(state.metas, fn meta -> index = meta.index other_meta = - meta.delta_meta - |> Map.new() - |> Map.put(:index, index) + Map.new() + |> add_index(index) + |> add_allowed_groups(meta.delta_meta) + |> add_origin(meta.delta_meta) + |> add_trail(meta.delta_meta) Map.get(total, index, []) |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) - |> Map.merge(other_meta) + |> Enum.reduce(other_meta, &add_delta/2) + + # |> Map.merge(other_meta) end) %{ @@ -205,29 +233,71 @@ defmodule Cache.Deltas do |> Delta.Messenger.inform_clients() end + # These might be better suited in seperate file + defp add_delta({:effective_insert, items}, map) do + Map.put(map, "effectiveInserts", items) + end + + defp add_delta({:effective_delete, items}, map) do + Map.put(map, "effectiveDeletes", items) + end + + defp add_delta({:insert, items}, map) do + Map.put(map, "inserts", items) + end + + defp add_delta({:delete, items}, map) do + Map.put(map, "deletes", items) + end + + defp add_allowed_groups(map, %{authorization_groups: :sudo}) do + Map.put(map, "allowedGroups", "sudo") + end + + defp add_allowed_groups(map, %{authorization_groups: groups}) do + json_access_groups = AccessGroupSupport.encode_json_access_groups(groups) + Map.put(map, "allowedGroups", json_access_groups) + end + + defp add_allowed_groups(map, _), do: map + + defp add_trail(map, %{mu_call_id_trail: trail}), do: Map.put(map, "muCallIdTrail", trail) + defp add_trail(map, _), do: map + + defp add_origin(map, %{origin: origin}), do: Map.put(map, "origin", origin) + defp add_origin(map, _), do: map + + defp add_index(map, index), do: Map.put(map, "index", index) + @doc """ GenServer.handle_call/3 callback """ def handle_call({:flush, options}, _from, state) do - {true_inserts, true_deletions, _false_inserts, _false_deletions} = state.cache + {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache inserts = Map.keys(true_inserts) unless Enum.empty?(inserts) do + QueryAnalyzer.construct_insert_query_from_quads(inserts, options) + |> Regen.result() + QueryAnalyzer.construct_insert_query_from_quads(inserts, options) |> SparqlClient.execute_parsed(query_type: :write) - |> ALog.di("Results from SparqlClient after write") end deletions = Map.keys(true_deletions) unless Enum.empty?(deletions) do + QueryAnalyzer.construct_delete_query_from_quads(deletions, options) + |> Regen.result() + QueryAnalyzer.construct_delete_query_from_quads(deletions, options) |> SparqlClient.execute_parsed(query_type: :write) - |> ALog.di("Results from SparqlClient after write") end - delta_update(state) + if not (Enum.empty?(all_inserts) and Enum.empty?(all_deletions)) do + delta_update(state) + end new_state = %{state | cache: new_cache(), metas: []} diff --git a/lib/delta/cache.ex b/lib/delta/cache.ex deleted file mode 100644 index 3ae8b94..0000000 --- a/lib/delta/cache.ex +++ /dev/null @@ -1,79 +0,0 @@ -defmodule Delta.Cache do - use GenServer - - def inform(delta, mu_call_id_trail) do - GenServer.call( __MODULE__, {:inform, delta, mu_call_id_trail}) - end - - - def flush(mu_call_id_trail) do - GenServer.cast( __MODULE__, {:flush, mu_call_id_trail}) - end - - - def start_link(_) do - GenServer.start_link(__MODULE__, nil, name: __MODULE__) - end - - - @impl true - def init(_) do - {:ok, %{}} - end - - - defp touch_timeout(state, mu_call_id_trail) do - timeout_duration = Application.get_env(:"mu-authorization", :delta_cache_timeout) - - ref = Process.send_after(self(), {:timeout, mu_call_id_trail}, timeout_duration) - new_state = Map.update(state, mu_call_id_trail, %{buffer: [], ref: ref}, fn x -> - Process.cancel_timer(x.ref) - # Keep buffer intact - %{x | ref: ref} - end) - - new_state - end - - - defp do_flush(state, mu_call_id_trail) do - # Remove possible timeout things - {cache, new_state} = Map.pop(state, mu_call_id_trail) - - Process.cancel_timer(cache.ref) - - json_model = %{ - "changeSets" => cache.buffer - } - - json_model - |> Poison.encode!() - |> Delta.Messenger.inform_clients(mu_call_id_trail: mu_call_id_trail) - - new_state - end - - - # TODO: this message might be incorrect, cause it was already in queue, after a touch message - @impl true - def handle_info({:timeout, trail}, state) do - new_state = do_flush(state, trail) - - {:noreply, new_state} - end - - - @impl true - def handle_cast({:flush, trail}, state) do - {:noreply, do_flush(state, trail)} - end - - - @impl true - def handle_call({:inform, delta, trail}, _from, state) do - new_state = touch_timeout(state, trail) - |> update_in([trail, :buffer], &(&1 ++ delta)) - - {:reply, :ok, new_state} - end -end diff --git a/lib/delta/delta.ex b/lib/delta/delta.ex deleted file mode 100644 index f7c9893..0000000 --- a/lib/delta/delta.ex +++ /dev/null @@ -1,54 +0,0 @@ -defmodule Delta do - alias Updates.QueryAnalyzer, as: QueryAnalyzer - - require Logger - require ALog - - @type delta :: QueryAnalyzer.quad_changes() - - @moduledoc """ - This service consumes altered triples and sends them to interested - clients. It runs in a separate thread and will always run *after* - the response has been supplied to the client. - """ - - @doc """ - Publishes the updated quads. The array is expected to contain - tuples of form {insert_type, quads} in which insert_type is one of - :insert or :delete. - """ - @spec publish_updates(QueryAnalyzer.quad_changes(), [any], Plug.Conn.t()) :: - QueryAnalyzer.quad_changes() - def publish_updates(delta, authorization_groups, conn) do - origin = - conn - |> Map.get(:remote_ip) - |> Tuple.to_list() - |> Enum.join(".") - - get_json_array_header = fn conn, header -> - case Plug.Conn.get_req_header(conn, header) do - [] -> - [] - - [value | _] -> - # ignore extra values for now, they should not happen, but - # if they do we don't want to crash either - Poison.decode!(value) - end - end - - mu_call_id_trail = - get_json_array_header.(conn, "mu-call-id-trail") - |> Kernel.++(Plug.Conn.get_req_header(conn, "mu-call-id")) - |> Poison.encode!() - - delta - |> Delta.Message.construct(authorization_groups, origin) - |> Logging.EnvLog.inspect(:log_delta_messages, label: "Constructed body for clients") - |> Delta.Cache.inform(mu_call_id_trail) - # |> Delta.Messenger.inform_clients(mu_call_id_trail: mu_call_id_trail) - - delta - end -end diff --git a/lib/delta/message.ex b/lib/delta/message.ex deleted file mode 100644 index 3cf30c5..0000000 --- a/lib/delta/message.ex +++ /dev/null @@ -1,98 +0,0 @@ -defmodule Delta.Message do - alias Updates.QueryAnalyzer.Types.Quad, as: Quad - alias SparqlServer.Router.AccessGroupSupport, as: AccessGroupSupport - - use GenServer - - def start_link(_) do - GenServer.start_link(__MODULE__, nil, name: __MODULE__) - end - - @impl true - def init(_) do - {:ok, %{index: :os.system_time(:millisecond)}} - end - - @impl true - def handle_call({:construct, delta, access_groups, origin}, _from, state ) do - - {model, new_index} = Enum.map_reduce(delta, state.index, fn delta_item, index -> - delta_item - |> convert_delta_item - |> add_allowed_groups(access_groups) - |> add_origin(origin) - |> add_index(index) - end) - - new_state = %{state | index: new_index} - - {:reply, model, new_state} - end - - @moduledoc """ - Contains code to construct the correct messenges for informing - clients. - """ - - @typedoc """ - Type of the messages which can be sent to a client. Currently, this - is a binary string. - """ - @type t :: String.t() - - @doc """ - Constructs a new message which can be sent to the clients based on a - quad delta. - """ - @spec construct(Delta.delta(), AccessGroupSupport.decoded_json_access_groups(), String.t()) :: - Delta.Message.t() - def construct(delta, access_groups, origin) do - # TODO we should include the current access rigths and an - # identifier for the originating service. This would help - # services ignore content which came from their end and would - # allow services to perform updates in the name of a specific - # user. - # json_model = - - # Poison.encode!(json_model) - - GenServer.call( __MODULE__, {:construct, delta, access_groups, origin}) - end - - defp convert_delta_item({:insert, quads}) do - %{"insert" => Enum.map(quads, &convert_quad/1)} - end - - defp convert_delta_item({:delete, quads}) do - %{"delete" => Enum.map(quads, &convert_quad/1)} - end - - @spec add_allowed_groups(Poison.Decoder.t(), AccessGroupSupport.decoded_json_access_groups()) :: - Poison.Decoder.t() - defp add_allowed_groups(map, :sudo) do - Map.put(map, "allowedGroups", "sudo") - end - - defp add_allowed_groups(map, access_groups) do - json_access_groups = AccessGroupSupport.encode_json_access_groups(access_groups) - Map.put(map, "allowedGroups", json_access_groups) - end - - defp add_origin(map, origin) do - Map.put(map, "origin", origin) - end - - defp add_index(map, index) do - {Map.put(map, "index", index), index + 1} - end - - defp convert_quad(%Quad{graph: graph, subject: subject, predicate: predicate, object: object}) do - [g, s, p, o] = - Enum.map( - [graph, subject, predicate, object], - &Updates.QueryAnalyzer.P.to_sparql_result_value/1 - ) - - %{"graph" => g, "subject" => s, "predicate" => p, "object" => o} - end -end diff --git a/lib/delta/messenger.ex b/lib/delta/messenger.ex index 1a36efc..de7b9f9 100644 --- a/lib/delta/messenger.ex +++ b/lib/delta/messenger.ex @@ -6,8 +6,11 @@ defmodule Delta.Messenger do @moduledoc """ Sends constructed messages to all interested clients. + + Type of the messages which can be sent to a client. Currently, this + is a binary string. """ - @spec inform_clients(Delta.Message.t(), call_options) :: :ok + @spec inform_clients(String.t(), call_options) :: :ok def inform_clients(message, options \\ []) do # TODO we should create one thread per callee and push messages on # there. As long as the client hasn't shown a sign of life, we diff --git a/lib/sparql_client/sparql_client.ex b/lib/sparql_client/sparql_client.ex index 5996018..f781e9d 100644 --- a/lib/sparql_client/sparql_client.ex +++ b/lib/sparql_client/sparql_client.ex @@ -34,8 +34,6 @@ defmodule SparqlClient do # poison_options = [recv_timeout: options[:timeout]] - ALog.ii(query, "Sending sparql query to backend") - Logging.EnvLog.log(:log_outgoing_sparql_queries, "Outgoing SPARQL query: #{query}") Logging.EnvLog.inspect(query, :inspect_outgoing_sparql_queries, label: "Outgoing SPARQL query") diff --git a/lib/sparql_server/router.ex b/lib/sparql_server/router.ex index f13d956..6e7970e 100644 --- a/lib/sparql_server/router.ex +++ b/lib/sparql_server/router.ex @@ -24,7 +24,6 @@ defmodule SparqlServer.Router do post "/sparql" do {:ok, body, _} = read_body(conn) - ALog.di(conn, "Received POST connection") conn = downcase_request_headers(conn) debug_log_request_id(conn) @@ -40,7 +39,6 @@ defmodule SparqlServer.Router do query_string -> params = URI.decode_query(query_string) - ALog.di(conn, "Received GET connection") conn = downcase_request_headers(conn) debug_log_request_id(conn) diff --git a/lib/sparql_server/router/handler_support.ex b/lib/sparql_server/router/handler_support.ex index 41fc478..a1f810e 100644 --- a/lib/sparql_server/router/handler_support.ex +++ b/lib/sparql_server/router/handler_support.ex @@ -99,7 +99,7 @@ defmodule SparqlServer.Router.HandlerSupport do {parsed_form, new_template_local_store} = query - |> ALog.di("Raw received query") + # |> ALog.di("Raw received query") |> String.trim() # TODO: check if this is valid and/or ensure parser skips \r between words. |> String.replace("\r", "") @@ -108,7 +108,7 @@ defmodule SparqlServer.Router.HandlerSupport do parsed_form = parsed_form |> wrap_query_in_toplevel - |> ALog.di("Wrapped parsed query") + # |> ALog.di("Wrapped parsed query") if is_select_query(parsed_form) do # TODO: Check where the default_graph is used where these options are passed and verify whether this is a sensible name. @@ -133,7 +133,7 @@ defmodule SparqlServer.Router.HandlerSupport do encoded_response = new_parsed_forms - |> ALog.di("New parsed forms") + # |> ALog.di("New parsed forms") |> Enum.map(&SparqlClient.execute_parsed(&1, request: conn, query_type: query_type)) |> List.first() |> Poison.encode!() @@ -161,13 +161,13 @@ defmodule SparqlServer.Router.HandlerSupport do analyzed_quads = parsed_form - |> ALog.di("Parsed query") + # |> ALog.di("Parsed query") |> QueryAnalyzer.quad_changes(%{ default_graph: Iri.from_iri_string("", %{}), authorization_groups: authorization_groups }) |> Enum.reject(&match?({_, []}, &1)) - |> ALog.di("Non-empty operations") + # |> ALog.di("Non-empty operations") |> enrich_manipulations_with_access_rights(authorization_groups) |> maybe_verify_all_triples_written() @@ -185,7 +185,7 @@ defmodule SparqlServer.Router.HandlerSupport do :construct, origin: origin, mu_call_id_trail: mu_call_id_trail, - authorization_groups: AccessGroupSupport.encode_json_access_groups(authorization_groups) + authorization_groups: authorization_groups ) succesful = Poison.encode!(%{sucessful: true}) @@ -319,7 +319,7 @@ defmodule SparqlServer.Router.HandlerSupport do quads |> Acl.process_quads_for_update(user_groups_for_update, authorization_groups) |> elem(1) - |> ALog.di("processed quads") + # |> ALog.di("processed quads") processed_quads end From de466da56b427351b7210998f8ff4d111b176771 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Thu, 19 Aug 2021 13:03:42 +0200 Subject: [PATCH 09/19] make backwards compatible with deltanotifier --- lib/cache/deltas.ex | 9 ++++----- lib/delta/messenger.ex | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/lib/cache/deltas.ex b/lib/cache/deltas.ex index 08a73d8..51efd8b 100644 --- a/lib/cache/deltas.ex +++ b/lib/cache/deltas.ex @@ -192,17 +192,16 @@ defmodule Cache.Deltas do merge_f = fn _, one, two -> one ++ two end + # Merge on index inserts = Enum.group_by(true_inserts, &elem(&1, 1), &{:effective_insert, convert_quad(elem(&1, 0))}) - deletions = Enum.group_by(true_deletions, &elem(&1, 1), &{:effective_delete, convert_quad(elem(&1, 0))}) - all_inserts = Enum.group_by(all_inserts, &elem(&1, 1), &{:insert, convert_quad(elem(&1, 0))}) - all_deletions = Enum.group_by(all_deletions, &elem(&1, 1), &{:delete, convert_quad(elem(&1, 0))}) + # Combine all things total = Map.merge(inserts, deletions, merge_f) |> Map.merge(all_inserts, merge_f) @@ -243,11 +242,11 @@ defmodule Cache.Deltas do end defp add_delta({:insert, items}, map) do - Map.put(map, "inserts", items) + Map.put(map, "insert", items) end defp add_delta({:delete, items}, map) do - Map.put(map, "deletes", items) + Map.put(map, "delete", items) end defp add_allowed_groups(map, %{authorization_groups: :sudo}) do diff --git a/lib/delta/messenger.ex b/lib/delta/messenger.ex index de7b9f9..9f17362 100644 --- a/lib/delta/messenger.ex +++ b/lib/delta/messenger.ex @@ -21,7 +21,7 @@ defmodule Delta.Messenger do # connections. Delta.Config.targets() |> ALog.di("Targets to inform") - |> Enum.map(&spawn(Delta.Messenger, :send_message_to_client, [message, &1, options])) + |> Enum.each(&spawn(Delta.Messenger, :send_message_to_client, [message, &1, options])) :ok end From bc27fa64a2ae2a1c607991f32e0a255dbb7cd01d Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Thu, 19 Aug 2021 14:16:19 +0200 Subject: [PATCH 10/19] revert unwanted changes --- Dockerfile | 2 +- config/config.exs | 11 ++--- config/dev.exs | 7 ++- lib/alog.ex | 54 ++++++--------------- lib/sparql_client/sparql_client.ex | 2 + lib/sparql_server/router.ex | 2 + lib/sparql_server/router/handler_support.ex | 11 ++--- 7 files changed, 32 insertions(+), 57 deletions(-) diff --git a/Dockerfile b/Dockerfile index c51916a..a968702 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM mine/elixir-server:latest +FROM madnificent/elixir-server:1.10.0 ENV MU_SPARQL_ENDPOINT 'http://database:8890/sparql' ENV LOG_ELIXIR_STARTUP_COMMAND 'true' diff --git a/config/config.exs b/config/config.exs index 38a3835..77dd232 100644 --- a/config/config.exs +++ b/config/config.exs @@ -53,7 +53,6 @@ end # config :sparqlex, key: :value config :"mu-authorization", author: :"mu-semtech", - delta_cache_timeout: CH.system_number("DELTA_CACHE_TIMEOUT", 500), log_server_configuration: CH.system_boolean("LOG_SERVER_CONFIGURATION"), log_outgoing_sparql_queries: CH.system_boolean("LOG_OUTGOING_SPARQL_QUERIES"), log_incoming_sparql_queries: CH.system_boolean("LOG_INCOMING_SPARQL_QUERIES"), @@ -88,16 +87,14 @@ config :"mu-authorization", # config :logger, level: :info # +# config :logger, +# compile_time_purge_level: :debug, +# level: :info + config :logger, compile_time_purge_level: :debug, level: :warn - # level: :debug - -# config :logger, -# compile_time_purge_level: :debug, -# level: :warn - if Mix.env() == :test do config :junit_formatter, report_dir: "/tmp/repo-example-test-results/exunit" diff --git a/config/dev.exs b/config/dev.exs index eb76290..ccbeaf9 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -3,7 +3,6 @@ use Mix.Config config :"mu-authorization", sparql_port: 9980 -config :logger, - compile_time_purge_level: :debug, - level: :info - # level: :debug +# config :logger, +# compile_time_purge_level: :debug, +# level: :info diff --git a/lib/alog.ex b/lib/alog.ex index 2fdf546..60fd747 100644 --- a/lib/alog.ex +++ b/lib/alog.ex @@ -18,38 +18,22 @@ defmodule ALog do supplied item when debugging. It just passes the item through. Note that the form may be evaluated twice. """ - defmacro di(item, name, inspect_result \\ true) do - if inspect_result do - quote do - result = unquote(item) - Logger.debug(fn -> unquote(name) <> ": " <> inspect(result) end) - result - end - else - quote do - result = unquote(item) - Logger.debug(fn -> unquote(name) <> ": " <> result end) - result - end + defmacro di(item, name) do + quote do + result = unquote(item) + Logger.debug(fn -> unquote(name) <> ": " <> inspect(result) end) + result end end @doc """ Equivalent to ALog.di, but for warnings. """ - defmacro wi(item, name, inspect_result \\ true) do - if inspect_result do - quote do - result = unquote(item) - Logger.warn(fn -> unquote(name) <> ": " <> inspect(result) end) - result - end - else - quote do - result = unquote(item) - Logger.warn(fn -> unquote(name) <> ": " <> result end) - result - end + defmacro wi(item, name) do + quote do + result = unquote(item) + Logger.warn(fn -> unquote(name) <> ": " <> inspect(result) end) + result end end @@ -58,19 +42,11 @@ defmodule ALog do supplied item when going through info. It just passes the item through. Note that the form may be evaluated twice. """ - defmacro ii(item, name, inspect_result \\ true) do - if inspect_result do - quote do - result = unquote(item) - Logger.info(fn -> unquote(name) <> ": " <> inspect(result) end) - result - end - else - quote do - result = unquote(item) - Logger.info(fn -> unquote(name) <> ": " <> result end) - result - end + defmacro ii(item, name) do + quote do + result = unquote(item) + Logger.info(fn -> unquote(name) <> ": " <> inspect(result) end) + result end end end diff --git a/lib/sparql_client/sparql_client.ex b/lib/sparql_client/sparql_client.ex index f781e9d..5996018 100644 --- a/lib/sparql_client/sparql_client.ex +++ b/lib/sparql_client/sparql_client.ex @@ -34,6 +34,8 @@ defmodule SparqlClient do # poison_options = [recv_timeout: options[:timeout]] + ALog.ii(query, "Sending sparql query to backend") + Logging.EnvLog.log(:log_outgoing_sparql_queries, "Outgoing SPARQL query: #{query}") Logging.EnvLog.inspect(query, :inspect_outgoing_sparql_queries, label: "Outgoing SPARQL query") diff --git a/lib/sparql_server/router.ex b/lib/sparql_server/router.ex index 6e7970e..f13d956 100644 --- a/lib/sparql_server/router.ex +++ b/lib/sparql_server/router.ex @@ -24,6 +24,7 @@ defmodule SparqlServer.Router do post "/sparql" do {:ok, body, _} = read_body(conn) + ALog.di(conn, "Received POST connection") conn = downcase_request_headers(conn) debug_log_request_id(conn) @@ -39,6 +40,7 @@ defmodule SparqlServer.Router do query_string -> params = URI.decode_query(query_string) + ALog.di(conn, "Received GET connection") conn = downcase_request_headers(conn) debug_log_request_id(conn) diff --git a/lib/sparql_server/router/handler_support.ex b/lib/sparql_server/router/handler_support.ex index a1f810e..9a0aef1 100644 --- a/lib/sparql_server/router/handler_support.ex +++ b/lib/sparql_server/router/handler_support.ex @@ -99,7 +99,7 @@ defmodule SparqlServer.Router.HandlerSupport do {parsed_form, new_template_local_store} = query - # |> ALog.di("Raw received query") + |> ALog.di("Raw received query") |> String.trim() # TODO: check if this is valid and/or ensure parser skips \r between words. |> String.replace("\r", "") @@ -108,7 +108,7 @@ defmodule SparqlServer.Router.HandlerSupport do parsed_form = parsed_form |> wrap_query_in_toplevel - # |> ALog.di("Wrapped parsed query") + |> ALog.di("Wrapped parsed query") if is_select_query(parsed_form) do # TODO: Check where the default_graph is used where these options are passed and verify whether this is a sensible name. @@ -133,7 +133,6 @@ defmodule SparqlServer.Router.HandlerSupport do encoded_response = new_parsed_forms - # |> ALog.di("New parsed forms") |> Enum.map(&SparqlClient.execute_parsed(&1, request: conn, query_type: query_type)) |> List.first() |> Poison.encode!() @@ -161,13 +160,13 @@ defmodule SparqlServer.Router.HandlerSupport do analyzed_quads = parsed_form - # |> ALog.di("Parsed query") + |> ALog.di("Parsed query") |> QueryAnalyzer.quad_changes(%{ default_graph: Iri.from_iri_string("", %{}), authorization_groups: authorization_groups }) |> Enum.reject(&match?({_, []}, &1)) - # |> ALog.di("Non-empty operations") + |> ALog.di("Non-empty operations") |> enrich_manipulations_with_access_rights(authorization_groups) |> maybe_verify_all_triples_written() @@ -319,7 +318,7 @@ defmodule SparqlServer.Router.HandlerSupport do quads |> Acl.process_quads_for_update(user_groups_for_update, authorization_groups) |> elem(1) - # |> ALog.di("processed quads") + |> ALog.di("processed quads") processed_quads end From 7a605cc67c66de6c139c1c14d1a076eb452ff090 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Thu, 19 Aug 2021 17:01:37 +0200 Subject: [PATCH 11/19] add timeout support to deltas --- Dockerfile | 2 +- config/config.exs | 4 +- lib/cache/deltas.ex | 63 +++++++++++++++------ lib/sparql_server/router/handler_support.ex | 26 ++++----- 4 files changed, 63 insertions(+), 32 deletions(-) diff --git a/Dockerfile b/Dockerfile index a968702..c51916a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM madnificent/elixir-server:1.10.0 +FROM mine/elixir-server:latest ENV MU_SPARQL_ENDPOINT 'http://database:8890/sparql' ENV LOG_ELIXIR_STARTUP_COMMAND 'true' diff --git a/config/config.exs b/config/config.exs index 77dd232..49b7f2e 100644 --- a/config/config.exs +++ b/config/config.exs @@ -76,7 +76,9 @@ config :"mu-authorization", log_workload_info_requests: CH.system_boolean("LOG_WORKLOAD_INFO_REQUESTS"), testing_auth_query_error_rate: CH.system_float("TESTING_AUTH_QUERY_ERROR_RATE"), error_on_unwritten_data: CH.system_boolean("ERROR_ON_UNWRITTEN_DATA"), - errors: CH.system_boolean("LOG_ERRORS", true) + errors: CH.system_boolean("LOG_ERRORS", true), + quad_change_cache_timeout: CH.system_number("QUAD_CHANGE_CACHE_TIMEOUT", 500), + quad_change_cache_session: CH.system_boolean("QUAD_CHANGE_CACHE_SESSION", true) # and access this configuration in your application as: # diff --git a/lib/cache/deltas.ex b/lib/cache/deltas.ex index 51efd8b..180e976 100644 --- a/lib/cache/deltas.ex +++ b/lib/cache/deltas.ex @@ -10,6 +10,7 @@ defmodule Cache.Deltas do @type cache_logic_key :: :precache | :construct | :ask + # {effective inserts, effective deletions, all inserts, all deletions} defp new_cache, do: {%{}, %{}, %{}, %{}} ### GenServer API @@ -32,17 +33,22 @@ defmodule Cache.Deltas do GenServer.call(__MODULE__, {:flush, options}) end - @spec add_deltas(QueryAnalyzer.quad_changes(), cache_logic_key()) :: :ok - def add_deltas(quad_changes, logic, delta_meta \\ []) do + # @spec add_deltas(QueryAnalyzer.quad_changes(), cache_logic_key()) :: :ok + def add_deltas(quad_changes, options, logic, delta_meta \\ []) do case logic do - :precache -> - GenServer.cast(__MODULE__, {:cache_w_cache, quad_changes}) + # TODO + # :precache -> + # GenServer.cast(__MODULE__, {:cache_w_cache, quad_changes}) :construct -> - GenServer.cast(__MODULE__, {:cache_w_construct, quad_changes, Map.new(delta_meta)}) - - :ask -> - GenServer.cast(__MODULE__, {:cache_w_ask, quad_changes}) + GenServer.cast( + __MODULE__, + {:cache_w_construct, quad_changes, Map.new(delta_meta), options} + ) + + # TODO + # :ask -> + # GenServer.cast(__MODULE__, {:cache_w_ask, quad_changes}) end end @@ -195,9 +201,12 @@ defmodule Cache.Deltas do # Merge on index inserts = Enum.group_by(true_inserts, &elem(&1, 1), &{:effective_insert, convert_quad(elem(&1, 0))}) + deletions = Enum.group_by(true_deletions, &elem(&1, 1), &{:effective_delete, convert_quad(elem(&1, 0))}) + all_inserts = Enum.group_by(all_inserts, &elem(&1, 1), &{:insert, convert_quad(elem(&1, 0))}) + all_deletions = Enum.group_by(all_deletions, &elem(&1, 1), &{:delete, convert_quad(elem(&1, 0))}) @@ -268,10 +277,7 @@ defmodule Cache.Deltas do defp add_index(map, index), do: Map.put(map, "index", index) - @doc """ - GenServer.handle_call/3 callback - """ - def handle_call({:flush, options}, _from, state) do + defp do_flush(state, options) do {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache inserts = Map.keys(true_inserts) @@ -298,13 +304,36 @@ defmodule Cache.Deltas do delta_update(state) end - new_state = %{state | cache: new_cache(), metas: []} + %{state | cache: new_cache(), metas: []} + end + + @doc """ + GenServer.handle_call/3 callback + """ + def handle_call({:flush, options}, _from, state) do + new_state = do_flush(state, options) {:reply, :ok, new_state} end # delta_meta: mu_call_id_trail, authorization_groups, origin - def handle_cast({:cache_w_construct, quads, delta_meta}, state) do + def handle_cast({:cache_w_construct, quads, delta_meta, options}, state) do + timeout_sessions = Application.get_env(:"mu-authorization", :quad_change_cache_session) + current_timeout = Map.get(state, :ref, nil) + + state = if is_nil(current_timeout) or timeout_sessions do + if not is_nil(current_timeout) do + Process.cancel_timer(current_timeout) + end + + timeout_duration = Application.get_env(:"mu-authorization", :quad_change_cache_timeout) + ref = Process.send_after(self(), {:timeout, options}, timeout_duration) + + Map.put(state, :ref, ref) + else + state + end + deltas = Enum.flat_map(quads, fn {type, qs} -> Enum.map(qs, &{type, &1}) end) quads = Enum.map(deltas, &elem(&1, 1)) @@ -328,7 +357,9 @@ defmodule Cache.Deltas do {:noreply, new_state} end - def handle_cast({:cache_w_ask, _quads}, state) do - {:noreply, state} + def handle_info({:timeout, options}, state) do + new_state = do_flush(state, options) + + {:noreply, new_state} end end diff --git a/lib/sparql_server/router/handler_support.ex b/lib/sparql_server/router/handler_support.ex index 9a0aef1..c7f7263 100644 --- a/lib/sparql_server/router/handler_support.ex +++ b/lib/sparql_server/router/handler_support.ex @@ -110,16 +110,16 @@ defmodule SparqlServer.Router.HandlerSupport do |> wrap_query_in_toplevel |> ALog.di("Wrapped parsed query") - if is_select_query(parsed_form) do - # TODO: Check where the default_graph is used where these options are passed and verify whether this is a sensible name. - options = %{ - default_graph: Iri.from_iri_string("", %{}), - prefixes: %{ - "xsd" => Iri.from_iri_string(""), - "foaf" => Iri.from_iri_string("") - } + # TODO: Check where the default_graph is used where these options are passed and verify whether this is a sensible name. + options = %{ + default_graph: Iri.from_iri_string("", %{}), + prefixes: %{ + "xsd" => Iri.from_iri_string(""), + "foaf" => Iri.from_iri_string("") } + } + if is_select_query(parsed_form) do Cache.Deltas.flush(options) {conn, new_parsed_forms} = manipulate_select_query(parsed_form, conn) @@ -147,14 +147,13 @@ defmodule SparqlServer.Router.HandlerSupport do |> Tuple.to_list() |> Enum.join(".") - mu_call_id_trail = - (case Plug.Conn.get_req_header(conn, "mu-call-id-trail") do + case Plug.Conn.get_req_header(conn, "mu-call-id-trail") do # ignore extra values for now, they should not happen, but # if they do we don't want to crash either [value | _] -> Poison.decode!(value) - _ -> [] - end) + _ -> [] + end |> Kernel.++(Plug.Conn.get_req_header(conn, "mu-call-id")) |> Poison.encode!() @@ -181,6 +180,7 @@ defmodule SparqlServer.Router.HandlerSupport do {manipulation, effective_quads} end) |> Cache.Deltas.add_deltas( + options, :construct, origin: origin, mu_call_id_trail: mu_call_id_trail, @@ -259,7 +259,6 @@ defmodule SparqlServer.Router.HandlerSupport do end end - defp enrich_manipulations_with_access_rights(manipulations, authorization_groups) do manipulations |> Enum.map(fn {kind, quads} -> @@ -308,7 +307,6 @@ defmodule SparqlServer.Router.HandlerSupport do end end - @spec enforce_write_rights([Quad.t()], Acl.UserGroups.Config.t()) :: [Quad.t()] defp enforce_write_rights(quads, authorization_groups) do Logger.info("Enforcing write rights") From 8c2427dff2412c48d6a410ab29c0e8f427577cbb Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Thu, 19 Aug 2021 17:05:04 +0200 Subject: [PATCH 12/19] update README --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e255e13..8016273 100644 --- a/README.md +++ b/README.md @@ -374,7 +374,8 @@ Flags which can be either on or off translate the environment variable string to - `INSPECT_OUTGOING_SPARQL_QUERY_RESPONSES` : Inspects the responses coming back from the backing triplestore - `LOG_OUTGOING_SPARQL_QUERY_ROUNDTRIP` : Logs both the request and the response to/from the backing triplestore closely together in the logs - `LOG_WORKLOAD_INFO_REQUESTS` : Logs workload information to the console when it is requested through an http call -- `DELTA_CACHE_TIMEOUT` : Specifies how long the delta messages should be coalesced +- `QUAD_CHANGE_CACHE_TIMEOUT` : Specifies how long (in milliseconds) quad changes should be cached before getting applied to the triplestore. +- `QUAD_CHANGE_CACHE_SESSION` : Specifies if the timeout should be restarted with a quad change. ### Query timeout configuration Complex SPARQL queries can take a long time to process and execute. The time mu-authorization is allowed to spend on this processing and execution before timing out can be configured through the following environment variables: From 56911ab12d7911fd415373f44969737c1ac5a867 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Mon, 23 Aug 2021 14:09:24 +0200 Subject: [PATCH 13/19] remember all inserts and deletion indices --- lib/cache/deltas.ex | 70 ++++++++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 27 deletions(-) diff --git a/lib/cache/deltas.ex b/lib/cache/deltas.ex index 180e976..bcdb208 100644 --- a/lib/cache/deltas.ex +++ b/lib/cache/deltas.ex @@ -122,10 +122,13 @@ defmodule Cache.Deltas do end end - # Reduce :insert and :delete delta's into true and false delta's + # Reduce :insert and :delete delta's into true and all delta's + # All delta's have a list of indices. Only one insert can be an actual insert, + # but multiple delta's can insert the same quad # # An insert is a true delta if the quad is not yet present in the triplestore - # If a true deletion would delete this quad, the deletion is actually a false deletion + # If an insert would insert a triple that is marked as true deletion, + # this deletion and insertion are false. defp add_delta_to_state({:insert, quad}, state) do meta = List.first(state.metas) {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache @@ -133,19 +136,22 @@ defmodule Cache.Deltas do new_cache = if quad_in_store?(meta, quad) do if Map.has_key?(true_deletions, quad) do - # Element not in store, but would be deleted + # Element in store, but would be deleted # Remove true_deletion - all_inserts = Map.put_new(all_inserts, quad, state.index) + all_inserts = Map.update(all_inserts, quad, [state.index], &[state.index | &1]) true_deletions = Map.delete(true_deletions, quad) {true_inserts, true_deletions, all_inserts, all_deletions} else - all_inserts = Map.put_new(all_inserts, quad, state.index) + all_inserts = Map.update(all_inserts, quad, [state.index], &[state.index | &1]) {true_inserts, true_deletions, all_inserts, all_deletions} end else - true_inserts = Map.put_new(true_inserts, quad, state.index) - all_inserts = Map.put_new(all_inserts, quad, state.index) + # This is important, the quad might be inserted and deleted with a previous delta + # But that index is more correct than the new index + index = Enum.min(Map.get(all_inserts, quad, [state.index])) + true_inserts = Map.put_new(true_inserts, quad, index) + all_inserts = Map.update(all_inserts, quad, [state.index], &[state.index | &1]) {true_inserts, true_deletions, all_inserts, all_deletions} end @@ -161,20 +167,21 @@ defmodule Cache.Deltas do new_cache = if quad_in_store?(meta, quad) do - true_deletions = Map.put(true_deletions, quad, state.index) - all_deletions = Map.put(all_deletions, quad, state.index) + index = Enum.min(Map.get(all_deletions, quad, [state.index])) + true_deletions = Map.put_new(true_deletions, quad, index) + all_deletions = Map.update(all_deletions, quad, [state.index], &[state.index | &1]) {true_inserts, true_deletions, all_inserts, all_deletions} else if Map.has_key?(true_inserts, quad) do - # Element not in store, but would be deleted + # Element not in store, but would be inserted and deleted # So both false insert and false deletion true_inserts = Map.delete(true_inserts, quad) - all_deletions = Map.put_new(all_deletions, quad, state.index) + all_deletions = Map.update(all_deletions, quad, [state.index], &[state.index | &1]) {true_inserts, true_deletions, all_inserts, all_deletions} else - all_deletions = Map.put(all_deletions, quad, state.index) + all_deletions = Map.update(all_deletions, quad, [state.index], &[state.index | &1]) {true_inserts, true_deletions, all_inserts, all_deletions} end @@ -205,10 +212,13 @@ defmodule Cache.Deltas do deletions = Enum.group_by(true_deletions, &elem(&1, 1), &{:effective_delete, convert_quad(elem(&1, 0))}) - all_inserts = Enum.group_by(all_inserts, &elem(&1, 1), &{:insert, convert_quad(elem(&1, 0))}) + all_inserts = + Enum.flat_map(all_inserts, fn {quad, ids} -> Enum.map(ids, &{quad, &1}) end) + |> Enum.group_by(&elem(&1, 1), &{:insert, convert_quad(elem(&1, 0))}) all_deletions = - Enum.group_by(all_deletions, &elem(&1, 1), &{:delete, convert_quad(elem(&1, 0))}) + Enum.flat_map(all_deletions, fn {quad, ids} -> Enum.map(ids, &{quad, &1}) end) + |> Enum.group_by(&elem(&1, 1), &{:delete, convert_quad(elem(&1, 0))}) # Combine all things total = @@ -230,8 +240,6 @@ defmodule Cache.Deltas do Map.get(total, index, []) |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) |> Enum.reduce(other_meta, &add_delta/2) - - # |> Map.merge(other_meta) end) %{ @@ -319,20 +327,27 @@ defmodule Cache.Deltas do # delta_meta: mu_call_id_trail, authorization_groups, origin def handle_cast({:cache_w_construct, quads, delta_meta, options}, state) do timeout_sessions = Application.get_env(:"mu-authorization", :quad_change_cache_session) - current_timeout = Map.get(state, :ref, nil) - state = if is_nil(current_timeout) or timeout_sessions do - if not is_nil(current_timeout) do - Process.cancel_timer(current_timeout) - end + current_timeout = + Map.get(state, :ref, nil) + |> IO.inspect(label: "current timeout") - timeout_duration = Application.get_env(:"mu-authorization", :quad_change_cache_timeout) - ref = Process.send_after(self(), {:timeout, options}, timeout_duration) + state = + if is_nil(current_timeout) or timeout_sessions do + if not is_nil(current_timeout) do + Process.cancel_timer(current_timeout) + end - Map.put(state, :ref, ref) - else - state - end + timeout_duration = + Application.get_env(:"mu-authorization", :quad_change_cache_timeout) + |> IO.inspect(label: "timeout duration") + + ref = Process.send_after(self(), {:timeout, options}, timeout_duration) + + Map.put(state, :ref, ref) + else + state + end deltas = Enum.flat_map(quads, fn {type, qs} -> Enum.map(qs, &{type, &1}) end) quads = Enum.map(deltas, &elem(&1, 1)) @@ -358,6 +373,7 @@ defmodule Cache.Deltas do end def handle_info({:timeout, options}, state) do + IO.puts("Timeout timeout!") new_state = do_flush(state, options) {:noreply, new_state} From 39640b489ed39c182632faa710762d8c5c0e737b Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Tue, 24 Aug 2021 10:49:43 +0200 Subject: [PATCH 14/19] bump --- lib/cache/deltas.ex | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/cache/deltas.ex b/lib/cache/deltas.ex index bcdb208..0c15d7a 100644 --- a/lib/cache/deltas.ex +++ b/lib/cache/deltas.ex @@ -80,6 +80,17 @@ defmodule Cache.Deltas do |> MapSet.new() end + defp merge_quads_in_non_overlapping_quads(quads) do + # Filter per graph + # Merge seperate graphs + # return quads + end + + # SELECT DISTINCT ?g ?s ?p ?o WHERE { VALUES (?g ?s ?p ?o) { ... } ?g ?s ?p ?o } + defp quads_in_store_with_select(quads) do + nil + end + # From current quads, calculate frequency of _triple_ # Equal quads have no influence, but same triples from different graphs # cannot be queried with the same CONSTRUCT query From 408f468af77ad8386887052afbd9d60b1ad54dc6 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Wed, 25 Aug 2021 10:05:26 +0200 Subject: [PATCH 15/19] bump2 --- lib/cache/deltas.ex | 156 ++++++++++++++++++++----- lib/updates/query_analyzer.ex | 10 ++ lib/updates/query_constructors.ex | 182 +++++++++++++++--------------- 3 files changed, 234 insertions(+), 114 deletions(-) diff --git a/lib/cache/deltas.ex b/lib/cache/deltas.ex index 0c15d7a..5de610d 100644 --- a/lib/cache/deltas.ex +++ b/lib/cache/deltas.ex @@ -35,21 +35,18 @@ defmodule Cache.Deltas do # @spec add_deltas(QueryAnalyzer.quad_changes(), cache_logic_key()) :: :ok def add_deltas(quad_changes, options, logic, delta_meta \\ []) do - case logic do - # TODO - # :precache -> - # GenServer.cast(__MODULE__, {:cache_w_cache, quad_changes}) - - :construct -> - GenServer.cast( - __MODULE__, - {:cache_w_construct, quad_changes, Map.new(delta_meta), options} - ) - - # TODO - # :ask -> - # GenServer.cast(__MODULE__, {:cache_w_ask, quad_changes}) - end + # TODO + # :precache -> + # GenServer.cast(__MODULE__, {:cache_w_cache, quad_changes}) + + GenServer.cast( + __MODULE__, + {:cache, logic, quad_changes, Map.new(delta_meta), options} + ) + + # TODO + # :ask -> + # GenServer.cast(__MODULE__, {:cache_w_ask, quad_changes}) end ## Create tuple from literal {type, value} @@ -80,15 +77,59 @@ defmodule Cache.Deltas do |> MapSet.new() end + defp quad_equal_without_graph( + %Quad{ + subject: s1, + predicate: p1, + object: o1, + graph: _graph + }, + %Quad{ + subject: s2, + predicate: p2, + object: o2, + graph: _graph + } + ) do + s1 == s2 and p1 == p1 and o1 == o2 + end + + defp split_into_nonoverlapping(cum, []) do + cum + end + + defp split_into_nonoverlapping(cum, xs) do + # if el can merge into acc, return {[], acc ++ el} + # else {[el], acc} + el_can_merge = fn el, acc -> Enum.any?(el, &Enum.member?(acc, &1)) end + + {xs, cum} = + Enum.flat_map_reduce(xs, cum, fn el, acc -> + # TODO check syntax! + (el_can_merge(el, acc) && {[], acc ++ el}) || {[el], acc} + end) + + [cum | split_into_nonoverlapping([], xs)] + end + defp merge_quads_in_non_overlapping_quads(quads) do # Filter per graph # Merge seperate graphs # return quads + per_graph = + Enum.group_by(quads, fn x -> x.graph end) + |> Map.values() + + split_into_nonoverlapping([], per_graph) end # SELECT DISTINCT ?g ?s ?p ?o WHERE { VALUES (?g ?s ?p ?o) { ... } ?g ?s ?p ?o } defp quads_in_store_with_select(quads) do - nil + QueryAnalyzer.construct_select_distinct_matching_quads(quads) + |> SparqlClient.execute_parsed(query_type: :read) + |> IO.inspect(label: "HALLO HERE BRO") + + quads end # From current quads, calculate frequency of _triple_ @@ -116,7 +157,10 @@ defmodule Cache.Deltas do # so the CONSTRUCT query does not uniquely represent the quad in the triplestore # so an ASK query is executed (this shouldn't happen too often) defp quad_in_store?( - %{triple_counts: triple_counts, triples_in_store: triples_in_store}, + %CacheType.ConstructAndAsk{ + triple_counts: triple_counts, + triples_in_store: triples_in_store + }, %Quad{ subject: subject, predicate: predicate, @@ -133,6 +177,26 @@ defmodule Cache.Deltas do end end + defp quad_in_store?( + %CacheType.OnlyAsk{}, + quad + ) do + quad_in_store_with_ask?(quad) + end + + # TODO: Implement + defp quad_in_store!( + %CacheType.MultipleConstructs{}, + %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: _graph + } = quad + ) do + false + end + # Reduce :insert and :delete delta's into true and all delta's # All delta's have a list of indices. Only one insert can be an actual insert, # but multiple delta's can insert the same quad @@ -141,11 +205,11 @@ defmodule Cache.Deltas do # If an insert would insert a triple that is marked as true deletion, # this deletion and insertion are false. defp add_delta_to_state({:insert, quad}, state) do - meta = List.first(state.metas) + cache_type = List.first(state.metas).cache_type {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache new_cache = - if quad_in_store?(meta, quad) do + if quad_in_store?(cache_type, quad) do if Map.has_key?(true_deletions, quad) do # Element in store, but would be deleted # Remove true_deletion @@ -173,11 +237,11 @@ defmodule Cache.Deltas do # A deletion is a true deletion if the quad is present in the triplestore # If a true insertion would insert this quad, the insert is actually a false insert defp add_delta_to_state({:delete, quad}, state) do - meta = List.first(state.metas) + cache_type = List.first(state.metas).cache_type {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache new_cache = - if quad_in_store?(meta, quad) do + if quad_in_store?(cache_type, quad) do index = Enum.min(Map.get(all_deletions, quad, [state.index])) true_deletions = Map.put_new(true_deletions, quad, index) all_deletions = Map.update(all_deletions, quad, [state.index], &[state.index | &1]) @@ -336,7 +400,7 @@ defmodule Cache.Deltas do end # delta_meta: mu_call_id_trail, authorization_groups, origin - def handle_cast({:cache_w_construct, quads, delta_meta, options}, state) do + def handle_cast({:cache, type, quads, delta_meta, options}, state) do timeout_sessions = Application.get_env(:"mu-authorization", :quad_change_cache_session) current_timeout = @@ -364,13 +428,13 @@ defmodule Cache.Deltas do quads = Enum.map(deltas, &elem(&1, 1)) # Calculate meta data - triple_counts = triple_counts_with_graph_differences(quads) - triples_in_store = quads_in_store_with_construct(quads) + cache_type = create_cache_type(type, quads) + + quads_in_store_with_select(quads) # Add metadata to state meta = %{ - triple_counts: triple_counts, - triples_in_store: triples_in_store, + cache_type: cache_type, delta_meta: delta_meta, index: state.index + 1 } @@ -383,6 +447,25 @@ defmodule Cache.Deltas do {:noreply, new_state} end + defp create_cache_type(:construct, quads) do + triple_counts = triple_counts_with_graph_differences(quads) + triples_in_store = quads_in_store_with_construct(quads) + + %CacheType.ConstructAndAsk{triple_counts: triple_counts, triples_in_store: triples_in_store} + end + + defp create_cache_type(:onlyask, quads) do + %CacheType.OnlyAsk{} + end + + defp create_cache_type(:multiple_constructs, quads) do + %CacheType.MultipleConstructs{quads_in_store: nil} + end + + defp create_cache_type(:construct_with_select, quads) do + %CacheType.ConstructSelect{quads_in_store: nil} + end + def handle_info({:timeout, options}, state) do IO.puts("Timeout timeout!") new_state = do_flush(state, options) @@ -390,3 +473,24 @@ defmodule Cache.Deltas do {:noreply, new_state} end end + +defmodule CacheType.ConstructAndAsk do + @enforce_keys [:triple_counts, :triples_in_store] + defstruct [:triple_counts, :triples_in_store] +end + +defmodule CacheType.OnlyAsk do + defstruct +end + +defmodule CacheType.MultipleConstructs do + @enforce_keys [:quads_in_store] + defstruct [:quads_in_store] +end + +defmodule CacheType.ConstructSelect do + @enforce_keys [:quads_in_store] + defstruct [:quads_in_store] +end + +# You like kinda want an 'instant' struct but that changes more then `quad_in_store` diff --git a/lib/updates/query_analyzer.ex b/lib/updates/query_analyzer.ex index 29e623c..02795a1 100644 --- a/lib/updates/query_analyzer.ex +++ b/lib/updates/query_analyzer.ex @@ -857,6 +857,16 @@ defmodule Updates.QueryAnalyzer do |> Map.put(:default_graph, iri) end + def construct_select_distinct_matching_quads(quads) do + graph = QueryConstructors/make_var_symbol("?g") + subject = QueryConstructors.make_var_symbol("?s") + predicate = QueryConstructors.make_var_symbol("?p") + object = QueryConstructors.make_var_symbol("?o") + where_clause = QueryConstructors.make_quad_match_values(graph, subject, predicate, object, quads) + + QueryConstructors.make_select_distinct_query([graph, subject, predicate, object], where_clause) + end + def construct_asks_query(quads) do QueryConstructors.make_asks_query(quads) end diff --git a/lib/updates/query_constructors.ex b/lib/updates/query_constructors.ex index e6ea6e1..156567a 100644 --- a/lib/updates/query_constructors.ex +++ b/lib/updates/query_constructors.ex @@ -34,13 +34,13 @@ defmodule Updates.QueryConstructors do Can be used as multiple ASK queries with only a little bit of suffering """ def make_asks_query(quads) do - matches = quads |> Enum.flat_map(&map_quad_to_bracet_data_block_values/1) - graph = make_var_symbol("?g") subject = make_var_symbol("?s") predicate = make_var_symbol("?p") object = make_var_symbol("?o") + where_clause = make_quad_match_values(graph, subject, predicate, object, quads) + %Sym{ symbol: :Sparql, submatches: [ @@ -79,91 +79,7 @@ defmodule Updates.QueryConstructors do symbol: :WhereClause, submatches: [ %Word{word: "WHERE"}, - %Sym{ - symbol: :GroupGraphPattern, - submatches: [ - %Word{word: "{"}, - %Sym{ - symbol: :GroupGraphPatternSub, - submatches: [ - %Sym{ - symbol: :GraphPatternNotTriples, - submatches: [ - %Sym{ - symbol: :InlineData, - submatches: [ - %Word{word: "VALUES"}, - %Sym{ - symbol: :DataBlock, - submatches: [ - %Sym{ - symbol: :InlineDataFull, - submatches: - [ - %Word{word: "("}, - graph, - subject, - predicate, - object, - %Word{word: ")"}, - %Word{word: "{"} - ] ++ - matches ++ - [ - %Word{word: "}"} - ] - } - ] - } - ] - } - ] - }, - %Sym{ - symbol: :GraphPatternNotTriples, - submatches: [ - %Sym{ - symbol: :GraphGraphPattern, - submatches: [ - %Word{word: "GRAPH"}, - %Sym{ - symbol: :VarOrIri, - submatches: [ - graph - ] - }, - %Sym{ - symbol: :GroupGraphPattern, - submatches: [ - %Word{word: "{"}, - %Sym{ - symbol: :GroupGraphPatternSub, - submatches: [ - %Sym{ - symbol: :TriplesBlock, - submatches: [ - make_simple_triples_same_subject_path( - subject, - predicate, - object - ), - %Word{word: "."} - ] - } - ] - }, - %Word{word: "}"} - ] - } - ] - } - ] - } - ] - }, - %Word{word: "}"} - ] - } + ] }, %Sym{ @@ -184,6 +100,96 @@ defmodule Updates.QueryConstructors do } end + def make_quad_match_values(graph, subject, predicate, object, quads) do + matches = quads |> Enum.flat_map(&map_quad_to_bracet_data_block_values/1) + + %Sym{ + symbol: :GroupGraphPattern, + submatches: [ + %Word{word: "{"}, + %Sym{ + symbol: :GroupGraphPatternSub, + submatches: [ + %Sym{ + symbol: :GraphPatternNotTriples, + submatches: [ + %Sym{ + symbol: :InlineData, + submatches: [ + %Word{word: "VALUES"}, + %Sym{ + symbol: :DataBlock, + submatches: [ + %Sym{ + symbol: :InlineDataFull, + submatches: + [ + %Word{word: "("}, + graph, + subject, + predicate, + object, + %Word{word: ")"}, + %Word{word: "{"} + ] ++ + matches ++ + [ + %Word{word: "}"} + ] + } + ] + } + ] + } + ] + }, + %Sym{ + symbol: :GraphPatternNotTriples, + submatches: [ + %Sym{ + symbol: :GraphGraphPattern, + submatches: [ + %Word{word: "GRAPH"}, + %Sym{ + symbol: :VarOrIri, + submatches: [ + graph + ] + }, + %Sym{ + symbol: :GroupGraphPattern, + submatches: [ + %Word{word: "{"}, + %Sym{ + symbol: :GroupGraphPatternSub, + submatches: [ + %Sym{ + symbol: :TriplesBlock, + submatches: [ + make_simple_triples_same_subject_path( + subject, + predicate, + object + ), + %Word{word: "."} + ] + } + ] + }, + %Word{word: "}"} + ] + } + ] + } + ] + } + ] + }, + %Word{word: "}"} + ] + } + end + @doc """ Creates a valid ASK query, for a single quad """ @@ -571,7 +577,7 @@ defmodule Updates.QueryConstructors do make_select_distinct_query(variables, group_graph_pattern_sym) end - defp make_var_symbol(str) do + def make_var_symbol(str) do %Sym{ symbol: :Var, submatches: [%Sym{symbol: :VAR1, string: str, submatches: :none}] From f105c1000613e0a2ae4708a2ef2dca60b889121c Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Wed, 25 Aug 2021 11:58:39 +0200 Subject: [PATCH 16/19] Split CacheType from Deltas --- lib/cache/cache_types.ex | 153 +++++++++++++++++++++++++++++ lib/cache/deltas.ex | 154 ++---------------------------- lib/updates/query_analyzer.ex | 2 +- lib/updates/query_constructors.ex | 2 +- 4 files changed, 161 insertions(+), 150 deletions(-) create mode 100644 lib/cache/cache_types.ex diff --git a/lib/cache/cache_types.ex b/lib/cache/cache_types.ex new file mode 100644 index 0000000..787ece7 --- /dev/null +++ b/lib/cache/cache_types.ex @@ -0,0 +1,153 @@ +defmodule CacheType do + alias SparqlServer.Router.AccessGroupSupport, as: AccessGroupSupport + alias Updates.QueryAnalyzer + alias Updates.QueryAnalyzer.P, as: QueryAnalyzerProtocol + alias Updates.QueryAnalyzer.Types.Quad, as: Quad + + require Logger + require ALog + + defmodule ConstructAndAsk do + @enforce_keys [:triple_counts, :triples_in_store] + defstruct [:triple_counts, :triples_in_store] + end + + defmodule OnlyAsk do + defstruct [] + end + + defmodule MultipleConstructs do + @enforce_keys [:quads_in_store] + defstruct [:quads_in_store] + end + + defmodule Select do + @enforce_keys [:quads_in_store] + defstruct [:quads_in_store] + end + + defp quad_in_store_with_ask?(quad) do + (QueryAnalyzer.construct_ask_query(quad) + |> SparqlClient.execute_parsed(query_type: :read))["boolean"] + end + + ## Create tuple from literal {type, value} + defp get_result_tuple(x) do + out = QueryAnalyzerProtocol.to_sparql_result_value(x) + {out.type, out.value} + end + + defp tuple_from_bindings(%{"o" => object, "s" => subject, "p" => predicate, "g" => graph}) do + { + {graph["type"], graph["value"]}, + {subject["type"], subject["value"]}, + {predicate["type"], predicate["value"]}, + {object["type"], object["value"]} + } + end + + defp tuple_from_bindings(%{"o" => object, "s" => subject, "p" => predicate}) do + { + {subject["type"], subject["value"]}, + {predicate["type"], predicate["value"]}, + {object["type"], object["value"]} + } + end + + defp query_to_results(query) do + query + |> SparqlClient.execute_parsed(query_type: :read) + |> Map.get("results") + |> Map.get("bindings") + |> Enum.map(&tuple_from_bindings/1) + |> MapSet.new() + end + + # From current quads, analyse what quads are already present + defp triples_in_store_with_construct(quads) do + quads + |> QueryAnalyzer.construct_asks_query() + |> query_to_results() + end + + def quads_in_store_with_select(quads) do + quads + |> QueryAnalyzer.construct_select_distinct_matching_quads() + |> query_to_results() + end + + # From current quads, calculate frequency of _triple_ + # Equal quads have no influence, but same triples from different graphs + # cannot be queried with the same CONSTRUCT query + # (because CONSTRUCT only returns triples) + defp triple_counts_with_graph_differences(quads) do + quads + |> Enum.uniq() + |> Enum.map(fn %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: _graph + } -> + {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} + end) + |> Enum.frequencies() + end + + # Test if a quad is inn the store + # If the calculated frequency is one, the existence of the triple in the CONSTRUCT query + # uniquely represents the existence of the quad in the triplestore + # If the calculated frequency is more, the triple might exist in more graphs + # so the CONSTRUCT query does not uniquely represent the quad in the triplestore + # so an ASK query is executed (this shouldn't happen too often) + def create_cache_type(:construct, quads) do + triple_counts = triple_counts_with_graph_differences(quads) + triples_in_store = triples_in_store_with_construct(quads) + + %ConstructAndAsk{triple_counts: triple_counts, triples_in_store: triples_in_store} + end + + def create_cache_type(:select, quads) do + quads_in_store = quads_in_store_with_select(quads) + + %Select{quads_in_store: quads_in_store} + end + + def quad_in_store?(%Select{quads_in_store: quads_in_store}, %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: graph + }) do + IO.puts("quad in store with Select") + + value = + {get_result_tuple(graph), get_result_tuple(subject), get_result_tuple(predicate), + get_result_tuple(object)} + + value in quads_in_store + end + + def quad_in_store?( + %ConstructAndAsk{ + triple_counts: triple_counts, + triples_in_store: triples_in_store + }, + %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: _graph + } = quad + ) do + IO.puts("quad in store with ConstructAndAsk") + + value = {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} + + if Map.get(triple_counts, value, 0) > 1 do + quad_in_store_with_ask?(quad) + else + value in triples_in_store + end + end +end diff --git a/lib/cache/deltas.ex b/lib/cache/deltas.ex index 5de610d..6beafda 100644 --- a/lib/cache/deltas.ex +++ b/lib/cache/deltas.ex @@ -1,8 +1,8 @@ defmodule Cache.Deltas do + alias SparqlServer.Router.AccessGroupSupport, as: AccessGroupSupport alias Updates.QueryAnalyzer alias Updates.QueryAnalyzer.P, as: QueryAnalyzerProtocol alias Updates.QueryAnalyzer.Types.Quad, as: Quad - alias SparqlServer.Router.AccessGroupSupport, as: AccessGroupSupport require Logger require ALog @@ -49,34 +49,6 @@ defmodule Cache.Deltas do # GenServer.cast(__MODULE__, {:cache_w_ask, quad_changes}) end - ## Create tuple from literal {type, value} - defp get_result_tuple(x) do - out = QueryAnalyzerProtocol.to_sparql_result_value(x) - {out.type, out.value} - end - - defp quad_in_store_with_ask?(quad) do - (QueryAnalyzer.construct_ask_query(quad) - |> SparqlClient.execute_parsed(query_type: :read))["boolean"] - end - - # From current quads, analyse what quads are already present - defp quads_in_store_with_construct(quads) do - quads - |> QueryAnalyzer.construct_asks_query() - |> SparqlClient.execute_parsed(query_type: :read) - |> Map.get("results") - |> Map.get("bindings") - |> Enum.map(fn %{"o" => object, "s" => subject, "p" => predicate} -> - { - {subject["type"], subject["value"]}, - {predicate["type"], predicate["value"]}, - {object["type"], object["value"]} - } - end) - |> MapSet.new() - end - defp quad_equal_without_graph( %Quad{ subject: s1, @@ -91,7 +63,7 @@ defmodule Cache.Deltas do graph: _graph } ) do - s1 == s2 and p1 == p1 and o1 == o2 + s1 == s2 and p1 == p2 and o1 == o2 end defp split_into_nonoverlapping(cum, []) do @@ -106,7 +78,7 @@ defmodule Cache.Deltas do {xs, cum} = Enum.flat_map_reduce(xs, cum, fn el, acc -> # TODO check syntax! - (el_can_merge(el, acc) && {[], acc ++ el}) || {[el], acc} + (el_can_merge.(el, acc) && {[], acc ++ el}) || {[el], acc} end) [cum | split_into_nonoverlapping([], xs)] @@ -123,80 +95,6 @@ defmodule Cache.Deltas do split_into_nonoverlapping([], per_graph) end - # SELECT DISTINCT ?g ?s ?p ?o WHERE { VALUES (?g ?s ?p ?o) { ... } ?g ?s ?p ?o } - defp quads_in_store_with_select(quads) do - QueryAnalyzer.construct_select_distinct_matching_quads(quads) - |> SparqlClient.execute_parsed(query_type: :read) - |> IO.inspect(label: "HALLO HERE BRO") - - quads - end - - # From current quads, calculate frequency of _triple_ - # Equal quads have no influence, but same triples from different graphs - # cannot be queried with the same CONSTRUCT query - # (because CONSTRUCT only returns triples) - defp triple_counts_with_graph_differences(quads) do - quads - |> Enum.uniq() - |> Enum.map(fn %Quad{ - subject: subject, - predicate: predicate, - object: object, - graph: _graph - } -> - {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} - end) - |> Enum.frequencies() - end - - # Test if a quad is inn the store - # If the calculated frequency is one, the existence of the triple in the CONSTRUCT query - # uniquely represents the existence of the quad in the triplestore - # If the calculated frequency is more, the triple might exist in more graphs - # so the CONSTRUCT query does not uniquely represent the quad in the triplestore - # so an ASK query is executed (this shouldn't happen too often) - defp quad_in_store?( - %CacheType.ConstructAndAsk{ - triple_counts: triple_counts, - triples_in_store: triples_in_store - }, - %Quad{ - subject: subject, - predicate: predicate, - object: object, - graph: _graph - } = quad - ) do - value = {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} - - if Map.get(triple_counts, value, 0) > 1 do - quad_in_store_with_ask?(quad) - else - value in triples_in_store - end - end - - defp quad_in_store?( - %CacheType.OnlyAsk{}, - quad - ) do - quad_in_store_with_ask?(quad) - end - - # TODO: Implement - defp quad_in_store!( - %CacheType.MultipleConstructs{}, - %Quad{ - subject: subject, - predicate: predicate, - object: object, - graph: _graph - } = quad - ) do - false - end - # Reduce :insert and :delete delta's into true and all delta's # All delta's have a list of indices. Only one insert can be an actual insert, # but multiple delta's can insert the same quad @@ -209,7 +107,7 @@ defmodule Cache.Deltas do {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache new_cache = - if quad_in_store?(cache_type, quad) do + if CacheType.quad_in_store?(cache_type, quad) do if Map.has_key?(true_deletions, quad) do # Element in store, but would be deleted # Remove true_deletion @@ -241,7 +139,7 @@ defmodule Cache.Deltas do {true_inserts, true_deletions, all_inserts, all_deletions} = state.cache new_cache = - if quad_in_store?(cache_type, quad) do + if CacheType.quad_in_store?(cache_type, quad) do index = Enum.min(Map.get(all_deletions, quad, [state.index])) true_deletions = Map.put_new(true_deletions, quad, index) all_deletions = Map.update(all_deletions, quad, [state.index], &[state.index | &1]) @@ -428,9 +326,7 @@ defmodule Cache.Deltas do quads = Enum.map(deltas, &elem(&1, 1)) # Calculate meta data - cache_type = create_cache_type(type, quads) - - quads_in_store_with_select(quads) + cache_type = CacheType.create_cache_type(type, quads) # Add metadata to state meta = %{ @@ -447,25 +343,6 @@ defmodule Cache.Deltas do {:noreply, new_state} end - defp create_cache_type(:construct, quads) do - triple_counts = triple_counts_with_graph_differences(quads) - triples_in_store = quads_in_store_with_construct(quads) - - %CacheType.ConstructAndAsk{triple_counts: triple_counts, triples_in_store: triples_in_store} - end - - defp create_cache_type(:onlyask, quads) do - %CacheType.OnlyAsk{} - end - - defp create_cache_type(:multiple_constructs, quads) do - %CacheType.MultipleConstructs{quads_in_store: nil} - end - - defp create_cache_type(:construct_with_select, quads) do - %CacheType.ConstructSelect{quads_in_store: nil} - end - def handle_info({:timeout, options}, state) do IO.puts("Timeout timeout!") new_state = do_flush(state, options) @@ -474,23 +351,4 @@ defmodule Cache.Deltas do end end -defmodule CacheType.ConstructAndAsk do - @enforce_keys [:triple_counts, :triples_in_store] - defstruct [:triple_counts, :triples_in_store] -end - -defmodule CacheType.OnlyAsk do - defstruct -end - -defmodule CacheType.MultipleConstructs do - @enforce_keys [:quads_in_store] - defstruct [:quads_in_store] -end - -defmodule CacheType.ConstructSelect do - @enforce_keys [:quads_in_store] - defstruct [:quads_in_store] -end - # You like kinda want an 'instant' struct but that changes more then `quad_in_store` diff --git a/lib/updates/query_analyzer.ex b/lib/updates/query_analyzer.ex index 02795a1..9ebbc0a 100644 --- a/lib/updates/query_analyzer.ex +++ b/lib/updates/query_analyzer.ex @@ -858,7 +858,7 @@ defmodule Updates.QueryAnalyzer do end def construct_select_distinct_matching_quads(quads) do - graph = QueryConstructors/make_var_symbol("?g") + graph = QueryConstructors.make_var_symbol("?g") subject = QueryConstructors.make_var_symbol("?s") predicate = QueryConstructors.make_var_symbol("?p") object = QueryConstructors.make_var_symbol("?o") diff --git a/lib/updates/query_constructors.ex b/lib/updates/query_constructors.ex index 156567a..c7de4f6 100644 --- a/lib/updates/query_constructors.ex +++ b/lib/updates/query_constructors.ex @@ -79,7 +79,7 @@ defmodule Updates.QueryConstructors do symbol: :WhereClause, submatches: [ %Word{word: "WHERE"}, - + where_clause ] }, %Sym{ From cdeb05413b984da05f00fadfcda9100701ae3067 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Wed, 25 Aug 2021 15:33:45 +0200 Subject: [PATCH 17/19] add other query methods --- lib/cache/cache_types.ex | 92 ++++++++++++++++++++- lib/cache/deltas.ex | 46 ----------- lib/sparql_server/router/handler_support.ex | 5 +- 3 files changed, 93 insertions(+), 50 deletions(-) diff --git a/lib/cache/cache_types.ex b/lib/cache/cache_types.ex index 787ece7..ee891fd 100644 --- a/lib/cache/cache_types.ex +++ b/lib/cache/cache_types.ex @@ -1,5 +1,4 @@ defmodule CacheType do - alias SparqlServer.Router.AccessGroupSupport, as: AccessGroupSupport alias Updates.QueryAnalyzer alias Updates.QueryAnalyzer.P, as: QueryAnalyzerProtocol alias Updates.QueryAnalyzer.Types.Quad, as: Quad @@ -17,8 +16,8 @@ defmodule CacheType do end defmodule MultipleConstructs do - @enforce_keys [:quads_in_store] - defstruct [:quads_in_store] + @enforce_keys [:non_overlapping_quads] + defstruct [:non_overlapping_quads] end defmodule Select do @@ -94,6 +93,61 @@ defmodule CacheType do |> Enum.frequencies() end + defp quad_equal_without_graph( + %Quad{ + subject: s1, + predicate: p1, + object: o1 + }, + %Quad{ + subject: s2, + predicate: p2, + object: o2 + } + ) do + s1 == s2 and p1 == p2 and o1 == o2 + end + + defp split_into_nonoverlapping(cum, []) do + cum + end + + defp split_into_nonoverlapping(cum, xs) do + # if el can merge into acc, return {[], acc ++ el} + # else {[el], acc} + el_can_merge = fn el, acc -> + not Enum.any?(el, fn x -> Enum.any?(acc, &quad_equal_without_graph(x, &1)) end) + end + + {xs, cum} = + Enum.flat_map_reduce(xs, cum, fn el, acc -> + # TODO check syntax! + (el_can_merge.(el, acc) && {[], acc ++ el}) || {[el], acc} + end) + + [cum | split_into_nonoverlapping([], xs)] + end + + defp merge_quads_in_non_overlapping_quads(quads) do + # Filter per graph + # Merge seperate graphs + # return quads + per_graph = + Enum.group_by(quads, fn x -> x.graph end) + |> Map.values() + + split_into_nonoverlapping([], per_graph) + end + + defp quad_list_to_constructed_graphs(quads) do + graphs = + Enum.map(quads, fn x -> get_result_tuple(x.graph) end) + |> MapSet.new() + + triples_in_store = triples_in_store_with_construct(quads) + {graphs, triples_in_store} + end + # Test if a quad is inn the store # If the calculated frequency is one, the existence of the triple in the CONSTRUCT query # uniquely represents the existence of the quad in the triplestore @@ -113,6 +167,33 @@ defmodule CacheType do %Select{quads_in_store: quads_in_store} end + def create_cache_type(:multiple_constructs, quads) do + non_overlapping_quads = + merge_quads_in_non_overlapping_quads(quads) + |> Enum.map(&quad_list_to_constructed_graphs/1) + + %MultipleConstructs{non_overlapping_quads: non_overlapping_quads} + end + + def create_cache_type(:only_asks, _quads) do + %OnlyAsk{} + end + + def quad_in_store?(%MultipleConstructs{non_overlapping_quads: non_overlapping_quads}, %Quad{ + subject: subject, + predicate: predicate, + object: object, + graph: graph + }) do + IO.puts("quad in store with MultipleConstructs") + g = get_result_tuple(graph) + value = {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} + + {_, quads_in_this_store} = Enum.find(non_overlapping_quads, fn {gs, _} -> g in gs end) + + value in quads_in_this_store + end + def quad_in_store?(%Select{quads_in_store: quads_in_store}, %Quad{ subject: subject, predicate: predicate, @@ -128,6 +209,11 @@ defmodule CacheType do value in quads_in_store end + def quad_in_store?(%OnlyAsk{}, quad) do + IO.puts("quad in store with OnlyAsk") + quad_in_store_with_ask?(quad) + end + def quad_in_store?( %ConstructAndAsk{ triple_counts: triple_counts, diff --git a/lib/cache/deltas.ex b/lib/cache/deltas.ex index 6beafda..c187022 100644 --- a/lib/cache/deltas.ex +++ b/lib/cache/deltas.ex @@ -49,52 +49,6 @@ defmodule Cache.Deltas do # GenServer.cast(__MODULE__, {:cache_w_ask, quad_changes}) end - defp quad_equal_without_graph( - %Quad{ - subject: s1, - predicate: p1, - object: o1, - graph: _graph - }, - %Quad{ - subject: s2, - predicate: p2, - object: o2, - graph: _graph - } - ) do - s1 == s2 and p1 == p2 and o1 == o2 - end - - defp split_into_nonoverlapping(cum, []) do - cum - end - - defp split_into_nonoverlapping(cum, xs) do - # if el can merge into acc, return {[], acc ++ el} - # else {[el], acc} - el_can_merge = fn el, acc -> Enum.any?(el, &Enum.member?(acc, &1)) end - - {xs, cum} = - Enum.flat_map_reduce(xs, cum, fn el, acc -> - # TODO check syntax! - (el_can_merge.(el, acc) && {[], acc ++ el}) || {[el], acc} - end) - - [cum | split_into_nonoverlapping([], xs)] - end - - defp merge_quads_in_non_overlapping_quads(quads) do - # Filter per graph - # Merge seperate graphs - # return quads - per_graph = - Enum.group_by(quads, fn x -> x.graph end) - |> Map.values() - - split_into_nonoverlapping([], per_graph) - end - # Reduce :insert and :delete delta's into true and all delta's # All delta's have a list of indices. Only one insert can be an actual insert, # but multiple delta's can insert the same quad diff --git a/lib/sparql_server/router/handler_support.ex b/lib/sparql_server/router/handler_support.ex index c7f7263..38bb177 100644 --- a/lib/sparql_server/router/handler_support.ex +++ b/lib/sparql_server/router/handler_support.ex @@ -169,6 +169,9 @@ defmodule SparqlServer.Router.HandlerSupport do |> enrich_manipulations_with_access_rights(authorization_groups) |> maybe_verify_all_triples_written() + cache_type = Plug.Conn.get_req_header(conn, "cache_type") + |> List.first() || "construct" + case analyzed_quads do {:fail, reason} -> encoded_response_string = Poison.encode!(%{errors: [%{status: "403", title: reason}]}) @@ -181,7 +184,7 @@ defmodule SparqlServer.Router.HandlerSupport do end) |> Cache.Deltas.add_deltas( options, - :construct, + String.to_atom(cache_type), origin: origin, mu_call_id_trail: mu_call_id_trail, authorization_groups: authorization_groups From 31f163227b0e3b8bb407e60e6aef95e1696e610f Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Wed, 25 Aug 2021 15:34:47 +0200 Subject: [PATCH 18/19] Revert Dockerfile --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index c51916a..a968702 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM mine/elixir-server:latest +FROM madnificent/elixir-server:1.10.0 ENV MU_SPARQL_ENDPOINT 'http://database:8890/sparql' ENV LOG_ELIXIR_STARTUP_COMMAND 'true' From 89464cfc293aa5568f0eb03d090dfeda687605b8 Mon Sep 17 00:00:00 2001 From: ajuvercr Date: Thu, 26 Aug 2021 00:35:36 +0200 Subject: [PATCH 19/19] make select default + remove debug statements --- lib/cache/cache_types.ex | 17 +++++++++++------ lib/sparql_server/router/handler_support.ex | 2 +- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/lib/cache/cache_types.ex b/lib/cache/cache_types.ex index ee891fd..02c4e9f 100644 --- a/lib/cache/cache_types.ex +++ b/lib/cache/cache_types.ex @@ -154,6 +154,17 @@ defmodule CacheType do # If the calculated frequency is more, the triple might exist in more graphs # so the CONSTRUCT query does not uniquely represent the quad in the triplestore # so an ASK query is executed (this shouldn't happen too often) + @spec create_cache_type(:construct | :multiple_constructs | :only_asks | :select, any) :: %{ + :__struct__ => + CacheType.ConstructAndAsk + | CacheType.MultipleConstructs + | CacheType.OnlyAsk + | CacheType.Select, + optional(:non_overlapping_quads) => list, + optional(:quads_in_store) => MapSet.t(any), + optional(:triple_counts) => map, + optional(:triples_in_store) => MapSet.t(any) + } def create_cache_type(:construct, quads) do triple_counts = triple_counts_with_graph_differences(quads) triples_in_store = triples_in_store_with_construct(quads) @@ -185,7 +196,6 @@ defmodule CacheType do object: object, graph: graph }) do - IO.puts("quad in store with MultipleConstructs") g = get_result_tuple(graph) value = {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} @@ -200,8 +210,6 @@ defmodule CacheType do object: object, graph: graph }) do - IO.puts("quad in store with Select") - value = {get_result_tuple(graph), get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} @@ -210,7 +218,6 @@ defmodule CacheType do end def quad_in_store?(%OnlyAsk{}, quad) do - IO.puts("quad in store with OnlyAsk") quad_in_store_with_ask?(quad) end @@ -226,8 +233,6 @@ defmodule CacheType do graph: _graph } = quad ) do - IO.puts("quad in store with ConstructAndAsk") - value = {get_result_tuple(subject), get_result_tuple(predicate), get_result_tuple(object)} if Map.get(triple_counts, value, 0) > 1 do diff --git a/lib/sparql_server/router/handler_support.ex b/lib/sparql_server/router/handler_support.ex index 38bb177..03c12e6 100644 --- a/lib/sparql_server/router/handler_support.ex +++ b/lib/sparql_server/router/handler_support.ex @@ -170,7 +170,7 @@ defmodule SparqlServer.Router.HandlerSupport do |> maybe_verify_all_triples_written() cache_type = Plug.Conn.get_req_header(conn, "cache_type") - |> List.first() || "construct" + |> List.first() || "select" case analyzed_quads do {:fail, reason} ->