Skip to content

Commit

Permalink
Introduced query timeout parameter (#211)
Browse files Browse the repository at this point in the history
* If unspecified, a cluster-level default value of 20 minutes will be used as the timeout
* If unspecified, a query-level default value of 1 minute will be used as the timeout
* The Postgres implementation simply continues to ignore the query options
* Minimum of cluster-level value and the query-level value will be used
* Redirect the custom metric reporting to secondary nodes always
* Use a query-level timeout override of 20 minutes for reporting custom metrics
  • Loading branch information
suresh-prakash authored Sep 25, 2024
1 parent 72cc0d2 commit 0dc1bec
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 98 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.hypertrace.core.documentstore;

import static org.hypertrace.core.documentstore.model.options.QueryOptions.DEFAULT_QUERY_OPTIONS;

import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -102,7 +104,8 @@ public interface Collection {
/**
* Search for documents matching the query.
*
* @deprecated Use {@link #aggregate(org.hypertrace.core.documentstore.query.Query)} instead
* @deprecated Use {@link #query(org.hypertrace.core.documentstore.query.Query, QueryOptions)}
* instead
* @param query filter to query matching documents
* @return {@link CloseableIterator} of matching documents
*/
Expand All @@ -115,7 +118,8 @@ public interface Collection {
*
* @param query The query definition to find
* @return {@link CloseableIterator} of matching documents
* @deprecated Use {@link #aggregate(org.hypertrace.core.documentstore.query.Query)} instead
* @deprecated Use {@link #query(org.hypertrace.core.documentstore.query.Query, QueryOptions)}
* instead
*/
@Deprecated(forRemoval = true)
CloseableIterator<Document> find(final org.hypertrace.core.documentstore.query.Query query);
Expand All @@ -126,7 +130,10 @@ public interface Collection {
* @param query The aggregate query specification
* @return {@link CloseableIterator} of matching documents
*/
CloseableIterator<Document> aggregate(final org.hypertrace.core.documentstore.query.Query query);
default CloseableIterator<Document> aggregate(
final org.hypertrace.core.documentstore.query.Query query) {
return query(query, DEFAULT_QUERY_OPTIONS);
}

/**
* Query the documents conforming to the query specification.
Expand Down Expand Up @@ -197,7 +204,21 @@ CloseableIterator<Document> query(
* @param query The query definition whose result-set size is to be determined
* @return The number of documents conforming to the input query
*/
long count(final org.hypertrace.core.documentstore.query.Query query);
default long count(final org.hypertrace.core.documentstore.query.Query query) {
return count(query, DEFAULT_QUERY_OPTIONS);
}

/**
* Count the result-set size of executing the given query. Note that this method is a generic
* version of {@link #count()}, {@link #count(org.hypertrace.core.documentstore.query.Query)} and
* {@link #total(Query)}
*
* @param query The query definition whose result-set size is to be determined
* @param queryOptions The query options to be used while fetching the results
* @return The number of documents conforming to the input query
*/
long count(
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions);

/**
* @param documents to be upserted in bulk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.typesafe.config.Config;
import java.time.Duration;
import org.hypertrace.core.documentstore.model.config.AggregatePipelineMode;
import org.hypertrace.core.documentstore.model.config.DatabaseType;
import org.hypertrace.core.documentstore.model.config.DatastoreConfig;
Expand Down Expand Up @@ -32,7 +33,8 @@ public DatastoreConfig convert(final Config config) {
null,
null,
AggregatePipelineMode.DEFAULT_ALWAYS,
DataFreshness.SYSTEM_DEFAULT) {
DataFreshness.SYSTEM_DEFAULT,
Duration.ofMinutes(20)) {
public MongoClientSettings toSettings() {
final MongoClientSettings.Builder settingsBuilder =
MongoClientSettings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toUnmodifiableMap;
import static org.hypertrace.core.documentstore.model.config.CustomMetricConfig.VALUE_KEY;
import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REALTIME_FRESHNESS;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.TextNode;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -25,6 +27,7 @@
import org.hypertrace.core.documentstore.Datastore;
import org.hypertrace.core.documentstore.Document;
import org.hypertrace.core.documentstore.model.config.CustomMetricConfig;
import org.hypertrace.core.documentstore.model.options.QueryOptions;
import org.hypertrace.core.documentstore.query.Query;
import org.hypertrace.core.documentstore.query.SelectionSpec;

Expand Down Expand Up @@ -55,63 +58,70 @@ public List<DocStoreMetric> getCustomMetrics(final CustomMetricConfig customMetr
});

final Collection collection = dataStore.getCollection(collectionName);
final CloseableIterator<Document> iterator = collection.aggregate(query);

final List<DocStoreMetric> metrics = new ArrayList<>();

while (iterator.hasNext()) {
final Document document = iterator.next();
final JsonNode node;
try (final CloseableIterator<Document> iterator =
collection.query(
query,
QueryOptions.builder()
.dataFreshness(NEAR_REALTIME_FRESHNESS)
.queryTimeout(Duration.ofMinutes(20))
.build())) {

try {
node = mapper.readTree(document.toJson());
} catch (final JsonProcessingException e) {
log.warn(
"Invalid JSON document {} for metric {} with query {}",
document.toJson(),
customMetricConfig.metricName(),
query);
continue;
}
while (iterator.hasNext()) {
final Document document = iterator.next();
final JsonNode node;

final double metricValue;
if (node.has(VALUE_KEY)) {
metricValue = node.get(VALUE_KEY).doubleValue();
} else {
log.warn(
"No value found in JSON document {} for metric {} with query {}",
document.toJson(),
customMetricConfig.metricName(),
query);
continue;
}
try {
node = mapper.readTree(document.toJson());
} catch (final JsonProcessingException e) {
log.warn(
"Invalid JSON document {} for metric {} with query {}",
document.toJson(),
customMetricConfig.metricName(),
query);
continue;
}

Map<String, JsonNode> jsonNodeMap =
query.getSelections().stream()
.collect(
Collectors.toUnmodifiableMap(
SelectionSpec::getAlias,
selectionSpec ->
Optional.ofNullable(node.get(selectionSpec.getAlias()))
.orElse(TextNode.valueOf(NULL_LABEL_VALUE_PLACEHOLDER))));
final double metricValue;
if (node.has(VALUE_KEY)) {
metricValue = node.get(VALUE_KEY).doubleValue();
} else {
log.warn(
"No value found in JSON document {} for metric {} with query {}",
document.toJson(),
customMetricConfig.metricName(),
query);
continue;
}

final Map<String, String> labels =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
jsonNodeMap.entrySet().iterator(), Spliterator.ORDERED),
false)
.filter(entry -> !VALUE_KEY.equals(entry.getKey()))
.collect(
toUnmodifiableMap(
Entry::getKey, entry -> getStringLabelValue(entry.getValue())));
Map<String, JsonNode> jsonNodeMap =
query.getSelections().stream()
.collect(
Collectors.toUnmodifiableMap(
SelectionSpec::getAlias,
selectionSpec ->
Optional.ofNullable(node.get(selectionSpec.getAlias()))
.orElse(TextNode.valueOf(NULL_LABEL_VALUE_PLACEHOLDER))));

final DocStoreMetric metric =
DocStoreMetric.builder()
.name(customMetricConfig.metricName())
.value(metricValue)
.labels(labels)
.build();
metrics.add(metric);
final Map<String, String> labels =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
jsonNodeMap.entrySet().iterator(), Spliterator.ORDERED),
false)
.filter(entry -> !VALUE_KEY.equals(entry.getKey()))
.collect(
toUnmodifiableMap(
Entry::getKey, entry -> getStringLabelValue(entry.getValue())));

final DocStoreMetric metric =
DocStoreMetric.builder()
.name(customMetricConfig.metricName())
.value(metricValue)
.labels(labels)
.build();
metrics.add(metric);
}
}

log.debug("Returning metrics: {}", metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static java.util.Collections.unmodifiableList;

import com.google.common.base.Preconditions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -32,6 +33,7 @@ public class ConnectionConfig {
@Nullable ConnectionCredentials credentials;
@NonNull AggregatePipelineMode aggregationPipelineMode;
@NonNull DataFreshness dataFreshness;
@NonNull Duration queryTimeout;

public ConnectionConfig(
@NonNull List<@NonNull Endpoint> endpoints,
Expand All @@ -42,7 +44,8 @@ public ConnectionConfig(
database,
credentials,
AggregatePipelineMode.DEFAULT_ALWAYS,
DataFreshness.SYSTEM_DEFAULT);
DataFreshness.SYSTEM_DEFAULT,
Duration.ofMinutes(20));
}

public static ConnectionConfigBuilder builder() {
Expand All @@ -63,6 +66,7 @@ public static class ConnectionConfigBuilder {
ConnectionPoolConfig connectionPoolConfig;
AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS;
DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT;
Duration queryTimeout = Duration.ofMinutes(20);

public ConnectionConfigBuilder type(final DatabaseType type) {
this.type = type;
Expand Down Expand Up @@ -91,7 +95,8 @@ public ConnectionConfig build() {
replicaSet,
connectionPoolConfig,
aggregationPipelineMode,
dataFreshness);
dataFreshness,
queryTimeout);

case POSTGRES:
return new PostgresConnectionConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.mongodb.ServerAddress;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ConnectionPoolSettings;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -51,13 +52,15 @@ public MongoConnectionConfig(
@Nullable final String replicaSetName,
@Nullable final ConnectionPoolConfig connectionPoolConfig,
@NonNull final AggregatePipelineMode aggregationPipelineMode,
@NonNull final DataFreshness dataFreshness) {
@NonNull final DataFreshness dataFreshness,
@NonNull final Duration queryTimeout) {
super(
ensureAtLeastOneEndpoint(endpoints),
getDatabaseOrDefault(database),
getCredentialsOrDefault(credentials, database),
aggregationPipelineMode,
dataFreshness);
dataFreshness,
queryTimeout);
this.applicationName = applicationName;
this.replicaSetName = replicaSetName;
this.connectionPoolConfig = getConnectionPoolConfigOrDefault(connectionPoolConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.hypertrace.core.documentstore.model.options;

import java.time.Duration;
import lombok.Builder;
import lombok.Builder.Default;
import lombok.Value;
Expand All @@ -12,4 +13,5 @@ public class QueryOptions {
public static final QueryOptions DEFAULT_QUERY_OPTIONS = QueryOptions.builder().build();

@Default DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT;
@Default Duration queryTimeout = Duration.ofMinutes(1);
}
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,6 @@ public CloseableIterator<Document> find(
return convertToDocumentIterator(queryExecutor.find(query));
}

@Override
public CloseableIterator<Document> aggregate(
final org.hypertrace.core.documentstore.query.Query query) {
return convertToDocumentIterator(queryExecutor.aggregate(query));
}

@Override
public CloseableIterator<Document> query(
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) {
Expand Down Expand Up @@ -579,8 +573,9 @@ public CloseableIterator<Document> bulkUpdate(
}

@Override
public long count(org.hypertrace.core.documentstore.query.Query query) {
return queryExecutor.count(query);
public long count(
final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) {
return queryExecutor.count(query, queryOptions);
}

@Override
Expand Down
Loading

0 comments on commit 0dc1bec

Please sign in to comment.