Skip to content

Commit

Permalink
feat(relationship 2.0): use REMOVE_ALL_EDGES_FROM_SOURCE by default f…
Browse files Browse the repository at this point in the history
…or ingesting 2.0 relationships (#463)
  • Loading branch information
jsdonn authored Nov 6, 2024
1 parent cf25612 commit a630243
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.exception.ModelConversionException;
Expand All @@ -18,10 +17,10 @@
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.TimeBasedRetention;
import com.linkedin.metadata.dao.retention.VersionBasedRetention;
import com.linkedin.metadata.dao.urnpath.EmptyPathExtractor;
import com.linkedin.metadata.dao.urnpath.UrnPathExtractor;
import com.linkedin.metadata.dao.storage.LocalDAOStorageConfig;
import com.linkedin.metadata.dao.tracking.BaseTrackingManager;
import com.linkedin.metadata.dao.urnpath.EmptyPathExtractor;
import com.linkedin.metadata.dao.urnpath.UrnPathExtractor;
import com.linkedin.metadata.dao.utils.EBeanDAOUtils;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.QueryUtils;
Expand Down Expand Up @@ -896,8 +895,8 @@ public <ASPECT extends RecordTemplate, RELATIONSHIP extends RecordTemplate> List
List<List<RELATIONSHIP>> allRelationships = EBeanDAOUtils.extractRelationshipsFromAspect(aspect);
localRelationshipUpdates = allRelationships.stream()
.filter(relationships -> !relationships.isEmpty()) // ensure at least 1 relationship in sublist to avoid index out of bounds
.map(relationships -> new BaseLocalRelationshipBuilder.LocalRelationshipUpdates(
relationships, relationships.get(0).getClass(), BaseGraphWriterDAO.RemovalOption.REMOVE_NONE))
.map(relationships -> new LocalRelationshipUpdates(
relationships, relationships.get(0).getClass(), BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE))
.collect(Collectors.toList());
} else if (_relationshipSource == RelationshipSource.RELATIONSHIP_BUILDERS) {
if (_localRelationshipBuilderRegistry != null && _localRelationshipBuilderRegistry.isRegistered(aspectClass)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import com.linkedin.testing.FooSnapshot;
import com.linkedin.testing.MixedRecord;
import com.linkedin.testing.localrelationship.AspectFooBar;
import com.linkedin.testing.localrelationship.AspectFooBarBaz;
import com.linkedin.testing.localrelationship.BelongsTo;
import com.linkedin.testing.localrelationship.BelongsToArray;
import com.linkedin.testing.localrelationship.ReportsTo;
Expand Down Expand Up @@ -2549,11 +2548,11 @@ public void testRemoveRelationshipsDuringAspectSoftDeletion() throws URISyntaxEx
// add an aspect (AspectFooBar) which includes BelongsTo relationships and ReportsTo relationships
FooUrn fooUrn = makeFooUrn(1);
BarUrn barUrn1 = BarUrn.createFromString("urn:li:bar:1");
BelongsTo belongsTo1 = new BelongsTo().setSource(barUrn1).setDestination(fooUrn);
BelongsTo belongsTo1 = new BelongsTo().setSource(fooUrn).setDestination(barUrn1);
BarUrn barUrn2 = BarUrn.createFromString("urn:li:bar:2");
BelongsTo belongsTo2 = new BelongsTo().setSource(barUrn2).setDestination(fooUrn);
BelongsTo belongsTo2 = new BelongsTo().setSource(fooUrn).setDestination(barUrn2);
BarUrn barUrn3 = BarUrn.createFromString("urn:li:bar:3");
BelongsTo belongsTo3 = new BelongsTo().setSource(barUrn3).setDestination(fooUrn);
BelongsTo belongsTo3 = new BelongsTo().setSource(fooUrn).setDestination(barUrn3);
BelongsToArray belongsToArray = new BelongsToArray(belongsTo1, belongsTo2, belongsTo3);
ReportsTo reportsTo = new ReportsTo().setSource(fooUrn).setDestination(barUrn1);
ReportsToArray reportsToArray = new ReportsToArray(reportsTo);
Expand All @@ -2566,22 +2565,15 @@ public void testRemoveRelationshipsDuringAspectSoftDeletion() throws URISyntaxEx
barDao.add(barUrn2, new AspectFoo().setValue("2"), auditStamp);
barDao.add(barUrn3, new AspectFoo().setValue("3"), auditStamp);

// add another aspect (AspectFooBarBaz) which includes BelongsTo relationship(s)
BarUrn barUrn4 = BarUrn.createFromString("urn:li:bar:4");
BelongsTo belongsTo4 = new BelongsTo().setSource(barUrn4).setDestination(fooUrn);
AspectFooBarBaz aspectFooBarBaz = new AspectFooBarBaz().setBars(new BarUrnArray(barUrn4)).setBelongsTos(new BelongsToArray(belongsTo4));
fooDao.add(fooUrn, aspectFooBarBaz, auditStamp);
barDao.add(barUrn4, new AspectFoo().setValue("4"), auditStamp);

// Verify local relationships and entities are added.
EbeanLocalRelationshipQueryDAO ebeanLocalRelationshipQueryDAO = new EbeanLocalRelationshipQueryDAO(_server);
ebeanLocalRelationshipQueryDAO.setSchemaConfig(_schemaConfig);

List<BelongsTo> resultBelongsTos =
ebeanLocalRelationshipQueryDAO.findRelationships(BarSnapshot.class, EMPTY_FILTER, FooSnapshot.class,
ebeanLocalRelationshipQueryDAO.findRelationships(FooSnapshot.class, EMPTY_FILTER, BarSnapshot.class,
EMPTY_FILTER, BelongsTo.class, OUTGOING_FILTER, 0, 10);

assertEquals(resultBelongsTos.size(), 4);
assertEquals(resultBelongsTos.size(), 3);

List<ReportsTo> resultReportsTos =
ebeanLocalRelationshipQueryDAO.findRelationships(FooSnapshot.class, EMPTY_FILTER, BarSnapshot.class,
Expand All @@ -2598,14 +2590,9 @@ public void testRemoveRelationshipsDuringAspectSoftDeletion() throws URISyntaxEx
fooDao.delete(fooUrn, AspectFooBar.class, _dummyAuditStamp);

// check that the belongsTo relationships 1, 2, & 3 were soft deleted
resultBelongsTos = ebeanLocalRelationshipQueryDAO.findRelationships(BarSnapshot.class, EMPTY_FILTER, FooSnapshot.class,
resultBelongsTos = ebeanLocalRelationshipQueryDAO.findRelationships(FooSnapshot.class, EMPTY_FILTER, BarSnapshot.class,
EMPTY_FILTER, BelongsTo.class, OUTGOING_FILTER, 0, 10);

// but ensure that belongsTo4 (from AspectFooBarBaz) was left untouched
assertEquals(resultBelongsTos.size(), 1);
assertEquals(resultBelongsTos.get(0).getSource(), barUrn4);
assertEquals(resultBelongsTos.get(0).getDestination(), fooUrn);

// check that the reportsTo relationship was soft deleted
resultReportsTos =
ebeanLocalRelationshipQueryDAO.findRelationships(FooSnapshot.class, EMPTY_FILTER, BarSnapshot.class,
Expand Down Expand Up @@ -3056,23 +3043,20 @@ public void testBackfillLocalRelationshipsFromEntityTables() throws URISyntaxExc
List<SqlRow> results = _server.createSqlQuery("select * from metadata_relationship_belongsto").findList();
assertEquals(results.size(), 3);
assertEquals(relationshipUpdates.size(), 1);
assertEquals(relationshipUpdates.get(0).getRemovalOption(), REMOVE_ALL_EDGES_TO_DESTINATION);
assertEquals(relationshipUpdates.get(0).getRemovalOption(), REMOVE_ALL_EDGES_FROM_SOURCE);

BarUrnArray sources = new BarUrnArray();
for (int i = 0; i < results.size(); i++) {
try {
RecordTemplate relationship = (RecordTemplate) relationshipUpdates.get(0).getRelationships().get(i);
Urn source = (Urn) relationship.getClass().getMethod("getSource").invoke(relationship);
Urn dest = (Urn) relationship.getClass().getMethod("getDestination").invoke(relationship);
assertEquals(dest.toString(), "urn:li:foo:1");
sources.add(BarUrn.createFromString(source.toString()));
assertEquals(source.toString(), "urn:li:foo:1");
assertEquals(dest.toString(), "urn:li:bar:" + (i + 1));
assertEquals(relationshipUpdates.get(0).getRelationships().get(i).getClass().getSimpleName(), "BelongsTo");
} catch (Exception e) {
throw new RuntimeException(e);
}
}

assertEquals(sources, barUrns);
}

@Test
Expand All @@ -3099,7 +3083,7 @@ public void testAddRelationships() throws URISyntaxException {
EbeanLocalRelationshipQueryDAO ebeanLocalRelationshipQueryDAO = new EbeanLocalRelationshipQueryDAO(_server);
ebeanLocalRelationshipQueryDAO.setSchemaConfig(_schemaConfig);
List<BelongsTo> relationships = ebeanLocalRelationshipQueryDAO.findRelationships(
BarSnapshot.class, EMPTY_FILTER, FooSnapshot.class, EMPTY_FILTER, BelongsTo.class, OUTGOING_FILTER, 0, 10);
FooSnapshot.class, EMPTY_FILTER, BarSnapshot.class, EMPTY_FILTER, BelongsTo.class, OUTGOING_FILTER, 0, 10);
assertEquals(relationships.size(), 0);

// Turn on local relationship ingestion now
Expand All @@ -3110,7 +3094,7 @@ public void testAddRelationships() throws URISyntaxException {

// Verify that the local relationships were added
relationships = ebeanLocalRelationshipQueryDAO.findRelationships(
BarSnapshot.class, EMPTY_FILTER, FooSnapshot.class, EMPTY_FILTER, BelongsTo.class, OUTGOING_FILTER, 0, 10);
FooSnapshot.class, EMPTY_FILTER, BarSnapshot.class, EMPTY_FILTER, BelongsTo.class, OUTGOING_FILTER, 0, 10);

assertEquals(relationships.size(), 3);
}
Expand Down Expand Up @@ -3139,7 +3123,7 @@ public void testAddWithLocalRelationshipBuilder() throws URISyntaxException {
ebeanLocalRelationshipQueryDAO.setSchemaConfig(_schemaConfig);

List<BelongsTo> relationships = ebeanLocalRelationshipQueryDAO.findRelationships(
BarSnapshot.class, EMPTY_FILTER, FooSnapshot.class, EMPTY_FILTER, BelongsTo.class, OUTGOING_FILTER, 0, 10);
FooSnapshot.class, EMPTY_FILTER, BarSnapshot.class, EMPTY_FILTER, BelongsTo.class, OUTGOING_FILTER, 0, 10);

AspectKey<FooUrn, AspectFooBar> key = new AspectKey<>(AspectFooBar.class, fooUrn, 0L);
List<EbeanMetadataAspect> aspects = fooDao.batchGetHelper(Collections.singletonList(key), 1, 0);
Expand All @@ -3157,11 +3141,11 @@ public void testAddRelationshipsFromAspect() throws URISyntaxException {

FooUrn fooUrn = makeFooUrn(1);
BarUrn barUrn1 = BarUrn.createFromString("urn:li:bar:1");
BelongsTo belongsTo1 = new BelongsTo().setSource(barUrn1).setDestination(fooUrn);
BelongsTo belongsTo1 = new BelongsTo().setSource(fooUrn).setDestination(barUrn1);
BarUrn barUrn2 = BarUrn.createFromString("urn:li:bar:2");
BelongsTo belongsTo2 = new BelongsTo().setSource(barUrn2).setDestination(fooUrn);
BelongsTo belongsTo2 = new BelongsTo().setSource(fooUrn).setDestination(barUrn2);
BarUrn barUrn3 = BarUrn.createFromString("urn:li:bar:3");
BelongsTo belongsTo3 = new BelongsTo().setSource(barUrn3).setDestination(fooUrn);
BelongsTo belongsTo3 = new BelongsTo().setSource(fooUrn).setDestination(barUrn3);
BelongsToArray belongsToArray = new BelongsToArray(belongsTo1, belongsTo2, belongsTo3);
AspectFooBar aspectFooBar = new AspectFooBar().setBars(new BarUrnArray(barUrn1, barUrn2, barUrn3)).setBelongsTos(belongsToArray);
AuditStamp auditStamp = makeAuditStamp("foo", System.currentTimeMillis());
Expand All @@ -3176,7 +3160,7 @@ public void testAddRelationshipsFromAspect() throws URISyntaxException {
ebeanLocalRelationshipQueryDAO.setSchemaConfig(_schemaConfig);

List<BelongsTo> relationships =
ebeanLocalRelationshipQueryDAO.findRelationships(BarSnapshot.class, EMPTY_FILTER, FooSnapshot.class,
ebeanLocalRelationshipQueryDAO.findRelationships(FooSnapshot.class, EMPTY_FILTER, BarSnapshot.class,
EMPTY_FILTER, BelongsTo.class, OUTGOING_FILTER, 0, 10);

AspectKey<FooUrn, AspectFooBar> key = new AspectKey<>(AspectFooBar.class, fooUrn, 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public void init() throws IOException {

@Test
public void testAddRelationshipWithRemoveAllEdgesToDestination() throws URISyntaxException {
_server.execute(Ebean.createSqlUpdate(insertRelationships("metadata_relationship_belongsto", "urn:li:bar:000",
"bar", "urn:li:foo:123", "foo")));
_server.execute(Ebean.createSqlUpdate(insertRelationships("metadata_relationship_belongsto", "urn:li:foo:123",
"foo", "urn:li:bar:000", "bar")));

AspectFooBar aspectFooBar = new AspectFooBar().setBars(new BarUrnArray(
BarUrn.createFromString("urn:li:bar:123"),
Expand All @@ -54,7 +54,7 @@ public void testAddRelationshipWithRemoveAllEdgesToDestination() throws URISynta
.buildRelationships(FooUrn.createFromString("urn:li:foo:123"), aspectFooBar);

// Before processing
List<SqlRow> before = _server.createSqlQuery("select * from metadata_relationship_belongsto where source='urn:li:bar:000'").findList();
List<SqlRow> before = _server.createSqlQuery("select * from metadata_relationship_belongsto where destination='urn:li:bar:000'").findList();
assertEquals(before.size(), 1);

_localRelationshipWriterDAO.processLocalRelationshipUpdates(FooUrn.createFromString("urn:li:foo:123"), updates, false);
Expand All @@ -67,7 +67,7 @@ public void testAddRelationshipWithRemoveAllEdgesToDestination() throws URISynta
assertEquals(softDeleted.size(), 1); // 1 soft deleted edge

List<SqlRow> newEdges = _server.createSqlQuery(
"select * from metadata_relationship_belongsto where destination='urn:li:foo:123' and deleted_ts IS NULL").findList();
"select * from metadata_relationship_belongsto where source='urn:li:foo:123' and deleted_ts IS NULL").findList();

assertEquals(newEdges.size(), 3); // 3 new edges added.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ public <URN extends Urn> List<LocalRelationshipUpdates> buildRelationships(@Nonn
@Nonnull AspectFooBar aspectFooBar) {
List<BelongsTo> belongsToRelationships = new ArrayList<>();
for (Urn barUrn : aspectFooBar.getBars()) {
belongsToRelationships.add(new BelongsTo().setSource(barUrn).setDestination(urn));
belongsToRelationships.add(new BelongsTo().setSource(urn).setDestination(barUrn));
}

LocalRelationshipUpdates localRelationshipUpdates =
new LocalRelationshipUpdates(belongsToRelationships, BelongsTo.class,
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_TO_DESTINATION);
BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE);

return Collections.singletonList(localRelationshipUpdates);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
import com.linkedin.metadata.query.RelationshipField;
import com.linkedin.metadata.query.UrnField;
import com.linkedin.testing.AnnotatedAspectBarWithRelationshipFields;
import com.linkedin.testing.AnnotatedAspectFooWithRelationshipField;
import com.linkedin.testing.AnnotatedRelationshipBar;
import com.linkedin.testing.AnnotatedRelationshipBarArray;
import com.linkedin.testing.AnnotatedRelationshipFoo;
import com.linkedin.testing.AnnotatedRelationshipFooArray;
import com.linkedin.testing.AspectFoo;
import com.linkedin.testing.AnnotatedAspectFooWithRelationshipField;
import com.linkedin.testing.CommonAspect;
import com.linkedin.testing.CommonAspectArray;
import com.linkedin.testing.urn.BurgerUrn;
Expand Down Expand Up @@ -639,6 +639,4 @@ public void testExtractRelationshipsFromAspect() {
assertEquals(new AnnotatedRelationshipFoo(), results.get(1).get(1));
assertEquals(new AnnotatedRelationshipBar(), results.get(2).get(0));
}


}

0 comments on commit a630243

Please sign in to comment.