Skip to content

Commit

Permalink
add better error handling and logging for the SPARQLProtocolWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
nck-mlcnv committed Sep 4, 2023
1 parent d745dc9 commit 29b84b1
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,13 @@ public record ConnectionConfig(
URI updateEndpoint,
DatasetConfig dataset
) {
private static final Logger LOGGER = LoggerFactory.getLogger(MainController.class);

public static class URIDeserializer extends JsonDeserializer<URI> {

@Override
public URI deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
try {
return URI.create(p.getValueAsString());
} catch (IllegalArgumentException e) {
LOGGER.error("The given URL {} is malformed. ", p, e);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Config(@JsonProperty(required = true) String path, Format format, Boolean
this.format = format == null ? Format.ONE_PER_LINE : format;
this.caching = caching == null || caching;
this.order = order == null ? Order.LINEAR : order;
this.seed = seed;
this.seed = seed == null ? 0 : seed; // TODO: every worker should maybe have different seeds, based on their workerid
this.lang = lang == null ? Language.SPARQL : lang;
}

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/aksw/iguana/cc/suite/Suite.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import org.aksw.iguana.cc.config.elements.ConnectionConfig;
import org.aksw.iguana.cc.config.elements.DatasetConfig;
import org.aksw.iguana.cc.config.elements.StorageConfig;
import org.aksw.iguana.cc.metrics.Metric;
import org.aksw.iguana.cc.metrics.MetricManager;
import org.aksw.iguana.cc.metrics.impl.*;
import org.aksw.iguana.cc.tasks.impl.Stresstest;
import org.aksw.iguana.cc.tasks.Task;
import org.aksw.iguana.cc.worker.ResponseBodyProcessorInstances;
Expand Down Expand Up @@ -37,6 +40,9 @@ public record Config(
long taskID = 0;
responseBodyProcessorInstances = new ResponseBodyProcessorInstances();

// TODO: maybe add this to the configd
MetricManager.setMetrics(List.of(new QPS(), new AvgQPS(), new NoQPH(), new AggregatedExecutionStatistics(), new EachExecutionStatistic()));

for (Task.Config task : config.tasks()) {
if (task instanceof Stresstest.Config) {
tasks.add(new Stresstest(taskID++, (Stresstest.Config) task, responseBodyProcessorInstances, this.config.storages)); // TODO: look for a better way to add the storages
Expand Down
13 changes: 0 additions & 13 deletions src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import org.aksw.iguana.cc.config.elements.StorageConfig;
import org.aksw.iguana.cc.metrics.MetricManager;
import org.aksw.iguana.cc.metrics.impl.AggregatedExecutionStatistics;
import org.aksw.iguana.cc.metrics.impl.AvgQPS;
import org.aksw.iguana.cc.metrics.impl.NoQPH;
import org.aksw.iguana.cc.metrics.impl.QPS;
import org.aksw.iguana.cc.storage.Storage;
import org.aksw.iguana.cc.storage.impl.CSVStorage;
import org.aksw.iguana.cc.storage.impl.RDFFileStorage;
Expand All @@ -32,11 +27,6 @@ public record Config(
@JsonProperty(required = true) List<HttpWorker.Config> workers
) implements Task.Config {}

public record PhaseExecutionConfig(
String name,
List<HttpWorker.Config> workers
) {}

public record Result(
List<HttpWorker.Result> workerResults,
Calendar startTime,
Expand Down Expand Up @@ -102,9 +92,6 @@ public void run() {

// TODO: language processor

// TODO: maybe add this to the configd
MetricManager.setMetrics(List.of(new QPS(), new AvgQPS(), new NoQPH(), new AggregatedExecutionStatistics()));

// TODO: suiteID
StresstestResultProcessor srp = new StresstestResultProcessor(
0L,
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/org/aksw/iguana/cc/worker/HttpWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import org.aksw.iguana.cc.tasks.impl.Stresstest;
import org.aksw.iguana.cc.worker.impl.SPARQLProtocolWorker;

import java.net.http.HttpTimeoutException;
import java.time.Duration;
import java.time.Instant;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* Interface for the Worker Thread used in the {@link Stresstest}
Expand Down Expand Up @@ -102,11 +104,14 @@ public boolean successful() {
}

public boolean timeout() {
boolean timeout = false;
if (!successful() && error().isPresent()) {
return error().get() instanceof java.net.SocketTimeoutException;
} else {
return false;
timeout |= error().get() instanceof java.util.concurrent.TimeoutException;
if (error().get() instanceof ExecutionException exec) {
timeout |= exec.getCause() instanceof HttpTimeoutException;
}
}
return timeout;
}

public boolean httpError() {
Expand Down
102 changes: 67 additions & 35 deletions src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import org.aksw.iguana.cc.worker.HttpWorker;
import org.aksw.iguana.commons.io.BigByteArrayOutputStream;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -27,7 +30,6 @@

public class SPARQLProtocolWorker extends HttpWorker {


public final static class RequestFactory {
public enum RequestType {
GET_QUERY("get query"),
Expand Down Expand Up @@ -112,7 +114,6 @@ public HttpRequest buildHttpRequest(InputStream queryStream,
}


// @JsonTypeName("SPARQLProtocolWorker")
public record Config(
Integer number,
QueryHandler queries,
Expand All @@ -121,7 +122,7 @@ public record Config(
Duration timeout,
String acceptHeader /* e.g. application/sparql-results+json */,
RequestFactory.RequestType requestType,
boolean parseResults // TODO: integrate this
boolean parseResults
) implements HttpWorker.Config {
public Config(Integer number,
@JsonProperty(required = true) QueryHandler queries,
Expand Down Expand Up @@ -178,6 +179,8 @@ public boolean successful() {
// used to read the http response body
private byte[] buffer = new byte[4096];

private final static Logger LOGGER = LoggerFactory.getLogger(SPARQLProtocolWorker.class);

@Override
public Config config() {
return (SPARQLProtocolWorker.Config) config;
Expand All @@ -196,38 +199,38 @@ public SPARQLProtocolWorker(long workerId, ResponseBodyProcessor responseBodyPro
public CompletableFuture<Result> start() {
return CompletableFuture.supplyAsync(() -> {
List<ExecutionStats> executionStats = new ArrayList<>();
try {
if (config().completionTarget() instanceof QueryMixes queryMixes) {
for (int i = 0; i < queryMixes.number(); i++) {
for (int j = 0; j < config().queries().getQueryCount(); j++) {
ExecutionStats execution = executeQuery(config().timeout(), false);
if (execution != null)
executionStats.add(execution);
if (config().completionTarget() instanceof QueryMixes queryMixes) {
for (int i = 0; i < queryMixes.number(); i++) {
for (int j = 0; j < config().queries().getQueryCount(); j++) {
ExecutionStats execution = executeQuery(config().timeout(), false);
if (execution != null) {
logExecution(execution);
executionStats.add(execution);
}

}
} else if (config().completionTarget() instanceof TimeLimit timeLimit) {
final Instant endTime = Instant.now().plus(timeLimit.duration());
Instant now;
while ((now = Instant.now()).isBefore(endTime)) {
final Duration timeToEnd = Duration.between(now, endTime);
final boolean timeoutBeforeEnd = config().timeout().compareTo(timeToEnd) < 0;
final Duration thisQueryTimeOut = (timeoutBeforeEnd) ? config().timeout() : timeToEnd;
// If timeoutBeforeEnd is false, fail shouldn't be counted as timeout
ExecutionStats execution = executeQuery(thisQueryTimeOut, !timeoutBeforeEnd);
if (execution != null)
executionStats.add(execution);
LOGGER.info("{}\t:: Completed {} out of {} querymixes", this, i + 1, queryMixes.number());
}
} else if (config().completionTarget() instanceof TimeLimit timeLimit) {
final Instant endTime = Instant.now().plus(timeLimit.duration());
Instant now;
while ((now = Instant.now()).isBefore(endTime)) {
final Duration timeToEnd = Duration.between(now, endTime);
final boolean timeoutBeforeEnd = config().timeout().compareTo(timeToEnd) < 0;
final Duration thisQueryTimeOut = (timeoutBeforeEnd) ? config().timeout() : timeToEnd;
// If timeoutBeforeEnd is false, fail shouldn't be counted as timeout
ExecutionStats execution = executeQuery(thisQueryTimeOut, !timeoutBeforeEnd);
if (execution != null){
logExecution(execution);
executionStats.add(execution);
}
}
} catch (IOException | URISyntaxException e) {
throw new RuntimeException(e); // TODO: better error handling
LOGGER.info("{}\t:: Reached time limit of {}.", this, timeLimit.duration());
}

return new Result(this.workerID, executionStats);
}, executor);
}

private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure) throws IOException, URISyntaxException {
private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure) {
HttpExecutionResult result = executeHttpRequest(timeout);
Optional<Integer> statuscode = Optional.empty();
if (result.response().isPresent())
Expand All @@ -244,6 +247,7 @@ private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure)
this.responseBodybbaos.reset();

if (!result.completed() && discardOnFailure) {
LOGGER.debug("{}\t:: Discarded execution, because the time limit has been reached: [queryID={}]", this, result.queryID);
return null;
}

Expand All @@ -259,14 +263,29 @@ private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure)
}


private HttpExecutionResult executeHttpRequest(Duration timeout) throws IOException, URISyntaxException {
final QueryHandler.QueryStreamWrapper queryHandle = config().queries().getNextQueryStream();
final HttpRequest request = requestFactory.buildHttpRequest(
queryHandle.queryInputStream(),
timeout,
config().connection(),
config().acceptHeader()
);
private HttpExecutionResult executeHttpRequest(Duration timeout) {
final var queryHandle = config().queries().getNextQueryStreamSupplier();
final HttpRequest request;

try {
request = requestFactory.buildHttpRequest(
queryHandle.queryStreamSupplier(),
timeout,
config().connection(),
config().acceptHeader()
);
} catch (IOException | URISyntaxException e) {
return new HttpExecutionResult(
queryHandle.index(),
Optional.empty(),
Instant.now(),
Duration.ZERO,
Optional.empty(),
OptionalLong.empty(),
OptionalLong.empty(),
Optional.of(e)
);
}

if (((ThreadPoolExecutor) this.httpClient.executor().get()).getActiveCount() != 0) {
((ThreadPoolExecutor) this.httpClient.executor().get()).shutdownNow();
Expand Down Expand Up @@ -322,7 +341,6 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) throws IOExcept
);
}
} else {
bodyStream.close();
return createFailedResult.apply(httpResponse, null);
}
} catch (IOException ex) {
Expand All @@ -341,4 +359,18 @@ private HttpClient buildHttpClient() {
.connectTimeout(config().timeout())
.build();
}

private void logExecution(ExecutionStats execution) {
switch (execution.endState()) {
case SUCCESS -> LOGGER.debug("{}\t:: Successfully executed query: [queryID={}].", this, execution.queryID());
case TIMEOUT -> LOGGER.warn("{}\t:: Timeout during query execution: [queryID={}, duration={}].", this, execution.queryID(), execution.duration()); // TODO: look for a possibility to add the query string for logging
case HTTP_ERROR -> LOGGER.warn("{}\t:: HTTP Error occurred during query execution: [queryID={}, httpError={}].", this, execution.queryID(), execution.httpStatusCode().get());
case MISCELLANEOUS_EXCEPTION -> LOGGER.warn("{}\t:: Miscellaneous exception occurred during query execution: [queryID={}, exception={}].", this, execution.queryID(), execution.error().get());
}
}

@Override
public String toString() {
return MessageFormatter.format("[{}-{}]", SPARQLProtocolWorker.class.getSimpleName(), this.workerID).getMessage();
}
}

0 comments on commit 29b84b1

Please sign in to comment.