Skip to content

Commit

Permalink
Read column from aspect-alias instead of column annotation - part 1 (#…
Browse files Browse the repository at this point in the history
…433)

* Read column from aspect-alias instead of column annotation

* address comments

---------

Co-authored-by: Yang Yang <[email protected]>
  • Loading branch information
yangyangv2 and Yang Yang authored Sep 26, 2024
1 parent 12491aa commit 3f21e7c
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ namespace com.linkedin.metadata.query

record AspectField {

/**
* Asset type (entityType) this aspect belongs to. When running a query filtered by an aspect field (aspect alias),
* depends on which asset this aspect belongs to, the field (aspect alias) can be different. For more context, check
* the decision on aspect alias: go/mg/aspect-alias-decision. The underlying logic requires asset type information
* to query on the right target. e.g. DB column name, which is from the aspect-alias defined on Asset.
*/
asset: string

/**
* FQCN of the aspect class e.g. com.linkedin.common.Status
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion(
final Urn entityUrn = aspectKeys.get(index).getUrn();
final Class<ASPECT> aspectClass = (Class<ASPECT>) aspectKeys.get(index).getAspectClass();
if (checkColumnExists(isTestMode ? getTestTableName(entityUrn) : getTableName(entityUrn),
getAspectColumnName(aspectClass))) {
getAspectColumnName(entityUrn.getEntityType(), aspectClass))) {
keysToQueryMap.computeIfAbsent(aspectClass, unused -> new HashSet<>()).add(entityUrn);
}
}
Expand Down Expand Up @@ -262,7 +262,7 @@ public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<AS
} else {
sqlRow.set("_total_count", 1);
final ASPECT aspect = RecordUtils.toRecordTemplate(aspectClass,
extractAspectJsonString(sqlRow.getString(getAspectColumnName(aspectClass))));
extractAspectJsonString(sqlRow.getString(getAspectColumnName(urn.getEntityType(), aspectClass))));
final ListResultMetadata listResultMetadata = new ListResultMetadata().setExtraInfos(new ExtraInfoArray());
final ExtraInfo extraInfo = new ExtraInfo().setUrn(urn)
.setVersion(LATEST_VERSION)
Expand All @@ -283,8 +283,8 @@ public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<AS
public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<ASPECT> aspectClass, int start,
int pageSize) {

final String tableName = SQLSchemaUtils.getTableName(_entityType);
final String listAspectSql = SQLStatementUtils.createListAspectWithPaginationSql(aspectClass, tableName, false, start, pageSize);

final String listAspectSql = SQLStatementUtils.createListAspectWithPaginationSql(aspectClass, _entityType, false, start, pageSize);
final SqlQuery sqlQuery = _server.createSqlQuery(listAspectSql);
final List<SqlRow> sqlRows = sqlQuery.findList();
if (sqlRows.isEmpty()) {
Expand All @@ -298,7 +298,7 @@ public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<AS
sqlRow.getString("createdfor")));
listResultMetadata.getExtraInfos().add(extraInfo);
return RecordUtils.toRecordTemplate(aspectClass,
extractAspectJsonString(sqlRow.getString(getAspectColumnName(aspectClass))));
extractAspectJsonString(sqlRow.getString(getAspectColumnName(_entityType, aspectClass))));
}).collect(Collectors.toList());
return toListResult(aspectList, sqlRows, listResultMetadata, start, pageSize);
}
Expand All @@ -309,15 +309,17 @@ public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(@Nonnull Class<AS
public Map<String, Long> countAggregate(@Nullable IndexFilter indexFilter,
@Nonnull IndexGroupByCriterion indexGroupByCriterion) {
final String tableName = SQLSchemaUtils.getTableName(_entityType);
final String groupByColumn = getGeneratedColumnName(indexGroupByCriterion.getAspect(), indexGroupByCriterion.getPath(), _nonDollarVirtualColumnsEnabled);
final String groupByColumn =
getGeneratedColumnName(_entityType, indexGroupByCriterion.getAspect(), indexGroupByCriterion.getPath(),
_nonDollarVirtualColumnsEnabled);
// first, check for existence of the column we want to GROUP BY
if (!checkColumnExists(tableName, groupByColumn)) {
// if we are trying to GROUP BY the results on a column that does not exist, just return an empty map
return Collections.emptyMap();
}

// now run the actual GROUP BY query
final String groupBySql = SQLStatementUtils.createGroupBySql(tableName, indexFilter, indexGroupByCriterion, _nonDollarVirtualColumnsEnabled);
final String groupBySql = SQLStatementUtils.createGroupBySql(_entityType, indexFilter, indexGroupByCriterion, _nonDollarVirtualColumnsEnabled);
final SqlQuery sqlQuery = _server.createSqlQuery(groupBySql);
final List<SqlRow> sqlRows = sqlQuery.findList();
Map<String, Long> resultMap = new HashMap<>();
Expand All @@ -343,12 +345,10 @@ public Map<String, Long> countAggregate(@Nullable IndexFilter indexFilter,
*/
private SqlQuery createFilterSqlQuery(@Nullable IndexFilter indexFilter,
@Nullable IndexSortCriterion indexSortCriterion, int offset, int pageSize) {

final String tableName = SQLSchemaUtils.getTableName(_entityType);
StringBuilder filterSql = new StringBuilder();
filterSql.append(SQLStatementUtils.createFilterSql(tableName, indexFilter, true, _nonDollarVirtualColumnsEnabled));
filterSql.append(SQLStatementUtils.createFilterSql(_entityType, indexFilter, true, _nonDollarVirtualColumnsEnabled));
filterSql.append("\n");
filterSql.append(parseSortCriteria(indexSortCriterion, _nonDollarVirtualColumnsEnabled));
filterSql.append(parseSortCriteria(_entityType, indexSortCriterion, _nonDollarVirtualColumnsEnabled));
filterSql.append(String.format(" LIMIT %d", Math.max(pageSize, 0)));
filterSql.append(String.format(" OFFSET %d", Math.max(offset, 0)));
return _server.createSqlQuery(filterSql.toString());
Expand All @@ -360,8 +360,7 @@ private SqlQuery createFilterSqlQuery(@Nullable IndexFilter indexFilter,
private SqlQuery createFilterSqlQuery(@Nullable IndexFilter indexFilter,
@Nullable IndexSortCriterion indexSortCriterion, @Nullable URN lastUrn, int pageSize) {
StringBuilder filterSql = new StringBuilder();
final String tableName = SQLSchemaUtils.getTableName(_entityType);
filterSql.append(SQLStatementUtils.createFilterSql(tableName, indexFilter, false, _nonDollarVirtualColumnsEnabled));
filterSql.append(SQLStatementUtils.createFilterSql(_entityType, indexFilter, false, _nonDollarVirtualColumnsEnabled));

if (lastUrn != null) {
// because createFilterSql will only include a WHERE clause if there are non-urn filters, we need to make sure
Expand All @@ -378,7 +377,7 @@ private SqlQuery createFilterSqlQuery(@Nullable IndexFilter indexFilter,
}

filterSql.append("\n");
filterSql.append(parseSortCriteria(indexSortCriterion, _nonDollarVirtualColumnsEnabled));
filterSql.append(parseSortCriteria(_entityType, indexSortCriterion, _nonDollarVirtualColumnsEnabled));
filterSql.append(String.format(" LIMIT %d", Math.max(pageSize, 0)));
return _server.createSqlQuery(filterSql.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ private <SNAPSHOT extends RecordTemplate> SNAPSHOT constructSnapshot(@Nonnull fi
final List<UnionTemplate> aspects = new ArrayList<>();

for (String aspectCanonicalName : ModelUtils.getAspectClassNames(unionTemplateClass)) {
String colName = SQLSchemaUtils.getAspectColumnName(aspectCanonicalName);
String colName =
SQLSchemaUtils.getAspectColumnName(ModelUtils.getUrnTypeFromSnapshot(snapshotClass), aspectCanonicalName);
String auditedAspectStr = sqlRow.getString(colName);

if (auditedAspectStr != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.dao.utils;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
Expand All @@ -18,6 +19,7 @@
import io.ebean.SqlRow;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -235,11 +237,16 @@ public static <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> readSqlR
*/
private static <ASPECT extends RecordTemplate> EbeanMetadataAspect readSqlRow(SqlRow sqlRow,
Class<ASPECT> aspectClass) {
final String columnName = SQLSchemaUtils.getAspectColumnName(aspectClass);

final EbeanMetadataAspect ebeanMetadataAspect = new EbeanMetadataAspect();
final String urn = sqlRow.getString("urn");
EbeanMetadataAspect.PrimaryKey primaryKey;

final String columnName;
try {
columnName = SQLSchemaUtils.getAspectColumnName(Urn.createFromString(urn).getEntityType(), aspectClass);
} catch (URISyntaxException e) {
throw new RuntimeException("Invalid urn format: " + urn, e);
}
if (isSoftDeletedAspect(sqlRow, columnName)) {
primaryKey = new EbeanMetadataAspect.PrimaryKey(urn, aspectClass.getCanonicalName(), LATEST_VERSION);
ebeanMetadataAspect.setCreatedBy(sqlRow.getString("lastmodifiedby"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,19 @@ private static String parseIndexValue(@Nullable IndexValue indexValue) {

/**
* Parse {@link IndexSortCriterion} into SQL syntax.
* @param entityType entity type from the Urn
* @param indexSortCriterion filter sorting criterion
* @param nonDollarVirtualColumnsEnabled true if virtual column does not contain $, false otherwise
* @return SQL statement of sorting, e.g. ORDER BY ... DESC ..etc.
*/
public static String parseSortCriteria(@Nullable IndexSortCriterion indexSortCriterion, boolean nonDollarVirtualColumnsEnabled) {
public static String parseSortCriteria(@Nonnull String entityType, @Nullable IndexSortCriterion indexSortCriterion,
boolean nonDollarVirtualColumnsEnabled) {
if (indexSortCriterion == null) {
// Default to order by urn if user does not provide sort criterion.
return "ORDER BY URN";
}
final String indexColumn =
SQLSchemaUtils.getGeneratedColumnName(indexSortCriterion.getAspect(), indexSortCriterion.getPath(),
SQLSchemaUtils.getGeneratedColumnName(entityType, indexSortCriterion.getAspect(), indexSortCriterion.getPath(),
nonDollarVirtualColumnsEnabled);

if (!indexSortCriterion.hasOrder()) {
Expand All @@ -87,12 +89,12 @@ public static String parseSortCriteria(@Nullable IndexSortCriterion indexSortCri

/**
* Parse {@link IndexFilter} into MySQL syntax.
*
* @param entityType entity type from the Urn
* @param indexFilter index filter
* @param nonDollarVirtualColumnsEnabled whether to enable non-dollar virtual columns
* @return translated SQL condition expression, e.g. WHERE ...
*/
public static String parseIndexFilter(@Nullable IndexFilter indexFilter, boolean nonDollarVirtualColumnsEnabled) {
public static String parseIndexFilter(@Nonnull String entityType, @Nullable IndexFilter indexFilter, boolean nonDollarVirtualColumnsEnabled) {
List<String> sqlFilters = new ArrayList<>();

if (indexFilter == null || !indexFilter.hasCriteria()) {
Expand All @@ -103,7 +105,7 @@ public static String parseIndexFilter(@Nullable IndexFilter indexFilter, boolean
final String aspect = indexCriterion.getAspect();
if (!isUrn(aspect)) {
// if aspect is not urn, then check aspect is not soft deleted and is not null
final String aspectColumn = getAspectColumnName(indexCriterion.getAspect());
final String aspectColumn = getAspectColumnName(entityType, indexCriterion.getAspect());
sqlFilters.add(aspectColumn + " IS NOT NULL");
sqlFilters.add(String.format(SOFT_DELETED_CHECK, aspectColumn));
}
Expand All @@ -112,7 +114,7 @@ public static String parseIndexFilter(@Nullable IndexFilter indexFilter, boolean
if (pathParams != null) {
validateConditionAndValue(indexCriterion);
final Condition condition = pathParams.getCondition();
final String indexColumn = getGeneratedColumnName(aspect, pathParams.getPath(), nonDollarVirtualColumnsEnabled);
final String indexColumn = getGeneratedColumnName(entityType, aspect, pathParams.getPath(), nonDollarVirtualColumnsEnabled);
sqlFilters.add(parseSqlFilter(indexColumn, condition, pathParams.getValue()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
import com.linkedin.metadata.dao.exception.MissingAnnotationException;
import java.util.Map;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.dao.utils.SQLStatementUtils.*;


/**
* Generate schema related SQL script, such as normalized table / column names ..etc
*/
@Slf4j
public class SQLSchemaUtils {

private static final String GMA = "gma";
public static final String ENTITY_TABLE_PREFIX = "metadata_entity_";
public static final String RELATIONSHIP_TABLE_PREFIX = "metadata_relationship_";
Expand All @@ -24,6 +27,17 @@ public class SQLSchemaUtils {

private static final int MYSQL_MAX_COLUMN_NAME_LENGTH = 64 - ASPECT_PREFIX.length();

/**
* This field is used when asset field in {@link com.linkedin.metadata.query.AspectField} is not provided in the
* legacy implementation. When this is field is set, the getColumnNameFromAnnotation() will retrieve
* "column" from the "column" annotation from the Aspect. However, the going forward way is to retrieve "column"
* information from aspect alias defined in the asset.
*
* <p>For more context, see: Decision - Using Proto Field Name as Aspect URI
* go/mg/aspect-alias-decision
*/
protected static final String UNKNOWN_ASSET = "UNKNOWN_ASSET";

private SQLSchemaUtils() {
}

Expand Down Expand Up @@ -113,8 +127,8 @@ public static <RELATIONSHIP extends RecordTemplate> String getTestRelationshipTa
* Get column name from aspect class canonical name.
*/
@Nonnull
public static String getAspectColumnName(@Nonnull final String aspectCanonicalName) {
return ASPECT_PREFIX + getColumnNameFromAnnotation(aspectCanonicalName);
public static String getAspectColumnName(@Nonnull final String entityType, @Nonnull final String aspectCanonicalName) {
return ASPECT_PREFIX + getColumnNameFromAnnotation(entityType, aspectCanonicalName);
}

/**
Expand All @@ -123,21 +137,25 @@ public static String getAspectColumnName(@Nonnull final String aspectCanonicalNa
* @param <ASPECT> aspect that extends {@link RecordTemplate}
* @return aspect column name
*/
public static <ASPECT extends RecordTemplate> String getAspectColumnName(@Nonnull Class<ASPECT> aspectClass) {
return getAspectColumnName(aspectClass.getCanonicalName());
public static <ASPECT extends RecordTemplate> String getAspectColumnName(@Nonnull final String entityType,
@Nonnull Class<ASPECT> aspectClass) {
return getAspectColumnName(entityType, aspectClass.getCanonicalName());
}

/**
* Get generated column name from aspect and path.
*/
@Nonnull
public static String getGeneratedColumnName(@Nonnull String aspect, @Nonnull String path, boolean nonDollarVirtualColumnsEnabled) {
public static String getGeneratedColumnName(@Nonnull String assetType, @Nonnull String aspect, @Nonnull String path,
boolean nonDollarVirtualColumnsEnabled) {
char delimiter = nonDollarVirtualColumnsEnabled ? '0' : '$';
if (isUrn(aspect)) {
return INDEX_PREFIX + "urn" + processPath(path, delimiter);
}

return INDEX_PREFIX + getColumnNameFromAnnotation(aspect) + processPath(path, delimiter);
if (UNKNOWN_ASSET.equals(assetType)) {
log.warn("query with unknown asset type. aspect = {}, path ={}, delimiter = {}", aspect, path, delimiter);
}
return INDEX_PREFIX + getColumnNameFromAnnotation(assetType, aspect) + processPath(path, delimiter);
}

/**
Expand Down Expand Up @@ -165,11 +183,13 @@ public static String processPath(@Nonnull String path, char delimiter) {
/**
* Get Column name from aspect canonical name.
*
* @param assetType entity type from Urn definition.
* @param aspectCanonicalName aspect name in canonical form.
* @return aspect column name
*/
@Nonnull
private static String getColumnNameFromAnnotation(@Nonnull final String aspectCanonicalName) {
private static String getColumnNameFromAnnotation(@Nonnull final String assetType, @Nonnull final String aspectCanonicalName) {
// TODO(yanyang) implement a map of (assetType, aspectClass) --> aspect_alias (column)
try {
final RecordDataSchema schema = (RecordDataSchema) DataTemplateUtil.getSchema(ClassUtils.loadClass(aspectCanonicalName));
final Map<String, Object> properties = schema.getProperties();
Expand Down
Loading

0 comments on commit 3f21e7c

Please sign in to comment.