From 1150b1ab45ccba26fb9e96622fbf13b0d419a1ad Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Mon, 23 Sep 2024 17:44:33 +0200 Subject: [PATCH 01/10] Add QueryData class --- .../org/aksw/iguana/cc/query/QueryData.java | 50 +++++++++++++++++++ .../cc/query/list/FileBasedQueryList.java | 11 ++++ .../aksw/iguana/cc/query/list/QueryList.java | 3 ++ .../query/list/impl/FileCachingQueryList.java | 2 + .../query/list/impl/FileReadingQueryList.java | 6 +++ .../query/list/impl/StringListQueryList.java | 8 +++ 6 files changed, 80 insertions(+) create mode 100644 src/main/java/org/aksw/iguana/cc/query/QueryData.java diff --git a/src/main/java/org/aksw/iguana/cc/query/QueryData.java b/src/main/java/org/aksw/iguana/cc/query/QueryData.java new file mode 100644 index 000000000..22c870ce1 --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/query/QueryData.java @@ -0,0 +1,50 @@ +package org.aksw.iguana.cc.query; + +import org.aksw.iguana.cc.query.source.QuerySource; +import org.apache.jena.update.UpdateFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * This class stores extra information about a query. + * At the moment, it only stores if the query is an update query or not. + * + * @param queryId The id of the query + * @param update If the query is an update query + */ +public record QueryData(int queryId, boolean update) { + public static List generate(Collection queries) { + final var queryData = new ArrayList(); + int i = 0; + for (InputStream query : queries) { + boolean update = true; + try { + UpdateFactory.read(query); // Throws an exception if the query is not an update query + } catch (Exception e) { + update = false; + } + queryData.add(new QueryData(i++, update)); + } + return queryData; + } + + public static List generate(QuerySource queries) throws IOException { + final var streams = new ArrayList(); + int bound = queries.size(); + for (int i = 0; i < bound; i++) { + InputStream queryStream = queries.getQueryStream(i); + streams.add(queryStream); + } + return generate(streams); + } + + public static List generate(List queries) { + final var streams = queries.stream().map(s -> (InputStream) new ByteArrayInputStream(s.getBytes())).toList(); + return generate(streams); + } +} diff --git a/src/main/java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java index 0256fee53..f4412e85f 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java @@ -1,9 +1,11 @@ package org.aksw.iguana.cc.query.list; +import org.aksw.iguana.cc.query.QueryData; import org.aksw.iguana.cc.query.source.QuerySource; import java.io.IOException; import java.io.InputStream; +import java.util.List; /** * The abstract class for a QueryList. A query list provides the queries to the QueryHandler. @@ -16,6 +18,7 @@ public abstract class FileBasedQueryList implements QueryList{ * This is the QuerySource from which the queries should be retrieved. */ final protected QuerySource querySource; + protected List queryData; public FileBasedQueryList(QuerySource querySource) { if (querySource == null) { @@ -24,6 +27,10 @@ public FileBasedQueryList(QuerySource querySource) { this.querySource = querySource; } + protected void setQueryData(List queryData) { + this.queryData = queryData; + } + /** * This method returns the amount of queries in the query list. * @@ -52,4 +59,8 @@ public int hashCode() { public abstract String getQuery(int index) throws IOException; public abstract InputStream getQueryStream(int index) throws IOException; + + public QueryData getQueryData(int index) { + return queryData.get(index); + } } diff --git a/src/main/java/org/aksw/iguana/cc/query/list/QueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/QueryList.java index 623a8c67f..877a03f9a 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/QueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/QueryList.java @@ -1,5 +1,6 @@ package org.aksw.iguana.cc.query.list; +import org.aksw.iguana.cc.query.QueryData; import org.aksw.iguana.cc.query.source.QuerySource; import java.io.IOException; @@ -35,4 +36,6 @@ public interface QueryList { String getQuery(int index) throws IOException; InputStream getQueryStream(int index) throws IOException; + + QueryData getQueryData(int index); } diff --git a/src/main/java/org/aksw/iguana/cc/query/list/impl/FileCachingQueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/impl/FileCachingQueryList.java index 8f6c3a38d..78dbb349c 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/impl/FileCachingQueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/impl/FileCachingQueryList.java @@ -1,5 +1,6 @@ package org.aksw.iguana.cc.query.list.impl; +import org.aksw.iguana.cc.query.QueryData; import org.aksw.iguana.cc.query.list.FileBasedQueryList; import org.aksw.iguana.cc.query.source.QuerySource; import org.aksw.iguana.commons.io.ByteArrayListInputStream; @@ -40,6 +41,7 @@ public FileCachingQueryList(QuerySource querySource) throws IOException { queries.add(balos); } } + setQueryData(QueryData.generate(queries.stream().map(ByteArrayListOutputStream::toInputStream).toList())); } @Override diff --git a/src/main/java/org/aksw/iguana/cc/query/list/impl/FileReadingQueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/impl/FileReadingQueryList.java index e3074be04..045a326df 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/impl/FileReadingQueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/impl/FileReadingQueryList.java @@ -1,5 +1,6 @@ package org.aksw.iguana.cc.query.list.impl; +import org.aksw.iguana.cc.query.QueryData; import org.aksw.iguana.cc.query.list.FileBasedQueryList; import org.aksw.iguana.cc.query.source.QuerySource; @@ -15,6 +16,11 @@ public class FileReadingQueryList extends FileBasedQueryList { public FileReadingQueryList(QuerySource querySource) { super(querySource); + try { + setQueryData(QueryData.generate(querySource)); + } catch (IOException e) { + throw new RuntimeException("Could not read queries from the source.", e); + } } @Override diff --git a/src/main/java/org/aksw/iguana/cc/query/list/impl/StringListQueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/impl/StringListQueryList.java index 1d448940e..8c3dcfc64 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/impl/StringListQueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/impl/StringListQueryList.java @@ -1,5 +1,6 @@ package org.aksw.iguana.cc.query.list.impl; +import org.aksw.iguana.cc.query.QueryData; import org.aksw.iguana.cc.query.list.QueryList; import java.io.ByteArrayInputStream; @@ -10,9 +11,11 @@ public class StringListQueryList implements QueryList { private final List queries; + private final List queryData; public StringListQueryList(List queries) { this.queries = queries; + this.queryData = QueryData.generate(queries); } @Override @@ -34,4 +37,9 @@ public int size() { public int hashCode() { return queries.hashCode(); } + + @Override + public QueryData getQueryData(int index) { + return queryData.get(index); + } } From 5bdf321dc7064132bba6addcd65f7e42dfd02c5b Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Mon, 23 Sep 2024 17:53:47 +0200 Subject: [PATCH 02/10] Add test --- .../aksw/iguana/cc/query/QueryDataTest.java | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java diff --git a/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java b/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java new file mode 100644 index 000000000..19d6a7481 --- /dev/null +++ b/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java @@ -0,0 +1,68 @@ +package org.aksw.iguana.cc.query; + +import org.aksw.iguana.cc.query.source.QuerySource; +import org.aksw.iguana.cc.query.source.impl.FileSeparatorQuerySource; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class QueryDataTest { + + private static Path tempFile = null; + + @BeforeAll + public static void setup() throws IOException { + tempFile = Files.createTempFile("test", "txt"); + Files.writeString(tempFile, """ + SELECT ?s ?p ?o WHERE { + ?s ?p ?o + } + + INSERT DATA { + + } + + DELETE DATA { + + } + + SELECT ?s ?p ?o WHERE { + ?s ?p ?o + } + """); + } + + @AfterAll + public static void teardown() throws IOException { + Files.deleteIfExists(tempFile); + } + + @Test + void testGeneration() throws IOException { + final QuerySource querySource = new FileSeparatorQuerySource(tempFile, ""); + final var testStrings = querySource.getAllQueries(); + + List> generations = List.of( + QueryData.generate(testStrings), + QueryData.generate(testStrings.stream().map(s -> (InputStream) new ByteArrayInputStream(s.getBytes())).toList()), + QueryData.generate(querySource) + ); + for (List generation : generations) { + assertEquals(4, generation.size()); + assertFalse(generation.get(0).update()); + assertTrue(generation.get(1).update()); + assertTrue(generation.get(2).update()); + assertFalse(generation.get(3).update()); + } + } +} \ No newline at end of file From bdef045e8b4de1c9f8cc22be96cbcf27207ea5b7 Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Mon, 23 Sep 2024 18:14:32 +0200 Subject: [PATCH 03/10] Check for update queries --- .../iguana/cc/query/handler/QueryHandler.java | 9 +++++---- .../iguana/cc/utils/http/RequestFactory.java | 16 +++++++++++++--- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java b/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java index 6930d3f18..03a1f656b 100644 --- a/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java +++ b/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java @@ -145,8 +145,9 @@ public Template(URI endpoint, Long limit, Boolean save) { } } - public record QueryStringWrapper(int index, String query) {} - public record QueryStreamWrapper(int index, boolean cached, Supplier queryInputStreamSupplier) {} + public record QueryStringWrapper(int index, String query, boolean update) {} + + public record QueryStreamWrapper(int index, boolean cached, Supplier queryInputStreamSupplier, boolean update) {} protected static final Logger LOGGER = LoggerFactory.getLogger(QueryHandler.class); @@ -247,7 +248,7 @@ public QuerySelector getQuerySelectorInstance() { public QueryStringWrapper getNextQuery(QuerySelector querySelector) throws IOException { final var queryIndex = querySelector.getNextIndex(); - return new QueryStringWrapper(queryIndex, queryList.getQuery(queryIndex)); + return new QueryStringWrapper(queryIndex, queryList.getQuery(queryIndex), queryList.getQueryData(queryIndex).update()); } public QueryStreamWrapper getNextQueryStream(QuerySelector querySelector) { @@ -258,7 +259,7 @@ public QueryStreamWrapper getNextQueryStream(QuerySelector querySelector) { } catch (IOException e) { throw new RuntimeException(e); } - }); + }, queryList.getQueryData(queryIndex).update()); } @Override diff --git a/src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java b/src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java index e29fc5331..3392c31b7 100644 --- a/src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java +++ b/src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.io.InputStream; +import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; @@ -113,9 +114,18 @@ public AsyncRequestProducer buildHttpRequest(QueryHandler.QueryStreamWrapper que throw new IOException(e); } + // check if the query is an update query, if yes, change the request type to similar update request type + RequestType actualRequestType = requestType; + if (requestType == RequestType.GET_QUERY || requestType == RequestType.POST_QUERY) + actualRequestType = queryHandle.update() ? RequestType.POST_UPDATE : requestType; + if (requestType == RequestType.POST_URL_ENC_QUERY) + actualRequestType = queryHandle.update() ? RequestType.POST_URL_ENC_UPDATE : requestType; + // if only one endpoint is set, use it for both queries and updates + URI updateEndpoint = connectionConfig.updateEndpoint() != null ? connectionConfig.updateEndpoint() : connectionConfig.endpoint(); + // If the query is bigger than 2^31 bytes (2GB) and the request type is set to GET_QUERY, POST_URL_ENC_QUERY or // POST_URL_ENC_UPDATE, the following code will throw an exception. - switch (requestType) { + switch (actualRequestType) { case GET_QUERY -> asyncRequestBuilder = AsyncRequestBuilder.get(new URIBuilder(connectionConfig.endpoint()) .addParameter("query", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)) .build() @@ -127,10 +137,10 @@ public AsyncRequestProducer buildHttpRequest(QueryHandler.QueryStreamWrapper que .setEntity(new BasicAsyncEntityProducer(urlEncode("query", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)), null, false)); case POST_QUERY -> asyncRequestBuilder = AsyncRequestBuilder.post(connectionConfig.endpoint()) .setEntity(new StreamEntityProducer(queryStreamSupplier, !caching, "application/sparql-query")); - case POST_URL_ENC_UPDATE -> asyncRequestBuilder = AsyncRequestBuilder.post(connectionConfig.endpoint()) + case POST_URL_ENC_UPDATE -> asyncRequestBuilder = AsyncRequestBuilder.post(updateEndpoint) .setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded") .setEntity(new BasicAsyncEntityProducer(urlEncode("update", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)), null, false)); - case POST_UPDATE -> asyncRequestBuilder = AsyncRequestBuilder.post(connectionConfig.endpoint()) + case POST_UPDATE -> asyncRequestBuilder = AsyncRequestBuilder.post(updateEndpoint) .setEntity(new StreamEntityProducer(queryStreamSupplier, !caching, "application/sparql-update")); default -> throw new IllegalStateException("Unexpected value: " + requestType); } From 984cd18591e58e39c1e67a80dc5a9bb1e198f1c7 Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Tue, 24 Sep 2024 13:50:19 +0200 Subject: [PATCH 04/10] Move responsibility of QueryData to QueryHandler --- .../aksw/iguana/cc/query/handler/QueryHandler.java | 14 ++++++++++++-- .../iguana/cc/query/list/FileBasedQueryList.java | 9 --------- .../org/aksw/iguana/cc/query/list/QueryList.java | 2 -- .../cc/query/list/impl/FileCachingQueryList.java | 1 - .../cc/query/list/impl/FileReadingQueryList.java | 5 ----- .../cc/query/list/impl/StringListQueryList.java | 7 ------- 6 files changed, 12 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java b/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java index 03a1f656b..be30268fa 100644 --- a/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java +++ b/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.aksw.iguana.cc.query.QueryData; import org.aksw.iguana.cc.query.list.impl.StringListQueryList; import org.aksw.iguana.cc.query.selector.QuerySelector; import org.aksw.iguana.cc.query.selector.impl.LinearQuerySelector; @@ -156,6 +157,7 @@ public record QueryStreamWrapper(int index, boolean cached, Supplier queryData; private int workerCount = 0; // give every worker inside the same worker config an offset seed @@ -169,6 +171,7 @@ protected QueryHandler() { config = null; queryList = null; hashCode = 0; + queryData = null; } @JsonCreator @@ -185,6 +188,13 @@ public QueryHandler(Config config) throws IOException { new FileReadingQueryList(querySource); } this.hashCode = queryList.hashCode(); + this.queryData = QueryData.generate(IntStream.range(0, queryList.size()).mapToObj(i -> { + try { + return queryList.getQueryStream(i); + } catch (IOException e) { + throw new RuntimeException("Couldn't read query stream", e); + } + }).collect(Collectors.toList())); } private QueryList initializeTemplateQueryHandler(QuerySource templateSource) throws IOException { @@ -248,7 +258,7 @@ public QuerySelector getQuerySelectorInstance() { public QueryStringWrapper getNextQuery(QuerySelector querySelector) throws IOException { final var queryIndex = querySelector.getNextIndex(); - return new QueryStringWrapper(queryIndex, queryList.getQuery(queryIndex), queryList.getQueryData(queryIndex).update()); + return new QueryStringWrapper(queryIndex, queryList.getQuery(queryIndex), queryData.get(queryIndex).update()); } public QueryStreamWrapper getNextQueryStream(QuerySelector querySelector) { @@ -259,7 +269,7 @@ public QueryStreamWrapper getNextQueryStream(QuerySelector querySelector) { } catch (IOException e) { throw new RuntimeException(e); } - }, queryList.getQueryData(queryIndex).update()); + }, queryData.get(queryIndex).update()); } @Override diff --git a/src/main/java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java index f4412e85f..013093fe7 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java @@ -18,7 +18,6 @@ public abstract class FileBasedQueryList implements QueryList{ * This is the QuerySource from which the queries should be retrieved. */ final protected QuerySource querySource; - protected List queryData; public FileBasedQueryList(QuerySource querySource) { if (querySource == null) { @@ -27,10 +26,6 @@ public FileBasedQueryList(QuerySource querySource) { this.querySource = querySource; } - protected void setQueryData(List queryData) { - this.queryData = queryData; - } - /** * This method returns the amount of queries in the query list. * @@ -59,8 +54,4 @@ public int hashCode() { public abstract String getQuery(int index) throws IOException; public abstract InputStream getQueryStream(int index) throws IOException; - - public QueryData getQueryData(int index) { - return queryData.get(index); - } } diff --git a/src/main/java/org/aksw/iguana/cc/query/list/QueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/QueryList.java index 877a03f9a..7df4dd332 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/QueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/QueryList.java @@ -36,6 +36,4 @@ public interface QueryList { String getQuery(int index) throws IOException; InputStream getQueryStream(int index) throws IOException; - - QueryData getQueryData(int index); } diff --git a/src/main/java/org/aksw/iguana/cc/query/list/impl/FileCachingQueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/impl/FileCachingQueryList.java index 78dbb349c..a804702b4 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/impl/FileCachingQueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/impl/FileCachingQueryList.java @@ -41,7 +41,6 @@ public FileCachingQueryList(QuerySource querySource) throws IOException { queries.add(balos); } } - setQueryData(QueryData.generate(queries.stream().map(ByteArrayListOutputStream::toInputStream).toList())); } @Override diff --git a/src/main/java/org/aksw/iguana/cc/query/list/impl/FileReadingQueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/impl/FileReadingQueryList.java index 045a326df..0999deba5 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/impl/FileReadingQueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/impl/FileReadingQueryList.java @@ -16,11 +16,6 @@ public class FileReadingQueryList extends FileBasedQueryList { public FileReadingQueryList(QuerySource querySource) { super(querySource); - try { - setQueryData(QueryData.generate(querySource)); - } catch (IOException e) { - throw new RuntimeException("Could not read queries from the source.", e); - } } @Override diff --git a/src/main/java/org/aksw/iguana/cc/query/list/impl/StringListQueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/impl/StringListQueryList.java index 8c3dcfc64..71f93d740 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/impl/StringListQueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/impl/StringListQueryList.java @@ -11,11 +11,9 @@ public class StringListQueryList implements QueryList { private final List queries; - private final List queryData; public StringListQueryList(List queries) { this.queries = queries; - this.queryData = QueryData.generate(queries); } @Override @@ -37,9 +35,4 @@ public int size() { public int hashCode() { return queries.hashCode(); } - - @Override - public QueryData getQueryData(int index) { - return queryData.get(index); - } } From d63e4fb8016ab4edf4805d0c1b27da07b3ef9268 Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Tue, 24 Sep 2024 13:51:25 +0200 Subject: [PATCH 05/10] Remove unused methods --- .../org/aksw/iguana/cc/query/QueryData.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/src/main/java/org/aksw/iguana/cc/query/QueryData.java b/src/main/java/org/aksw/iguana/cc/query/QueryData.java index 22c870ce1..02858cb8f 100644 --- a/src/main/java/org/aksw/iguana/cc/query/QueryData.java +++ b/src/main/java/org/aksw/iguana/cc/query/QueryData.java @@ -1,10 +1,7 @@ package org.aksw.iguana.cc.query; -import org.aksw.iguana.cc.query.source.QuerySource; import org.apache.jena.update.UpdateFactory; -import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; @@ -32,19 +29,4 @@ public static List generate(Collection queries) { } return queryData; } - - public static List generate(QuerySource queries) throws IOException { - final var streams = new ArrayList(); - int bound = queries.size(); - for (int i = 0; i < bound; i++) { - InputStream queryStream = queries.getQueryStream(i); - streams.add(queryStream); - } - return generate(streams); - } - - public static List generate(List queries) { - final var streams = queries.stream().map(s -> (InputStream) new ByteArrayInputStream(s.getBytes())).toList(); - return generate(streams); - } } From a382d3e51b191c7985cbeaa3827a816b7db38e5d Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Tue, 24 Sep 2024 18:05:40 +0200 Subject: [PATCH 06/10] Add tests --- .../aksw/iguana/cc/query/QueryDataTest.java | 4 +- .../worker/impl/SPARQLProtocolWorkerTest.java | 83 +++++++++++++++++++ 2 files changed, 84 insertions(+), 3 deletions(-) diff --git a/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java b/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java index 19d6a7481..259dd9aac 100644 --- a/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java +++ b/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java @@ -53,9 +53,7 @@ void testGeneration() throws IOException { final var testStrings = querySource.getAllQueries(); List> generations = List.of( - QueryData.generate(testStrings), - QueryData.generate(testStrings.stream().map(s -> (InputStream) new ByteArrayInputStream(s.getBytes())).toList()), - QueryData.generate(querySource) + QueryData.generate(testStrings.stream().map(s -> (InputStream) new ByteArrayInputStream(s.getBytes())).toList()) ); for (List generation : generations) { assertEquals(4, generation.size()); diff --git a/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java b/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java index b7d4daf71..d6c5911f8 100644 --- a/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java +++ b/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java @@ -57,15 +57,19 @@ public class SPARQLProtocolWorkerTest { .build(); private final static String QUERY = "SELECT * WHERE { ?s ?p ?o }"; + private final static String UPDATE_QUERY = "INSERT DATA { }"; private final static int QUERY_MIXES = 1; private static Path queryFile; + private static Path updateFile; private static final Logger LOGGER = LoggerFactory.getLogger(SPARQLProtocolWorker.class); @BeforeAll public static void setup() throws IOException { queryFile = Files.createTempFile("iguana-test-queries", ".tmp"); + updateFile = Files.createTempFile("iguana-test-updates", ".tmp"); Files.writeString(queryFile, QUERY, StandardCharsets.UTF_8); + Files.writeString(updateFile, QUERY + "\n\n" + UPDATE_QUERY, StandardCharsets.UTF_8); } @BeforeEach @@ -77,6 +81,7 @@ public void reset() { @AfterAll public static void cleanup() throws IOException { Files.deleteIfExists(queryFile); + Files.deleteIfExists(updateFile); SPARQLProtocolWorker.closeHttpClient(); } @@ -120,6 +125,31 @@ public static Stream requestFactoryData() throws URISyntaxException { return workers.stream(); } + public static Stream updateWorkerData() throws IOException { + final var normalEndpoint = URI.create("http://localhost:" + wm.getPort() + "/ds/query"); + final var updateEndpoint = URI.create("http://localhost:" + wm.getPort() + "/ds/update"); + final var processor = new ResponseBodyProcessor("application/sparql-results+json"); + final var format = QueryHandler.Config.Format.SEPARATOR; + final var queryHandler = new QueryHandler(new QueryHandler.Config(updateFile.toAbsolutePath().toString(), format, null, true, QueryHandler.Config.Order.LINEAR, 0L, QueryHandler.Config.Language.SPARQL)); + final var datasetConfig = new DatasetConfig("TestDS", null); + final var connection = new ConnectionConfig("TestConn", "1", datasetConfig, normalEndpoint, new ConnectionConfig.Authentication("testUser", "password"), updateEndpoint, new ConnectionConfig.Authentication("updateUser", "password")); + final var workers = new ArrayDeque(); + for (var requestType : List.of(RequestFactory.RequestType.GET_QUERY, RequestFactory.RequestType.POST_URL_ENC_QUERY, RequestFactory.RequestType.POST_QUERY)) { + final var config = new SPARQLProtocolWorker.Config( + 1, + queryHandler, + new HttpWorker.QueryMixes(QUERY_MIXES), + connection, + Duration.parse("PT6S"), + "application/sparql-results+json", + requestType, + true + ); + workers.add(Arguments.of(Named.of(requestType.name(), new SPARQLProtocolWorker(0, processor, config)))); + } + return workers.stream(); + } + public static List completionTargets() { final var out = new ArrayList(); final var queryMixesAmount = List.of(1, 2, 5, 10, 100, 200); @@ -204,6 +234,59 @@ public void testRequestFactory(SPARQLProtocolWorker worker, boolean cached) { assertNotEquals(Duration.ZERO, result.executionStats().get(0).duration(), "Worker returned zero duration"); } + @ParameterizedTest + @MethodSource("updateWorkerData") + public void testSeparateUpdateEndpoint(SPARQLProtocolWorker worker) { + final var workerConfig = worker.config(); + switch (workerConfig.requestType()) { + case GET_QUERY -> { + wm.stubFor(get(urlPathEqualTo("/ds/query")) + .withQueryParam("query", equalTo(QUERY)) + .withBasicAuth("testUser", "password") + .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body"))); + wm.stubFor(post(urlPathEqualTo("/ds/update")) + .withHeader("Content-Type", equalTo("application/sparql-update")) + .withBasicAuth("updateUser", "password") + .withRequestBody(equalTo(UPDATE_QUERY)) + .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body"))); + } + case POST_URL_ENC_QUERY -> { + wm.stubFor(post(urlPathEqualTo("/ds/query")) + .withHeader("Content-Type", equalTo("application/x-www-form-urlencoded")) + .withBasicAuth("testUser", "password") + .withRequestBody(equalTo("query=" + URLEncoder.encode(QUERY, StandardCharsets.UTF_8))) + .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body"))); + wm.stubFor(post(urlPathEqualTo("/ds/update")) + .withHeader("Content-Type", equalTo("application/x-www-form-urlencoded")) + .withBasicAuth("updateUser", "password") + .withRequestBody(equalTo("update=" + URLEncoder.encode(UPDATE_QUERY, StandardCharsets.UTF_8))) + .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body"))); + } + case POST_QUERY -> { + wm.stubFor(post(urlPathEqualTo("/ds/query")) + .withHeader("Content-Type", equalTo("application/sparql-query")) + .withBasicAuth("testUser", "password") + .withRequestBody(equalTo(QUERY)) + .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body"))); + wm.stubFor(post(urlPathEqualTo("/ds/update")) + .withHeader("Content-Type", equalTo("application/sparql-update")) + .withBasicAuth("updateUser", "password") + .withRequestBody(equalTo(UPDATE_QUERY)) + .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body"))); + } + } + final HttpWorker.Result result = worker.start().join(); + assertEquals(result.executionStats().size(), QUERY_MIXES * 2, "Worker should have executed only 1 query"); + for (var res : result.executionStats()) { + assertNull(res.error().orElse(null), "Worker threw an exception, during execution"); + assertEquals(200, res.httpStatusCode().get(), "Worker returned wrong status code"); + assertNotEquals(0, res.responseBodyHash().getAsLong(), "Worker didn't return a response body hash"); + assertEquals("Non-Empty-Body".getBytes(StandardCharsets.UTF_8).length, res.contentLength().getAsLong(), "Worker returned wrong content length"); + assertNotEquals(Duration.ZERO, res.duration(), "Worker returned zero duration"); + } + + } + @DisplayName("Test Malformed Response Processing") @ParameterizedTest(name = "[{index}] fault = {0}") @EnumSource(Fault.class) From 21cc30410a208322329fc913151534cb4f22de94 Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Tue, 24 Sep 2024 18:05:46 +0200 Subject: [PATCH 07/10] Fix authentication --- .../org/aksw/iguana/cc/utils/http/RequestFactory.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java b/src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java index 3392c31b7..e0853166e 100644 --- a/src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java +++ b/src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java @@ -148,10 +148,15 @@ public AsyncRequestProducer buildHttpRequest(QueryHandler.QueryStreamWrapper que // set additional headers if (acceptHeader != null) asyncRequestBuilder.addHeader("Accept", acceptHeader); - if (connectionConfig.authentication() != null && connectionConfig.authentication().user() != null) + if (queryHandle.update() && connectionConfig.updateAuthentication() != null && connectionConfig.updateAuthentication().user() != null) { asyncRequestBuilder.addHeader("Authorization", - HttpWorker.basicAuth(connectionConfig.authentication().user(), - Optional.ofNullable(connectionConfig.authentication().password()).orElse(""))); + HttpWorker.basicAuth(connectionConfig.updateAuthentication().user(), + Optional.ofNullable(connectionConfig.updateAuthentication().password()).orElse(""))); + } else if (connectionConfig.authentication() != null && connectionConfig.authentication().user() != null) { + asyncRequestBuilder.addHeader("Authorization", + HttpWorker.basicAuth(connectionConfig.authentication().user(), + Optional.ofNullable(connectionConfig.authentication().password()).orElse(""))); + } // cache request if (caching) From 870d9d96a73aace5a1eb4470ff6c633c96643f9a Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Tue, 24 Sep 2024 18:06:20 +0200 Subject: [PATCH 08/10] Cleanup --- src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java | 1 - .../aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java b/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java index 259dd9aac..3db404fcf 100644 --- a/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java +++ b/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java @@ -11,7 +11,6 @@ import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.List; import static org.junit.jupiter.api.Assertions.*; diff --git a/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java b/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java index d6c5911f8..6d9842fa7 100644 --- a/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java +++ b/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java @@ -290,7 +290,7 @@ public void testSeparateUpdateEndpoint(SPARQLProtocolWorker worker) { @DisplayName("Test Malformed Response Processing") @ParameterizedTest(name = "[{index}] fault = {0}") @EnumSource(Fault.class) - public void testMalformedResponseProcessing(Fault fault) throws IOException, URISyntaxException { + public void testMalformedResponseProcessing(Fault fault) throws URISyntaxException { SPARQLProtocolWorker worker = (SPARQLProtocolWorker) ((Named)requestFactoryData().toList().get(0).get()[0]).getPayload(); wm.stubFor(get(urlPathEqualTo("/ds/query")) .willReturn(aResponse().withFault(fault))); @@ -300,7 +300,7 @@ public void testMalformedResponseProcessing(Fault fault) throws IOException, URI } @Test - public void testBadHttpCodeResponse() throws IOException, URISyntaxException { + public void testBadHttpCodeResponse() throws URISyntaxException { SPARQLProtocolWorker worker = (SPARQLProtocolWorker) ((Named)requestFactoryData().toList().get(0).get()[0]).getPayload(); wm.stubFor(get(urlPathEqualTo("/ds/query")) .willReturn(aResponse().withStatus(404))); From fc12b2317bd0b55d8d987adf22166224ecb794bd Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Wed, 25 Sep 2024 18:07:28 +0200 Subject: [PATCH 09/10] Remove unused import statements --- .../java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java | 2 -- src/main/java/org/aksw/iguana/cc/query/list/QueryList.java | 3 --- .../aksw/iguana/cc/query/list/impl/FileCachingQueryList.java | 1 - .../aksw/iguana/cc/query/list/impl/FileReadingQueryList.java | 1 - .../aksw/iguana/cc/query/list/impl/StringListQueryList.java | 1 - 5 files changed, 8 deletions(-) diff --git a/src/main/java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java index 013093fe7..0256fee53 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/FileBasedQueryList.java @@ -1,11 +1,9 @@ package org.aksw.iguana.cc.query.list; -import org.aksw.iguana.cc.query.QueryData; import org.aksw.iguana.cc.query.source.QuerySource; import java.io.IOException; import java.io.InputStream; -import java.util.List; /** * The abstract class for a QueryList. A query list provides the queries to the QueryHandler. diff --git a/src/main/java/org/aksw/iguana/cc/query/list/QueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/QueryList.java index 7df4dd332..3f9f2a788 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/QueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/QueryList.java @@ -1,8 +1,5 @@ package org.aksw.iguana.cc.query.list; -import org.aksw.iguana.cc.query.QueryData; -import org.aksw.iguana.cc.query.source.QuerySource; - import java.io.IOException; import java.io.InputStream; diff --git a/src/main/java/org/aksw/iguana/cc/query/list/impl/FileCachingQueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/impl/FileCachingQueryList.java index a804702b4..8f6c3a38d 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/impl/FileCachingQueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/impl/FileCachingQueryList.java @@ -1,6 +1,5 @@ package org.aksw.iguana.cc.query.list.impl; -import org.aksw.iguana.cc.query.QueryData; import org.aksw.iguana.cc.query.list.FileBasedQueryList; import org.aksw.iguana.cc.query.source.QuerySource; import org.aksw.iguana.commons.io.ByteArrayListInputStream; diff --git a/src/main/java/org/aksw/iguana/cc/query/list/impl/FileReadingQueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/impl/FileReadingQueryList.java index 0999deba5..e3074be04 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/impl/FileReadingQueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/impl/FileReadingQueryList.java @@ -1,6 +1,5 @@ package org.aksw.iguana.cc.query.list.impl; -import org.aksw.iguana.cc.query.QueryData; import org.aksw.iguana.cc.query.list.FileBasedQueryList; import org.aksw.iguana.cc.query.source.QuerySource; diff --git a/src/main/java/org/aksw/iguana/cc/query/list/impl/StringListQueryList.java b/src/main/java/org/aksw/iguana/cc/query/list/impl/StringListQueryList.java index 71f93d740..1d448940e 100644 --- a/src/main/java/org/aksw/iguana/cc/query/list/impl/StringListQueryList.java +++ b/src/main/java/org/aksw/iguana/cc/query/list/impl/StringListQueryList.java @@ -1,6 +1,5 @@ package org.aksw.iguana.cc.query.list.impl; -import org.aksw.iguana.cc.query.QueryData; import org.aksw.iguana.cc.query.list.QueryList; import java.io.ByteArrayInputStream; From ba1b51b172b83990b15392244132748b16014ef2 Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Fri, 11 Oct 2024 12:18:01 +0200 Subject: [PATCH 10/10] Add chained update request as test case for Update queries distinction --- .../java/org/aksw/iguana/cc/query/QueryDataTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java b/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java index 3db404fcf..63b56d249 100644 --- a/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java +++ b/src/test/java/org/aksw/iguana/cc/query/QueryDataTest.java @@ -38,6 +38,12 @@ public static void setup() throws IOException { SELECT ?s ?p ?o WHERE { ?s ?p ?o } + + INSERT DATA { + + }; INSERT DATA { + + } """); } @@ -55,11 +61,12 @@ void testGeneration() throws IOException { QueryData.generate(testStrings.stream().map(s -> (InputStream) new ByteArrayInputStream(s.getBytes())).toList()) ); for (List generation : generations) { - assertEquals(4, generation.size()); + assertEquals(5, generation.size()); assertFalse(generation.get(0).update()); assertTrue(generation.get(1).update()); assertTrue(generation.get(2).update()); assertFalse(generation.get(3).update()); + assertTrue(generation.get(4).update()); } } } \ No newline at end of file