Skip to content

Commit

Permalink
read column from aspect alias part 2 (linkedin#436)
Browse files Browse the repository at this point in the history
Co-authored-by: Yang Yang <[email protected]>
  • Loading branch information
yangyangv2 and Yang Yang authored Sep 29, 2024
1 parent 3f21e7c commit 56c19cb
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -203,6 +204,20 @@ public static <SNAPSHOT extends RecordTemplate> String getUrnTypeFromSnapshot(@N
}
}

/**
* Get the asset type of urn inside a asset class.
* @param asset an assset class
* @return entity type of urn
*/
@Nonnull
public static <ASSET extends RecordTemplate> String getUrnTypeFromAsset(@Nonnull Class<ASSET> asset) {
try {
return (String) asset.getMethod("getUrn").getReturnType().getField("ENTITY_TYPE").get(null);
} catch (Exception ignored) {
throw new IllegalArgumentException(String.format("The snapshot class %s is not valid.", asset.getCanonicalName()));
}
}

/**
* Similar to {@link #getUrnFromSnapshot(RecordTemplate)} but extracts from a Snapshot union instead.
*/
Expand Down Expand Up @@ -350,6 +365,43 @@ public static <ASSET extends RecordTemplate> List<RecordTemplate> getAspectsFrom
}
}

private final static ConcurrentHashMap<Class<? extends RecordTemplate>, Map<String, String>> ASPECT_ALIAS_CACHE =
new ConcurrentHashMap<>();

/**
* Return aspect alias (in lower cases) from given asset class and aspect FQCN (Fully Qualified Class Name).
* @param assetClass asset class
* @param aspectFQCN aspect FQCN
* @return alias names in lower cases
* @param <ASSET> Asset class
*/
@Nullable
public static <ASSET extends RecordTemplate> String getAspectAlias(@Nonnull Class<ASSET> assetClass,
@Nonnull String aspectFQCN) {
return ASPECT_ALIAS_CACHE.computeIfAbsent(assetClass, key -> {
AssetValidator.validateAssetSchema(assetClass);
final Field[] declaredFields = assetClass.getDeclaredFields();
Map<String, String> map = new HashMap<>();
for (Field declaredField : declaredFields) {
if (!declaredField.getName().startsWith(FIELD_FIELD_PREFIX)) {
continue;
}
String fieldName = declaredField.getName().substring(FIELD_FIELD_PREFIX.length());
if (fieldName.equalsIgnoreCase(URN_FIELD)) {
continue;
}
String methodName = "get" + fieldName;
try {
String aspectClass = assetClass.getMethod(methodName).getReturnType().getCanonicalName();
map.put(aspectClass, fieldName.toLowerCase());
} catch (NoSuchMethodException e) {
throw new RuntimeException("Method not found: " + methodName, e);
}
}
return map;
}).get(aspectFQCN);
}

