Skip to content

Commit

Permalink
feat(api): add AuditStamp to the V3 API entity/aspect response
Browse files Browse the repository at this point in the history
  • Loading branch information
ajoymajumdar committed Aug 8, 2024
1 parent d6e46b9 commit d4a60ce
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.datahubproject.openapi.v3.models;

import com.linkedin.common.AuditStamp;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.mxe.SystemMetadata;
import lombok.Builder;
import lombok.Value;

@Builder(toBuilder = true)
@Value
public class AspectItem {
RecordTemplate aspect;
SystemMetadata systemMetadata;
AuditStamp auditStamp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ public class GenericAspectV3 implements GenericAspect {
@Nonnull Map<String, Object> value;
@Nullable Map<String, Object> systemMetadata;
@Nullable Map<String, String> headers;
@Nullable Map<String, Object> auditStamp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static class GenericEntityV3Builder {
public GenericEntityV3 build(
ObjectMapper objectMapper,
@Nonnull Urn urn,
Map<String, Pair<RecordTemplate, SystemMetadata>> aspects) {
Map<String, AspectItem> aspects) {
Map<String, GenericAspectV3> jsonObjectMap =
aspects.entrySet().stream()
.map(
Expand All @@ -53,20 +53,24 @@ public GenericEntityV3 build(
String aspectName = entry.getKey();
Map<String, Object> aspectValue =
objectMapper.readValue(
RecordUtils.toJsonString(entry.getValue().getFirst())
RecordUtils.toJsonString(entry.getValue().getAspect())
.getBytes(StandardCharsets.UTF_8),
new TypeReference<>() {});
Map<String, Object> systemMetadata =
entry.getValue().getSecond() != null
entry.getValue().getSystemMetadata() != null
? objectMapper.convertValue(
entry.getValue().getSecond(), new TypeReference<>() {})
entry.getValue().getSystemMetadata(), new TypeReference<>() {})
: null;
Map<String, Object> auditStamp =
entry.getValue().getAuditStamp() != null
? objectMapper.convertValue(
entry.getValue().getAuditStamp().data(), new TypeReference<>() {})
: null;

return Map.entry(
aspectName,
GenericAspectV3.builder()
.value(aspectValue)
.systemMetadata(systemMetadata)
.value(aspectValue).systemMetadata(systemMetadata).auditStamp(auditStamp)
.build());
} catch (IOException ex) {
throw new RuntimeException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class OpenAPIV3Generator {
private static final String NAME_QUERY = "query";
private static final String NAME_PATH = "path";
private static final String NAME_SYSTEM_METADATA = "systemMetadata";
private static final String NAME_ASYNC = "async";
private static final String NAME_AUDIT_STAMP = "auditStamp";
private static final String NAME_VERSION = "version";
private static final String NAME_SCROLL_ID = "scrollId";
private static final String NAME_INCLUDE_SOFT_DELETE = "includeSoftDelete";
Expand Down Expand Up @@ -77,9 +77,6 @@ public static OpenAPI generateOpenApiSpec(EntityRegistry entityRegistry) {
// Components
final Components components = new Components();
// --> Aspect components
// TODO: Correct handling of SystemMetadata and SortOrder
components.addSchemas(
"SystemMetadata", new Schema().type(TYPE_OBJECT).additionalProperties(true));
components.addSchemas("SortOrder", new Schema()._enum(List.of("ASCENDING", "DESCENDING")));
components.addSchemas("AspectPatch", buildAspectPatchSchema());
components.addSchemas(
Expand Down Expand Up @@ -167,6 +164,10 @@ public static OpenAPI generateOpenApiSpec(EntityRegistry entityRegistry) {
buildSingleEntityAspectPath(
e, a.getName(), a.getPegasusSchema().getName())));
});
// TODO: Correct handling of SystemMetadata and AuditStamp
components.addSchemas(
"SystemMetadata", new Schema().type(TYPE_OBJECT).additionalProperties(true));
components.addSchemas("AuditStamp", new Schema().type(TYPE_OBJECT).additionalProperties(true));
return new OpenAPI().openapi("3.0.1").info(info).paths(paths).components(components);
}

Expand All @@ -185,7 +186,7 @@ private static PathItem buildSingleEntityPath(final EntitySpec entity) {
.schema(new Schema().type(TYPE_STRING)),
new Parameter()
.in(NAME_QUERY)
.name("systemMetadata")
.name(NAME_SYSTEM_METADATA)
.description("Include systemMetadata with response.")
.schema(new Schema().type(TYPE_BOOLEAN)._default(false)),
new Parameter()
Expand Down Expand Up @@ -424,7 +425,7 @@ private static PathItem buildBatchGetEntityPath(final EntitySpec entity) {
List.of(
new Parameter()
.in(NAME_QUERY)
.name("systemMetadata")
.name(NAME_SYSTEM_METADATA)
.description("Include systemMetadata with response.")
.schema(new Schema().type(TYPE_BOOLEAN)._default(false))))
.requestBody(
Expand Down Expand Up @@ -575,12 +576,19 @@ private static Schema buildAspectRefResponseSchema(final String aspectName) {
.required(List.of(PROPERTY_VALUE))
.addProperty(PROPERTY_VALUE, new Schema<>().$ref(PATH_DEFINITIONS + aspectName));
result.addProperty(
"systemMetadata",
NAME_SYSTEM_METADATA,
new Schema<>()
.type(TYPE_OBJECT)
.anyOf(List.of(new Schema().$ref(PATH_DEFINITIONS + "SystemMetadata")))
.description("System metadata for the aspect.")
.nullable(true));
result.addProperty(
NAME_AUDIT_STAMP,
new Schema<>()
.type(TYPE_OBJECT)
.anyOf(List.of(new Schema().$ref(PATH_DEFINITIONS + "AuditStamp")))
.description("Audit stamp for the aspect.")
.nullable(true));
return result;
}

Expand All @@ -592,7 +600,7 @@ private static Schema buildAspectRefRequestSchema(final String aspectName) {
.required(List.of(PROPERTY_VALUE))
.addProperty(PROPERTY_VALUE, new Schema<>().$ref(PATH_DEFINITIONS + aspectName));
result.addProperty(
"systemMetadata",
NAME_SYSTEM_METADATA,
new Schema<>()
.type(TYPE_OBJECT)
.anyOf(List.of(new Schema().$ref(PATH_DEFINITIONS + "SystemMetadata")))
Expand Down Expand Up @@ -867,7 +875,7 @@ private static PathItem buildSingleEntityAspectPath(
List.of(
new Parameter()
.in(NAME_QUERY)
.name("systemMetadata")
.name(NAME_SYSTEM_METADATA)
.description("Include systemMetadata with response.")
.schema(new Schema().type(TYPE_BOOLEAN)._default(false))))
.summary(String.format("Patch aspect %s on %s ", aspect, upperFirstEntity))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.ByteString;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.BatchItem;
Expand All @@ -28,12 +27,12 @@
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.RequestContext;
import io.datahubproject.openapi.controller.GenericEntitiesController;
import io.datahubproject.openapi.exception.InvalidUrnException;
import io.datahubproject.openapi.exception.UnauthorizedException;
import io.datahubproject.openapi.v3.models.AspectItem;
import io.datahubproject.openapi.v3.models.GenericAspectV3;
import io.datahubproject.openapi.v3.models.GenericEntityScrollResultV3;
import io.datahubproject.openapi.v3.models.GenericEntityV3;
Expand Down Expand Up @@ -143,11 +142,27 @@ protected List<GenericEntityV3> buildEntityVersionedAspectList(
.map(
u ->
GenericEntityV3.builder()
.build(objectMapper, u, toAspectMap(u, aspects.get(u), withSystemMetadata)))
.build(
objectMapper, u, toAspectItemMap(u, aspects.get(u), withSystemMetadata)))
.collect(Collectors.toList());
}
}

private Map<String, AspectItem> toAspectItemMap(
Urn urn, List<EnvelopedAspect> aspects, boolean withSystemMetadata) {
return aspects.stream()
.map(
a ->
Map.entry(
a.getName(),
AspectItem.builder()
.aspect(toRecordTemplate(lookupAspectSpec(urn, a.getName()), a))
.systemMetadata(withSystemMetadata ? a.getSystemMetadata() : null)
.auditStamp(withSystemMetadata ? a.getCreated() : null)
.build()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
protected List<GenericEntityV3> buildEntityList(
Set<IngestResult> ingestResults, boolean withSystemMetadata) {
Expand All @@ -156,15 +171,21 @@ protected List<GenericEntityV3> buildEntityList(
Map<Urn, List<IngestResult>> entityMap =
ingestResults.stream().collect(Collectors.groupingBy(IngestResult::getUrn));
for (Map.Entry<Urn, List<IngestResult>> urnAspects : entityMap.entrySet()) {
Map<String, Pair<RecordTemplate, SystemMetadata>> aspectsMap =
Map<String, AspectItem> aspectsMap =
urnAspects.getValue().stream()
.map(
ingest ->
Map.entry(
ingest.getRequest().getAspectName(),
Pair.of(
ingest.getRequest().getRecordTemplate(),
withSystemMetadata ? ingest.getRequest().getSystemMetadata() : null)))
AspectItem.builder()
.aspect(ingest.getRequest().getRecordTemplate())
.systemMetadata(
withSystemMetadata
? ingest.getRequest().getSystemMetadata()
: null)
.auditStamp(
withSystemMetadata ? ingest.getRequest().getAuditStamp() : null)
.build()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
responseList.add(
GenericEntityV3.builder().build(objectMapper, urnAspects.getKey(), aspectsMap));
Expand All @@ -183,9 +204,12 @@ protected GenericEntityV3 buildGenericEntity(
updateAspectResult.getUrn(),
Map.of(
aspectName,
Pair.of(
updateAspectResult.getNewValue(),
withSystemMetadata ? updateAspectResult.getNewSystemMetadata() : null)));
AspectItem.builder()
.aspect(updateAspectResult.getNewValue())
.systemMetadata(
withSystemMetadata ? updateAspectResult.getNewSystemMetadata() : null)
.auditStamp(withSystemMetadata ? updateAspectResult.getAuditStamp() : null)
.build()));
}

private List<GenericEntityV3> toRecordTemplates(
Expand Down

0 comments on commit d4a60ce

Please sign in to comment.