Skip to content

Commit

Permalink
fix: Use UNION for batch get rather than OR. (#86)
Browse files Browse the repository at this point in the history
UNION ALL can be much more performant than OR, and the mysql team at LI has indicated this query is causing performance issues. Switch to UNION ALL on their guidance.
  • Loading branch information
John Plaisted authored Mar 16, 2021
1 parent dff90cd commit 5592384
Show file tree
Hide file tree
Showing 2 changed files with 333 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import io.ebean.ExpressionList;
import io.ebean.PagedList;
import io.ebean.Query;
import io.ebean.RawSql;
import io.ebean.RawSqlBuilder;
import io.ebean.Transaction;
import io.ebean.config.ServerConfig;
import io.ebean.datasource.DataSourceConfig;
Expand All @@ -51,6 +53,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.RollbackException;
import javax.persistence.Table;
import lombok.Value;

import static com.linkedin.metadata.dao.EbeanMetadataAspect.*;
Expand All @@ -71,6 +74,9 @@ public class EbeanLocalDAO<ASPECT_UNION extends UnionTemplate, URN extends Urn>
private UrnPathExtractor<URN> _urnPathExtractor;
private int _queryKeysCount = 0; // 0 means no pagination on keys

// TODO feature flag, remove when vetted.
private boolean _useUnionForBatch = false;

@Value
static class GMAIndexPair {
public String valueType;
Expand Down Expand Up @@ -154,6 +160,18 @@ public EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull Serve
this(producer, createServer(serverConfig), storageConfig, urnClass, new EmptyPathExtractor<>());
}

/**
* Determines whether we should use UNION ALL statements for batch gets, rather than a large series of OR statements.
*
* <p>DO NOT USE THIS FLAG! This is for LinkedIn use to help us test this feature without a rollback. Once we've
* vetted this in production we will be removing this flag and making the the default behavior. So if you set this
* to true by calling this method, your code will break when we remove this method. Just wait a bit for us to turn
* it on by default!
*/
public void setUseUnionForBatch(boolean useUnionForBatch) {
_useUnionForBatch = useUnionForBatch;
}

@Nonnull
private static EbeanServer createServer(@Nonnull ServerConfig serverConfig) {
// Make sure that the serverConfig includes the package that contains DAO's Ebean model.
Expand Down Expand Up @@ -246,7 +264,8 @@ protected <T> T runInTransactionWithRetry(@Nonnull Supplier<T> block, int maxTra

@Override
protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nullable ASPECT oldValue, @Nullable AuditStamp oldAuditStamp, @Nonnull ASPECT newValue, @Nonnull AuditStamp newAuditStamp) {
@Nullable ASPECT oldValue, @Nullable AuditStamp oldAuditStamp, @Nonnull ASPECT newValue,
@Nonnull AuditStamp newAuditStamp) {
// Save oldValue as the largest version + 1
long largestVersion = 0;
if (oldValue != null && oldAuditStamp != null) {
Expand All @@ -260,8 +279,8 @@ protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Non
}

@Override
protected <ASPECT extends RecordTemplate> void updateLocalIndex(@Nonnull URN urn,
@Nonnull ASPECT newValue, long version) {
protected <ASPECT extends RecordTemplate> void updateLocalIndex(@Nonnull URN urn, @Nonnull ASPECT newValue,
long version) {
if (!isLocalSecondaryIndexEnabled()) {
throw new UnsupportedOperationException("Local secondary index isn't supported");
}
Expand Down Expand Up @@ -311,13 +330,10 @@ protected void save(@Nonnull URN urn, @Nonnull RecordTemplate value, @Nonnull Au
}
}

protected long saveSingleRecordToLocalIndex(@Nonnull URN urn, @Nonnull String aspect,
@Nonnull String path, @Nonnull Object value) {
protected long saveSingleRecordToLocalIndex(@Nonnull URN urn, @Nonnull String aspect, @Nonnull String path,
@Nonnull Object value) {

final EbeanMetadataIndex record = new EbeanMetadataIndex()
.setUrn(urn.toString())
.setAspect(aspect)
.setPath(path);
final EbeanMetadataIndex record = new EbeanMetadataIndex().setUrn(urn.toString()).setAspect(aspect).setPath(path);
if (value instanceof Integer || value instanceof Long) {
record.setLongVal(Long.valueOf(value.toString()));
} else if (value instanceof Float || value instanceof Double) {
Expand All @@ -342,15 +358,13 @@ private void updateUrnInLocalIndex(@Nonnull URN urn) {

final Map<String, Object> pathValueMap = _urnPathExtractor.extractPaths(urn);
pathValueMap.forEach(
(path, value) -> saveSingleRecordToLocalIndex(urn, urn.getClass().getCanonicalName(), path, value)
);
(path, value) -> saveSingleRecordToLocalIndex(urn, urn.getClass().getCanonicalName(), path, value));
}

private <ASPECT extends RecordTemplate> void updateAspectInLocalIndex(@Nonnull URN urn, @Nonnull ASPECT newValue) {

if (!_storageConfig.getAspectStorageConfigMap().containsKey(newValue.getClass())
|| _storageConfig.getAspectStorageConfigMap().get(newValue.getClass()) == null
) {
|| _storageConfig.getAspectStorageConfigMap().get(newValue.getClass()) == null) {
return;
}
// step1: remove all rows from the index table corresponding to <urn, aspect> pair
Expand Down Expand Up @@ -422,7 +436,7 @@ protected <ASPECT extends RecordTemplate> void applyTimeBasedRetention(@Nonnull
final List<EbeanMetadataAspect> records;

if (_queryKeysCount == 0) {
records = batchGet(keys);
records = batchGet(keys, keys.size());
} else {
records = batchGet(keys, _queryKeysCount);
}
Expand All @@ -443,7 +457,7 @@ protected <ASPECT extends RecordTemplate> void applyTimeBasedRetention(@Nonnull
return Collections.emptyMap();
}

final List<EbeanMetadataAspect> records = batchGet(keys);
final List<EbeanMetadataAspect> records = batchGet(keys, keys.size());

final Map<AspectKey<URN, ? extends RecordTemplate>, AspectWithExtraInfo<? extends RecordTemplate>> result =
new HashMap<>();
Expand All @@ -454,31 +468,8 @@ protected <ASPECT extends RecordTemplate> void applyTimeBasedRetention(@Nonnull
return result;
}


public boolean existsInLocalIndex(@Nonnull URN urn) {
return _server.find(EbeanMetadataIndex.class)
.where().eq(URN_COLUMN, urn.toString())
.exists();
}

// Will be migrated to use {@link #batchGet(Set<AspectKey<URN, ? extends RecordTemplate>>, int)}
@Nonnull
private List<EbeanMetadataAspect> batchGet(@Nonnull Set<AspectKey<URN, ? extends RecordTemplate>> keys) {

ExpressionList<EbeanMetadataAspect> query = _server.find(EbeanMetadataAspect.class).select(ALL_COLUMNS).where();
if (keys.size() > 1) {
query = query.or();
}

for (AspectKey<URN, ? extends RecordTemplate> key : keys) {
query = query.and()
.eq(URN_COLUMN, key.getUrn().toString())
.eq(ASPECT_COLUMN, ModelUtils.getAspectName(key.getAspectClass()))
.eq(VERSION_COLUMN, key.getVersion())
.endAnd();
}

return query.findList();
return _server.find(EbeanMetadataIndex.class).where().eq(URN_COLUMN, urn.toString()).exists();
}

/**
Expand All @@ -499,7 +490,8 @@ public void setQueryKeysCount(int keysCount) {
* @param keysCount the max number of keys for each sub query
*/
@Nonnull
private List<EbeanMetadataAspect> batchGet(@Nonnull Set<AspectKey<URN, ? extends RecordTemplate>> keys, int keysCount) {
private List<EbeanMetadataAspect> batchGet(@Nonnull Set<AspectKey<URN, ? extends RecordTemplate>> keys,
int keysCount) {

int position = 0;
final int totalPageCount = QueryUtils.getTotalPageCount(keys.size(), keysCount);
Expand All @@ -513,8 +505,68 @@ private List<EbeanMetadataAspect> batchGet(@Nonnull Set<AspectKey<URN, ? extends
return finalResult;
}

/**
* Builds a single SELECT statement for batch get, which selects one entity, and then can be UNION'd with other SELECT
* statements.
*/
private String batchGetSelect(int selectId, @Nonnull String urn, @Nonnull String aspect, long version,
@Nonnull Map<String, Object> outputParamsToValues) {
final String urnArg = "urn" + selectId;
final String aspectArg = "aspect" + selectId;
final String versionArg = "version" + selectId;

outputParamsToValues.put(urnArg, urn);
outputParamsToValues.put(aspectArg, aspect);
outputParamsToValues.put(versionArg, version);

return String.format("SELECT urn, aspect, version, metadata, createdOn, createdBy, createdFor "
+ "FROM %s WHERE urn = :%s AND aspect = :%s AND version = :%s",
EbeanMetadataAspect.class.getAnnotation(Table.class).name(), urnArg, aspectArg, versionArg);
}

@Nonnull
private List<EbeanMetadataAspect> batchGetHelper(@Nonnull List<AspectKey<URN, ? extends RecordTemplate>> keys, int keysCount, int position) {
private List<EbeanMetadataAspect> batchGetUnion(@Nonnull List<AspectKey<URN, ? extends RecordTemplate>> keys,
int keysCount, int position) {

// Build one SELECT per key and then UNION ALL the results. This can be much more performant than OR'ing the
// conditions together. Our query will look like:
// SELECT * FROM metadata_aspect WHERE urn = 'urn0' AND aspect = 'aspect0' AND version = 0
// UNION ALL
// SELECT * FROM metadata_aspect WHERE urn = 'urn0' AND aspect = 'aspect1' AND version = 0
// ...
// Note: UNION ALL should be safe and more performant than UNION. We're selecting the entire entity key (as well
// as data), so each result should be unique. No need to deduplicate.
// Another note: ebean doesn't support UNION ALL, so we need to manually build the SQL statement ourselves.
final StringBuilder sb = new StringBuilder();
final int end = Math.min(keys.size(), position + keysCount);
final Map<String, Object> params = new HashMap<>();
for (int index = position; index < end; index++) {
sb.append(batchGetSelect(index - position, keys.get(index).getUrn().toString(),
ModelUtils.getAspectName(keys.get(index).getAspectClass()), keys.get(index).getVersion(), params));

if (index != end - 1) {
sb.append(" UNION ALL ");
}
}

final RawSql rawSql = RawSqlBuilder.parse(sb.toString())
.columnMapping(URN_COLUMN, "key.urn")
.columnMapping(ASPECT_COLUMN, "key.aspect")
.columnMapping(VERSION_COLUMN, "key.version")
.create();

final Query<EbeanMetadataAspect> query = _server.find(EbeanMetadataAspect.class).setRawSql(rawSql);

for (Map.Entry<String, Object> param : params.entrySet()) {
query.setParameter(param.getKey(), param.getValue());
}

return query.findList();
}

@Nonnull
private List<EbeanMetadataAspect> batchGetOr(@Nonnull List<AspectKey<URN, ? extends RecordTemplate>> keys,
int keysCount, int position) {
ExpressionList<EbeanMetadataAspect> query = _server.find(EbeanMetadataAspect.class).select(ALL_COLUMNS).where();

// add or if it is not the last element
Expand All @@ -533,6 +585,17 @@ private List<EbeanMetadataAspect> batchGetHelper(@Nonnull List<AspectKey<URN, ?
return query.findList();
}

@Nonnull
private List<EbeanMetadataAspect> batchGetHelper(@Nonnull List<AspectKey<URN, ? extends RecordTemplate>> keys,
int keysCount, int position) {
// TODO remove batchGetOr, make batchGetUnion the only implementation.
if (_useUnionForBatch) {
return batchGetUnion(keys, keysCount, position);
} else {
return batchGetOr(keys, keysCount, position);
}
}

/**
* Checks if an {@link AspectKey} and a {@link PrimaryKey} for Ebean are equivalent.
*
Expand Down Expand Up @@ -584,7 +647,8 @@ public <ASPECT extends RecordTemplate> ListResult<URN> listUrns(@Nonnull Class<A
.asc(URN_COLUMN)
.findPagedList();

final List<URN> urns = pagedList.getList().stream().map(entry -> getUrn(entry.getKey().getUrn())).collect(Collectors.toList());
final List<URN> urns =
pagedList.getList().stream().map(entry -> getUrn(entry.getKey().getUrn())).collect(Collectors.toList());
return toListResult(urns, null, pagedList, start);
}

Expand All @@ -608,8 +672,8 @@ public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<AS

final List<ASPECT> aspects =
pagedList.getList().stream().map(a -> toRecordTemplate(aspectClass, a)).collect(Collectors.toList());
final ListResultMetadata listResultMetadata =
makeListResultMetadata(pagedList.getList().stream().map(EbeanLocalDAO::toExtraInfo).collect(Collectors.toList()));
final ListResultMetadata listResultMetadata = makeListResultMetadata(
pagedList.getList().stream().map(EbeanLocalDAO::toExtraInfo).collect(Collectors.toList()));
return toListResult(aspects, listResultMetadata, pagedList, start);
}

Expand All @@ -633,8 +697,8 @@ public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<AS

final List<ASPECT> aspects =
pagedList.getList().stream().map(a -> toRecordTemplate(aspectClass, a)).collect(Collectors.toList());
final ListResultMetadata listResultMetadata =
makeListResultMetadata(pagedList.getList().stream().map(EbeanLocalDAO::toExtraInfo).collect(Collectors.toList()));
final ListResultMetadata listResultMetadata = makeListResultMetadata(
pagedList.getList().stream().map(EbeanLocalDAO::toExtraInfo).collect(Collectors.toList()));
return toListResult(aspects, listResultMetadata, pagedList, start);
}

Expand Down Expand Up @@ -671,7 +735,8 @@ private static <ASPECT extends RecordTemplate> AspectWithExtraInfo<ASPECT> toRec
@Nonnull
private <T> ListResult<T> toListResult(@Nonnull List<T> values, @Nullable ListResultMetadata listResultMetadata,
@Nonnull PagedList<?> pagedList, @Nullable Integer start) {
final int nextStart = (start != null && pagedList.hasNext()) ? start + pagedList.getList().size() : ListResult.INVALID_NEXT_START;
final int nextStart =
(start != null && pagedList.hasNext()) ? start + pagedList.getList().size() : ListResult.INVALID_NEXT_START;
return ListResult.<T>builder()
// Format
.values(values)
Expand Down Expand Up @@ -745,7 +810,7 @@ static GMAIndexPair getGMAIndexPair(@Nonnull IndexValue indexValue) {
if (indexValue.isBoolean()) {
object = indexValue.getBoolean().toString();
return new GMAIndexPair(EbeanMetadataIndex.STRING_COLUMN, object);
} else if (indexValue.isDouble()) {
} else if (indexValue.isDouble()) {
object = indexValue.getDouble();
return new GMAIndexPair(EbeanMetadataIndex.DOUBLE_COLUMN, object);
} else if (indexValue.isFloat()) {
Expand Down Expand Up @@ -775,8 +840,8 @@ static GMAIndexPair getGMAIndexPair(@Nonnull IndexValue indexValue) {
* @param lastUrn string representation of the urn whose value is used to set the last urn parameter in index query
* @param pageSize maximum number of distinct urns to return which is essentially the LIMIT clause of SQL query
*/
private static void setParameters(@Nonnull IndexCriterionArray indexCriterionArray, @Nonnull Query<EbeanMetadataIndex> indexQuery,
@Nonnull String lastUrn, int pageSize) {
private static void setParameters(@Nonnull IndexCriterionArray indexCriterionArray,
@Nonnull Query<EbeanMetadataIndex> indexQuery, @Nonnull String lastUrn, int pageSize) {
indexQuery.setParameter(1, lastUrn);
int pos = 2;
for (IndexCriterion criterion : indexCriterionArray) {
Expand All @@ -792,7 +857,8 @@ private static void setParameters(@Nonnull IndexCriterionArray indexCriterionArr
@Nonnull
private static String getStringForOperator(@Nonnull Condition condition) {
if (!CONDITION_STRING_MAP.containsKey(condition)) {
throw new UnsupportedOperationException(condition.toString() + " condition is not supported in local secondary index");
throw new UnsupportedOperationException(
condition.toString() + " condition is not supported in local secondary index");
}
return CONDITION_STRING_MAP.get(condition);
}
Expand All @@ -807,8 +873,9 @@ private static String getStringForOperator(@Nonnull Condition condition) {
@Nonnull
private static String constructSQLQuery(@Nonnull IndexCriterionArray indexCriterionArray) {
String selectClause = "SELECT DISTINCT(t0.urn) FROM metadata_index t0";
selectClause += IntStream.range(1, indexCriterionArray.size()).mapToObj(i -> " INNER JOIN metadata_index " + "t"
+ i + " ON t0.urn = " + "t" + i + ".urn").collect(Collectors.joining(""));
selectClause += IntStream.range(1, indexCriterionArray.size())
.mapToObj(i -> " INNER JOIN metadata_index " + "t" + i + " ON t0.urn = " + "t" + i + ".urn")
.collect(Collectors.joining(""));
final StringBuilder whereClause = new StringBuilder("WHERE t0.urn > ?");
IntStream.range(0, indexCriterionArray.size()).forEach(i -> {
final IndexCriterion criterion = indexCriterionArray.get(i);
Expand Down Expand Up @@ -860,7 +927,8 @@ public List<URN> listUrns(@Nonnull IndexFilter indexFilter, @Nullable URN lastUr
throw new UnsupportedOperationException("Empty Index Filter is not supported by EbeanLocalDAO");
}
if (indexCriterionArray.size() > 10) {
throw new UnsupportedOperationException("Currently more than 10 filter conditions is not supported by EbeanLocalDAO");
throw new UnsupportedOperationException(
"Currently more than 10 filter conditions is not supported by EbeanLocalDAO");
}

addEntityTypeFilter(indexFilter);
Expand All @@ -872,8 +940,6 @@ public List<URN> listUrns(@Nonnull IndexFilter indexFilter, @Nullable URN lastUr

final List<EbeanMetadataIndex> pagedList = query.findList();

return pagedList.stream()
.map(entry -> getUrn(entry.getUrn()))
.collect(Collectors.toList());
return pagedList.stream().map(entry -> getUrn(entry.getUrn())).collect(Collectors.toList());
}
}
Loading

0 comments on commit 5592384

Please sign in to comment.