Skip to content

Commit

Permalink
Backfill relationship from entity table (#302)
Browse files Browse the repository at this point in the history
* Add rest endpoint to support backfill relationship

* add unit test

* revert

* Address comment

* fix style

---------

Co-authored-by: Jesse Jia <[email protected]>
  • Loading branch information
zhixuanjia and Jesse Jia authored Oct 16, 2023
1 parent 0d23d2c commit 00c2142
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.backfill.BackfillMode;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.equality.DefaultEqualityTester;
import com.linkedin.metadata.dao.equality.EqualityTester;
import com.linkedin.metadata.dao.exception.ModelValidationException;
Expand Down Expand Up @@ -806,8 +807,10 @@ public abstract <ASPECT extends RecordTemplate> void updateLocalIndex(@Nonnull U
*
* @param urn the URN for the entity the aspect (which the local relationship is derived from) is attached to
* @param aspectClass class of the aspect to backfill
* @return A list of local relationship updates executed.
*/
public abstract <ASPECT extends RecordTemplate> void backfillLocalRelationshipsFromEntityTables(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass);
public abstract <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationshipsFromEntityTables(
@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass);

/**
* Returns list of urns from local secondary index that satisfy the given filter conditions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
*/
public abstract class BaseLocalRelationshipBuilder<ASPECT extends RecordTemplate> {

private Class<ASPECT> _aspectClass;
private final Class<ASPECT> _aspectClass;

@Value
public class LocalRelationshipUpdates {
public static class LocalRelationshipUpdates {
List<? extends RecordTemplate> relationships;
BaseGraphWriterDAO.RemovalOption removalOption;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.common.AuditStamp;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.TimeBasedRetention;
Expand Down Expand Up @@ -84,8 +85,9 @@ public <ASPECT extends RecordTemplate> void updateEntityTables(@Nonnull FooUrn u
}

@Override
public <ASPECT extends RecordTemplate> void backfillLocalRelationshipsFromEntityTables(@Nonnull FooUrn urn, @Nonnull Class<ASPECT> aspectClass) {

public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationshipsFromEntityTables(
@Nonnull FooUrn urn, @Nonnull Class<ASPECT> aspectClass) {
return null;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.metadata.aspect.AuditedAspect;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.scsi.EmptyPathExtractor;
import com.linkedin.metadata.dao.scsi.UrnPathExtractor;
Expand All @@ -29,6 +29,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -134,13 +135,18 @@ public <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPEC
}

@Override
public <ASPECT extends RecordTemplate> void addRelationships(@Nonnull URN urn, @Nonnull ASPECT aspect, @Nonnull Class<ASPECT> aspectClass) {
public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates>
addRelationships(@Nonnull URN urn, @Nonnull ASPECT aspect, @Nonnull Class<ASPECT> aspectClass) {
if (_localRelationshipBuilderRegistry != null && _localRelationshipBuilderRegistry.isRegistered(aspectClass)) {
List<BaseLocalRelationshipBuilder<ASPECT>.LocalRelationshipUpdates> localRelationshipUpdates =
List<LocalRelationshipUpdates> localRelationshipUpdates =
_localRelationshipBuilderRegistry.getLocalRelationshipBuilder(aspect).buildRelationships(urn, aspect);

_localRelationshipWriterDAO.processLocalRelationshipUpdates(localRelationshipUpdates);

return localRelationshipUpdates;
}

return new ArrayList<>();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.exception.ModelConversionException;
import com.linkedin.metadata.dao.exception.RetryLimitReached;
Expand Down Expand Up @@ -578,19 +579,19 @@ public <ASPECT extends RecordTemplate> void updateEntityTables(@Nonnull URN urn,
}, 1);
}

public <ASPECT extends RecordTemplate> void backfillLocalRelationshipsFromEntityTables(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass) {
public <ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> backfillLocalRelationshipsFromEntityTables(
@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass) {
if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) {
throw new UnsupportedOperationException("Local relationship tables cannot be used in OLD_SCHEMA_ONLY mode, so they cannot be backfilled.");
}
AspectKey<URN, ASPECT> key = new AspectKey<>(aspectClass, urn, LATEST_VERSION);
runInTransactionWithRetry(() -> {
return runInTransactionWithRetry(() -> {
List<EbeanMetadataAspect> results = _localAccess.batchGetUnion(Collections.singletonList(key), 1, 0);
if (results.size() == 0) {
return null; // unused
return new ArrayList<>();
}
Optional<ASPECT> aspect = toRecordTemplate(aspectClass, results.get(0));
aspect.ifPresent(value -> _localAccess.addRelationships(urn, value, aspectClass));
return null; // unused
return aspect.map(value -> _localAccess.addRelationships(urn, value, aspectClass)).orElse(new ArrayList<>());
}, 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO;
import com.linkedin.metadata.dao.utils.GraphUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
Expand Down Expand Up @@ -48,9 +48,9 @@ public EbeanLocalRelationshipWriterDAO(EbeanServer server) {
*/
@Transactional
public <ASPECT extends RecordTemplate> void processLocalRelationshipUpdates(
@Nonnull List<BaseLocalRelationshipBuilder<ASPECT>.LocalRelationshipUpdates> relationshipUpdates) {
@Nonnull List<LocalRelationshipUpdates> relationshipUpdates) {

for (BaseLocalRelationshipBuilder<ASPECT>.LocalRelationshipUpdates relationshipUpdate : relationshipUpdates) {
for (LocalRelationshipUpdates relationshipUpdate : relationshipUpdates) {
addRelationships(relationshipUpdate.getRelationships(), relationshipUpdate.getRemovalOption());
}
}
Expand Down Expand Up @@ -95,7 +95,7 @@ private <RELATIONSHIP extends RecordTemplate> void addRelationshipGroup(@Nonnull
RELATIONSHIP firstRelationship = relationshipGroup.get(0);
RelationshipValidator.validateRelationshipSchema(firstRelationship.getClass());

// Process remove option to delete some local relationships if nedded before adding new relationships.
// Process remove option to delete some local relationships if needed before adding new relationships.
processRemovalOption(SQLSchemaUtils.getRelationshipTableName(firstRelationship), firstRelationship, removalOption);

long now = Instant.now().toEpochMilli();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.scsi.UrnPathExtractor;
import com.linkedin.metadata.query.IndexFilter;
Expand Down Expand Up @@ -37,8 +38,11 @@ <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newVa
* @param urn urn associated with the relationships
* @param relationship aspect from which the relationships are derived from
* @param aspectClass class of the aspect
* @return relationship updates applied on relationship table
*/
<ASPECT extends RecordTemplate> void addRelationships(@Nonnull URN urn, @Nonnull ASPECT relationship, @Nonnull Class<ASPECT> aspectClass);
@Nonnull
<ASPECT extends RecordTemplate> List<LocalRelationshipUpdates> addRelationships(@Nonnull URN urn,
@Nonnull ASPECT relationship, @Nonnull Class<ASPECT> aspectClass);

/**
* Get read aspects from entity table. This a new schema implementation for batchGetUnion() in {@link EbeanLocalDAO}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.metadata.dao.EbeanLocalDAO.FindMethodology;
import com.linkedin.metadata.dao.EbeanLocalDAO.SchemaConfig;
import com.linkedin.metadata.dao.EbeanMetadataAspect.PrimaryKey;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder;
import com.linkedin.metadata.dao.equality.AlwaysFalseEqualityTester;
import com.linkedin.metadata.dao.equality.DefaultEqualityTester;
import com.linkedin.metadata.dao.exception.InvalidMetadataType;
Expand Down Expand Up @@ -104,12 +105,14 @@
import org.testng.annotations.Test;

import static com.linkedin.common.AuditStamps.*;
import static com.linkedin.metadata.dao.internal.BaseGraphWriterDAO.RemovalOption.*;
import static com.linkedin.metadata.dao.utils.EBeanDAOUtils.*;
import static com.linkedin.metadata.dao.utils.SQLSchemaUtils.*;
import static com.linkedin.testing.TestUtils.*;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;


public class EbeanLocalDAOTest {
private long _now;
private EbeanServer _server;
Expand Down Expand Up @@ -3061,16 +3064,36 @@ public void testBackfillLocalRelationshipsFromEntityTables() throws URISyntaxExc
BarUrn barUrn1 = BarUrn.createFromString("urn:li:bar:1");
BarUrn barUrn2 = BarUrn.createFromString("urn:li:bar:2");
BarUrn barUrn3 = BarUrn.createFromString("urn:li:bar:3");
AspectFooBar aspectFooBar = new AspectFooBar().setBars(new BarUrnArray(barUrn1, barUrn2, barUrn3));
BarUrnArray barUrns = new BarUrnArray(barUrn1, barUrn2, barUrn3);
AspectFooBar aspectFooBar = new AspectFooBar().setBars(barUrns);
dao.add(fooUrn, aspectFooBar, _dummyAuditStamp);

// clear local relationship table
_server.createSqlUpdate("delete from metadata_relationship_belongsto").execute();

List<BaseLocalRelationshipBuilder.LocalRelationshipUpdates> relationshipUpdates =
dao.backfillLocalRelationshipsFromEntityTables(fooUrn, AspectFooBar.class);

List<SqlRow> results = _server.createSqlQuery("select * from metadata_relationship_belongsto").findList();
assertEquals(3, results.size());
assertEquals(results.size(), 3);
assertEquals(relationshipUpdates.size(), 1);
assertEquals(relationshipUpdates.get(0).getRemovalOption(), REMOVE_ALL_EDGES_TO_DESTINATION);

BarUrnArray sources = new BarUrnArray();
for (int i = 0; i < results.size(); i++) {
try {
RecordTemplate relationship = relationshipUpdates.get(0).getRelationships().get(i);
Urn source = (Urn) relationship.getClass().getMethod("getSource").invoke(relationship);
Urn dest = (Urn) relationship.getClass().getMethod("getDestination").invoke(relationship);
assertEquals(dest.toString(), "urn:li:foo:1");
sources.add(BarUrn.createFromString(source.toString()));
assertEquals(relationshipUpdates.get(0).getRelationships().get(i).getClass().getSimpleName(), "BelongsTo");
} catch (Exception e) {
throw new RuntimeException(e);
}
}

assertEquals(sources, barUrns);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.linkedin.metadata.dao.localrelationship.builder.ReportsToLocalRelationshipBuilder;
import com.linkedin.metadata.dao.localrelationship.builder.VersionOfLocalRelationshipBuilder;
import com.linkedin.metadata.dao.utils.EmbeddedMariaInstance;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.testing.BarUrnArray;
import com.linkedin.testing.localrelationship.AspectFooBar;
import com.linkedin.testing.urn.BarUrn;
Expand Down Expand Up @@ -47,7 +47,7 @@ public void testAddRelationshipWithRemoveAllEdgesToDestination() throws URISynta
BarUrn.createFromString("urn:li:bar:456"),
BarUrn.createFromString("urn:li:bar:789")));

List<BaseLocalRelationshipBuilder<AspectFooBar>.LocalRelationshipUpdates> updates = new BelongsToLocalRelationshipBuilder(AspectFooBar.class)
List<LocalRelationshipUpdates> updates = new BelongsToLocalRelationshipBuilder(AspectFooBar.class)
.buildRelationships(FooUrn.createFromString("urn:li:foo:123"), aspectFooBar);

// Before processing
Expand Down Expand Up @@ -82,7 +82,7 @@ public void testAddRelationshipWithRemoveNone() throws URISyntaxException {
BarUrn.createFromString("urn:li:bar:456"),
BarUrn.createFromString("urn:li:bar:789")));

List<BaseLocalRelationshipBuilder<AspectFooBar>.LocalRelationshipUpdates> updates = new ReportsToLocalRelationshipBuilder(AspectFooBar.class)
List<LocalRelationshipUpdates> updates = new ReportsToLocalRelationshipBuilder(AspectFooBar.class)
.buildRelationships(FooUrn.createFromString("urn:li:foo:123"), aspectFooBar);

// Before processing
Expand Down Expand Up @@ -114,7 +114,7 @@ public void testAddRelationshipWithRemoveAllEdgesFromSourceToDestination() throw

AspectFooBar aspectFooBar = new AspectFooBar().setBars(new BarUrnArray(BarUrn.createFromString("urn:li:bar:123")));

List<BaseLocalRelationshipBuilder<AspectFooBar>.LocalRelationshipUpdates> updates = new PairsWithLocalRelationshipBuilder(AspectFooBar.class)
List<LocalRelationshipUpdates> updates = new PairsWithLocalRelationshipBuilder(AspectFooBar.class)
.buildRelationships(FooUrn.createFromString("urn:li:foo:123"), aspectFooBar);

// Before processing
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testAddRelationshipWithRemoveAllEdgesFromSource() throws URISyntaxEx

AspectFooBar aspectFooBar = new AspectFooBar().setBars(new BarUrnArray(BarUrn.createFromString("urn:li:bar:123")));

List<BaseLocalRelationshipBuilder<AspectFooBar>.LocalRelationshipUpdates> updates = new VersionOfLocalRelationshipBuilder(AspectFooBar.class)
List<LocalRelationshipUpdates> updates = new VersionOfLocalRelationshipBuilder(AspectFooBar.class)
.buildRelationships(FooUrn.createFromString("urn:li:foo:123"), aspectFooBar);

// Before processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public <URN extends Urn> List<LocalRelationshipUpdates> buildRelationships(@Nonn
reportsToRelationships.add(new ReportsTo().setSource(barUrn).setDestination(urn));
}

LocalRelationshipUpdates localRelationshipUpdates = new LocalRelationshipUpdates(reportsToRelationships,
BaseGraphWriterDAO.RemovalOption.REMOVE_NONE);
LocalRelationshipUpdates localRelationshipUpdates =
new LocalRelationshipUpdates(reportsToRelationships, BaseGraphWriterDAO.RemovalOption.REMOVE_NONE);

return Collections.singletonList(localRelationshipUpdates);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,42 @@ public Task<BackfillResult> backfillEntityTables(@ActionParam(PARAM_URNS) @Nonnu
});
}

/**
* Backfill the relationship tables from entity table.
*/
@Action(name = ACTION_BACKFILL_RELATIONSHIP_TABLES)
@Nonnull
public Task<BackfillResult> backfillRelationshipTables(@ActionParam(PARAM_URNS) @Nonnull String[] urns,
@ActionParam(PARAM_ASPECTS) @Nonnull String[] aspectNames) {
final BackfillResult backfillResult = new BackfillResult()
.setEntities(new BackfillResultEntityArray())
.setRelationships(new BackfillResultRelationshipArray());

for (String urn : urns) {
for (Class<? extends RecordTemplate> aspect : parseAspectsParam(aspectNames)) {
getLocalDAO().backfillLocalRelationshipsFromEntityTables(parseUrnParam(urn), aspect).forEach(relationshipUpdates -> {
relationshipUpdates.getRelationships().forEach(relationship -> {
try {
Urn source = (Urn) relationship.getClass().getMethod("getSource").invoke(relationship);
Urn dest = (Urn) relationship.getClass().getMethod("getDestination").invoke(relationship);
BackfillResultRelationship backfillResultRelationship = new BackfillResultRelationship()
.setSource(source)
.setDestination(dest)
.setRemovalOption(relationshipUpdates.getRemovalOption().name())
.setRelationship(relationship.getClass().getSimpleName());

backfillResult.getRelationships().add(backfillResultRelationship);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
});
});
}
}

return RestliUtils.toTask(() -> backfillResult);
}

/**
* An action method for emitting MAE backfill messages for a set of entities using SCSI.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ private RestliConstants() { }
public static final String ACTION_AUTOCOMPLETE = "autocomplete";
public static final String ACTION_BACKFILL = "backfill";
public static final String ACTION_BACKFILL_ENTITY_TABLES = "backfillEntityTables";
public static final String ACTION_BACKFILL_RELATIONSHIP_TABLES = "backfillRelationshipTables";
public static final String ACTION_BACKFILL_WITH_URNS = "backfillWithUrns";
public static final String ACTION_BACKFILL_WITH_NEW_VALUE = "backfillWithNewValue";
public static final String ACTION_BACKFILL_LEGACY = "backfillLegacy";
Expand Down
Loading

0 comments on commit 00c2142

Please sign in to comment.