diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index 4631fe24b..7fe5ddcdb 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -30,6 +30,8 @@ import io.ebean.ExpressionList; import io.ebean.PagedList; import io.ebean.Query; +import io.ebean.RawSql; +import io.ebean.RawSqlBuilder; import io.ebean.Transaction; import io.ebean.config.ServerConfig; import io.ebean.datasource.DataSourceConfig; @@ -51,6 +53,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.persistence.RollbackException; +import javax.persistence.Table; import lombok.Value; import static com.linkedin.metadata.dao.EbeanMetadataAspect.*; @@ -71,6 +74,9 @@ public class EbeanLocalDAO private UrnPathExtractor _urnPathExtractor; private int _queryKeysCount = 0; // 0 means no pagination on keys + // TODO feature flag, remove when vetted. + private boolean _useUnionForBatch = false; + @Value static class GMAIndexPair { public String valueType; @@ -154,6 +160,18 @@ public EbeanLocalDAO(@Nonnull BaseMetadataEventProducer producer, @Nonnull Serve this(producer, createServer(serverConfig), storageConfig, urnClass, new EmptyPathExtractor<>()); } + /** + * Determines whether we should use UNION ALL statements for batch gets, rather than a large series of OR statements. + * + *

DO NOT USE THIS FLAG! This is for LinkedIn use to help us test this feature without a rollback. Once we've + * vetted this in production we will be removing this flag and making the the default behavior. So if you set this + * to true by calling this method, your code will break when we remove this method. Just wait a bit for us to turn + * it on by default! + */ + public void setUseUnionForBatch(boolean useUnionForBatch) { + _useUnionForBatch = useUnionForBatch; + } + @Nonnull private static EbeanServer createServer(@Nonnull ServerConfig serverConfig) { // Make sure that the serverConfig includes the package that contains DAO's Ebean model. @@ -246,7 +264,8 @@ protected T runInTransactionWithRetry(@Nonnull Supplier block, int maxTra @Override protected long saveLatest(@Nonnull URN urn, @Nonnull Class aspectClass, - @Nullable ASPECT oldValue, @Nullable AuditStamp oldAuditStamp, @Nonnull ASPECT newValue, @Nonnull AuditStamp newAuditStamp) { + @Nullable ASPECT oldValue, @Nullable AuditStamp oldAuditStamp, @Nonnull ASPECT newValue, + @Nonnull AuditStamp newAuditStamp) { // Save oldValue as the largest version + 1 long largestVersion = 0; if (oldValue != null && oldAuditStamp != null) { @@ -260,8 +279,8 @@ protected long saveLatest(@Nonnull URN urn, @Non } @Override - protected void updateLocalIndex(@Nonnull URN urn, - @Nonnull ASPECT newValue, long version) { + protected void updateLocalIndex(@Nonnull URN urn, @Nonnull ASPECT newValue, + long version) { if (!isLocalSecondaryIndexEnabled()) { throw new UnsupportedOperationException("Local secondary index isn't supported"); } @@ -311,13 +330,10 @@ protected void save(@Nonnull URN urn, @Nonnull RecordTemplate value, @Nonnull Au } } - protected long saveSingleRecordToLocalIndex(@Nonnull URN urn, @Nonnull String aspect, - @Nonnull String path, @Nonnull Object value) { + protected long saveSingleRecordToLocalIndex(@Nonnull URN urn, @Nonnull String aspect, @Nonnull String path, + @Nonnull Object value) { - final EbeanMetadataIndex record = new EbeanMetadataIndex() - .setUrn(urn.toString()) - .setAspect(aspect) - .setPath(path); + final EbeanMetadataIndex record = new EbeanMetadataIndex().setUrn(urn.toString()).setAspect(aspect).setPath(path); if (value instanceof Integer || value instanceof Long) { record.setLongVal(Long.valueOf(value.toString())); } else if (value instanceof Float || value instanceof Double) { @@ -342,15 +358,13 @@ private void updateUrnInLocalIndex(@Nonnull URN urn) { final Map pathValueMap = _urnPathExtractor.extractPaths(urn); pathValueMap.forEach( - (path, value) -> saveSingleRecordToLocalIndex(urn, urn.getClass().getCanonicalName(), path, value) - ); + (path, value) -> saveSingleRecordToLocalIndex(urn, urn.getClass().getCanonicalName(), path, value)); } private void updateAspectInLocalIndex(@Nonnull URN urn, @Nonnull ASPECT newValue) { if (!_storageConfig.getAspectStorageConfigMap().containsKey(newValue.getClass()) - || _storageConfig.getAspectStorageConfigMap().get(newValue.getClass()) == null - ) { + || _storageConfig.getAspectStorageConfigMap().get(newValue.getClass()) == null) { return; } // step1: remove all rows from the index table corresponding to pair @@ -422,7 +436,7 @@ protected void applyTimeBasedRetention(@Nonnull final List records; if (_queryKeysCount == 0) { - records = batchGet(keys); + records = batchGet(keys, keys.size()); } else { records = batchGet(keys, _queryKeysCount); } @@ -443,7 +457,7 @@ protected void applyTimeBasedRetention(@Nonnull return Collections.emptyMap(); } - final List records = batchGet(keys); + final List records = batchGet(keys, keys.size()); final Map, AspectWithExtraInfo> result = new HashMap<>(); @@ -454,31 +468,8 @@ protected void applyTimeBasedRetention(@Nonnull return result; } - public boolean existsInLocalIndex(@Nonnull URN urn) { - return _server.find(EbeanMetadataIndex.class) - .where().eq(URN_COLUMN, urn.toString()) - .exists(); - } - - // Will be migrated to use {@link #batchGet(Set>, int)} - @Nonnull - private List batchGet(@Nonnull Set> keys) { - - ExpressionList query = _server.find(EbeanMetadataAspect.class).select(ALL_COLUMNS).where(); - if (keys.size() > 1) { - query = query.or(); - } - - for (AspectKey key : keys) { - query = query.and() - .eq(URN_COLUMN, key.getUrn().toString()) - .eq(ASPECT_COLUMN, ModelUtils.getAspectName(key.getAspectClass())) - .eq(VERSION_COLUMN, key.getVersion()) - .endAnd(); - } - - return query.findList(); + return _server.find(EbeanMetadataIndex.class).where().eq(URN_COLUMN, urn.toString()).exists(); } /** @@ -499,7 +490,8 @@ public void setQueryKeysCount(int keysCount) { * @param keysCount the max number of keys for each sub query */ @Nonnull - private List batchGet(@Nonnull Set> keys, int keysCount) { + private List batchGet(@Nonnull Set> keys, + int keysCount) { int position = 0; final int totalPageCount = QueryUtils.getTotalPageCount(keys.size(), keysCount); @@ -513,8 +505,68 @@ private List batchGet(@Nonnull Set outputParamsToValues) { + final String urnArg = "urn" + selectId; + final String aspectArg = "aspect" + selectId; + final String versionArg = "version" + selectId; + + outputParamsToValues.put(urnArg, urn); + outputParamsToValues.put(aspectArg, aspect); + outputParamsToValues.put(versionArg, version); + + return String.format("SELECT urn, aspect, version, metadata, createdOn, createdBy, createdFor " + + "FROM %s WHERE urn = :%s AND aspect = :%s AND version = :%s", + EbeanMetadataAspect.class.getAnnotation(Table.class).name(), urnArg, aspectArg, versionArg); + } + @Nonnull - private List batchGetHelper(@Nonnull List> keys, int keysCount, int position) { + private List batchGetUnion(@Nonnull List> keys, + int keysCount, int position) { + + // Build one SELECT per key and then UNION ALL the results. This can be much more performant than OR'ing the + // conditions together. Our query will look like: + // SELECT * FROM metadata_aspect WHERE urn = 'urn0' AND aspect = 'aspect0' AND version = 0 + // UNION ALL + // SELECT * FROM metadata_aspect WHERE urn = 'urn0' AND aspect = 'aspect1' AND version = 0 + // ... + // Note: UNION ALL should be safe and more performant than UNION. We're selecting the entire entity key (as well + // as data), so each result should be unique. No need to deduplicate. + // Another note: ebean doesn't support UNION ALL, so we need to manually build the SQL statement ourselves. + final StringBuilder sb = new StringBuilder(); + final int end = Math.min(keys.size(), position + keysCount); + final Map params = new HashMap<>(); + for (int index = position; index < end; index++) { + sb.append(batchGetSelect(index - position, keys.get(index).getUrn().toString(), + ModelUtils.getAspectName(keys.get(index).getAspectClass()), keys.get(index).getVersion(), params)); + + if (index != end - 1) { + sb.append(" UNION ALL "); + } + } + + final RawSql rawSql = RawSqlBuilder.parse(sb.toString()) + .columnMapping(URN_COLUMN, "key.urn") + .columnMapping(ASPECT_COLUMN, "key.aspect") + .columnMapping(VERSION_COLUMN, "key.version") + .create(); + + final Query query = _server.find(EbeanMetadataAspect.class).setRawSql(rawSql); + + for (Map.Entry param : params.entrySet()) { + query.setParameter(param.getKey(), param.getValue()); + } + + return query.findList(); + } + + @Nonnull + private List batchGetOr(@Nonnull List> keys, + int keysCount, int position) { ExpressionList query = _server.find(EbeanMetadataAspect.class).select(ALL_COLUMNS).where(); // add or if it is not the last element @@ -533,6 +585,17 @@ private List batchGetHelper(@Nonnull List batchGetHelper(@Nonnull List> keys, + int keysCount, int position) { + // TODO remove batchGetOr, make batchGetUnion the only implementation. + if (_useUnionForBatch) { + return batchGetUnion(keys, keysCount, position); + } else { + return batchGetOr(keys, keysCount, position); + } + } + /** * Checks if an {@link AspectKey} and a {@link PrimaryKey} for Ebean are equivalent. * @@ -584,7 +647,8 @@ public ListResult listUrns(@Nonnull Class urns = pagedList.getList().stream().map(entry -> getUrn(entry.getKey().getUrn())).collect(Collectors.toList()); + final List urns = + pagedList.getList().stream().map(entry -> getUrn(entry.getKey().getUrn())).collect(Collectors.toList()); return toListResult(urns, null, pagedList, start); } @@ -608,8 +672,8 @@ public ListResult list(@Nonnull Class aspects = pagedList.getList().stream().map(a -> toRecordTemplate(aspectClass, a)).collect(Collectors.toList()); - final ListResultMetadata listResultMetadata = - makeListResultMetadata(pagedList.getList().stream().map(EbeanLocalDAO::toExtraInfo).collect(Collectors.toList())); + final ListResultMetadata listResultMetadata = makeListResultMetadata( + pagedList.getList().stream().map(EbeanLocalDAO::toExtraInfo).collect(Collectors.toList())); return toListResult(aspects, listResultMetadata, pagedList, start); } @@ -633,8 +697,8 @@ public ListResult list(@Nonnull Class aspects = pagedList.getList().stream().map(a -> toRecordTemplate(aspectClass, a)).collect(Collectors.toList()); - final ListResultMetadata listResultMetadata = - makeListResultMetadata(pagedList.getList().stream().map(EbeanLocalDAO::toExtraInfo).collect(Collectors.toList())); + final ListResultMetadata listResultMetadata = makeListResultMetadata( + pagedList.getList().stream().map(EbeanLocalDAO::toExtraInfo).collect(Collectors.toList())); return toListResult(aspects, listResultMetadata, pagedList, start); } @@ -671,7 +735,8 @@ private static AspectWithExtraInfo toRec @Nonnull private ListResult toListResult(@Nonnull List values, @Nullable ListResultMetadata listResultMetadata, @Nonnull PagedList pagedList, @Nullable Integer start) { - final int nextStart = (start != null && pagedList.hasNext()) ? start + pagedList.getList().size() : ListResult.INVALID_NEXT_START; + final int nextStart = + (start != null && pagedList.hasNext()) ? start + pagedList.getList().size() : ListResult.INVALID_NEXT_START; return ListResult.builder() // Format .values(values) @@ -745,7 +810,7 @@ static GMAIndexPair getGMAIndexPair(@Nonnull IndexValue indexValue) { if (indexValue.isBoolean()) { object = indexValue.getBoolean().toString(); return new GMAIndexPair(EbeanMetadataIndex.STRING_COLUMN, object); - } else if (indexValue.isDouble()) { + } else if (indexValue.isDouble()) { object = indexValue.getDouble(); return new GMAIndexPair(EbeanMetadataIndex.DOUBLE_COLUMN, object); } else if (indexValue.isFloat()) { @@ -775,8 +840,8 @@ static GMAIndexPair getGMAIndexPair(@Nonnull IndexValue indexValue) { * @param lastUrn string representation of the urn whose value is used to set the last urn parameter in index query * @param pageSize maximum number of distinct urns to return which is essentially the LIMIT clause of SQL query */ - private static void setParameters(@Nonnull IndexCriterionArray indexCriterionArray, @Nonnull Query indexQuery, - @Nonnull String lastUrn, int pageSize) { + private static void setParameters(@Nonnull IndexCriterionArray indexCriterionArray, + @Nonnull Query indexQuery, @Nonnull String lastUrn, int pageSize) { indexQuery.setParameter(1, lastUrn); int pos = 2; for (IndexCriterion criterion : indexCriterionArray) { @@ -792,7 +857,8 @@ private static void setParameters(@Nonnull IndexCriterionArray indexCriterionArr @Nonnull private static String getStringForOperator(@Nonnull Condition condition) { if (!CONDITION_STRING_MAP.containsKey(condition)) { - throw new UnsupportedOperationException(condition.toString() + " condition is not supported in local secondary index"); + throw new UnsupportedOperationException( + condition.toString() + " condition is not supported in local secondary index"); } return CONDITION_STRING_MAP.get(condition); } @@ -807,8 +873,9 @@ private static String getStringForOperator(@Nonnull Condition condition) { @Nonnull private static String constructSQLQuery(@Nonnull IndexCriterionArray indexCriterionArray) { String selectClause = "SELECT DISTINCT(t0.urn) FROM metadata_index t0"; - selectClause += IntStream.range(1, indexCriterionArray.size()).mapToObj(i -> " INNER JOIN metadata_index " + "t" - + i + " ON t0.urn = " + "t" + i + ".urn").collect(Collectors.joining("")); + selectClause += IntStream.range(1, indexCriterionArray.size()) + .mapToObj(i -> " INNER JOIN metadata_index " + "t" + i + " ON t0.urn = " + "t" + i + ".urn") + .collect(Collectors.joining("")); final StringBuilder whereClause = new StringBuilder("WHERE t0.urn > ?"); IntStream.range(0, indexCriterionArray.size()).forEach(i -> { final IndexCriterion criterion = indexCriterionArray.get(i); @@ -860,7 +927,8 @@ public List listUrns(@Nonnull IndexFilter indexFilter, @Nullable URN lastUr throw new UnsupportedOperationException("Empty Index Filter is not supported by EbeanLocalDAO"); } if (indexCriterionArray.size() > 10) { - throw new UnsupportedOperationException("Currently more than 10 filter conditions is not supported by EbeanLocalDAO"); + throw new UnsupportedOperationException( + "Currently more than 10 filter conditions is not supported by EbeanLocalDAO"); } addEntityTypeFilter(indexFilter); @@ -872,8 +940,6 @@ public List listUrns(@Nonnull IndexFilter indexFilter, @Nullable URN lastUr final List pagedList = query.findList(); - return pagedList.stream() - .map(entry -> getUrn(entry.getUrn())) - .collect(Collectors.toList()); + return pagedList.stream().map(entry -> getUrn(entry.getUrn())).collect(Collectors.toList()); } } \ No newline at end of file diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java index a9a6c8891..b5b40c4e4 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java @@ -55,9 +55,12 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.annotation.Nonnull; import javax.persistence.RollbackException; import org.mockito.InOrder; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; import org.testng.annotations.Test; import static com.linkedin.common.AuditStamps.*; @@ -71,6 +74,20 @@ public class EbeanLocalDAOTest { private EbeanServer _server; private BaseMetadataEventProducer _mockProducer; private AuditStamp _dummyAuditStamp; + // TODO delete this flag and stop running all tests twice for it. + private boolean _useUnionForBatch; + + @Factory(dataProvider = "inputList") + public EbeanLocalDAOTest(boolean useUnionForBatch) { + _useUnionForBatch = useUnionForBatch; + } + + @DataProvider + public static Object[][] inputList() { + return new Object[][]{ + {false}, {true} + }; + } @BeforeMethod public void setupTest() { @@ -79,10 +96,23 @@ public void setupTest() { _dummyAuditStamp = makeAuditStamp("foo", 1234); } + @Nonnull + private EbeanLocalDAO createDao(@Nonnull EbeanServer server, + @Nonnull Class urnClass) { + final EbeanLocalDAO dao = + new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, server, urnClass); + dao.setUseUnionForBatch(_useUnionForBatch); + return dao; + } + + @Nonnull + private EbeanLocalDAO createDao(@Nonnull Class urnClass) { + return createDao(_server, urnClass); + } + @Test(expectedExceptions = InvalidMetadataType.class) public void testMetadataAspectCheck() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); dao.add(makeFooUrn(1), new AspectInvalid().setValue("invalid"), _dummyAuditStamp); } @@ -91,8 +121,7 @@ public void testMetadataAspectCheck() { public void testAddOne() { Clock mockClock = mock(Clock.class); when(mockClock.millis()).thenReturn(1234L); - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); dao.setClock(mockClock); FooUrn urn = makeFooUrn(1); String aspectName = ModelUtils.getAspectName(AspectFoo.class); @@ -121,8 +150,7 @@ public void testAddOne() { @Test public void testAddTwo() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn = makeFooUrn(1); String aspectName = ModelUtils.getAspectName(AspectFoo.class); AspectFoo v1 = new AspectFoo().setValue("foo"); @@ -147,8 +175,7 @@ public void testAddTwo() { @Test public void testDefaultEqualityTester() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); dao.setEqualityTester(AspectFoo.class, DefaultEqualityTester.newInstance()); FooUrn urn = makeFooUrn(1); String aspectName = ModelUtils.getAspectName(AspectFoo.class); @@ -179,8 +206,7 @@ public void testDefaultEqualityTester() { @Test public void testAlwaysFalseEqualityTester() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); dao.setEqualityTester(AspectFoo.class, AlwaysFalseEqualityTester.newInstance()); FooUrn urn = makeFooUrn(1); String aspectName = ModelUtils.getAspectName(AspectFoo.class); @@ -205,8 +231,7 @@ public void testAlwaysFalseEqualityTester() { @Test public void testVersionBasedRetention() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); dao.setRetention(AspectFoo.class, new VersionBasedRetention(2)); FooUrn urn = makeFooUrn(1); String aspectName = ModelUtils.getAspectName(AspectFoo.class); @@ -232,8 +257,7 @@ public void testTimeBasedRetention() { .thenReturn(20L) // v2 age check .thenReturn(120L); // v3 age check - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); dao.setClock(mockClock); dao.setRetention(AspectFoo.class, new TimeBasedRetention(100)); FooUrn urn = makeFooUrn(1); @@ -258,8 +282,7 @@ public void testAddSuccessAfterRetry() { when(server.beginTransaction()).thenReturn(mockTransaction); when(server.find(any(), any())).thenReturn(null); doThrow(RollbackException.class).doNothing().when(server).insert(any(EbeanMetadataAspect.class)); - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, server, FooUrn.class); + EbeanLocalDAO dao = createDao(server, FooUrn.class); dao.add(makeFooUrn(1), new AspectFoo().setValue("foo"), _dummyAuditStamp); } @@ -271,16 +294,14 @@ public void testAddFailedAfterRetry() { when(server.beginTransaction()).thenReturn(mockTransaction); when(server.find(any(), any())).thenReturn(null); doThrow(RollbackException.class).when(server).insert(any(EbeanMetadataAspect.class)); - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, server, FooUrn.class); + EbeanLocalDAO dao = createDao(server, FooUrn.class); dao.add(makeFooUrn(1), new AspectFoo().setValue("foo"), _dummyAuditStamp); } @Test public void testGetNonExisting() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn = makeFooUrn(1); Optional foo = dao.get(AspectFoo.class, urn); @@ -290,8 +311,7 @@ public void testGetNonExisting() { @Test public void testGetCapsSensitivity() { - final EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, Urn.class); + final EbeanLocalDAO dao = createDao(Urn.class); final Urn urnCaps = makeUrn("Dataset"); final Urn urnLower = makeUrn("dataset"); @@ -313,8 +333,7 @@ public void testGetCapsSensitivity() { @Test public void testGetLatestVersion() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn = makeFooUrn(1); AspectFoo v0 = new AspectFoo().setValue("foo"); addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, v0); @@ -329,8 +348,7 @@ public void testGetLatestVersion() { @Test public void testGetSpecificVersion() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn = makeFooUrn(1); AspectFoo v0 = new AspectFoo().setValue("foo"); addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, v0); @@ -345,8 +363,7 @@ public void testGetSpecificVersion() { @Test public void testGetMultipleAspects() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn = makeFooUrn(1); AspectFoo fooV0 = new AspectFoo().setValue("foo"); addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, fooV0); @@ -365,8 +382,7 @@ public void testGetMultipleAspects() { @Test public void testGetMultipleAspectsForMultipleUrns() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); // urn1 has both foo & bar FooUrn urn1 = makeFooUrn(1); @@ -397,8 +413,7 @@ public void testGetMultipleAspectsForMultipleUrns() { @Test public void testBackfill() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn = makeFooUrn(1); AspectFoo expected = new AspectFoo().setValue("foo"); @@ -413,23 +428,38 @@ public void testBackfill() { } @Test - public void testLocalSecondaryIndexBackfill() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + public void testLocalSecondaryIndexBackfillDisabled() { + // given + EbeanLocalDAO dao = createDao(FooUrn.class); dao.setUrnPathExtractor(new FooUrnPathExtractor()); FooUrn urn = makeFooUrn(1); AspectFoo expected = new AspectFoo().setValue("foo"); addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, expected); - - // Check if backfilled: _writeToLocalSecondary = false dao.backfill(AspectFoo.class, urn); + + // then when assertEquals(getAllRecordsFromLocalIndex(urn).size(), 0); + } + + + @Test + public void testLocalSecondaryIndexBackfillEnabled() { + // given + EbeanLocalDAO dao = createDao(FooUrn.class); + dao.setUrnPathExtractor(new FooUrnPathExtractor()); + + FooUrn urn = makeFooUrn(1); + AspectFoo expected = new AspectFoo().setValue("foo"); + addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, expected); - // Check if backfilled: _writeToLocalSecondary = true dao.enableLocalSecondaryIndex(true); dao.backfill(AspectFoo.class, urn); + + // when List fooRecords = getAllRecordsFromLocalIndex(urn); + + // then assertEquals(fooRecords.size(), 1); EbeanMetadataIndex fooRecord = fooRecords.get(0); assertEquals(fooRecord.getUrn(), urn.toString()); @@ -439,9 +469,9 @@ public void testLocalSecondaryIndexBackfill() { } @Test - public void testBackfillWithUrns() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + public void testBackfillSingleAspect() { + // given + EbeanLocalDAO dao = createDao(FooUrn.class); List urns = ImmutableList.of(makeFooUrn(1), makeFooUrn(2), makeFooUrn(3)); Map, RecordTemplate>> aspects = new HashMap<>(); @@ -454,28 +484,67 @@ public void testBackfillWithUrns() { addMetadata(urn, AspectBar.class.getCanonicalName(), 0, aspectBar); }); - // Backfill single aspect for set of urns + // when Map, Optional>> backfilledAspects = dao.backfill(Collections.singleton(AspectFoo.class), new HashSet<>(urns)); + + // then for (Urn urn : urns) { RecordTemplate aspect = aspects.get(urn).get(AspectFoo.class); assertEquals(backfilledAspects.get(urn).get(AspectFoo.class).get(), aspect); verify(_mockProducer, times(1)).produceMetadataAuditEvent(urn, aspect, aspect); } - clearInvocations(_mockProducer); + } - // Backfill set of aspects for a single urn - backfilledAspects = + @Test + public void testBackfillMultipleAspectsOneUrn() { + // given + EbeanLocalDAO dao = createDao(FooUrn.class); + List urns = ImmutableList.of(makeFooUrn(1)); + + Map, RecordTemplate>> aspects = new HashMap<>(); + + urns.forEach(urn -> { + AspectFoo aspectFoo = new AspectFoo().setValue("foo"); + AspectBar aspectBar = new AspectBar().setValue("bar"); + aspects.put(urn, ImmutableMap.of(AspectFoo.class, aspectFoo, AspectBar.class, aspectBar)); + addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, aspectFoo); + addMetadata(urn, AspectBar.class.getCanonicalName(), 0, aspectBar); + }); + + // when + Map, Optional>> backfilledAspects = dao.backfill(ImmutableSet.of(AspectFoo.class, AspectBar.class), Collections.singleton(urns.get(0))); + + // then for (Class clazz : aspects.get(urns.get(0)).keySet()) { RecordTemplate aspect = aspects.get(urns.get(0)).get(clazz); assertEquals(backfilledAspects.get(urns.get(0)).get(clazz).get(), aspect); verify(_mockProducer, times(1)).produceMetadataAuditEvent(urns.get(0), aspect, aspect); } - clearInvocations(_mockProducer); + } + + @Test + public void testBackfillMultipleAspectsMultipleUrns() { + // given + EbeanLocalDAO dao = createDao(FooUrn.class); + List urns = ImmutableList.of(makeFooUrn(1), makeFooUrn(2), makeFooUrn(3)); + + Map, RecordTemplate>> aspects = new HashMap<>(); - // Backfill set of aspects for set of urns - backfilledAspects = dao.backfill(ImmutableSet.of(AspectFoo.class, AspectBar.class), new HashSet<>(urns)); + urns.forEach(urn -> { + AspectFoo aspectFoo = new AspectFoo().setValue("foo"); + AspectBar aspectBar = new AspectBar().setValue("bar"); + aspects.put(urn, ImmutableMap.of(AspectFoo.class, aspectFoo, AspectBar.class, aspectBar)); + addMetadata(urn, AspectFoo.class.getCanonicalName(), 0, aspectFoo); + addMetadata(urn, AspectBar.class.getCanonicalName(), 0, aspectBar); + }); + + // when + Map, Optional>> backfilledAspects = + dao.backfill(ImmutableSet.of(AspectFoo.class, AspectBar.class), new HashSet<>(urns)); + + // then for (Urn urn : urns) { for (Class clazz : aspects.get(urn).keySet()) { RecordTemplate aspect = aspects.get(urn).get(clazz); @@ -492,7 +561,8 @@ public void testBackfillUsingSCSI() { makeLocalDAOStorageConfig(AspectFoo.class, Collections.singletonList("/value"), AspectBar.class, Collections.singletonList("/value")); EbeanLocalDAO dao = - new EbeanLocalDAO(_mockProducer, _server, storageConfig, FooUrn.class); + new EbeanLocalDAO<>(_mockProducer, _server, storageConfig, FooUrn.class); + dao.setUseUnionForBatch(_useUnionForBatch); dao.enableLocalSecondaryIndex(true); List urns = ImmutableList.of(makeFooUrn(1), makeFooUrn(2), makeFooUrn(3)); @@ -555,8 +625,7 @@ public void testBackfillUsingSCSI() { @Test public void testListVersions() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn = makeFooUrn(1); List versions = new ArrayList<>(); for (long i = 0; i < 6; i++) { @@ -603,8 +672,7 @@ private static IndexCriterionArray makeIndexCriterionArray(int size) { @Test void testListUrnsFromIndexManyFilters() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); dao.enableLocalSecondaryIndex(true); FooUrn urn1 = makeFooUrn(1); FooUrn urn2 = makeFooUrn(2); @@ -735,8 +803,7 @@ void testListUrnsFromIndexManyFilters() { @Test public void testListUrns() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); AspectFoo foo = new AspectFoo().setValue("foo"); List urns = new ArrayList<>(); for (int i = 0; i < 3; i++) { @@ -780,7 +847,8 @@ public void testGetAspectsWithIndexFilter() { LocalDAOStorageConfig storageConfig = makeLocalDAOStorageConfig(AspectFoo.class, Collections.singletonList("/value")); EbeanLocalDAO dao = - new EbeanLocalDAO(_mockProducer, _server, storageConfig, FooUrn.class); + new EbeanLocalDAO<>(_mockProducer, _server, storageConfig, FooUrn.class); + dao.setUseUnionForBatch(_useUnionForBatch); dao.enableLocalSecondaryIndex(true); dao.setUrnPathExtractor(new FooUrnPathExtractor()); @@ -826,8 +894,7 @@ public void testGetAspectsWithIndexFilter() { @Test public void testList() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); List foos = new LinkedList<>(); for (int i = 0; i < 3; i++) { FooUrn urn = makeFooUrn(i); @@ -918,8 +985,7 @@ void testStrongConsistentIndexPaths() { @Test public void testListAspectsForAllUrns() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); for (int i = 0; i < 3; i++) { FooUrn urn = makeFooUrn(i); @@ -971,8 +1037,7 @@ public void testListAspectsForAllUrns() { @Test void testNewStringId() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); String id1 = dao.newStringId(); String id2 = dao.newStringId(); @@ -985,8 +1050,7 @@ void testNewStringId() { @Test void testNewNumericId() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); long id1 = dao.newNumericId("namespace"); long id2 = dao.newNumericId("namespace"); long id3 = dao.newNumericId("another namespace"); @@ -998,8 +1062,7 @@ void testNewNumericId() { @Test void testSaveSingleEntryToLocalIndex() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, BarUrn.class); + EbeanLocalDAO dao = createDao(BarUrn.class); BarUrn urn = makeBarUrn(0); // Test indexing integer typed value @@ -1059,8 +1122,7 @@ record = getRecordFromLocalIndex(recordId); @Test void testExistsInLocalIndex() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, BarUrn.class); + EbeanLocalDAO dao = createDao(BarUrn.class); BarUrn urn = makeBarUrn(0); assertFalse(dao.existsInLocalIndex(urn)); @@ -1072,12 +1134,10 @@ void testExistsInLocalIndex() { @Test void testUpdateUrnInLocalIndex() { // only urn will be updated since storage config has not been provided - EbeanLocalDAO dao1 = - new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server, BarUrn.class); + EbeanLocalDAO dao1 = createDao(BarUrn.class); dao1.enableLocalSecondaryIndex(true); dao1.setUrnPathExtractor(new BarUrnPathExtractor()); - EbeanLocalDAO dao2 = - new EbeanLocalDAO(EntityAspectUnion.class, _mockProducer, _server, BazUrn.class); + EbeanLocalDAO dao2 = createDao(BazUrn.class); dao2.enableLocalSecondaryIndex(true); dao2.setUrnPathExtractor(new BazUrnPathExtractor()); @@ -1125,6 +1185,7 @@ void testEmptyAspectStorageConfigMap() { EbeanLocalDAO dao = new EbeanLocalDAO(_mockProducer, _server, storageConfig, FooUrn.class, new FooUrnPathExtractor()); + dao.setUseUnionForBatch(_useUnionForBatch); dao.enableLocalSecondaryIndex(true); AspectFoo aspect = new AspectFoo().setValue("val1"); @@ -1150,7 +1211,8 @@ void testNullPathStorageConfigMap() { LocalDAOStorageConfig storageConfig = LocalDAOStorageConfig.builder().aspectStorageConfigMap(aspectStorageConfigMap).build(); EbeanLocalDAO dao = - new EbeanLocalDAO(_mockProducer, _server, storageConfig, FooUrn.class); + new EbeanLocalDAO<>(_mockProducer, _server, storageConfig, FooUrn.class); + dao.setUseUnionForBatch(_useUnionForBatch); dao.enableLocalSecondaryIndex(true); dao.setUrnPathExtractor(new FooUrnPathExtractor()); AspectFoo aspect = new AspectFoo().setValue("val2"); @@ -1168,8 +1230,9 @@ void testNullPathStorageConfigMap() { @Test void testUpdateUrnAndAspectInLocalIndex() { - EbeanLocalDAO dao = new EbeanLocalDAO(_mockProducer, _server, + EbeanLocalDAO dao = new EbeanLocalDAO<>(_mockProducer, _server, makeLocalDAOStorageConfig(AspectFooEvolved.class, Arrays.asList("/value", "/newValue")), FooUrn.class); + dao.setUseUnionForBatch(_useUnionForBatch); dao.enableLocalSecondaryIndex(true); dao.setUrnPathExtractor(new FooUrnPathExtractor()); FooUrn urn = makeFooUrn(1); @@ -1234,8 +1297,7 @@ void testUpdateUrnAndAspectInLocalIndex() { @Test void testUpdateLocalIndex() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, BarUrn.class); + EbeanLocalDAO dao = createDao(BarUrn.class); dao.enableLocalSecondaryIndex(true); dao.setUrnPathExtractor(new BarUrnPathExtractor()); @@ -1300,8 +1362,7 @@ void testGetGMAIndexPair() { @Test void testListUrnsFromIndex() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn1 = makeFooUrn(1); FooUrn urn2 = makeFooUrn(2); FooUrn urn3 = makeFooUrn(3); @@ -1366,8 +1427,7 @@ void testListUrnsFromIndex() { @Test void testListUrnsFromIndexZeroSize() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn1 = makeFooUrn(1); FooUrn urn2 = makeFooUrn(2); String aspect = "aspect" + System.currentTimeMillis(); @@ -1389,8 +1449,7 @@ void testListUrnsFromIndexZeroSize() { @Test void testAddEntityTypeFilter() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); String aspect = "aspect" + System.currentTimeMillis(); IndexValue indexValue = new IndexValue(); @@ -1417,10 +1476,8 @@ void testAddEntityTypeFilter() { @Test void testListUrnsFromIndexForAnEntity() { - EbeanLocalDAO dao1 = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); - EbeanLocalDAO dao2 = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, BarUrn.class); + EbeanLocalDAO dao1 = createDao(FooUrn.class); + EbeanLocalDAO dao2 = createDao(BarUrn.class); dao1.enableLocalSecondaryIndex(true); dao2.enableLocalSecondaryIndex(true); dao1.setUrnPathExtractor(new FooUrnPathExtractor()); @@ -1450,8 +1507,7 @@ void testListUrnsFromIndexForAnEntity() { @Test void testGetUrn() { // case 1: valid urn - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); String urn1 = "urn:li:foo:1"; FooUrn fooUrn = makeFooUrn(1); @@ -1468,8 +1524,7 @@ void testGetUrn() { @Test public void testGetWithExtraInfoLatestVersion() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn = makeFooUrn(1); AspectFoo v0 = new AspectFoo().setValue("foo"); Urn creator1 = Urns.createFromTypeSpecificString("test", "testCreator1"); @@ -1491,8 +1546,7 @@ public void testGetWithExtraInfoLatestVersion() { @Test public void testGetWithExtraInfoSpecificVersion() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn = makeFooUrn(1); AspectFoo v0 = new AspectFoo().setValue("foo"); Urn creator1 = Urns.createFromTypeSpecificString("test", "testCreator1"); @@ -1514,8 +1568,7 @@ public void testGetWithExtraInfoSpecificVersion() { @Test public void testGetWithExtraInfoMultipleKeys() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn urn = makeFooUrn(1); Urn creator1 = Urns.createFromTypeSpecificString("test", "testCreator1"); Urn impersonator1 = Urns.createFromTypeSpecificString("test", "testImpersonator1"); @@ -1558,8 +1611,8 @@ public void testGetWithExtraInfoMultipleKeys() { @Test public void testGetWithKeysCount() { - EbeanLocalDAO dao = - new EbeanLocalDAO<>(EntityAspectUnion.class, _mockProducer, _server, FooUrn.class); + // given + EbeanLocalDAO dao = createDao(FooUrn.class); FooUrn fooUrn = makeFooUrn(1); @@ -1574,9 +1627,37 @@ public void testGetWithKeysCount() { addMetadata(fooUrn, AspectBar.class.getCanonicalName(), 0, barV0); // batch get without query keys count set + // when Map, Optional> records = dao.get(new HashSet<>(Arrays.asList(aspectKey1, aspectKey2))); + + // then assertEquals(records.size(), 2); + } + + @Test + public void testNegativeIsInvalidKeyCount() { + // given + EbeanLocalDAO dao = createDao(FooUrn.class); + + // expect + assertThrows(IllegalArgumentException.class, () -> dao.setQueryKeysCount(-1)); + } + + public void testGetWithQuerySize(int querySize) { + // given + EbeanLocalDAO dao = createDao(FooUrn.class); + FooUrn fooUrn = makeFooUrn(1); + + // both aspect keys exist + AspectKey aspectKey1 = new AspectKey<>(AspectFoo.class, fooUrn, 1L); + AspectKey aspectKey2 = new AspectKey<>(AspectBar.class, fooUrn, 0L); + + // add metadata + AspectFoo fooV1 = new AspectFoo().setValue("foo"); + addMetadata(fooUrn, AspectFoo.class.getCanonicalName(), 1, fooV1); + AspectBar barV0 = new AspectBar().setValue("bar"); + addMetadata(fooUrn, AspectBar.class.getCanonicalName(), 0, barV0); FooUrn fooUrn2 = makeFooUrn(2); AspectKey aspectKey3 = new AspectKey<>(AspectFoo.class, fooUrn2, 0L); @@ -1591,42 +1672,49 @@ public void testGetWithKeysCount() { AspectBar barV5 = new AspectBar().setValue("bar5"); addMetadata(fooUrn2, AspectBar.class.getCanonicalName(), 0, barV5); - assertThrows(IllegalArgumentException.class, () -> dao.setQueryKeysCount(-1)); + dao.setQueryKeysCount(querySize); - dao.setQueryKeysCount(0); + // when Map, Optional> fiveRecords = dao.get(new HashSet<>(Arrays.asList(aspectKey1, aspectKey2, aspectKey3, aspectKey4, aspectKey5))); + + // then assertEquals(fiveRecords.size(), 5); + } - dao.setQueryKeysCount(1); - Map, Optional> fiveRecords1 = - dao.get(new HashSet<>(Arrays.asList(aspectKey1, aspectKey2, aspectKey3, aspectKey4, aspectKey5))); - assertEquals(fiveRecords1, fiveRecords); + @Test + public void testNoPaging() { + testGetWithQuerySize(0); + } - dao.setQueryKeysCount(2); - Map, Optional> fiveRecords2 = - dao.get(new HashSet<>(Arrays.asList(aspectKey1, aspectKey2, aspectKey3, aspectKey4, aspectKey5))); - assertEquals(fiveRecords2, fiveRecords); + @Test + public void testPageSizeOne() { + testGetWithQuerySize(1); + } - dao.setQueryKeysCount(3); - Map, Optional> fiveRecords3 = - dao.get(new HashSet<>(Arrays.asList(aspectKey1, aspectKey2, aspectKey3, aspectKey4, aspectKey5))); - assertEquals(fiveRecords3, fiveRecords); + @Test + public void testPageSizeTwo() { + testGetWithQuerySize(2); + } - dao.setQueryKeysCount(4); - Map, Optional> fiveRecords4 = - dao.get(new HashSet<>(Arrays.asList(aspectKey1, aspectKey2, aspectKey3, aspectKey4, aspectKey5))); - assertEquals(fiveRecords4, fiveRecords); + @Test + public void testPageSizeThree() { + testGetWithQuerySize(3); + } - dao.setQueryKeysCount(5); - Map, Optional> fiveRecords5 = - dao.get(new HashSet<>(Arrays.asList(aspectKey1, aspectKey2, aspectKey3, aspectKey4, aspectKey5))); - assertEquals(fiveRecords5, fiveRecords); + @Test + public void testPageSizeFour() { + testGetWithQuerySize(4); + } - dao.setQueryKeysCount(1000); - Map, Optional> fiveRecord1000 = - dao.get(new HashSet<>(Arrays.asList(aspectKey1, aspectKey2, aspectKey3, aspectKey4, aspectKey5))); - assertEquals(fiveRecord1000, fiveRecords); + @Test + public void testPageSizeSameAsResultSize() { + testGetWithQuerySize(5); + } + + @Test + public void testPageSizeGreaterThanResultsSize() { + testGetWithQuerySize(1000); } private void addMetadata(Urn urn, String aspectName, long version, RecordTemplate metadata) {