/**
* Extracts given aspect from a snapshot.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ namespace com.linkedin.metadata.query
record AspectField {

/**
* Asset type (entityType) this aspect belongs to. When running a query filtered by an aspect field (aspect alias),
* FQCN of the asset this aspect belongs to. When running a query filtered by an aspect field (aspect alias),
* depends on which asset this aspect belongs to, the field (aspect alias) can be different. For more context, check
* the decision on aspect alias: go/mg/aspect-alias-decision. The underlying logic requires asset type information
* to query on the right target. e.g. DB column name, which is from the aspect-alias defined on Asset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.metadata.validator.NullFieldException;
import com.linkedin.testing.AspectAttributes;
import com.linkedin.testing.AspectUnionWithSoftDeletedAspect;
import com.linkedin.testing.BarAsset;
import com.linkedin.testing.BarUrnArray;
import com.linkedin.testing.DatasetInfo;
import com.linkedin.testing.DeltaUnionAlias;
Expand Down Expand Up @@ -794,4 +795,22 @@ public void testGetEntityType() {
assertEquals(entityType, expectedUrn.getEntityType());
assertNull(ModelUtils.getEntityType(null));
}

@Test
public void testGetUrnTypeFromAssetType() {
String assetType = ModelUtils.getUrnTypeFromAsset(BarAsset.class);
assertEquals(BarUrn.ENTITY_TYPE, assetType);
}

@Test
public void testGetAspectAlias() {
assertEquals(ModelUtils.getAspectAlias(BarAsset.class, AspectBar.class.getCanonicalName()), "aspect_bar");
assertNull(ModelUtils.getAspectAlias(BarAsset.class, AspectFoo.class.getCanonicalName()), "aspect_foo");
try {
ModelUtils.getAspectAlias(AspectBar.class, AspectFoo.class.getCanonicalName());
fail("should fail on non-Asset template");
} catch (Exception e) {

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.linkedin.metadata.dao;

import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.validator.AssetValidator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.reflections.Reflections;


/**
* This class tracks the (entityType, Asset) map.
*/
@Slf4j
public class GlobalAssetRegistry {
private final Map<String, Class<? extends RecordTemplate>> registry = new ConcurrentHashMap<>();

private GlobalAssetRegistry() {
preLoadInternalAssets();
}

// thread-safe, lazy-load singleton instance.
// JVM guarantees static fields is only instantiated once and in a thread-safe manner when class is first being loaded.
// Putting it in the inner class makes this inner only being loaded when getInstance() is called.
private static class InnerHolder {
private static final GlobalAssetRegistry INSTANCE = new GlobalAssetRegistry();
}

private static GlobalAssetRegistry getInstance() {
return InnerHolder.INSTANCE;
}

public static void register(@Nonnull String assetType, @Nonnull Class<? extends RecordTemplate> assetClass) {
AssetValidator.validateAssetSchema(assetClass);
getInstance().registry.put(assetType, assetClass);
}

public static Class<? extends RecordTemplate> get(@Nonnull String assetType) {
return getInstance().registry.get(assetType);
}

/**
* TODO: moving this loading logic into internal-models.
*/
private void preLoadInternalAssets() {
Reflections reflections = new Reflections(INTERNAL_ASSET_PACKAGE); // Change to your package
Set<Class<? extends RecordTemplate>> assetClasses = reflections.getSubTypesOf(RecordTemplate.class);
for (Class<? extends RecordTemplate> assetClass : assetClasses) {
try {
register(ModelUtils.getUrnTypeFromAsset(assetClass), assetClass);
} catch (Exception e) {
log.error("failed to load asset: " + assetClass, e);
}
}
}

private static final String INTERNAL_ASSET_PACKAGE = "pegasus.com.linkedin.metadata.asset";
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.aspect.AspectColumnMetadata;
import com.linkedin.metadata.dao.GlobalAssetRegistry;
import com.linkedin.metadata.dao.exception.MissingAnnotationException;
import com.linkedin.metadata.dao.exception.ModelValidationException;
import java.util.Map;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.dao.utils.SQLStatementUtils.*;


/**
* Generate schema related SQL script, such as normalized table / column names ..etc
Expand Down Expand Up @@ -128,7 +128,7 @@ public static <RELATIONSHIP extends RecordTemplate> String getTestRelationshipTa
*/
@Nonnull
public static String getAspectColumnName(@Nonnull final String entityType, @Nonnull final String aspectCanonicalName) {
return ASPECT_PREFIX + getColumnNameFromAnnotation(entityType, aspectCanonicalName);
return ASPECT_PREFIX + getColumnName(entityType, aspectCanonicalName);
}

/**
Expand All @@ -155,7 +155,7 @@ public static String getGeneratedColumnName(@Nonnull String assetType, @Nonnull
if (UNKNOWN_ASSET.equals(assetType)) {
log.warn("query with unknown asset type. aspect = {}, path ={}, delimiter = {}", aspect, path, delimiter);
}
return INDEX_PREFIX + getColumnNameFromAnnotation(assetType, aspect) + processPath(path, delimiter);
return INDEX_PREFIX + getColumnName(assetType, aspect) + processPath(path, delimiter);
}

/**
Expand Down Expand Up @@ -188,17 +188,46 @@ public static String processPath(@Nonnull String path, char delimiter) {
* @return aspect column name
*/
@Nonnull
private static String getColumnNameFromAnnotation(@Nonnull final String assetType, @Nonnull final String aspectCanonicalName) {
// TODO(yanyang) implement a map of (assetType, aspectClass) --> aspect_alias (column)
private static String getColumnName(@Nonnull final String assetType,
@Nonnull final String aspectCanonicalName) {

Class<? extends RecordTemplate> assetClass = GlobalAssetRegistry.get(assetType);
if (assetClass == null) {
return getColumnNameFromAnnotation(assetType, aspectCanonicalName);
} else {
String aspectAlias = ModelUtils.getAspectAlias(assetClass, aspectCanonicalName);
if (aspectAlias == null) {
throw new ModelValidationException(
"failed to get aspect alias for: " + aspectCanonicalName + " from asset: " + assetClass);
} else {
return aspectAlias;
}
}
}

/**
* Get Column name from aspect column annotation (legacy).
*
* @param assetType entity type from Urn definition.
* @param aspectCanonicalName aspect name in canonical form.
* @return aspect column name
*/
@Nonnull
private static String getColumnNameFromAnnotation(@Nonnull final String assetType,
@Nonnull final String aspectCanonicalName) {
log.warn("loading column name from legacy 'column' annotation. asset: {}, aspect: {}", assetType,
aspectCanonicalName);
// load column from Aspect annotation (legacy way)
try {
final RecordDataSchema schema = (RecordDataSchema) DataTemplateUtil.getSchema(ClassUtils.loadClass(aspectCanonicalName));
final RecordDataSchema schema =
(RecordDataSchema) DataTemplateUtil.getSchema(ClassUtils.loadClass(aspectCanonicalName));
final Map<String, Object> properties = schema.getProperties();
final Object gmaObj = properties.get(GMA);
final AspectColumnMetadata gmaAnnotation = DataTemplateUtil.wrap(gmaObj, AspectColumnMetadata.class);
return gmaAnnotation.getAspect().getColumn().getName();
} catch (Exception e) {
throw new MissingAnnotationException(String.format("Aspect %s should be annotated with @gma.aspect.column.name.",
aspectCanonicalName), e);
throw new MissingAnnotationException(
String.format("Aspect %s should be annotated with @gma.aspect.column.name.", aspectCanonicalName), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO;
import com.linkedin.metadata.query.AspectField;
import com.linkedin.metadata.query.Condition;
import com.linkedin.metadata.query.IndexFilter;
import com.linkedin.metadata.query.IndexGroupByCriterion;
Expand Down Expand Up @@ -476,14 +477,34 @@ private static String parseLocalRelationshipField(

if (field.isAspectField()) {
// entity type from Urn definition.
String asset = field.getAspectField().hasAsset() ? field.getAspectField().getAsset() : UNKNOWN_ASSET;
return tablePrefix + SQLSchemaUtils.getGeneratedColumnName(asset, field.getAspectField().getAspect(),
String assetType = getAssetType(field.getAspectField());
return tablePrefix + SQLSchemaUtils.getGeneratedColumnName(assetType, field.getAspectField().getAspect(),
field.getAspectField().getPath(), nonDollarVirtualColumnsEnabled);
}

throw new IllegalArgumentException("Unrecognized field type");
}

/**
* Get asset type from an aspectField.
* @param aspectField {@link AspectField}
* @return asset type, which is equivalent to Urn's entity type
*/
protected static String getAssetType(AspectField aspectField) {

String assetType = UNKNOWN_ASSET;
if (aspectField.hasAsset()) {
try {
assetType =
ModelUtils.getUrnTypeFromAsset(Class.forName(aspectField.getAsset()).asSubclass(RecordTemplate.class));
} catch (ClassNotFoundException | ClassCastException e) {
throw new IllegalArgumentException("Unrecognized asset type: " + aspectField.getAsset());
}
}
return assetType;
}


private static String parseLocalRelationshipValue(@Nonnull final LocalRelationshipValue localRelationshipValue) {
if (localRelationshipValue.isArray()) {
return "(" + localRelationshipValue.getArray().stream().map(s -> "'" + StringEscapeUtils.escapeSql(s) + "'")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.metadata.dao;

import com.linkedin.testing.AspectBar;
import com.linkedin.testing.BarAsset;
import com.linkedin.testing.urn.BarUrn;
import org.testng.annotations.Test;

import static org.testng.Assert.*;


public class GlobalAssetRegistryTest {
@Test
public void testGetInstance() {
GlobalAssetRegistry.register(BarUrn.ENTITY_TYPE, BarAsset.class);
try {
GlobalAssetRegistry.register(BarUrn.ENTITY_TYPE, AspectBar.class);
fail("should fail because of invalid aspect");
} catch (Exception e) {
}

assertEquals(GlobalAssetRegistry.get(BarUrn.ENTITY_TYPE), BarAsset.class);
assertNull(GlobalAssetRegistry.get("unknownType"));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.metadata.dao.utils;

import com.linkedin.metadata.dao.GlobalAssetRegistry;
import com.linkedin.testing.AspectFoo;
import com.linkedin.testing.BarAsset;
import com.linkedin.testing.urn.BarUrn;
import com.linkedin.testing.urn.FooUrn;
import org.testng.annotations.Test;
Expand All @@ -24,7 +26,8 @@ public void testGetGeneratedColumnName() {

@Test
public void testGetAspectColumnName() {
assertEquals("a_aspectbar",
GlobalAssetRegistry.register(BarUrn.ENTITY_TYPE, BarAsset.class);
assertEquals("a_aspect_bar",
SQLSchemaUtils.getAspectColumnName(BarUrn.ENTITY_TYPE, "com.linkedin.testing.AspectBar"));
}
}
Loading

0 comments on commit 56c19cb

Please sign in to comment.