Skip to content

Commit

Permalink
feat: add new findRelationships API and unit tests (#390)
Browse files Browse the repository at this point in the history
* feat: add new findRelationships API and unit tests

* remove unnecessary validation & fix

* minor bug fix

* address comments

* validate non-mg entity filter

* add one unit test
  • Loading branch information
ybz1013 authored Aug 2, 2024
1 parent d1584da commit fea297e
Show file tree
Hide file tree
Showing 6 changed files with 567 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.metadata.dao;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.utils.ClassUtils;
Expand All @@ -20,22 +22,28 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.javatuples.Pair;


/**
* An Ebean implementation of {@link BaseQueryDAO} backed by local relationship tables.
*/
@Slf4j
public class EbeanLocalRelationshipQueryDAO {
public static final String URN_PATTERN = "urn:li:[a-zA-Z]+:?.*";
private final EbeanServer _server;
private final MultiHopsTraversalSqlGenerator _sqlGenerator;

private final EBeanDAOConfig _eBeanDAOConfig;

private Set<String> _mgEntityTypeNameSet;

public EbeanLocalRelationshipQueryDAO(EbeanServer server, EBeanDAOConfig eBeanDAOConfig) {
_server = server;
_eBeanDAOConfig = eBeanDAOConfig;
Expand Down Expand Up @@ -137,7 +145,7 @@ public <SRC_SNAPSHOT extends RecordTemplate, DEST_SNAPSHOT extends RecordTemplat
@Nonnull Class<RELATIONSHIP> relationshipType, @Nonnull LocalRelationshipFilter relationshipFilter, int offset, int count) {
validateEntityFilter(sourceEntityFilter, sourceEntityClass);
validateEntityFilter(destinationEntityFilter, destinationEntityClass);
validateEntityFilter(relationshipFilter, relationshipType);
validateRelationshipFilter(relationshipFilter);

String destTableName = null;
if (destinationEntityClass != null) {
Expand All @@ -152,12 +160,14 @@ public <SRC_SNAPSHOT extends RecordTemplate, DEST_SNAPSHOT extends RecordTemplat
final String relationshipTableName = SQLSchemaUtils.getRelationshipTableName(relationshipType);

final String sql = buildFindRelationshipSQL(
destTableName,
sourceTableName,
relationshipTableName,
relationshipFilter,
sourceTableName,
sourceEntityFilter,
destTableName,
destinationEntityFilter,
relationshipFilter);
count,
offset);

return _server.createSqlQuery(sql).findList().stream()
.map(row -> RecordUtils.toRecordTemplate(relationshipType, row.getString("metadata")))
Expand All @@ -174,17 +184,64 @@ public <SRC_SNAPSHOT extends RecordTemplate, DEST_SNAPSHOT extends RecordTemplat
* @param relationshipType the type of relationship to query
* @param relationshipFilter the filter to apply to relationship when querying
* @param offset the offset query should start at. Ignored if set to a negative value.
* @param count the maximum number of entities to return. Ignored if set to a non-positive value. * @return A list of relationship records.
* @param count the maximum number of entities to return. Ignored if set to a non-positive value.
* @return A list of relationship records.
*/
@Nonnull
public <RELATIONSHIP extends RecordTemplate> List<RELATIONSHIP> findRelationships(
@Nullable String sourceEntityUrn, @Nullable LocalRelationshipFilter sourceEntityFilter,
@Nullable String destinationEntityUrn, @Nullable LocalRelationshipFilter destinationEntityFilter,
@Nullable Urn sourceEntityUrn, @Nullable LocalRelationshipFilter sourceEntityFilter,
@Nullable Urn destinationEntityUrn, @Nullable LocalRelationshipFilter destinationEntityFilter,
@Nonnull Class<RELATIONSHIP> relationshipType, @Nonnull LocalRelationshipFilter relationshipFilter,
int offset, int count) {
// NOTE: additional validation for sourceEntityUrn and sourceEntityUrn first.
// for non-MG entities, filters need to be null or ignored.
throw new RuntimeException("findRelationships is not implemented.");
validateEntityUrnAndFilter(sourceEntityFilter, sourceEntityUrn);
validateEntityUrnAndFilter(destinationEntityFilter, destinationEntityUrn);
validateRelationshipFilter(relationshipFilter);

// the assumption is we have the table for every MG entity. For non-MG entities, sourceTableName will be null.
final String sourceTableName = getMgEntityTableName(sourceEntityUrn);
final String destTableName = getMgEntityTableName(destinationEntityUrn);
final String relationshipTableName = SQLSchemaUtils.getRelationshipTableName(relationshipType);

final String sql = buildFindRelationshipSQL(
relationshipTableName, relationshipFilter,
sourceTableName, sourceEntityFilter,
destTableName, destinationEntityFilter,
count, offset);

return _server.createSqlQuery(sql).findList().stream()
.map(row -> RecordUtils.toRecordTemplate(relationshipType, row.getString("metadata")))
.collect(Collectors.toList());
}

private boolean isValidUrn(@Nonnull Urn entityUrn) {
return entityUrn.toString().matches(URN_PATTERN);
}

/**
* Checks if entity type name can be extracted from urn, and that entity type has a table in db.
*/
@VisibleForTesting
protected boolean isMgEntityUrn(@Nonnull Urn entityUrn) {
// there is some race condition, the local relationship db might not be ready when EbeanLocalRelationshipQueryDAO inits.
// so we can't init the _mgEntityTypeNameSet in constructor.
if (_mgEntityTypeNameSet == null) {
initMgEntityTypeNameSet();
}

return _mgEntityTypeNameSet.contains(StringUtils.lowerCase(entityUrn.getEntityType()));
}

/**
* Extracts the table name from an entity urn for MG entities. If entityUrn is not for MG entity, return null.
* @param entityUrn should match pattern "urn:li:[a-zA-Z0-9]+:?\(?[a-zA-Z0-9]*\)?"
* @return metadata_entity_entity_type_name or null
*/
@Nullable
private String getMgEntityTableName(@Nullable Urn entityUrn) {
if (entityUrn == null || !isMgEntityUrn(entityUrn)) {
return null;
}
return SQLSchemaUtils.getTableName(entityUrn);
}

/**
Expand All @@ -201,6 +258,52 @@ private <ENTITY extends RecordTemplate> void validateEntityFilter(@Nonnull Local
validateFilterCriteria(filter.getCriteria().stream().map(LocalRelationshipCriterion::getCondition).collect(Collectors.toList()));
}

/**
* Validate:
* 1. if entity urn is null or empty, then filter should be emtpy.
* 2. urn should be in valid format
* 3. the entity filter only contains supported condition.
* If any of above is violated, throw IllegalArgumentException.
*/
private void validateEntityUrnAndFilter(@Nullable LocalRelationshipFilter filter, @Nullable Urn entityUrn) {
if ((entityUrn == null || StringUtils.isBlank(entityUrn.getEntityType())) && filter != null && filter.hasCriteria() && !filter.getCriteria()
.isEmpty()) {
throw new IllegalArgumentException("Entity urn is null or empty but filter is not empty.");
}

if (entityUrn != null && !isValidUrn(entityUrn)) {
throw new IllegalArgumentException(String.format("Entity urn is not valid: %s", entityUrn));
}

if (filter != null) {
validateFilterCriteria(
filter.getCriteria().stream().map(LocalRelationshipCriterion::getCondition).collect(Collectors.toList()));
}
}

/**
* Ensure that the source and destination entity filters.
* 1) include no more than 1 criterion
* 2) that 1 criterion must be on the urn field
* 3) the passed in condition is supported by this DAO
*/
private void validateEntityFilterOnlyOneUrn(@Nonnull LocalRelationshipFilter filter) {
if (filter.hasCriteria() && !filter.getCriteria().isEmpty()) {
if (filter.getCriteria().size() > 1) {
throw new IllegalArgumentException("Only 1 filter is allowed in non-mg entity filter.");
}
LocalRelationshipCriterion criterion = filter.getCriteria().get(0);

if (!criterion.hasField() || !criterion.getField().isUrnField()) {
throw new IllegalArgumentException("Only urn filter is allowed in non-mg entity filter.");
}
Condition condition = filter.getCriteria().get(0).getCondition();
if (!SUPPORTED_CONDITIONS.containsKey(condition)) {
throw new IllegalArgumentException(String.format("Condition %s is not supported by local relationship DAO.", condition));
}
}
}

/**
* Validate:
* 1. The relationship filter only contains supported condition.
Expand Down Expand Up @@ -256,39 +359,86 @@ private <SNAPSHOT extends RecordTemplate> SNAPSHOT constructSnapshot(@Nonnull fi
}

/**
* Construct SQL similar to following:
* Constructs SQL similar to following.
*
* <p>SELECT rt.* FROM relationship_table rt
* INNER JOIN destination_entity_table dt ON dt.urn = rt.destinationEntityUrn
* INNER JOIN source_entity_table st ON st.urn = rt.sourceEntityUrn
* WHERE destination entity filters AND source entity filters AND relationship filters
* WHERE destination entity filters AND source entity filters AND relationship filters</p>
*
* @param relationshipTableName relationship table name
* @param relationshipFilter filter on relationship
* @param sourceTableName source entity table name
* @param sourceEntityFilter filter on source entity.
* @param destTableName destination entity table name. Always null if building relationship with non-mg
* entity.
* @param destinationEntityFilter filter on destination entity.
* @param limit max number of records to return. If < 0, will return all records.
* @param offset offset to start from. If < 0, will start from 0.
*/
@Nonnull
private String buildFindRelationshipSQL(@Nullable final String destTableName, @Nullable final String sourceTableName,
@Nonnull final String relationshipTableName, @Nonnull final LocalRelationshipFilter sourceEntityFilter,
@Nonnull final LocalRelationshipFilter destinationEntityFilter, @Nonnull final LocalRelationshipFilter relationshipFilter) {
private String buildFindRelationshipSQL(
@Nonnull final String relationshipTableName, @Nonnull final LocalRelationshipFilter relationshipFilter,
@Nullable final String sourceTableName, @Nullable final LocalRelationshipFilter sourceEntityFilter,
@Nullable final String destTableName, @Nullable final LocalRelationshipFilter destinationEntityFilter,
int limit, int offset) {

StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("SELECT rt.* FROM ").append(relationshipTableName).append(" rt ");

List<Pair<LocalRelationshipFilter, String>> filters = new ArrayList<>();

if (destTableName != null) {
sqlBuilder.append("INNER JOIN ").append(destTableName).append(" dt ON dt.urn=rt.destination ");

if (destinationEntityFilter != null) {
filters.add(new Pair<>(destinationEntityFilter, "dt"));
}
} else if (destinationEntityFilter != null) {
validateEntityFilterOnlyOneUrn(destinationEntityFilter);
// non-mg entity case, applying dest filter on relationship table
filters.add(new Pair<>(destinationEntityFilter, "rt"));
}

if (sourceTableName != null) {
sqlBuilder.append("INNER JOIN ").append(sourceTableName).append(" st ON st.urn=rt.source ");

if (sourceEntityFilter != null) {
filters.add(new Pair<>(sourceEntityFilter, "st"));
}
}

sqlBuilder.append("WHERE deleted_ts is NULL");

filters.add(new Pair<>(relationshipFilter, "rt"));

String whereClause = SQLStatementUtils.whereClause(SUPPORTED_CONDITIONS,
_eBeanDAOConfig.isNonDollarVirtualColumnsEnabled(),
new Pair<>(sourceEntityFilter, "st"),
new Pair<>(destinationEntityFilter, "dt"),
new Pair<>(relationshipFilter, "rt"));
filters.toArray(new Pair[filters.size()]));

if (whereClause != null) {
sqlBuilder.append(" AND ").append(whereClause);
}

if (limit > 0) {
sqlBuilder.append(" LIMIT ").append(limit);

if (offset > 0) {
sqlBuilder.append(" OFFSET ").append(offset);
}
}

return sqlBuilder.toString();
}

/**
* Creates a set of MG entity type names by querying the database.
*/
private void initMgEntityTypeNameSet() {
final String sql = "SELECT table_name FROM information_schema.tables"
+ " WHERE table_type = 'BASE TABLE' AND TABLE_SCHEMA=DATABASE() AND table_name LIKE 'metadata_entity_%'";
_mgEntityTypeNameSet = _server.createSqlQuery(sql).findList().stream()
.map(row -> row.getString("table_name").replace("metadata_entity_", ""))
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.metadata.query.IndexValue;
import com.linkedin.metadata.query.LocalRelationshipCriterionArray;
import com.linkedin.metadata.query.LocalRelationshipFilter;
import com.linkedin.metadata.query.RelationshipDirection;
import com.linkedin.metadata.query.SortOrder;
import com.linkedin.testing.AspectFoo;
import com.linkedin.testing.BarSnapshot;
Expand Down Expand Up @@ -59,7 +60,8 @@ public class EbeanLocalAccessTest {
private static IEbeanLocalAccess<BurgerUrn> _ebeanLocalAccessBurger;
private static long _now;
private final EBeanDAOConfig _ebeanConfig = new EBeanDAOConfig();
private static final LocalRelationshipFilter EMPTY_FILTER = new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray());
private static final LocalRelationshipFilter EMPTY_FILTER = new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray())
.setDirection(RelationshipDirection.UNDIRECTED);

@Factory(dataProvider = "inputList")
public EbeanLocalAccessTest(boolean nonDollarVirtualColumnsEnabled) {
Expand Down
Loading

0 comments on commit fea297e

Please sign in to comment.