From b38cf9db8f9b5716382bc4f824c49e0d34018dd4 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Wed, 25 Oct 2023 11:03:31 -0700 Subject: [PATCH] disable change history during persistence (#305) * disable change history during persistency * addressed review comments. * fix comments and adding an unit test --- .../metadata/dao/EbeanLocalAccess.java | 94 +++- .../linkedin/metadata/dao/EbeanLocalDAO.java | 311 +++++++----- .../metadata/dao/IEbeanLocalAccess.java | 34 +- .../metadata/dao/utils/EBeanDAOUtils.java | 69 +++ .../metadata/dao/utils/SQLStatementUtils.java | 75 ++- .../metadata/dao/EbeanLocalAccessTest.java | 28 +- .../metadata/dao/EbeanLocalDAOTest.java | 458 ++++++++++++------ .../metadata/dao/utils/EBeanDAOUtilsTest.java | 19 +- .../dao/utils/SQLStatementUtilsTest.java | 31 +- 9 files changed, 841 insertions(+), 278 deletions(-) diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java index cd77c8f66..817c550c2 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java @@ -14,9 +14,12 @@ import com.linkedin.metadata.dao.utils.RecordUtils; import com.linkedin.metadata.dao.utils.SQLSchemaUtils; import com.linkedin.metadata.dao.utils.SQLStatementUtils; +import com.linkedin.metadata.query.ExtraInfo; +import com.linkedin.metadata.query.ExtraInfoArray; import com.linkedin.metadata.query.IndexFilter; import com.linkedin.metadata.query.IndexGroupByCriterion; import com.linkedin.metadata.query.IndexSortCriterion; +import com.linkedin.metadata.query.ListResultMetadata; import io.ebean.EbeanServer; import io.ebean.SqlQuery; import io.ebean.SqlRow; @@ -33,15 +36,18 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.persistence.PersistenceException; import lombok.extern.slf4j.Slf4j; import org.json.simple.JSONObject; +import static com.linkedin.metadata.dao.EbeanLocalDAO.*; import static com.linkedin.metadata.dao.utils.EBeanDAOUtils.*; import static com.linkedin.metadata.dao.utils.SQLIndexFilterUtils.*; import static com.linkedin.metadata.dao.utils.SQLSchemaUtils.*; @@ -159,10 +165,12 @@ public int add(@Nonnull URN urn, @Nullable ASPEC * @param aspectKeys a List of keys (urn, aspect pairings) to query for * @param keysCount number of keys to query * @param position position of the key to start from + * @param includeSoftDeleted whether to include soft deleted aspect in the query */ @Override public List batchGetUnion( - @Nonnull List> aspectKeys, int keysCount, int position) { + @Nonnull List> aspectKeys, int keysCount, int position, + boolean includeSoftDeleted) { final int end = Math.min(aspectKeys.size(), position + keysCount); final Map, Set> keysToQueryMap = new HashMap<>(); @@ -178,12 +186,19 @@ public List batchGetUnion( } // each statement is for a single aspect class - List selectStatements = keysToQueryMap.entrySet().stream() - .map(entry -> SQLStatementUtils.createAspectReadSql(entry.getKey(), entry.getValue())) - .collect(Collectors.toList()); + Map> selectStatements = keysToQueryMap.entrySet() + .stream() + .collect(Collectors.toMap( + entry -> SQLStatementUtils.createAspectReadSql(entry.getKey(), entry.getValue(), includeSoftDeleted), + entry -> entry.getKey())); // consolidate/join the results - List sqlRows = selectStatements.stream().flatMap(sql -> _server.createSqlQuery(sql).findList().stream()).collect(Collectors.toList()); + final Map> sqlRows = new LinkedHashMap<>(); + for (Map.Entry> entry : selectStatements.entrySet()) { + for (SqlRow sqlRow : _server.createSqlQuery(entry.getKey()).findList()) { + sqlRows.put(sqlRow, entry.getValue()); + } + } return EBeanDAOUtils.readSqlRows(sqlRows); } @@ -206,7 +221,7 @@ public ListResult listUrns(@Nonnull IndexFilter indexFilter, @Nullable Inde return toListResult(actualTotalCount, start, pageSize); } final List values = sqlRows.stream().map(sqlRow -> getUrn(sqlRow.getString("urn"), _urnClass)).collect(Collectors.toList()); - return toListResult(values, sqlRows, start, pageSize); + return toListResult(values, sqlRows, null, start, pageSize); } @Override @@ -233,9 +248,69 @@ public ListResult listUrns(@Nonnull Class values = sqlRows.stream() .map(sqlRow -> getUrn(sqlRow.getString("urn"), _urnClass)) .collect(Collectors.toList()); - return toListResult(values, sqlRows, start, pageSize); + return toListResult(values, sqlRows, null, start, pageSize); + } + + @Nonnull + @Override + public ListResult list(@Nonnull Class aspectClass, @Nonnull URN urn, + int start, int pageSize) { + // start / pageSize will be ignored since there will be at most one record returned from entity table. + final String listAspectByUrnSql = SQLStatementUtils.createListAspectByUrnSql(aspectClass, urn, false); + final SqlQuery sqlQuery = _server.createSqlQuery(listAspectByUrnSql); + + try { + final SqlRow sqlRow = sqlQuery.findOne(); + if (sqlRow == null) { + return toListResult(0, start, pageSize); + } else { + sqlRow.set("_total_count", 1); + final ASPECT aspect = RecordUtils.toRecordTemplate(aspectClass, + extractAspectJsonString(sqlRow.getString(getAspectColumnName(aspectClass)))); + final ListResultMetadata listResultMetadata = new ListResultMetadata().setExtraInfos(new ExtraInfoArray()); + final ExtraInfo extraInfo = new ExtraInfo().setUrn(urn) + .setVersion(LATEST_VERSION) + .setAudit(makeAuditStamp(sqlRow.getTimestamp("lastmodifiedon"), sqlRow.getString("lastmodifiedby"), + sqlRow.getString("createdfor"))); + listResultMetadata.getExtraInfos().add(extraInfo); + return toListResult(Collections.singletonList(aspect), Collections.singletonList(sqlRow), listResultMetadata, + start, pageSize); + } + } catch (PersistenceException pe) { + throw new RuntimeException( + String.format("Expect at most 1 aspect value per entity per aspect type . Sql: %s", listAspectByUrnSql)); + } } + + + + @Nonnull + @Override + public ListResult list(@Nonnull Class aspectClass, int start, + int pageSize) { + + final String tableName = SQLSchemaUtils.getTableName(_entityType); + final String listAspectSql = SQLStatementUtils.createListAspectWithPaginationSql(aspectClass, tableName, false, start, pageSize); + final SqlQuery sqlQuery = _server.createSqlQuery(listAspectSql); + final List sqlRows = sqlQuery.findList(); + if (sqlRows.isEmpty()) { + return toListResult(0, start, pageSize); + } + final ListResultMetadata listResultMetadata = new ListResultMetadata().setExtraInfos(new ExtraInfoArray()); + final List aspectList = sqlRows.stream().map(sqlRow -> { + final ExtraInfo extraInfo = new ExtraInfo().setUrn(getUrn(sqlRow.getString("urn"), _urnClass)) + .setVersion(LATEST_VERSION).setAudit( + makeAuditStamp(sqlRow.getTimestamp("lastmodifiedon"), sqlRow.getString("lastmodifiedby"), + sqlRow.getString("createdfor"))); + listResultMetadata.getExtraInfos().add(extraInfo); + return RecordUtils.toRecordTemplate(aspectClass, + extractAspectJsonString(sqlRow.getString(getAspectColumnName(aspectClass)))); + }).collect(Collectors.toList()); + return toListResult(aspectList, sqlRows, listResultMetadata, start, pageSize); + } + + @Nonnull @Override public Map countAggregate(@Nonnull IndexFilter indexFilter, @@ -354,13 +429,14 @@ protected ListResult toListResult(int totalCount, int start, int pageSize * Convert sqlRows into {@link ListResult}. * @param values a list of query response result * @param sqlRows list of {@link SqlRow} from ebean query execution + * @param listResultMetadata {@link ListResultMetadata} with {@link com.linkedin.metadata.query.ExtraInfo} * @param start starting position * @param pageSize number of rows in a page * @param type of query response * @return {@link ListResult} which contains paging metadata information */ @Nonnull - protected ListResult toListResult(@Nonnull List values, @Nonnull List sqlRows, + protected ListResult toListResult(@Nonnull List values, @Nonnull List sqlRows, @Nullable ListResultMetadata listResultMetadata, int start, int pageSize) { if (pageSize == 0) { pageSize = DEFAULT_PAGE_SIZE; @@ -382,7 +458,7 @@ protected ListResult toListResult(@Nonnull List values, @Nonnull List< } return ListResult.builder() .values(values) - .metadata(null) + .metadata(listResultMetadata) .nextStart(nextStart) .havingMore(hasNext) .totalCount(totalCount) diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index ef801d559..019d7b5ca 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -38,7 +38,6 @@ import com.linkedin.metadata.query.SortOrder; import io.ebean.DuplicateKeyException; import io.ebean.EbeanServer; -import io.ebean.ExpressionList; import io.ebean.PagedList; import io.ebean.Query; import io.ebean.SqlUpdate; @@ -101,6 +100,22 @@ public enum SchemaConfig { // See GCN-38382 private FindMethodology _findMethodology = FindMethodology.UNIQUE_ID; + // true if metadata change will be persisted into the change log table (metadata_aspect) + private boolean _changeLogEnabled = true; + + public void setChangeLogEnabled(boolean changeLogEnabled) { + if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) { + _changeLogEnabled = changeLogEnabled; + } else { + // For non-new schema, _changeLog will be enforced to be true + _changeLogEnabled = true; + } + } + + public boolean isChangeLogEnabled() { + return _changeLogEnabled; + } + public enum FindMethodology { UNIQUE_ID, // (legacy) https://javadoc.io/static/io.ebean/ebean/11.19.2/io/ebean/EbeanServer.html#find-java.lang.Class-java.lang.Object- DIRECT_SQL, // https://javadoc.io/static/io.ebean/ebean/11.19.2/io/ebean/EbeanServer.html#findNative-java.lang.Class-java.lang.String- @@ -470,6 +485,14 @@ public SchemaConfig getSchemaConfig() { return _schemaConfig; } + /** + * Overwride schema config, unit-test only. + * @param schemaConfig schema config + */ + void setSchemaConfig(SchemaConfig schemaConfig) { + _schemaConfig = schemaConfig; + } + /** * Ensure table schemas is up-to-date with db evolution scripts. */ @@ -512,22 +535,29 @@ protected long saveLatest(@Nonnull URN urn, @Non @Nonnull AuditStamp newAuditStamp, boolean isSoftDeleted, @Nullable IngestionTrackingContext trackingContext) { // Save oldValue as the largest version + 1 long largestVersion = 0; - if ((isSoftDeleted || oldValue != null) && oldAuditStamp != null) { - largestVersion = getNextVersion(urn, aspectClass); + if ((isSoftDeleted || oldValue != null) && oldAuditStamp != null && _changeLogEnabled) { + // When saving on entity which has history version (including being soft deleted), and changeLog is enabled, + // the saveLatest will process the following steps: + // 1. get the next version from the metadata_aspect table + // 2. write value of latest version (version = 0) as a new version + // 3. update the latest version (version = 0) with the new value. If the value of latest version has been + // changed during this process, then rollback by throwing OptimisticLockException + + largestVersion = getNextVersion(urn, aspectClass); // TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards if (log.isDebugEnabled()) { if ("AzkabanFlowInfo".equals(aspectClass.getSimpleName())) { log.debug("Insert: {} => oldValue = {}, latest version = {}", urn, oldValue, largestVersion); } } - // Move latest version to historical version by insert a new record. insert(urn, oldValue, aspectClass, oldAuditStamp, largestVersion, trackingContext); // update latest version updateWithOptimisticLocking(urn, newValue, aspectClass, newAuditStamp, LATEST_VERSION, new Timestamp(oldAuditStamp.getTime()), trackingContext); } else { + // When for fresh ingestion or with changeLog disabled // TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards if (log.isDebugEnabled()) { @@ -586,7 +616,7 @@ public List backfillLo } AspectKey key = new AspectKey<>(aspectClass, urn, LATEST_VERSION); return runInTransactionWithRetry(() -> { - List results = _localAccess.batchGetUnion(Collections.singletonList(key), 1, 0); + List results = _localAccess.batchGetUnion(Collections.singletonList(key), 1, 0, false); if (results.size() == 0) { return new ArrayList<>(); } @@ -595,22 +625,37 @@ public List backfillLo }, 1); } - private EbeanMetadataAspect queryLatest(@Nonnull URN urn, + /** + * Get latest metadata aspect record by urn and aspect. + * @param urn entity urn + * @param aspectClass aspect class + * @param aspect type + * @return metadata aspect ebean model {@link EbeanMetadataAspect} + */ + private @Nullable EbeanMetadataAspect queryLatest(@Nonnull URN urn, @Nonnull Class aspectClass) { - final String aspectName = ModelUtils.getAspectName(aspectClass); - final PrimaryKey key = new PrimaryKey(urn.toString(), aspectName, 0L); + EbeanMetadataAspect result; - if (_findMethodology == FindMethodology.DIRECT_SQL) { - result = findLatestMetadataAspect(_server, urn, aspectClass); - if (result == null) { - // Attempt 1: retry - result = _server.find(EbeanMetadataAspect.class, key); - if (log.isDebugEnabled()) { - log.debug("Attempt 1: Retried on {}, {}", urn, result); + if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) { + final String aspectName = ModelUtils.getAspectName(aspectClass); + final PrimaryKey key = new PrimaryKey(urn.toString(), aspectName, LATEST_VERSION); + if (_findMethodology == FindMethodology.DIRECT_SQL) { + result = findLatestMetadataAspect(_server, urn, aspectClass); + if (result == null) { + // Attempt 1: retry + result = _server.find(EbeanMetadataAspect.class, key); + if (log.isDebugEnabled()) { + log.debug("Attempt 1: Retried on {}, {}", urn, result); + } } + } else { + result = _server.find(EbeanMetadataAspect.class, key); } } else { - result = _server.find(EbeanMetadataAspect.class, key); + // for new schema or dual-schema, get latest data from new schema. (Resolving the read de-coupling issue) + final List results = _localAccess.batchGetUnion( + Collections.singletonList(new AspectKey<>(aspectClass, urn, LATEST_VERSION)), 1, 0, true); + result = results.isEmpty() ? null : results.get(0); } return result; } @@ -713,7 +758,11 @@ protected void insert(@Nonnull URN urn, @Nullabl // the metadata entity tables shouldn't been updated. _localAccess.add(urn, (ASPECT) value, aspectClass, auditStamp); } - _server.insert(aspect); + + if (_changeLogEnabled) { + // skip appending change log table (metadata_aspect) if not enabled + _server.insert(aspect); + } } protected void saveRecordsToLocalIndex(@Nonnull URN urn, @Nonnull String aspect, @Nonnull String path, @@ -784,42 +833,49 @@ private void updateAspectInLocalIndex(@Nonnull U @Override protected long getNextVersion(@Nonnull URN urn, @Nonnull Class aspectClass) { + if (!_changeLogEnabled) { + throw new UnsupportedOperationException("getNextVersion shouldn't be called when changeLog is disabled"); + } else { + final List result = _server.find(EbeanMetadataAspect.class) + .where() + .eq(URN_COLUMN, urn.toString()) + .eq(ASPECT_COLUMN, ModelUtils.getAspectName(aspectClass)) + .orderBy() + .desc(VERSION_COLUMN) + .setMaxRows(1) + .findIds(); - final List result = _server.find(EbeanMetadataAspect.class) - .where() - .eq(URN_COLUMN, urn.toString()) - .eq(ASPECT_COLUMN, ModelUtils.getAspectName(aspectClass)) - .orderBy() - .desc(VERSION_COLUMN) - .setMaxRows(1) - .findIds(); - - return result.isEmpty() ? 0 : result.get(0).getVersion() + 1L; + return result.isEmpty() ? 0 : result.get(0).getVersion() + 1L; + } } @Override protected void applyVersionBasedRetention(@Nonnull Class aspectClass, @Nonnull URN urn, @Nonnull VersionBasedRetention retention, long largestVersion) { - - _server.find(EbeanMetadataAspect.class) - .where() - .eq(URN_COLUMN, urn.toString()) - .eq(ASPECT_COLUMN, ModelUtils.getAspectName(aspectClass)) - .ne(VERSION_COLUMN, LATEST_VERSION) - .le(VERSION_COLUMN, largestVersion - retention.getMaxVersionsToRetain() + 1) - .delete(); + if (_changeLogEnabled) { + // only apply version based retention when changeLog is enabled + _server.find(EbeanMetadataAspect.class) + .where() + .eq(URN_COLUMN, urn.toString()) + .eq(ASPECT_COLUMN, ModelUtils.getAspectName(aspectClass)) + .ne(VERSION_COLUMN, LATEST_VERSION) + .le(VERSION_COLUMN, largestVersion - retention.getMaxVersionsToRetain() + 1) + .delete(); + } } @Override protected void applyTimeBasedRetention(@Nonnull Class aspectClass, @Nonnull URN urn, @Nonnull TimeBasedRetention retention, long currentTime) { - - _server.find(EbeanMetadataAspect.class) - .where() - .eq(URN_COLUMN, urn.toString()) - .eq(ASPECT_COLUMN, ModelUtils.getAspectName(aspectClass)) - .lt(CREATED_ON_COLUMN, new Timestamp(currentTime - retention.getMaxAgeToRetain())) - .delete(); + if (_changeLogEnabled) { + // only apply time based retention when changeLog is enabled + _server.find(EbeanMetadataAspect.class) + .where() + .eq(URN_COLUMN, urn.toString()) + .eq(ASPECT_COLUMN, ModelUtils.getAspectName(aspectClass)) + .lt(CREATED_ON_COLUMN, new Timestamp(currentTime - retention.getMaxAgeToRetain())) + .delete(); + } } @Override @@ -991,27 +1047,6 @@ private List batchGetUnion(@Nonnull List batchGetOr(@Nonnull List> keys, - int keysCount, int position) { - ExpressionList query = _server.find(EbeanMetadataAspect.class).select(ALL_COLUMNS).where(); - - // add or if it is not the last element - if (position != keys.size() - 1) { - query = query.or(); - } - - for (int index = position; index < keys.size() && index < position + keysCount; index++) { - query = query.and() - .eq(URN_COLUMN, keys.get(index).getUrn().toString()) - .eq(ASPECT_COLUMN, ModelUtils.getAspectName(keys.get(index).getAspectClass())) - .eq(VERSION_COLUMN, keys.get(index).getVersion()) - .endAnd(); - } - - return query.findList(); - } - @Nonnull @SuppressWarnings({"checkstyle:FallThrough", "checkstyle:DefaultComesLast"}) List batchGetHelper(@Nonnull List> keys, @@ -1024,13 +1059,13 @@ List batchGetHelper(@Nonnull List resultsOldSchema = batchGetUnion(keys, keysCount, position); - final List resultsNewSchema = _localAccess.batchGetUnion(keys, keysCount, position); + final List resultsNewSchema = _localAccess.batchGetUnion(keys, keysCount, position, false); EBeanDAOUtils.compareResults(resultsOldSchema, resultsNewSchema, "batchGet"); return resultsOldSchema; } @@ -1053,31 +1088,34 @@ boolean matchKeys(@Nonnull AspectKey aspectKey, @ @Nonnull public ListResult listVersions(@Nonnull Class aspectClass, @Nonnull URN urn, int start, int pageSize) { - checkValidAspect(aspectClass); - - final PagedList pagedList = _server.find(EbeanMetadataAspect.class) - .select(KEY_ID) - .where() - .eq(URN_COLUMN, urn.toString()) - .eq(ASPECT_COLUMN, ModelUtils.getAspectName(aspectClass)) - .ne(METADATA_COLUMN, DELETED_VALUE) - .setFirstRow(start) - .setMaxRows(pageSize) - .orderBy() - .asc(VERSION_COLUMN) - .findPagedList(); - - final List versions = - pagedList.getList().stream().map(a -> a.getKey().getVersion()).collect(Collectors.toList()); - return toListResult(versions, null, pagedList, start); + if (_changeLogEnabled) { + PagedList pagedList = _server.find(EbeanMetadataAspect.class) + .select(KEY_ID) + .where() + .eq(URN_COLUMN, urn.toString()) + .eq(ASPECT_COLUMN, ModelUtils.getAspectName(aspectClass)) + .ne(METADATA_COLUMN, DELETED_VALUE) + .setFirstRow(start) + .setMaxRows(pageSize) + .orderBy() + .asc(VERSION_COLUMN) + .findPagedList(); + final List versions = + pagedList.getList().stream().map(a -> a.getKey().getVersion()).collect(Collectors.toList()); + return toListResult(versions, null, pagedList, start); + } else { + ListResult aspectListResult = _localAccess.list(aspectClass, urn, start, pageSize); + return transformListResult(aspectListResult, aspect -> LATEST_VERSION); + } } @Override @Nonnull public ListResult listUrns(@Nonnull Class aspectClass, int start, int pageSize) { - if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) { + if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY) { + // decouple from old schema return _localAccess.listUrns(aspectClass, start, pageSize); } checkValidAspect(aspectClass); @@ -1125,22 +1163,26 @@ ListResult getListResult(@Nonnull Class< @Nonnull public ListResult list(@Nonnull Class aspectClass, @Nonnull URN urn, int start, int pageSize) { - checkValidAspect(aspectClass); - - final PagedList pagedList = _server.find(EbeanMetadataAspect.class) - .select(ALL_COLUMNS) - .where() - .eq(URN_COLUMN, urn.toString()) - .eq(ASPECT_COLUMN, ModelUtils.getAspectName(aspectClass)) - .ne(METADATA_COLUMN, DELETED_VALUE) - .setFirstRow(start) - .setMaxRows(pageSize) - .orderBy() - .asc(VERSION_COLUMN) - .findPagedList(); - - return getListResult(aspectClass, pagedList, start); + PagedList pagedList; + if (_changeLogEnabled) { + pagedList = _server.find(EbeanMetadataAspect.class) + .select(ALL_COLUMNS) + .where() + .eq(URN_COLUMN, urn.toString()) + .eq(ASPECT_COLUMN, ModelUtils.getAspectName(aspectClass)) + .ne(METADATA_COLUMN, DELETED_VALUE) + .setFirstRow(start) + .setMaxRows(pageSize) + .orderBy() + .asc(VERSION_COLUMN) + .findPagedList(); + return getListResult(aspectClass, pagedList, start); + } else { + // if changeLog is disabled, then list all the non-null, + // non-solft deleted, version-0) aspects from new schema table + return _localAccess.list(aspectClass, urn, start, pageSize); + } } @Override @@ -1150,19 +1192,29 @@ public ListResult list(@Nonnull Class pagedList = _server.find(EbeanMetadataAspect.class) - .select(ALL_COLUMNS) - .where() - .eq(ASPECT_COLUMN, ModelUtils.getAspectName(aspectClass)) - .eq(VERSION_COLUMN, version) - .ne(METADATA_COLUMN, DELETED_VALUE) - .setFirstRow(start) - .setMaxRows(pageSize) - .orderBy() - .asc(URN_COLUMN) - .findPagedList(); + if (_changeLogEnabled) { + + PagedList pagedList = _server.find(EbeanMetadataAspect.class) + .select(ALL_COLUMNS) + .where() + .eq(ASPECT_COLUMN, ModelUtils.getAspectName(aspectClass)) + .eq(VERSION_COLUMN, version) + .ne(METADATA_COLUMN, DELETED_VALUE) + .setFirstRow(start) + .setMaxRows(pageSize) + .orderBy() + .asc(URN_COLUMN) + .findPagedList(); - return getListResult(aspectClass, pagedList, start); + return getListResult(aspectClass, pagedList, start); + } else { + if (version != LATEST_VERSION) { + throw new UnsupportedOperationException( + "non-current version based list is not supported when ChangeLog is disabled"); + } + + return _localAccess.list(aspectClass, start, pageSize); + } } @Override @@ -1202,6 +1254,29 @@ static Optional> toR extraInfo)); } + + /** + * Transform list result from type T to type R. + * @param listResult input list result + * @param function transform function + * @param input data type + * @param output data type + * @return ListResult of type R + */ + @Nonnull + public static ListResult transformListResult(@Nonnull ListResult listResult, + @Nonnull Function function) { + List values = listResult.getValues().stream().map(function).collect(Collectors.toList()); + return ListResult.builder().values(values) + .metadata(listResult.getMetadata()) + .nextStart(listResult.getNextStart()) + .havingMore(listResult.isHavingMore()) + .totalCount(listResult.getTotalCount()) + .totalPageCount(listResult.getTotalPageCount()) + .pageSize(listResult.getPageSize()) + .build(); + } + @Nonnull private ListResult toListResult(@Nonnull List values, @Nullable ListResultMetadata listResultMetadata, @Nonnull PagedList pagedList, @Nullable Integer start) { @@ -1234,14 +1309,13 @@ static ExtraInfo toExtraInfo(@Nonnull EbeanMetadataAspect aspect) { } @Nonnull - static AuditStamp makeAuditStamp(@Nonnull EbeanMetadataAspect aspect) { + static AuditStamp makeAuditStamp(@Nonnull Timestamp timestamp, @Nonnull String actor, @Nullable String impersonator) { final AuditStamp auditStamp = new AuditStamp(); - auditStamp.setTime(aspect.getCreatedOn().getTime()); - + auditStamp.setTime(timestamp.getTime()); try { - auditStamp.setActor(new Urn(aspect.getCreatedBy())); - if (aspect.getCreatedFor() != null) { - auditStamp.setImpersonator(new Urn(aspect.getCreatedFor())); + auditStamp.setActor(new Urn(actor)); + if (impersonator != null) { + auditStamp.setImpersonator(new Urn(impersonator)); } } catch (URISyntaxException e) { throw new RuntimeException(e); @@ -1249,6 +1323,11 @@ static AuditStamp makeAuditStamp(@Nonnull EbeanMetadataAspect aspect) { return auditStamp; } + @Nonnull + static AuditStamp makeAuditStamp(@Nonnull EbeanMetadataAspect aspect) { + return makeAuditStamp(aspect.getCreatedOn(), aspect.getCreatedBy(), aspect.getCreatedFor()); + } + @Nonnull private ListResultMetadata makeListResultMetadata(@Nonnull List extraInfos) { final ListResultMetadata listResultMetadata = new ListResultMetadata(); diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java index 690617bfe..59e35a259 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java @@ -49,12 +49,13 @@ List addRelationships( * @param keys {@link AspectKey} to retrieve aspect metadata * @param keysCount pagination key count limit * @param position starting position of pagination + * @param includeSoftDeleted include soft deleted aspects, default false * @param metadata aspect value * @return a list of {@link EbeanMetadataAspect} as get response */ @Nonnull List batchGetUnion(@Nonnull List> keys, - int keysCount, int position); + int keysCount, int position, boolean includeSoftDeleted); /** * Returns list of urns that satisfy the given filter conditions. @@ -112,6 +113,37 @@ Map countAggregate(@Nonnull IndexFilter indexFilter, @Nonnull ListResult listUrns(@Nonnull Class aspectClass, int start, int pageSize); + + /** + * Paginates over all versions of an aspect for a specific Urn. It does not return metadata corresponding to versions + * indicating soft deleted aspect(s). + * + * @param aspectClass the type of the aspect to query + * @param urn {@link Urn} for the entity + * @param start the starting offset of the page + * @param pageSize the size of the page + * @param must be a supported aspect type in {@code ASPECT_UNION}. + * @return a {@link ListResult} containing a list of aspects and other pagination information + */ + @Nonnull + ListResult list(@Nonnull Class aspectClass, + @Nonnull URN urn, int start, int pageSize); + + /** + * Paginates over a specific version of a specific aspect for all Urns. The result does not include soft deleted + * aspect if the specific version of a specific aspect was soft deleted. + * + * @param aspectClass the type of the aspect to query + * @param start the starting offset of the page + * @param pageSize the size of the page + * @param must be a supported aspect type in {@code ASPECT_UNION}. + * @return a {@link ListResult} containing a list of aspects and other pagination information + */ + @Nonnull + ListResult list(@Nonnull Class aspectClass, + int start, int pageSize); + + /** * Provide a local relationship builder registry. Local relationships will be built based on the builders during data ingestion. * @param localRelationshipBuilderRegistry All local relationship builders should be registered in this registry. diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java index 99b612898..f48c9491b 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java @@ -15,6 +15,7 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -154,6 +155,7 @@ public static boolean isSoftDeletedAspect(@Nonnu * Read {@link SqlRow} list into a {@link EbeanMetadataAspect} list. * @param sqlRows list of {@link SqlRow} * @return list of {@link EbeanMetadataAspect} + * @deprecated This method has been deprecated, use {@link #readSqlRow(SqlRow, Class)} instead. */ // TODO: make this method private once all users' custom SQL queries have been replaced by DAO-supported methods public static List readSqlRows(List sqlRows) { @@ -176,6 +178,73 @@ public static List readSqlRows(List sqlRows) { }).collect(Collectors.toList()); } + /** + * Read {@link SqlRow} list into a {@link EbeanMetadataAspect} list. + * @param sqlRows a list of {@link SqlRow} to aspect {@link Class} map. + * @param aspect class type + * @return list of {@link EbeanMetadataAspect} + */ + public static List readSqlRows( + @Nonnull Map> sqlRows) { + return sqlRows.entrySet().stream().flatMap(entry -> { + List columns = new ArrayList<>(); + SqlRow sqlRow = entry.getKey(); + sqlRow.keySet() + .stream() + .filter(key -> key.startsWith(SQLSchemaUtils.ASPECT_PREFIX) && sqlRow.get(key) != null) + .forEach(columns::add); + + return columns.stream().map(columnName -> readSqlRow(sqlRow, entry.getValue())); + }).collect(Collectors.toList()); + } + + /** + * Read EbeanMetadataAspect from {@link SqlRow}. + * @param sqlRow {@link SqlRow} + * @param aspectClass aspect class + * @param aspect type + * @return {@link EbeanMetadataAspect} + */ + private static EbeanMetadataAspect readSqlRow(SqlRow sqlRow, + Class aspectClass) { + final String columnName = SQLSchemaUtils.getAspectColumnName(aspectClass); + final EbeanMetadataAspect ebeanMetadataAspect = new EbeanMetadataAspect(); + final String urn = sqlRow.getString("urn"); + EbeanMetadataAspect.PrimaryKey primaryKey; + + if (isSoftDeletedAspect(sqlRow, columnName)) { + primaryKey = new EbeanMetadataAspect.PrimaryKey(urn, aspectClass.getCanonicalName(), LATEST_VERSION); + ebeanMetadataAspect.setCreatedBy(sqlRow.getString("lastmodifiedby")); + ebeanMetadataAspect.setCreatedOn(sqlRow.getTimestamp("lastmodifiedon")); + ebeanMetadataAspect.setCreatedFor(sqlRow.getString("createdfor")); + ebeanMetadataAspect.setMetadata(DELETED_VALUE); + } else { + AuditedAspect auditedAspect = RecordUtils.toRecordTemplate(AuditedAspect.class, sqlRow.getString(columnName)); + primaryKey = new EbeanMetadataAspect.PrimaryKey(urn, auditedAspect.getCanonicalName(), LATEST_VERSION); + ebeanMetadataAspect.setCreatedBy(auditedAspect.getLastmodifiedby()); + ebeanMetadataAspect.setCreatedOn(Timestamp.valueOf(auditedAspect.getLastmodifiedon())); + ebeanMetadataAspect.setCreatedFor(auditedAspect.getCreatedfor()); + ebeanMetadataAspect.setMetadata(extractAspectJsonString(sqlRow.getString(columnName))); + } + ebeanMetadataAspect.setKey(primaryKey); + return ebeanMetadataAspect; + } + + /** + * Checks whether the entity table record has been soft deleted. + * @param sqlRow {@link SqlRow} result from MySQL server + * @param columnName column name of entity table + * @return boolean representing whether the aspect record has been soft deleted + */ + public static boolean isSoftDeletedAspect(@Nonnull SqlRow sqlRow, @Nonnull String columnName) { + try { + SoftDeletedAspect aspect = RecordUtils.toRecordTemplate(SoftDeletedAspect.class, sqlRow.getString(columnName)); + return aspect.hasGma_deleted() && aspect.isGma_deleted(); + } catch (Exception e) { + return false; + } + } + /** * Extract aspect json string from an AuditedAspect string in its DB format. Return null if aspect json string does not exist. * @param auditedAspect an AuditedAspect string in its DB format diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java index 8fed0f821..8d6364948 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java @@ -38,6 +38,8 @@ public class SQLStatementUtils { public static final String SOFT_DELETED_CHECK = "JSON_EXTRACT(%s, '$.gma_deleted') IS NULL"; // true when not soft deleted + public static final String NONNULL_CHECK = "%s IS NOT NULL"; // true when the value of aspect_column is not NULL + private static final String SQL_UPSERT_ASPECT_TEMPLATE = "INSERT INTO %s (urn, %s, lastmodifiedon, lastmodifiedby) VALUE (:urn, :metadata, :lastmodifiedon, :lastmodifiedby) " + "ON DUPLICATE KEY UPDATE %s = :metadata, lastmodifiedon = :lastmodifiedon;"; @@ -49,6 +51,23 @@ public class SQLStatementUtils { private static final String SQL_READ_ASPECT_TEMPLATE = String.format("SELECT urn, %%s, lastmodifiedon, lastmodifiedby FROM %%s WHERE urn = '%%s' AND %s", SOFT_DELETED_CHECK); + private static final String SQL_LIST_ASPECT_BY_URN_TEMPLATE = + String.format("SELECT urn, %%s, lastmodifiedon, lastmodifiedby, createdfor FROM %%s WHERE urn = '%%s' AND %s AND %s", NONNULL_CHECK, SOFT_DELETED_CHECK); + + private static final String SQL_LIST_ASPECT_BY_URN_WITH_SOFT_DELETED_TEMPLATE = + String.format("SELECT urn, %%s, lastmodifiedon, lastmodifiedby, createdfor FROM %%s WHERE urn = '%%s' AND %s", NONNULL_CHECK); + + private static final String SQL_LIST_ASPECT_WITH_PAGINATION_TEMPLATE = + String.format("SELECT urn, %%s, lastmodifiedon, lastmodifiedby, createdfor, (SELECT COUNT(urn) FROM %%s WHERE %s AND %s) " + + "as _total_count FROM %%s WHERE %s AND %s LIMIT %%s OFFSET %%s", NONNULL_CHECK, SOFT_DELETED_CHECK, NONNULL_CHECK, SOFT_DELETED_CHECK); + + private static final String SQL_LIST_ASPECT_WITH_PAGINATION_WITH_SOFT_DELETED_TEMPLATE = + String.format("SELECT urn, %%s, lastmodifiedon, lastmodifiedby, createdfor, (SELECT COUNT(urn) FROM %%s WHERE %s) " + + "as _total_count FROM %%s WHERE %s LIMIT %%s OFFSET %%s", NONNULL_CHECK, NONNULL_CHECK); + + private static final String SQL_READ_ASPECT_WITH_SOFT_DELETED_TEMPLATE = + "SELECT urn, %s, lastmodifiedon, lastmodifiedby FROM %s WHERE urn = '%s'"; + private static final String INDEX_GROUP_BY_CRITERION = "SELECT count(*) as COUNT, %s FROM %s"; private static final String SQL_COLUMN_EXISTS_TEMPLATE = @@ -107,24 +126,72 @@ public static String createExistSql(@Nonnull Urn urn) { *

* @param aspectClass aspect class to query for * @param urns a Set of Urns to query for + * @param includeSoftDeleted a flag to include soft deleted records * @param aspect type * @return aspect read sql statement for a single aspect (across multiple tables and urns) */ public static String createAspectReadSql(@Nonnull Class aspectClass, - @Nonnull Set urns) { + @Nonnull Set urns, boolean includeSoftDeleted) { if (urns.size() == 0) { throw new IllegalArgumentException("Need at least 1 urn to query."); } final String columnName = getAspectColumnName(aspectClass); StringBuilder stringBuilder = new StringBuilder(); List selectStatements = urns.stream().map(urn -> { - final String tableName = getTableName(urn); - return String.format(SQL_READ_ASPECT_TEMPLATE, columnName, tableName, escapeReservedCharInUrn(urn.toString()), columnName); - }).collect(Collectors.toList()); + final String tableName = getTableName(urn); + final String sqlTemplate = + includeSoftDeleted ? SQL_READ_ASPECT_WITH_SOFT_DELETED_TEMPLATE : SQL_READ_ASPECT_TEMPLATE; + return String.format(sqlTemplate, columnName, tableName, escapeReservedCharInUrn(urn.toString()), columnName); + }).collect(Collectors.toList()); stringBuilder.append(String.join(" UNION ALL ", selectStatements)); return stringBuilder.toString(); } + /** + * List all the aspect record (0 or 1) for a given entity urn and aspect type. + * @param aspectClass aspect type + * @param urn entity urn + * @param includeSoftDeleted whether to include soft deleted aspects + * @param aspect type + * @return a SQL to run listing aspect query + */ + public static String createListAspectByUrnSql(@Nonnull Class aspectClass, + @Nonnull Urn urn, boolean includeSoftDeleted) { + final String columnName = getAspectColumnName(aspectClass); + final String tableName = getTableName(urn); + if (includeSoftDeleted) { + return String.format(SQL_LIST_ASPECT_BY_URN_WITH_SOFT_DELETED_TEMPLATE, columnName, tableName, + escapeReservedCharInUrn(urn.toString()), columnName); + } else { + return String.format(SQL_LIST_ASPECT_BY_URN_TEMPLATE, columnName, tableName, + escapeReservedCharInUrn(urn.toString()), columnName, columnName); + } + } + + /** + * List all the aspects for a given entity type and aspect type. + * @param aspectClass aspect type + * @param tableName table name + * @param includeSoftDeleted whether to include soft deleted aspects + * @param start pagination offset + * @param pageSize page size + * @param aspect type + * @return a SQL to run listing aspect query with pagination. + */ + public static String createListAspectWithPaginationSql(@Nonnull Class aspectClass, + String tableName, boolean includeSoftDeleted, int start, int pageSize) { + final String columnName = getAspectColumnName(aspectClass); + if (includeSoftDeleted) { + return String.format(SQL_LIST_ASPECT_WITH_PAGINATION_WITH_SOFT_DELETED_TEMPLATE, columnName, tableName, + columnName, tableName, columnName, pageSize, start); + } else { + return String.format(SQL_LIST_ASPECT_WITH_PAGINATION_TEMPLATE, columnName, tableName, columnName, columnName, + tableName, columnName, columnName, pageSize, start); + } + } + + + /** * Create Upsert SQL statement. * @param urn entity urn diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java index 46f1f1cd7..850010357 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java @@ -44,6 +44,8 @@ import static com.linkedin.common.AuditStamps.*; import static com.linkedin.testing.TestUtils.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.AssertJUnit.*; public class EbeanLocalAccessTest { @@ -93,7 +95,7 @@ public void testGetAspect() { // When get AspectFoo from urn:li:foo:0 List ebeanMetadataAspectList = - _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey), 1000, 0); + _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey), 1000, 0, false); assertEquals(1, ebeanMetadataAspectList.size()); EbeanMetadataAspect ebeanMetadataAspect = ebeanMetadataAspectList.get(0); @@ -110,7 +112,7 @@ public void testGetAspect() { // When get AspectFoo from urn:li:foo:9999 (does not exist) FooUrn nonExistFooUrn = makeFooUrn(9999); AspectKey nonExistKey = new AspectKey(AspectFoo.class, nonExistFooUrn, 0L); - ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(nonExistKey), 1000, 0); + ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(nonExistKey), 1000, 0, false); // Expect: get AspectFoo from urn:li:foo:9999 returns empty result assertTrue(ebeanMetadataAspectList.isEmpty()); @@ -281,7 +283,7 @@ public void testEscapeSpecialCharInUrn() { _ebeanLocalAccessBurger.add(johnsBurgerUrn1, aspectFoo, AspectFoo.class, auditStamp); AspectKey aspectKey1 = new AspectKey(AspectFoo.class, johnsBurgerUrn1, 0L); - List ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey1), 1, 0); + List ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey1), 1, 0, false); assertEquals(1, ebeanMetadataAspectList.size()); assertEquals(ebeanMetadataAspectList.get(0).getKey().getUrn(), johnsBurgerUrn1.toString()); @@ -290,7 +292,7 @@ public void testEscapeSpecialCharInUrn() { _ebeanLocalAccessBurger.add(johnsBurgerUrn2, aspectFoo, AspectFoo.class, auditStamp); AspectKey aspectKey2 = new AspectKey(AspectFoo.class, johnsBurgerUrn2, 0L); - ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey2), 1, 0); + ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey2), 1, 0, false); assertEquals(1, ebeanMetadataAspectList.size()); assertEquals(ebeanMetadataAspectList.get(0).getKey().getUrn(), johnsBurgerUrn2.toString()); @@ -299,7 +301,7 @@ public void testEscapeSpecialCharInUrn() { _ebeanLocalAccessBurger.add(johnsBurgerUrn3, aspectFoo, AspectFoo.class, auditStamp); AspectKey aspectKey3 = new AspectKey(AspectFoo.class, johnsBurgerUrn3, 0L); - ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey3), 1, 0); + ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey3), 1, 0, false); assertEquals(1, ebeanMetadataAspectList.size()); assertEquals(ebeanMetadataAspectList.get(0).getKey().getUrn(), johnsBurgerUrn3.toString()); } @@ -324,7 +326,7 @@ public void testAddWithLocalRelationshipBuilder() throws URISyntaxException { BarSnapshot.class, EMPTY_FILTER, FooSnapshot.class, EMPTY_FILTER, BelongsTo.class, EMPTY_FILTER, 0, 10); AspectKey key = new AspectKey<>(AspectFooBar.class, fooUrn, 0L); - List aspects = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(key), 10, 0); + List aspects = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(key), 10, 0, false); assertEquals(3, relationships.size()); assertEquals(1, aspects.size()); @@ -435,4 +437,18 @@ public void testFindLatestMetadataAspect() throws URISyntaxException { assertNull(EbeanLocalAccess.findLatestMetadataAspect(_server, foo9999, AspectFoo.class)); } + @Test + public void testGetAspectNoSoftDeleteCheck() { + FooUrn fooUrn = makeFooUrn(0); + _ebeanLocalAccessFoo.add(fooUrn, null, AspectFoo.class, makeAuditStamp("foo", System.currentTimeMillis())); + AspectKey aspectKey = new AspectKey(AspectFoo.class, fooUrn, 0L); + List ebeanMetadataAspectList = + _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey), 1000, 0, false); + assertEquals(0, ebeanMetadataAspectList.size()); + + ebeanMetadataAspectList = + _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey), 1000, 0, true); + assertFalse(ebeanMetadataAspectList.isEmpty()); + assertEquals(fooUrn.toString(), ebeanMetadataAspectList.get(0).getKey().getUrn()); + } } \ No newline at end of file diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java index c7ce3e49b..c738e4340 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java @@ -8,7 +8,9 @@ import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.Urns; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.SetMode; import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.aspect.AuditedAspect; import com.linkedin.metadata.backfill.BackfillMode; import com.linkedin.metadata.dao.EbeanLocalDAO.FindMethodology; import com.linkedin.metadata.dao.EbeanLocalDAO.SchemaConfig; @@ -68,6 +70,7 @@ import io.ebean.OrderBy; import io.ebean.PagedList; import io.ebean.Query; +import io.ebean.SqlQuery; import io.ebean.SqlRow; import io.ebean.Transaction; import io.ebean.config.ServerConfig; @@ -127,14 +130,17 @@ public class EbeanLocalDAOTest { // run the tests 1 time for each of EbeanLocalDAO.FindMethodology values (3 total) private final FindMethodology _findMethodology; + private final boolean _enableChangeLog; + private static final String NEW_SCHEMA_CREATE_ALL_SQL = "ebean-local-dao-create-all.sql"; private static final String GMA_CREATE_ALL_SQL = "gma-create-all.sql"; private static final String GMA_DROP_ALL_SQL = "gma-drop-all.sql"; @Factory(dataProvider = "inputList") - public EbeanLocalDAOTest(SchemaConfig schemaConfig, FindMethodology findMethodology) { + public EbeanLocalDAOTest(SchemaConfig schemaConfig, FindMethodology findMethodology, boolean enableChangeLog) { _schemaConfig = schemaConfig; _findMethodology = findMethodology; + _enableChangeLog = enableChangeLog; } @Nonnull @@ -149,12 +155,18 @@ private String readSQLfromFile(@Nonnull String resourcePath) { @DataProvider public static Object[][] inputList() { return new Object[][]{ - {SchemaConfig.OLD_SCHEMA_ONLY, FindMethodology.UNIQUE_ID}, - {SchemaConfig.NEW_SCHEMA_ONLY, FindMethodology.UNIQUE_ID}, - {SchemaConfig.DUAL_SCHEMA, FindMethodology.UNIQUE_ID}, - {SchemaConfig.OLD_SCHEMA_ONLY, FindMethodology.DIRECT_SQL}, - {SchemaConfig.NEW_SCHEMA_ONLY, FindMethodology.DIRECT_SQL}, - {SchemaConfig.DUAL_SCHEMA, FindMethodology.DIRECT_SQL} + {SchemaConfig.OLD_SCHEMA_ONLY, FindMethodology.UNIQUE_ID, true}, + {SchemaConfig.OLD_SCHEMA_ONLY, FindMethodology.UNIQUE_ID, false}, + {SchemaConfig.NEW_SCHEMA_ONLY, FindMethodology.UNIQUE_ID, true}, + {SchemaConfig.NEW_SCHEMA_ONLY, FindMethodology.UNIQUE_ID, false}, + {SchemaConfig.DUAL_SCHEMA, FindMethodology.UNIQUE_ID, true}, + {SchemaConfig.DUAL_SCHEMA, FindMethodology.UNIQUE_ID, false}, + {SchemaConfig.OLD_SCHEMA_ONLY, FindMethodology.DIRECT_SQL, true}, + {SchemaConfig.OLD_SCHEMA_ONLY, FindMethodology.DIRECT_SQL, false}, + {SchemaConfig.NEW_SCHEMA_ONLY, FindMethodology.DIRECT_SQL, true}, + {SchemaConfig.NEW_SCHEMA_ONLY, FindMethodology.DIRECT_SQL, false}, + {SchemaConfig.DUAL_SCHEMA, FindMethodology.DIRECT_SQL, true}, + {SchemaConfig.DUAL_SCHEMA, FindMethodology.DIRECT_SQL, false}, }; } @@ -196,6 +208,7 @@ private EbeanLocalDAO createDao(@Nonnu dao.setUrnPathExtractor((UrnPathExtractor) new BarUrnPathExtractor()); } dao.setEmitAuditEvent(true); + dao.setChangeLogEnabled(_enableChangeLog); return dao; } @@ -290,7 +303,10 @@ public void testAddOne() { assertEquals(aspect.getKey().getVersion(), 0); assertEquals(aspect.getCreatedOn(), new Timestamp(_now)); assertEquals(aspect.getCreatedBy(), "urn:li:test:actor"); - assertEquals(aspect.getCreatedFor(), "urn:li:test:impersonator"); + if (_schemaConfig != SchemaConfig.NEW_SCHEMA_ONLY) { + // didn't even implement this in the new schema since the createdfor column is not being read by anyone. so skipping this check. + assertEquals(aspect.getCreatedFor(), "urn:li:test:impersonator"); + } AspectFoo actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); assertEquals(actual, expected); @@ -314,9 +330,11 @@ public void testAddTwo() { AspectFoo actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); assertEquals(actual, v0); - aspect = getMetadata(urn, aspectName, 1); - actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); - assertEquals(actual, v1); + if (dao.isChangeLogEnabled()) { + aspect = getMetadata(urn, aspectName, 1); + actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); + assertEquals(actual, v1); + } InOrder inOrder = inOrder(_mockProducer); inOrder.verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, null, v1); @@ -342,13 +360,16 @@ public void testDefaultEqualityTester() { AspectFoo actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); assertEquals(actual, bar); - // v1: foo - aspect = getMetadata(urn, aspectName, 1); - actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); - assertEquals(actual, foo); + if (dao.isChangeLogEnabled()) { + // v1: foo + aspect = getMetadata(urn, aspectName, 1); + actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); + assertEquals(actual, foo); + + // no v2 + assertNull(getMetadata(urn, aspectName, 2)); + } - // no v2 - assertNull(getMetadata(urn, aspectName, 2)); verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, null, foo); verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, foo, bar); @@ -371,9 +392,11 @@ public void testAlwaysFalseEqualityTester() { AspectFoo actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); assertEquals(actual, foo1); - aspect = getMetadata(urn, aspectName, 1); - actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); - assertEquals(actual, foo2); + if (dao.isChangeLogEnabled()) { + aspect = getMetadata(urn, aspectName, 1); + actual = RecordUtils.toRecordTemplate(AspectFoo.class, aspect.getMetadata()); + assertEquals(actual, foo2); + } verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, null, foo1); verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, foo1, foo2); @@ -394,8 +417,10 @@ public void testVersionBasedRetention() { dao.add(urn, v2, _dummyAuditStamp); dao.add(urn, v0, _dummyAuditStamp); - assertNull(getMetadata(urn, aspectName, 1)); - assertNotNull(getMetadata(urn, aspectName, 2)); + if (dao.isChangeLogEnabled()) { + assertNull(getMetadata(urn, aspectName, 1)); + assertNotNull(getMetadata(urn, aspectName, 2)); + } assertNotNull(getMetadata(urn, aspectName, 0)); } @@ -422,8 +447,10 @@ public void testTimeBasedRetention() { dao.add(urn, v2, makeAuditStamp("foo", baseTime + 3000L)); dao.add(urn, v0, makeAuditStamp("foo", baseTime + 5000L)); - assertNull(getMetadata(urn, aspectName, 1)); - assertNotNull(getMetadata(urn, aspectName, 2)); + if (dao.isChangeLogEnabled()) { + assertNull(getMetadata(urn, aspectName, 1)); + assertNotNull(getMetadata(urn, aspectName, 2)); + } assertNotNull(getMetadata(urn, aspectName, 0)); } @@ -461,10 +488,13 @@ public void testAddSuccessAfterRetry() { public void testAddFailedAfterRetry() { EbeanServer server = mock(EbeanServer.class); Transaction mockTransaction = mock(Transaction.class); + SqlQuery mockSqlQuery = mock(SqlQuery.class); when(server.beginTransaction()).thenReturn(mockTransaction); when(server.find(any(), ArgumentMatchers.any(PrimaryKey.class))).thenReturn(null); doThrow(RollbackException.class).when(server).insert(any(EbeanMetadataAspect.class)); doThrow(RollbackException.class).when(server).createSqlUpdate(any()); + when(server.createSqlQuery(any())).thenReturn(mockSqlQuery); + when(mockSqlQuery.findList()).thenReturn(Collections.emptyList()); Query mockQuery = mock(Query.class); when(mockQuery.findList()).thenReturn(Collections.emptyList()); @@ -571,9 +601,9 @@ public void testGetLatestVersion() { EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn = makeFooUrn(1); AspectFoo v0 = new AspectFoo().setValue("foo"); - addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, v0); + addMetadata(urn, AspectFoo.class, 0, v0); AspectFoo v1 = new AspectFoo().setValue("bar"); - addMetadata(urn, AspectFoo.class.getCanonicalName(), 1, v1); + addMetadata(urn, AspectFoo.class, 1, v1); Optional foo = dao.get(AspectFoo.class, urn); @@ -591,9 +621,9 @@ public void testGetSpecificVersion() { EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn = makeFooUrn(1); AspectFoo v0 = new AspectFoo().setValue("foo"); - addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, v0); + addMetadata(urn, AspectFoo.class, 0, v0); AspectFoo v1 = new AspectFoo().setValue("bar"); - addMetadata(urn, AspectFoo.class.getCanonicalName(), 1, v1); + addMetadata(urn, AspectFoo.class, 1, v1); Optional foo = dao.get(AspectFoo.class, urn, 1); @@ -703,7 +733,7 @@ public void testLocalSecondaryIndexBackfillDisabled() { FooUrn urn = makeFooUrn(1); AspectFoo expected = new AspectFoo().setValue("foo"); - addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, expected); + addMetadata(urn, AspectFoo.class, 0, expected); dao.backfill(AspectFoo.class, urn); // then when @@ -856,8 +886,8 @@ public void testBackfillUsingSCSI() { // update metadata_aspects table aspects.put(urn, ImmutableMap.of(AspectFoo.class, aspectFoo, AspectBar.class, aspectBar)); - addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, aspectFoo); - addMetadata(urn, AspectBar.class.getCanonicalName(), 0, aspectBar); + addMetadata(urn, AspectFoo.class, 0, aspectFoo); + addMetadata(urn, AspectBar.class, 0, aspectBar); // only index urn addIndex(urn, FooUrn.class.getCanonicalName(), "/fooId", urn.getFooIdEntity()); @@ -940,11 +970,19 @@ public void testListVersions() { List versions = new ArrayList<>(); for (long i = 0; i < 6; i++) { AspectFoo foo = new AspectFoo().setValue("foo" + i); - addMetadata(urn, AspectFoo.class.getCanonicalName(), i, foo); + addMetadata(urn, AspectFoo.class, i, foo); versions.add(i); } ListResult results = dao.listVersions(AspectFoo.class, urn, 0, 5); + if (!dao.isChangeLogEnabled()) { + // when: change log is disabled, + // expect: listVersion should only return 1 result which is the LATEST_VERSION + assertFalse(results.isHavingMore()); + assertEquals(results.getTotalCount(), 1); + assertEquals(results.getValues(), versions.subList(0, 1)); + return; + } assertTrue(results.isHavingMore()); assertEquals(results.getNextStart(), 5); @@ -1478,22 +1516,22 @@ public void testGetAspectsWithIndexFilter() { FooUrn urn1 = makeFooUrn(1); AspectFoo e1foo1 = new AspectFoo().setValue("val1"); - addMetadata(urn1, AspectFoo.class.getCanonicalName(), 0, e1foo1); + addMetadata(urn1, AspectFoo.class, 0, e1foo1); AspectFoo e1foo2 = new AspectFoo().setValue("val2"); - addMetadata(urn1, AspectFoo.class.getCanonicalName(), 1, e1foo2); + addMetadata(urn1, AspectFoo.class, 1, e1foo2); AspectBar e1bar1 = new AspectBar().setValue("val1"); - addMetadata(urn1, AspectBar.class.getCanonicalName(), 0, e1bar1); + addMetadata(urn1, AspectBar.class, 0, e1bar1); AspectBar e1bar2 = new AspectBar().setValue("val2"); - addMetadata(urn1, AspectBar.class.getCanonicalName(), 1, e1bar2); + addMetadata(urn1, AspectBar.class, 1, e1bar2); FooUrn urn2 = makeFooUrn(2); AspectFoo e2foo1 = new AspectFoo().setValue("val1"); - addMetadata(urn2, AspectFoo.class.getCanonicalName(), 0, e2foo1); + addMetadata(urn2, AspectFoo.class, 0, e2foo1); AspectFoo e2foo2 = new AspectFoo().setValue("val2"); - addMetadata(urn2, AspectFoo.class.getCanonicalName(), 1, e2foo2); + addMetadata(urn2, AspectFoo.class, 1, e2foo2); AspectBar e2bar1 = new AspectBar().setValue("val1"); - addMetadata(urn2, AspectBar.class.getCanonicalName(), 0, e2bar1); + addMetadata(urn2, AspectBar.class, 0, e2bar1); AspectBar e2bar2 = new AspectBar().setValue("val2"); - addMetadata(urn2, AspectBar.class.getCanonicalName(), 1, e2bar2); + addMetadata(urn2, AspectBar.class, 1, e2bar2); dao.updateLocalIndex(urn1, e1foo1, 0); dao.updateLocalIndex(urn1, e1bar1, 0); @@ -1535,7 +1573,7 @@ public void testList() { for (int j = 0; j < 10; j++) { AspectFoo foo = new AspectFoo().setValue("foo" + j); - addMetadata(urn, AspectFoo.class.getCanonicalName(), j, foo); + addMetadata(urn, AspectFoo.class, j, foo); if (i == 0) { foos.add(foo); } @@ -1546,29 +1584,48 @@ public void testList() { ListResult results = dao.list(AspectFoo.class, urn0, 0, 5); - assertTrue(results.isHavingMore()); - assertEquals(results.getNextStart(), 5); - assertEquals(results.getTotalCount(), 10); - assertEquals(results.getPageSize(), 5); - assertEquals(results.getTotalPageCount(), 2); - assertEquals(results.getValues(), foos.subList(0, 5)); - - assertNotNull(results.getMetadata()); - List expectedVersions = Arrays.asList(0L, 1L, 2L, 3L, 4L); - List expectedUrns = Collections.singletonList(urn0); - assertVersionMetadata(results.getMetadata(), expectedVersions, expectedUrns, _now, - Urns.createFromTypeSpecificString("test", "foo"), Urns.createFromTypeSpecificString("test", "bar")); - - // List next page - results = dao.list(AspectFoo.class, urn0, 5, 9); - - assertFalse(results.isHavingMore()); - assertEquals(results.getNextStart(), ListResult.INVALID_NEXT_START); - assertEquals(results.getTotalCount(), 10); - assertEquals(results.getPageSize(), 9); - assertEquals(results.getTotalPageCount(), 2); - assertEquals(results.getValues(), foos.subList(5, 10)); - assertNotNull(results.getMetadata()); + if (dao.isChangeLogEnabled()) { + assertTrue(results.isHavingMore()); + assertEquals(results.getNextStart(), 5); + assertEquals(results.getTotalCount(), 10); + assertEquals(results.getPageSize(), 5); + assertEquals(results.getTotalPageCount(), 2); + assertEquals(results.getValues(), foos.subList(0, 5)); + + assertNotNull(results.getMetadata()); + List expectedVersions = Arrays.asList(0L, 1L, 2L, 3L, 4L); + List expectedUrns = Collections.singletonList(urn0); + assertVersionMetadata(results.getMetadata(), expectedVersions, expectedUrns, _now, + Urns.createFromTypeSpecificString("test", "foo"), + Urns.createFromTypeSpecificString("test", "bar")); + + // List next page + results = dao.list(AspectFoo.class, urn0, 5, 9); + + assertFalse(results.isHavingMore()); + assertEquals(results.getNextStart(), ListResult.INVALID_NEXT_START); + assertEquals(results.getTotalCount(), 10); + assertEquals(results.getPageSize(), 9); + assertEquals(results.getTotalPageCount(), 2); + assertEquals(results.getValues(), foos.subList(5, 10)); + assertNotNull(results.getMetadata()); + } else { + // when: change log is not enabled + // expect: + assertFalse(results.isHavingMore()); + assertEquals(results.getNextStart(), ListResult.INVALID_NEXT_START); + assertEquals(results.getTotalCount(), 1); + assertEquals(results.getPageSize(), 5); + assertEquals(results.getTotalPageCount(), 1); + // expect: only the latest version is loaded + assertEquals(results.getValues(), foos.subList(0, 1)); + assertNotNull(results.getMetadata()); + List expectedVersions = Arrays.asList(0L); + List expectedUrns = Collections.singletonList(urn0); + assertVersionMetadata(results.getMetadata(), expectedVersions, expectedUrns, _now, + Urns.createFromTypeSpecificString("test", "foo"), + Urns.createFromTypeSpecificString("test", "bar")); + } } private static LocalDAOStorageConfig makeLocalDAOStorageConfig(Class aspectClass, @@ -1620,12 +1677,12 @@ void testStrongConsistentIndexPaths() { @Test public void testListAspectsForAllUrns() { EbeanLocalDAO dao = createDao(FooUrn.class); + for (int i = 0; i < 3; i++) { FooUrn urn = makeFooUrn(i); - for (int j = 0; j < 10; j++) { AspectFoo foo = new AspectFoo().setValue("foo" + i + j); - addMetadata(urn, AspectFoo.class.getCanonicalName(), j, foo); + addMetadata(urn, AspectFoo.class, j, foo); } } @@ -1655,19 +1712,27 @@ public void testListAspectsForAllUrns() { assertEquals(results.getTotalPageCount(), 2); assertNotNull(results.getMetadata()); - // Test list for a non-zero version - results = dao.list(AspectFoo.class, 1, 0, 5); + if (dao.isChangeLogEnabled()) { + // Test list for a non-zero version + results = dao.list(AspectFoo.class, 1, 0, 5); - assertFalse(results.isHavingMore()); - assertEquals(results.getNextStart(), ListResult.INVALID_NEXT_START); - assertEquals(results.getTotalCount(), 3); - assertEquals(results.getPageSize(), 5); - assertEquals(results.getTotalPageCount(), 1); + assertFalse(results.isHavingMore()); + assertEquals(results.getNextStart(), ListResult.INVALID_NEXT_START); + assertEquals(results.getTotalCount(), 3); + assertEquals(results.getPageSize(), 5); + assertEquals(results.getTotalPageCount(), 1); - assertNotNull(results.getMetadata()); - assertVersionMetadata(results.getMetadata(), Collections.singletonList(1L), - Arrays.asList(makeFooUrn(0), makeFooUrn(1), makeFooUrn(2)), _now, - Urns.createFromTypeSpecificString("test", "foo"), Urns.createFromTypeSpecificString("test", "bar")); + assertNotNull(results.getMetadata()); + assertVersionMetadata(results.getMetadata(), Collections.singletonList(1L), + Arrays.asList(makeFooUrn(0), makeFooUrn(1), makeFooUrn(2)), _now, + Urns.createFromTypeSpecificString("test", "foo"), Urns.createFromTypeSpecificString("test", "bar")); + } else { + try { + dao.list(AspectFoo.class, 1, 0, 5); + fail("list aspect by non-0 version is not supported when change log is disabled"); + } catch (UnsupportedOperationException uoe) { + } + } } @Test @@ -2339,10 +2404,10 @@ public void testGetWithExtraInfoLatestVersion() { Urn impersonator1 = Urns.createFromTypeSpecificString("test", "testImpersonator1"); Urn creator2 = Urns.createFromTypeSpecificString("test", "testCreator2"); Urn impersonator2 = Urns.createFromTypeSpecificString("test", "testImpersonator2"); - addMetadataWithAuditStamp(urn, AspectFoo.class.getCanonicalName(), 0, v0, _now, creator1.toString(), + addMetadataWithAuditStamp(urn, AspectFoo.class, 0, v0, _now, creator1.toString(), impersonator1.toString()); AspectFoo v1 = new AspectFoo().setValue("bar"); - addMetadataWithAuditStamp(urn, AspectFoo.class.getCanonicalName(), 1, v1, _now, creator2.toString(), + addMetadataWithAuditStamp(urn, AspectFoo.class, 1, v1, _now, creator2.toString(), impersonator2.toString()); Optional> foo = dao.getWithExtraInfo(AspectFoo.class, urn); @@ -2366,10 +2431,10 @@ public void testGetWithExtraInfoSpecificVersion() { Urn impersonator1 = Urns.createFromTypeSpecificString("test", "testImpersonator1"); Urn creator2 = Urns.createFromTypeSpecificString("test", "testCreator2"); Urn impersonator2 = Urns.createFromTypeSpecificString("test", "testImpersonator2"); - addMetadataWithAuditStamp(urn, AspectFoo.class.getCanonicalName(), 0, v0, _now, creator1.toString(), + addMetadataWithAuditStamp(urn, AspectFoo.class, 0, v0, _now, creator1.toString(), impersonator1.toString()); AspectFoo v1 = new AspectFoo().setValue("bar"); - addMetadataWithAuditStamp(urn, AspectFoo.class.getCanonicalName(), 1, v1, _now, creator2.toString(), + addMetadataWithAuditStamp(urn, AspectFoo.class, 1, v1, _now, creator2.toString(), impersonator2.toString()); Optional> foo = dao.getWithExtraInfo(AspectFoo.class, urn, 1); @@ -2388,9 +2453,9 @@ public void testGetLatestVersionForSoftDeletedAspect() { Urn impersonator1 = Urns.createFromTypeSpecificString("test", "testImpersonator1"); Urn creator2 = Urns.createFromTypeSpecificString("test", "testCreator2"); Urn impersonator2 = Urns.createFromTypeSpecificString("test", "testImpersonator2"); - addMetadataWithAuditStamp(urn, AspectFoo.class.getCanonicalName(), 0, null, _now, creator1.toString(), + addMetadataWithAuditStamp(urn, AspectFoo.class, 0, null, _now, creator1.toString(), impersonator1.toString()); - addMetadataWithAuditStamp(urn, AspectFoo.class.getCanonicalName(), 1, v1, _now, creator2.toString(), + addMetadataWithAuditStamp(urn, AspectFoo.class, 1, v1, _now, creator2.toString(), impersonator2.toString()); Optional foo = dao.get(AspectFoo.class, urn); @@ -2407,9 +2472,9 @@ public void testGetNonLatestVersionForSoftDeletedAspect() { Urn impersonator1 = Urns.createFromTypeSpecificString("test", "testImpersonator1"); Urn creator2 = Urns.createFromTypeSpecificString("test", "testCreator2"); Urn impersonator2 = Urns.createFromTypeSpecificString("test", "testImpersonator2"); - addMetadataWithAuditStamp(urn, AspectFoo.class.getCanonicalName(), 0, v0, _now, creator1.toString(), + addMetadataWithAuditStamp(urn, AspectFoo.class, 0, v0, _now, creator1.toString(), impersonator1.toString()); - addMetadataWithAuditStamp(urn, AspectFoo.class.getCanonicalName(), 1, null, _now, creator2.toString(), + addMetadataWithAuditStamp(urn, AspectFoo.class, 1, null, _now, creator2.toString(), impersonator2.toString()); Optional> foo = dao.getWithExtraInfo(AspectFoo.class, urn, 1); @@ -2426,7 +2491,7 @@ public void testListSoftDeletedAspectGivenUrn() { for (int j = 0; j < 10; j++) { AspectFoo foo = new AspectFoo().setValue("foo" + j); - addMetadata(urn, AspectFoo.class.getCanonicalName(), j, foo); + addMetadata(urn, AspectFoo.class, j, foo); if (i == 0) { foos.add(foo); } @@ -2439,6 +2504,12 @@ public void testListSoftDeletedAspectGivenUrn() { ListResult results = dao.list(AspectFoo.class, urn0, 0, 5); + if (!dao.isChangeLogEnabled()) { + // if change log is not enabled, and the entity has been soft deleted, expect no more change history + assertFalse(results.isHavingMore()); + return; + } + assertTrue(results.isHavingMore()); assertEquals(results.getNextStart(), 5); assertEquals(results.getTotalCount(), 10); @@ -2484,14 +2555,24 @@ public void testListSpecificVersionSoftDeletedAspect() { } // version=10 corresponds to soft deleted aspect - ListResult results = dao.list(AspectFoo.class, 10, 0, 2); - - assertFalse(results.isHavingMore()); - assertEquals(results.getNextStart(), -1); - assertEquals(results.getTotalCount(), 0); - assertEquals(results.getPageSize(), 2); - assertEquals(results.getTotalPageCount(), 0); - assertEquals(results.getValues().size(), 0); + if (!dao.isChangeLogEnabled()) { + // version based query is not applicable if + try { + dao.list(AspectFoo.class, 10, 0, 2); + fail("UnsupportedOperationException should be thrown"); + } catch (UnsupportedOperationException uoe) { + // expected, do nothing + } + return; + } else { + ListResult results = dao.list(AspectFoo.class, 10, 0, 2); + assertFalse(results.isHavingMore()); + assertEquals(results.getNextStart(), -1); + assertEquals(results.getTotalCount(), 0); + assertEquals(results.getPageSize(), 2); + assertEquals(results.getTotalPageCount(), 0); + assertEquals(results.getValues().size(), 0); + } } @Test @@ -2512,15 +2593,18 @@ public void testGetSoftDeletedAspect() { Optional fooOptional = dao.get(AspectFoo.class, urn); assertFalse(fooOptional.isPresent()); - // version=1 should be non-null - fooOptional = dao.get(AspectFoo.class, urn, 1); - assertTrue(fooOptional.isPresent()); - assertEquals(fooOptional.get(), v1); + if (dao.isChangeLogEnabled()) { + // version=1 should be non-null + fooOptional = dao.get(AspectFoo.class, urn, 1); + assertTrue(fooOptional.isPresent()); + assertEquals(fooOptional.get(), v1); + + // version=2 should be non-null + fooOptional = dao.get(AspectFoo.class, urn, 2); + assertTrue(fooOptional.isPresent()); + assertEquals(fooOptional.get(), v0); + } - // version=2 should be non-null - fooOptional = dao.get(AspectFoo.class, urn, 2); - assertTrue(fooOptional.isPresent()); - assertEquals(fooOptional.get(), v0); InOrder inOrder = inOrder(_mockProducer); inOrder.verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, null, v1); @@ -2554,13 +2638,21 @@ public void testListVersionsForSoftDeletedAspect() { FooUrn urn = makeFooUrn(1); for (long i = 0; i < 6; i++) { AspectFoo foo = new AspectFoo().setValue("foo" + i); - addMetadata(urn, AspectFoo.class.getCanonicalName(), i, foo); + addMetadata(urn, AspectFoo.class, i, foo); } // soft delete the latest version dao.delete(urn, AspectFoo.class, _dummyAuditStamp); ListResult results = dao.listVersions(AspectFoo.class, urn, 0, 5); + if (!dao.isChangeLogEnabled()) { + // When: change log is disabled, + // Expect list version will return empty if the entity has been soft-deleted. + assertFalse(results.isHavingMore()); + assertTrue(results.getValues().isEmpty()); + return; + } + assertTrue(results.isHavingMore()); assertEquals(results.getNextStart(), 5); assertEquals(results.getTotalCount(), 6); @@ -2605,6 +2697,7 @@ public void testListUrnsForSoftDeletedAspect() { @Test public void testListUrnsAfterUndeleteSoftDeletedAspect() { EbeanLocalDAO dao = createDao(FooUrn.class); + List urns = new ArrayList<>(); for (int i = 0; i < 3; i++) { FooUrn urn = makeFooUrn(i); @@ -2633,7 +2726,6 @@ public void testListUrnsAfterUndeleteSoftDeletedAspect() { public void testGetWithKeysSoftDeletedAspect() { // given EbeanLocalDAO dao = createDao(FooUrn.class); - FooUrn fooUrn = makeFooUrn(1); // both aspect keys exist @@ -2641,9 +2733,9 @@ public void testGetWithKeysSoftDeletedAspect() { AspectKey aspectKey2 = new AspectKey<>(AspectBar.class, fooUrn, 0L); // add metadata - addMetadata(fooUrn, AspectFoo.class.getCanonicalName(), 1, null); + addMetadata(fooUrn, AspectFoo.class, 1, null); AspectBar barV0 = new AspectBar().setValue("bar"); - addMetadata(fooUrn, AspectBar.class.getCanonicalName(), 0, barV0); + addMetadata(fooUrn, AspectBar.class, 0, barV0); // when Map, Optional> records = @@ -2671,27 +2763,33 @@ public void testUndeleteSoftDeletedAspect() { // next undelete the soft deleted aspect AspectFoo foo = new AspectFoo().setValue("baz"); dao.add(urn, foo, _dummyAuditStamp); - - // latest version of metadata should be non-null and correspond to the metadata added after soft deleting the aspect Optional fooOptional = dao.get(AspectFoo.class, urn); - assertTrue(fooOptional.isPresent()); - assertEquals(fooOptional.get(), foo); - - // version=3 should correspond to soft deleted metadata - EbeanMetadataAspect aspect = getMetadata(urn, aspectName, 3); - assertTrue(isSoftDeletedAspect(aspect, AspectFoo.class)); - fooOptional = dao.get(AspectFoo.class, urn, 3); - assertFalse(fooOptional.isPresent()); + // latest version of metadata should be non-null and correspond to the metadata added after soft deleting the aspect - // version=2 should be non-null - fooOptional = dao.get(AspectFoo.class, urn, 2); - assertTrue(fooOptional.isPresent()); - assertEquals(fooOptional.get(), v0); + if (!dao.isChangeLogEnabled() && _schemaConfig != SchemaConfig.NEW_SCHEMA_ONLY) { + // skip if change log is disabled and schemaConfig is not NEW_SCHEMA_ONLY + } else { + assertTrue(fooOptional.isPresent()); + assertEquals(fooOptional.get(), foo); + } - // version=1 should be non-null again - fooOptional = dao.get(AspectFoo.class, urn, 1); - assertTrue(fooOptional.isPresent()); - assertEquals(fooOptional.get(), v1); + if (dao.isChangeLogEnabled()) { + // version=3 should correspond to soft deleted metadata + EbeanMetadataAspect aspect = getMetadata(urn, aspectName, 3); + assertTrue(isSoftDeletedAspect(aspect, AspectFoo.class)); + fooOptional = dao.get(AspectFoo.class, urn, 3); + assertFalse(fooOptional.isPresent()); + + // version=2 should be non-null + fooOptional = dao.get(AspectFoo.class, urn, 2); + assertTrue(fooOptional.isPresent()); + assertEquals(fooOptional.get(), v0); + + // version=1 should be non-null again + fooOptional = dao.get(AspectFoo.class, urn, 1); + assertTrue(fooOptional.isPresent()); + assertEquals(fooOptional.get(), v1); + } InOrder inOrder = inOrder(_mockProducer); inOrder.verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, null, v1); @@ -2712,13 +2810,13 @@ public void testGetWithExtraInfoMultipleKeys() { Urn creator3 = Urns.createFromTypeSpecificString("test", "testCreator3"); Urn impersonator3 = Urns.createFromTypeSpecificString("test", "testImpersonator3"); AspectFoo fooV0 = new AspectFoo().setValue("foo"); - addMetadataWithAuditStamp(urn, AspectFoo.class.getCanonicalName(), 0, fooV0, _now, creator1.toString(), + addMetadataWithAuditStamp(urn, AspectFoo.class, 0, fooV0, _now, creator1.toString(), impersonator1.toString()); AspectFoo fooV1 = new AspectFoo().setValue("bar"); - addMetadataWithAuditStamp(urn, AspectFoo.class.getCanonicalName(), 1, fooV1, _now, creator2.toString(), + addMetadataWithAuditStamp(urn, AspectFoo.class, 1, fooV1, _now, creator2.toString(), impersonator2.toString()); AspectBar barV0 = new AspectBar().setValue("bar"); - addMetadataWithAuditStamp(urn, AspectBar.class.getCanonicalName(), 0, barV0, _now, creator3.toString(), + addMetadataWithAuditStamp(urn, AspectBar.class, 0, barV0, _now, creator3.toString(), impersonator3.toString()); // both aspect keys exist @@ -2757,9 +2855,9 @@ public void testGetWithKeysCount() { // add metadata AspectFoo fooV1 = new AspectFoo().setValue("foo"); - addMetadata(fooUrn, AspectFoo.class.getCanonicalName(), 1, fooV1); + addMetadata(fooUrn, AspectFoo.class, 1, fooV1); AspectBar barV0 = new AspectBar().setValue("bar"); - addMetadata(fooUrn, AspectBar.class.getCanonicalName(), 0, barV0); + addMetadata(fooUrn, AspectBar.class, 0, barV0); // batch get without query keys count set // when @@ -2921,9 +3019,9 @@ public void testGetWithQuerySize(int querySize) { // add metadata AspectFoo fooV1 = new AspectFoo().setValue("foo"); - addMetadata(fooUrn, AspectFoo.class.getCanonicalName(), 1, fooV1); + addMetadata(fooUrn, AspectFoo.class, 1, fooV1); AspectBar barV0 = new AspectBar().setValue("bar"); - addMetadata(fooUrn, AspectBar.class.getCanonicalName(), 0, barV0); + addMetadata(fooUrn, AspectBar.class, 0, barV0); FooUrn fooUrn2 = makeFooUrn(2); AspectKey aspectKey3 = new AspectKey<>(AspectFoo.class, fooUrn2, 0L); @@ -2932,11 +3030,11 @@ public void testGetWithQuerySize(int querySize) { // add metadata AspectFoo fooV3 = new AspectFoo().setValue("foo3"); - addMetadata(fooUrn2, AspectFoo.class.getCanonicalName(), 0, fooV3); + addMetadata(fooUrn2, AspectFoo.class, 0, fooV3); AspectFoo fooV4 = new AspectFoo().setValue("foo4"); - addMetadata(fooUrn2, AspectFoo.class.getCanonicalName(), 1, fooV4); + addMetadata(fooUrn2, AspectFoo.class, 1, fooV4); AspectBar barV5 = new AspectBar().setValue("bar5"); - addMetadata(fooUrn2, AspectBar.class.getCanonicalName(), 0, barV5); + addMetadata(fooUrn2, AspectBar.class, 0, barV5); dao.setQueryKeysCount(querySize); @@ -3037,7 +3135,10 @@ public void testUpdateEntityTables() throws URISyntaxException { // fill in old schema FooUrn urn1 = new FooUrn(1); AspectFoo foo = new AspectFoo().setValue("foo"); - addMetadata(urn1, AspectFoo.class.getCanonicalName(), 0, foo); // this function only adds to old schema + // this function only adds to old schema + String aspectName = AspectFoo.class.getCanonicalName(); + EbeanMetadataAspect ema = getMetadata(urn1, aspectName, 0, foo); + _server.save(ema); // check that there is nothing in the entity table right now if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY) { @@ -3208,6 +3309,29 @@ public void testNewSchemaExactMatchEmptyArray() { } } + @Test + public void testDataNotWrittenIntoOldSchemaWhenChangeLogIsDisabled() { + EbeanLocalDAO dao = createDao(FooUrn.class); + if (dao.isChangeLogEnabled()) { + // this test is only applicable when changeLog is disabled + return; + } + + // Given: an empty old schema and empty new schema and changelog is disabled + EbeanLocalDAO legacyDao = createDao(FooUrn.class); + legacyDao.setSchemaConfig(SchemaConfig.OLD_SCHEMA_ONLY); + legacyDao.setChangeLogEnabled(false); + + // When: AspectFoo is written into the dao. + FooUrn fooUrn = makeFooUrn(1); + AspectFoo v1 = new AspectFoo().setValue("foo"); + dao.add(fooUrn, v1, _dummyAuditStamp); + + // Expect: the aspect foo is only written into the new schema. + assertTrue(dao.get(AspectFoo.class, fooUrn).isPresent()); + assertFalse(legacyDao.get(AspectFoo.class, fooUrn).isPresent()); + } + @Nonnull private EbeanMetadataAspect getMetadata(Urn urn, String aspectName, long version, @Nullable RecordTemplate metadata) { EbeanMetadataAspect aspect = new EbeanMetadataAspect(); @@ -3223,18 +3347,52 @@ private EbeanMetadataAspect getMetadata(Urn urn, String aspectName, long version return aspect; } - private void addMetadata(Urn urn, String aspectName, long version, @Nullable RecordTemplate metadata) { - EbeanMetadataAspect aspect = getMetadata(urn, aspectName, version, metadata); - _server.save(aspect); + private void addMetadata(Urn urn, Class aspectClass, long version, @Nullable RecordTemplate metadata) { + String aspectName = aspectClass.getCanonicalName(); + EbeanMetadataAspect ema = getMetadata(urn, aspectName, version, metadata); + _server.save(ema); + + if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY || _schemaConfig == SchemaConfig.DUAL_SCHEMA) { + addMetadataEntityTable(urn, aspectClass, metadata, version, _now, ema.getCreatedBy(), + ema.getCreatedFor()); + } } - private void addMetadataWithAuditStamp(Urn urn, String aspectName, long version, RecordTemplate metadata, + private void addMetadataEntityTable(Urn urn, Class aspectClass, @Nullable RecordTemplate metadata, long version, + long createdOn, String createdBy, String createdFor) { + if (version != 0) { + return; + } + String aspectName = aspectClass.getCanonicalName(); + String columnName = SQLSchemaUtils.getAspectColumnName(aspectName); + String template = "insert into metadata_entity_%s (urn, %s, lastmodifiedon, lastmodifiedby, createdfor) value" + + "('%s', '%s', '%s', '%s', '%s') ON DUPLICATE KEY UPDATE %s = '%s';"; + String query = String.format(template, urn.getEntityType(), columnName, urn, createAuditedAspect(metadata, aspectClass, createdOn, createdBy, createdFor), + new Timestamp(createdOn), createdBy, createdFor, columnName, createAuditedAspect(metadata, aspectClass, createdOn, createdBy, createdFor)); + _server.createSqlUpdate(query).execute(); + } + + private String createAuditedAspect(RecordTemplate metadata, Class aspectClass, + long createdOn, String createdBy, String createdFor) { + return metadata == null ? DELETED_VALUE : EbeanLocalAccess.toJsonString(new AuditedAspect() + .setAspect(RecordUtils.toJsonString(metadata)) + .setCanonicalName(aspectClass.getCanonicalName()) + .setLastmodifiedby(createdBy) + .setLastmodifiedon(new Timestamp(createdOn).toString()) + .setCreatedfor(createdFor, SetMode.IGNORE_NULL)); + } + + private void addMetadataWithAuditStamp(Urn urn, Class aspectClass, long version, RecordTemplate metadata, long timeStamp, String creator, String impersonator) { - EbeanMetadataAspect aspect = getMetadata(urn, aspectName, version, metadata); + EbeanMetadataAspect aspect = getMetadata(urn, aspectClass.getCanonicalName(), version, metadata); aspect.setCreatedOn(new Timestamp(timeStamp)); aspect.setCreatedBy(creator); aspect.setCreatedFor(impersonator); _server.save(aspect); + + if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY || _schemaConfig == SchemaConfig.DUAL_SCHEMA) { + addMetadataEntityTable(urn, aspectClass, metadata, version, timeStamp, creator, impersonator); + } } private EbeanMetadataIndex getRecordFromLocalIndex(long id) { @@ -3327,6 +3485,26 @@ private void addIndex(Urn urn, String aspectName, String pathName, Object val) { } private EbeanMetadataAspect getMetadata(Urn urn, String aspectName, long version) { + if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY && version == 0) { + String aspectColumn = getAspectColumnName(aspectName); + String template = "select urn, lastmodifiedon, lastmodifiedby, createdfor, %s from metadata_entity_%s"; + String query = String.format(template, aspectColumn, urn.getEntityType()); + SqlRow result = _server.createSqlQuery(query).findOne(); + if (result != null) { + EbeanMetadataAspect ema = new EbeanMetadataAspect(); + String metadata = extractAspectJsonString(result.getString(aspectColumn)); + if (metadata == null) { + metadata = DELETED_VALUE; + } + ema.setMetadata(metadata); + ema.setKey(new PrimaryKey(urn.toString(), aspectName, version)); + ema.setCreatedOn(result.getTimestamp("lastmodifiedon")); + ema.setCreatedBy(result.getString("lastmodifiedby")); + ema.setCreatedFor(result.getString("creatdfor")); + return ema; + } + return null; + } return _server.find(EbeanMetadataAspect.class, new EbeanMetadataAspect.PrimaryKey(urn.toString(), aspectName, version)); } diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/EBeanDAOUtilsTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/EBeanDAOUtilsTest.java index 5826ebc43..bf51fb91a 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/EBeanDAOUtilsTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/EBeanDAOUtilsTest.java @@ -11,6 +11,7 @@ import com.linkedin.testing.urn.FooUrn; import io.ebean.Ebean; import io.ebean.EbeanServer; +import io.ebean.SqlRow; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -25,7 +26,9 @@ import org.testng.annotations.Test; import static com.linkedin.testing.TestUtils.*; -import static org.testng.Assert.*; +import static org.mockito.Mockito.*; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertNotNull; @@ -513,4 +516,18 @@ public void testGetFromJdbc() { // sanity test on JDBC functions assertNotNull(EBeanDAOUtils.getWithJdbc(fooUrn.toString(), fooAspect.getClass().getCanonicalName(), server, key)); } + + @Test + public void testIsSoftDeletedAspect() { + SqlRow sqlRow = mock(SqlRow.class); + when(sqlRow.getString("a_aspectfoo")).thenReturn("{\"gma_deleted\": true}"); + assertTrue(EBeanDAOUtils.isSoftDeletedAspect(sqlRow, "a_aspectfoo")); + + when(sqlRow.getString("a_aspectbar")).thenReturn( + "{\"aspect\": {\"value\": \"bar\"}, \"lastmodifiedby\": \"urn:li:tester\"}"); + assertFalse(EBeanDAOUtils.isSoftDeletedAspect(sqlRow, "a_aspectbar")); + + when(sqlRow.getString("a_aspectbaz")).thenReturn("{\"random_value\": \"baz\"}"); + assertFalse(EBeanDAOUtils.isSoftDeletedAspect(sqlRow, "a_aspectbaz")); + } } \ No newline at end of file diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/SQLStatementUtilsTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/SQLStatementUtilsTest.java index ae6b1d7c7..606217147 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/SQLStatementUtilsTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/SQLStatementUtilsTest.java @@ -17,6 +17,7 @@ import com.linkedin.metadata.query.UrnField; import com.linkedin.testing.AspectFoo; import com.linkedin.testing.urn.FooUrn; +import java.net.URISyntaxException; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -56,7 +57,7 @@ public void testCreateAspectReadSql() { "SELECT urn, a_aspectfoo, lastmodifiedon, lastmodifiedby FROM metadata_entity_foo WHERE urn = 'urn:li:foo:1' " + "AND JSON_EXTRACT(a_aspectfoo, '$.gma_deleted') IS NULL UNION ALL SELECT urn, a_aspectfoo, lastmodifiedon, lastmodifiedby " + "FROM metadata_entity_foo WHERE urn = 'urn:li:foo:2' AND JSON_EXTRACT(a_aspectfoo, '$.gma_deleted') IS NULL"; - assertEquals(SQLStatementUtils.createAspectReadSql(AspectFoo.class, set), expectedSql); + assertEquals(SQLStatementUtils.createAspectReadSql(AspectFoo.class, set, false), expectedSql); } @Test @@ -260,4 +261,32 @@ public void testWhereClauseMultiFilters() { new Pair<>(filter2, "bar")), "(foo.i_aspectfoo$value='value2' AND (foo.urn='value1' OR foo.urn='value3')" + " AND foo.metadata$value='value4') AND (bar.urn='value1' OR bar.urn='value2')"); } + + @Test + public void testCreateListAspectByUrnSql() throws URISyntaxException { + FooUrn fooUrn = new FooUrn(1); + assertEquals(SQLStatementUtils.createListAspectByUrnSql(AspectFoo.class, fooUrn, true), + "SELECT urn, a_aspectfoo, lastmodifiedon, lastmodifiedby, createdfor FROM " + + "metadata_entity_foo WHERE urn = 'urn:li:foo:1' AND a_aspectfoo IS NOT NULL"); + assertEquals(SQLStatementUtils.createListAspectByUrnSql(AspectFoo.class, fooUrn, false), + "SELECT urn, a_aspectfoo, lastmodifiedon, lastmodifiedby, createdfor FROM " + + "metadata_entity_foo WHERE urn = 'urn:li:foo:1' AND a_aspectfoo IS NOT NULL AND JSON_EXTRACT(a_aspectfoo, '$.gma_deleted') IS NULL"); + } + + @Test + public void testCreateListAspectSql() throws URISyntaxException { + FooUrn fooUrn = new FooUrn(1); + String tableName = SQLSchemaUtils.getTableName(fooUrn.getEntityType()); + assertEquals( + SQLStatementUtils.createListAspectWithPaginationSql(AspectFoo.class, tableName, true, 0, 5), + "SELECT urn, a_aspectfoo, lastmodifiedon, lastmodifiedby, createdfor, (SELECT COUNT(urn) FROM " + + "metadata_entity_foo WHERE a_aspectfoo IS NOT NULL) as _total_count FROM metadata_entity_foo " + + "WHERE a_aspectfoo IS NOT NULL LIMIT 5 OFFSET 0"); + assertEquals( + SQLStatementUtils.createListAspectWithPaginationSql(AspectFoo.class, tableName, false, 0, 5), + "SELECT urn, a_aspectfoo, lastmodifiedon, lastmodifiedby, createdfor, (SELECT COUNT(urn) FROM " + + "metadata_entity_foo WHERE a_aspectfoo IS NOT NULL AND JSON_EXTRACT(a_aspectfoo, '$.gma_deleted') IS NULL) " + + "as _total_count FROM metadata_entity_foo WHERE a_aspectfoo IS NOT NULL AND " + + "JSON_EXTRACT(a_aspectfoo, '$.gma_deleted') IS NULL LIMIT 5 OFFSET 0"); + } } \ No newline at end of file