diff --git a/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java b/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java index d9cbdb335..3bf41a02e 100644 --- a/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java +++ b/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java @@ -164,6 +164,7 @@ public record QueryStreamWrapper(int index, boolean cached, Supplier queryData; private int workerCount = 0; // give every worker inside the same worker config an offset seed + private int totalWorkerCount = 0; final protected int hashCode; @@ -201,6 +202,10 @@ public QueryHandler(Config config) throws IOException { }).collect(Collectors.toList())); } + public void setTotalWorkerCount(int workers) { + this.totalWorkerCount = workers; + } + private QueryList initializeTemplateQueryHandler(QuerySource templateSource) throws IOException { QuerySource querySource = templateSource; final var originalPath = templateSource.getPath(); @@ -253,7 +258,7 @@ private QuerySource createQuerySource(Path path) throws IOException { public QuerySelector getQuerySelectorInstance() { switch (config.order()) { - case LINEAR -> { return new LinearQuerySelector(queryList.size()); } + case LINEAR -> { return new LinearQuerySelector(queryList.size(), totalWorkerCount != 0 ? (queryList.size() * workerCount++) / totalWorkerCount : 0); } case RANDOM -> { return new RandomQuerySelector(queryList.size(), config.seed() + workerCount++); } } diff --git a/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java b/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java index 3d3faad32..60c0ae0ed 100644 --- a/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java +++ b/src/main/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelector.java @@ -14,25 +14,30 @@ public class LinearQuerySelector extends QuerySelector { public LinearQuerySelector(int size) { super(size); - index = -1; + index = 0; + } + + public LinearQuerySelector(int size, int startIndex) { + super(size); + index = startIndex; } @Override public int getNextIndex() { - index++; if (index >= this.size) { index = 0; } - return index; + return index++; } /** - * Return the current index. This is the index of the last returned query. If no query was returned yet, it returns - * -1. - * @return + * Return the current index. This is the index of the last returned query. + * If no query was returned yet, the method will return -1. + * + * @return the current index */ @Override public int getCurrentIndex() { - return index; + return index - 1; } } diff --git a/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java b/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java index 1e93882e1..ad36af220 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java +++ b/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.aksw.iguana.cc.metrics.Metric; +import org.aksw.iguana.cc.query.handler.QueryHandler; import org.aksw.iguana.cc.storage.Storage; import org.aksw.iguana.cc.tasks.Task; import org.aksw.iguana.cc.worker.HttpWorker; @@ -44,6 +45,15 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody // initialize workers if (config.warmupWorkers() != null) { + // initialize query handlers + // count the number of workers for each query handler + final var queryHandlers = config.warmupWorkers.stream().map(HttpWorker.Config::queries).distinct().toList(); + queryHandlers.stream().map(qh1 -> + List.of(qh1, config.warmupWorkers.stream() + .map(HttpWorker.Config::queries) + .filter(qh1::equals) + .count())) + .forEach(list -> ((QueryHandler) list.get(0)).setTotalWorkerCount((int) (long) list.get(1))); long workerId = 0; for (HttpWorker.Config workerConfig : config.warmupWorkers()) { for (int i = 0; i < workerConfig.number(); i++) { @@ -54,6 +64,15 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody } for (HttpWorker.Config workerConfig : config.workers()) { + // initialize query handlers + // count the number of workers for each query handler + final var queryHandlers = config.workers.stream().map(HttpWorker.Config::queries).distinct().toList(); + queryHandlers.stream().map(qh1 -> + List.of(qh1, config.workers.stream() + .filter(w -> w.queries().equals(qh1)) + .mapToInt(HttpWorker.Config::number) + .sum())) + .forEach(list -> ((QueryHandler) list.get(0)).setTotalWorkerCount((int) list.get(1))); long workerId = 0; for (int i = 0; i < workerConfig.number(); i++) { var responseBodyProcessor = (workerConfig.parseResults()) ? responseBodyProcessorInstances.getProcessor(workerConfig.acceptHeader()) : null; @@ -83,10 +102,9 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody public void run() { if (!warmupWorkers.isEmpty()) { SPARQLProtocolWorker.initHttpClient(warmupWorkers.size()); - var warmupResults = executeWorkers(warmupWorkers); // warmup results will be dismissed + executeWorkers(warmupWorkers); // warmup results will be dismissed SPARQLProtocolWorker.closeHttpClient(); } - SPARQLProtocolWorker.initHttpClient(workers.size()); var results = executeWorkers(workers); SPARQLProtocolWorker.closeHttpClient(); diff --git a/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java b/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java index 565763f68..a4e841039 100644 --- a/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java +++ b/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java @@ -270,7 +270,7 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) { try { request = requestFactory.buildHttpRequest(queryHandle); } catch (IOException | URISyntaxException e) { - return createFailedResultBeforeRequest(config.queries().getQuerySelectorInstance().getCurrentIndex(), e); + return createFailedResultBeforeRequest(querySelector.getCurrentIndex(), e); } // execute the request diff --git a/src/test/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelectorTest.java b/src/test/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelectorTest.java index ca508685b..c52331c99 100644 --- a/src/test/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelectorTest.java +++ b/src/test/java/org/aksw/iguana/cc/query/selector/impl/LinearQuerySelectorTest.java @@ -26,4 +26,18 @@ public void ThrowOnLinearQuerySelectorSizeZero() { final var size = 0; assertThrows(IllegalArgumentException.class, () -> new LinearQuerySelector(size)); } + + @Test + public void testStartingIndex() { + final var size = 5; + final var startIndex = 3; + final var linearQuerySelector = new LinearQuerySelector(size, startIndex); + // -1, because the next index hasn't been requested yet + assertEquals(startIndex - 1, linearQuerySelector.getCurrentIndex()); + for (int i = 0; i < 10; i++) { + int currentIndex = linearQuerySelector.getNextIndex(); + assertEquals((i + startIndex) % size, currentIndex); + assertEquals(currentIndex, linearQuerySelector.getCurrentIndex()); + } + } } \ No newline at end of file