From 8a1122049c02c4929d8029c25dac517e5fdafc48 Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Mon, 11 Dec 2023 14:25:43 -0800 Subject: [PATCH 1/7] feat(patch): support fine grained lineage patches (#9408) Co-authored-by: Harshal Sheth --- .../dataset/UpstreamLineageTemplate.java | 271 ++++++++++++- .../registry/UpstreamLineageTemplateTest.java | 359 ++++++++++++++++++ .../java/com/linkedin/metadata/Constants.java | 5 + .../src/datahub/specific/dataset.py | 107 +++++- .../unit/patch/complex_dataset_patch.json | 45 ++- .../tests/unit/patch/test_patch_builder.py | 16 + .../dataset/UpstreamLineagePatchBuilder.java | 231 ++++++++++- .../java/datahub/client/patch/PatchTest.java | 24 +- 8 files changed, 1023 insertions(+), 35 deletions(-) create mode 100644 entity-registry/src/test/java/com/linkedin/metadata/models/registry/UpstreamLineageTemplateTest.java diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/registry/template/dataset/UpstreamLineageTemplate.java b/entity-registry/src/main/java/com/linkedin/metadata/models/registry/template/dataset/UpstreamLineageTemplate.java index 35816895669beb..81a4065dedb1a2 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/registry/template/dataset/UpstreamLineageTemplate.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/registry/template/dataset/UpstreamLineageTemplate.java @@ -1,20 +1,41 @@ package com.linkedin.metadata.models.registry.template.dataset; +import static com.fasterxml.jackson.databind.node.JsonNodeFactory.*; +import static com.linkedin.metadata.Constants.*; + import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Streams; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.RecordTemplate; import com.linkedin.dataset.FineGrainedLineageArray; import com.linkedin.dataset.UpstreamArray; import com.linkedin.dataset.UpstreamLineage; -import com.linkedin.metadata.models.registry.template.ArrayMergingTemplate; +import com.linkedin.metadata.models.registry.template.CompoundKeyTemplate; import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; -public class UpstreamLineageTemplate implements ArrayMergingTemplate { +public class UpstreamLineageTemplate extends CompoundKeyTemplate { + // Fields private static final String UPSTREAMS_FIELD_NAME = "upstreams"; private static final String DATASET_FIELD_NAME = "dataset"; + private static final String FINE_GRAINED_LINEAGES_FIELD_NAME = "fineGrainedLineages"; + private static final String FINE_GRAINED_UPSTREAM_TYPE = "upstreamType"; + private static final String FINE_GRAINED_UPSTREAMS = "upstreams"; + private static final String FINE_GRAINED_DOWNSTREAM_TYPE = "downstreamType"; + private static final String FINE_GRAINED_DOWNSTREAMS = "downstreams"; + private static final String FINE_GRAINED_TRANSFORMATION_OPERATION = "transformOperation"; + private static final String FINE_GRAINED_CONFIDENCE_SCORE = "confidenceScore"; - // TODO: Fine Grained Lineages not patchable at this time, they don't have a well established key + // Template support + private static final String NONE_TRANSFORMATION_TYPE = "NONE"; + private static final Float DEFAULT_CONFIDENCE_SCORE = 1.0f; @Override public UpstreamLineage getSubtype(RecordTemplate recordTemplate) throws ClassCastException { @@ -42,14 +63,250 @@ public UpstreamLineage getDefault() { @Nonnull @Override public JsonNode transformFields(JsonNode baseNode) { - return arrayFieldToMap( - baseNode, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME)); + JsonNode transformedNode = + arrayFieldToMap( + baseNode, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME)); + ((ObjectNode) transformedNode) + .set( + FINE_GRAINED_LINEAGES_FIELD_NAME, + combineAndTransformFineGrainedLineages( + transformedNode.get(FINE_GRAINED_LINEAGES_FIELD_NAME))); + + return transformedNode; } @Nonnull @Override public JsonNode rebaseFields(JsonNode patched) { - return transformedMapToArray( - patched, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME)); + JsonNode rebasedNode = + transformedMapToArray( + patched, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME)); + ((ObjectNode) rebasedNode) + .set( + FINE_GRAINED_LINEAGES_FIELD_NAME, + reconstructFineGrainedLineages(rebasedNode.get(FINE_GRAINED_LINEAGES_FIELD_NAME))); + return rebasedNode; + } + + /** + * Combines fine grained lineage array into a map using upstream and downstream types as keys, + * defaulting when not present. Due to this construction, patches will look like: path: + * /fineGrainedLineages/TRANSFORMATION_OPERATION/(upstreamType || downstreamType)/TYPE/FIELD_URN, + * op: ADD/REMOVE, value: float (confidenceScore) Due to the way FineGrainedLineage was designed + * it doesn't necessarily have a consistent key we can reference, so this specialized method + * mimics the arrayFieldToMap of the super class with the specialization that it does not put the + * full value of the aspect at the end of the key, just the particular array. This prevents + * unintended overwrites through improper MCP construction that is technically allowed by the + * schema when combining under fields that form the natural key. + * + * @param fineGrainedLineages the fine grained lineage array node + * @return the modified {@link JsonNode} with array fields transformed to maps + */ + private JsonNode combineAndTransformFineGrainedLineages(@Nullable JsonNode fineGrainedLineages) { + ObjectNode mapNode = instance.objectNode(); + if (!(fineGrainedLineages instanceof ArrayNode) || fineGrainedLineages.isEmpty()) { + return mapNode; + } + JsonNode lineageCopy = fineGrainedLineages.deepCopy(); + + lineageCopy + .elements() + .forEachRemaining( + node -> { + JsonNode nodeClone = node.deepCopy(); + String transformationOperation = + nodeClone.has(FINE_GRAINED_TRANSFORMATION_OPERATION) + ? nodeClone.get(FINE_GRAINED_TRANSFORMATION_OPERATION).asText() + : NONE_TRANSFORMATION_TYPE; + + if (!mapNode.has(transformationOperation)) { + mapNode.set(transformationOperation, instance.objectNode()); + } + ObjectNode transformationOperationNode = + (ObjectNode) mapNode.get(transformationOperation); + + Float confidenceScore = + nodeClone.has(FINE_GRAINED_CONFIDENCE_SCORE) + ? nodeClone.get(FINE_GRAINED_CONFIDENCE_SCORE).floatValue() + : DEFAULT_CONFIDENCE_SCORE; + + String upstreamType = + nodeClone.has(FINE_GRAINED_UPSTREAM_TYPE) + ? nodeClone.get(FINE_GRAINED_UPSTREAM_TYPE).asText() + : null; + String downstreamType = + nodeClone.has(FINE_GRAINED_DOWNSTREAM_TYPE) + ? nodeClone.get(FINE_GRAINED_DOWNSTREAM_TYPE).asText() + : null; + ArrayNode upstreams = + nodeClone.has(FINE_GRAINED_UPSTREAMS) + ? (ArrayNode) nodeClone.get(FINE_GRAINED_UPSTREAMS) + : null; + ArrayNode downstreams = + nodeClone.has(FINE_GRAINED_DOWNSTREAMS) + ? (ArrayNode) nodeClone.get(FINE_GRAINED_DOWNSTREAMS) + : null; + + // Handle upstreams + if (upstreamType == null) { + // Determine default type + Urn upstreamUrn = + upstreams != null ? UrnUtils.getUrn(upstreams.get(0).asText()) : null; + if (upstreamUrn != null + && SCHEMA_FIELD_ENTITY_NAME.equals(upstreamUrn.getEntityType())) { + upstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE; + } else { + upstreamType = FINE_GRAINED_LINEAGE_DATASET_TYPE; + } + } + if (!transformationOperationNode.has(FINE_GRAINED_UPSTREAM_TYPE)) { + transformationOperationNode.set(FINE_GRAINED_UPSTREAM_TYPE, instance.objectNode()); + } + ObjectNode upstreamTypeNode = + (ObjectNode) transformationOperationNode.get(FINE_GRAINED_UPSTREAM_TYPE); + if (!upstreamTypeNode.has(upstreamType)) { + upstreamTypeNode.set(upstreamType, instance.objectNode()); + } + if (upstreams != null) { + addUrnsToSubType(upstreamTypeNode, upstreams, upstreamType, confidenceScore); + } + + // Handle downstreams + if (downstreamType == null) { + // Determine default type + if (downstreams != null && downstreams.size() > 1) { + downstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE; + } else { + downstreamType = FINE_GRAINED_LINEAGE_FIELD_TYPE; + } + } + if (!transformationOperationNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)) { + transformationOperationNode.set( + FINE_GRAINED_DOWNSTREAM_TYPE, instance.objectNode()); + } + ObjectNode downstreamTypeNode = + (ObjectNode) transformationOperationNode.get(FINE_GRAINED_DOWNSTREAM_TYPE); + if (!downstreamTypeNode.has(downstreamType)) { + downstreamTypeNode.set(downstreamType, instance.objectNode()); + } + if (downstreams != null) { + addUrnsToSubType(downstreamTypeNode, downstreams, downstreamType, confidenceScore); + } + }); + return mapNode; + } + + private void addUrnsToSubType( + JsonNode superType, ArrayNode urnsList, String subType, Float confidenceScore) { + ObjectNode upstreamSubTypeNode = (ObjectNode) superType.get(subType); + // Will overwrite repeat urns with different confidence scores with the most recently seen + upstreamSubTypeNode.setAll( + Streams.stream(urnsList.elements()) + .map(JsonNode::asText) + .distinct() + .collect(Collectors.toMap(urn -> urn, urn -> instance.numberNode(confidenceScore)))); + } + + /** + * Takes the transformed fine grained lineages map from pre-processing and reconstructs an array + * of FineGrainedLineages Avoids producing side effects by copying nodes, use resulting node and + * not the original + * + * @param transformedFineGrainedLineages the transformed fine grained lineage map + * @return the modified {@link JsonNode} formatted consistent with the original schema + */ + private ArrayNode reconstructFineGrainedLineages(JsonNode transformedFineGrainedLineages) { + if (transformedFineGrainedLineages instanceof ArrayNode) { + // We already have an ArrayNode, no need to transform. This happens during `replace` + // operations + return (ArrayNode) transformedFineGrainedLineages; + } + ObjectNode mapNode = (ObjectNode) transformedFineGrainedLineages; + ArrayNode arrayNode = instance.arrayNode(); + + mapNode + .fieldNames() + .forEachRemaining( + transformationOperation -> { + final ObjectNode transformationOperationNode = + (ObjectNode) mapNode.get(transformationOperation); + final ObjectNode upstreamType = + transformationOperationNode.has(FINE_GRAINED_UPSTREAM_TYPE) + ? (ObjectNode) transformationOperationNode.get(FINE_GRAINED_UPSTREAM_TYPE) + : instance.objectNode(); + final ObjectNode downstreamType = + transformationOperationNode.has(FINE_GRAINED_DOWNSTREAM_TYPE) + ? (ObjectNode) transformationOperationNode.get(FINE_GRAINED_DOWNSTREAM_TYPE) + : instance.objectNode(); + + // Handle upstreams + if (!upstreamType.isEmpty()) { + populateTypeNode( + upstreamType, + transformationOperation, + FINE_GRAINED_UPSTREAM_TYPE, + FINE_GRAINED_UPSTREAMS, + FINE_GRAINED_DOWNSTREAM_TYPE, + arrayNode); + } + + // Handle downstreams + if (!downstreamType.isEmpty()) { + populateTypeNode( + downstreamType, + transformationOperation, + FINE_GRAINED_DOWNSTREAM_TYPE, + FINE_GRAINED_DOWNSTREAMS, + FINE_GRAINED_UPSTREAM_TYPE, + arrayNode); + } + }); + + return arrayNode; + } + + private void populateTypeNode( + JsonNode typeNode, + String transformationOperation, + String typeName, + String arrayTypeName, + String defaultTypeName, + ArrayNode arrayNode) { + typeNode + .fieldNames() + .forEachRemaining( + subTypeName -> { + ObjectNode subType = (ObjectNode) typeNode.get(subTypeName); + if (!subType.isEmpty()) { + ObjectNode fineGrainedLineage = instance.objectNode(); + AtomicReference minimumConfidenceScore = new AtomicReference<>(1.0f); + + fineGrainedLineage.put(typeName, subTypeName); + fineGrainedLineage.put( + FINE_GRAINED_TRANSFORMATION_OPERATION, transformationOperation); + // Array to actually be filled out + fineGrainedLineage.set(arrayTypeName, instance.arrayNode()); + // Added to pass model validation, because we have no way of appropriately pairing + // upstreams and downstreams + // within fine grained lineages consistently due to being able to have multiple + // downstream types paired with a single + // transform operation, we just set a default type because it's a required property + fineGrainedLineage.put(defaultTypeName, FINE_GRAINED_LINEAGE_FIELD_SET_TYPE); + subType + .fieldNames() + .forEachRemaining( + subTypeKey -> { + ((ArrayNode) fineGrainedLineage.get(arrayTypeName)).add(subTypeKey); + Float scoreValue = subType.get(subTypeKey).floatValue(); + if (scoreValue <= minimumConfidenceScore.get()) { + minimumConfidenceScore.set(scoreValue); + fineGrainedLineage.set( + FINE_GRAINED_CONFIDENCE_SCORE, + instance.numberNode(minimumConfidenceScore.get())); + } + }); + arrayNode.add(fineGrainedLineage); + } + }); } } diff --git a/entity-registry/src/test/java/com/linkedin/metadata/models/registry/UpstreamLineageTemplateTest.java b/entity-registry/src/test/java/com/linkedin/metadata/models/registry/UpstreamLineageTemplateTest.java new file mode 100644 index 00000000000000..07982a87be56cb --- /dev/null +++ b/entity-registry/src/test/java/com/linkedin/metadata/models/registry/UpstreamLineageTemplateTest.java @@ -0,0 +1,359 @@ +package com.linkedin.metadata.models.registry; + +import static com.fasterxml.jackson.databind.node.JsonNodeFactory.*; + +import com.fasterxml.jackson.databind.node.NumericNode; +import com.github.fge.jackson.jsonpointer.JsonPointer; +import com.github.fge.jsonpatch.AddOperation; +import com.github.fge.jsonpatch.JsonPatch; +import com.github.fge.jsonpatch.JsonPatchOperation; +import com.github.fge.jsonpatch.RemoveOperation; +import com.linkedin.common.UrnArray; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.DataMap; +import com.linkedin.dataset.FineGrainedLineage; +import com.linkedin.dataset.FineGrainedLineageDownstreamType; +import com.linkedin.dataset.FineGrainedLineageUpstreamType; +import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.metadata.models.registry.template.dataset.UpstreamLineageTemplate; +import java.util.ArrayList; +import java.util.List; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class UpstreamLineageTemplateTest { + @Test + public void testPatchUpstream() throws Exception { + UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate(); + UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault(); + List patchOperations = new ArrayList<>(); + NumericNode upstreamConfidenceScore = instance.numberNode(1.0f); + JsonPatchOperation operation = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"), + upstreamConfidenceScore); + patchOperations.add(operation); + JsonPatch jsonPatch = new JsonPatch(patchOperations); + + // Initial population test + UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap = new DataMap(); + dataMap.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap); + UrnArray urns = new UrnArray(); + Urn urn1 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"); + urns.add(urn1); + fineGrainedLineage.setUpstreams(urns); + fineGrainedLineage.setTransformOperation("CREATE"); + fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + Assert.assertEquals(result.getFineGrainedLineages().get(0), fineGrainedLineage); + + // Test non-overwrite upstreams and correct confidence score + JsonPatchOperation operation2 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), + upstreamConfidenceScore); + NumericNode upstreamConfidenceScore2 = instance.numberNode(0.1f); + JsonPatchOperation operation3 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), + upstreamConfidenceScore2); + List patchOperations2 = new ArrayList<>(); + patchOperations2.add(operation2); + patchOperations2.add(operation3); + JsonPatch jsonPatch2 = new JsonPatch(patchOperations2); + UpstreamLineage result2 = upstreamLineageTemplate.applyPatch(result, jsonPatch2); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap2 = new DataMap(); + dataMap2.put("confidenceScore", 0.1); + FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2); + UrnArray urns2 = new UrnArray(); + Urn urn2 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"); + urns2.add(urn1); + urns2.add(urn2); + fineGrainedLineage2.setUpstreams(urns2); + fineGrainedLineage2.setTransformOperation("CREATE"); + fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + Assert.assertEquals(result2.getFineGrainedLineages().get(0), fineGrainedLineage2); + + // Check different upstream types + JsonPatchOperation operation4 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"), + upstreamConfidenceScore); + List patchOperations3 = new ArrayList<>(); + patchOperations3.add(operation4); + JsonPatch jsonPatch3 = new JsonPatch(patchOperations3); + UpstreamLineage result3 = upstreamLineageTemplate.applyPatch(result2, jsonPatch3); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap3 = new DataMap(); + dataMap3.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage3 = new FineGrainedLineage(dataMap3); + UrnArray urns3 = new UrnArray(); + Urn urn3 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"); + urns3.add(urn3); + fineGrainedLineage3.setUpstreams(urns3); + fineGrainedLineage3.setTransformOperation("CREATE"); + fineGrainedLineage3.setUpstreamType(FineGrainedLineageUpstreamType.DATASET); + fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + // Splits into two for different types + Assert.assertEquals(result3.getFineGrainedLineages().get(1), fineGrainedLineage3); + + // Check different transform types + JsonPatchOperation operation5 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"), + upstreamConfidenceScore); + List patchOperations4 = new ArrayList<>(); + patchOperations4.add(operation5); + JsonPatch jsonPatch4 = new JsonPatch(patchOperations4); + UpstreamLineage result4 = upstreamLineageTemplate.applyPatch(result3, jsonPatch4); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap4 = new DataMap(); + dataMap4.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage4 = new FineGrainedLineage(dataMap4); + UrnArray urns4 = new UrnArray(); + Urn urn4 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"); + urns4.add(urn4); + fineGrainedLineage4.setUpstreams(urns4); + fineGrainedLineage4.setTransformOperation("TRANSFORM"); + fineGrainedLineage4.setUpstreamType(FineGrainedLineageUpstreamType.DATASET); + fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + // New entry in array because of new transformation type + Assert.assertEquals(result4.getFineGrainedLineages().get(2), fineGrainedLineage4); + + // Remove + JsonPatchOperation removeOperation = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)")); + JsonPatchOperation removeOperation2 = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)")); + JsonPatchOperation removeOperation3 = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)")); + JsonPatchOperation removeOperation4 = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)")); + + List removeOperations = new ArrayList<>(); + removeOperations.add(removeOperation); + removeOperations.add(removeOperation2); + removeOperations.add(removeOperation3); + removeOperations.add(removeOperation4); + JsonPatch removePatch = new JsonPatch(removeOperations); + UpstreamLineage finalResult = upstreamLineageTemplate.applyPatch(result4, removePatch); + Assert.assertEquals(upstreamLineageTemplate.getDefault(), finalResult); + } + + @Test + public void testPatchDownstream() throws Exception { + UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate(); + UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault(); + List patchOperations = new ArrayList<>(); + NumericNode downstreamConfidenceScore = instance.numberNode(1.0f); + JsonPatchOperation operation = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"), + downstreamConfidenceScore); + patchOperations.add(operation); + JsonPatch jsonPatch = new JsonPatch(patchOperations); + + // Initial population test + UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap = new DataMap(); + dataMap.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap); + UrnArray urns = new UrnArray(); + Urn urn1 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"); + urns.add(urn1); + fineGrainedLineage.setDownstreams(urns); + fineGrainedLineage.setTransformOperation("CREATE"); + fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + Assert.assertEquals(result.getFineGrainedLineages().get(0), fineGrainedLineage); + + // Test non-overwrite downstreams and correct confidence score + JsonPatchOperation operation2 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), + downstreamConfidenceScore); + NumericNode downstreamConfidenceScore2 = instance.numberNode(0.1f); + JsonPatchOperation operation3 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), + downstreamConfidenceScore2); + List patchOperations2 = new ArrayList<>(); + patchOperations2.add(operation2); + patchOperations2.add(operation3); + JsonPatch jsonPatch2 = new JsonPatch(patchOperations2); + UpstreamLineage result2 = upstreamLineageTemplate.applyPatch(result, jsonPatch2); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap2 = new DataMap(); + dataMap2.put("confidenceScore", 0.1); + FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2); + UrnArray urns2 = new UrnArray(); + Urn urn2 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"); + urns2.add(urn1); + urns2.add(urn2); + fineGrainedLineage2.setDownstreams(urns2); + fineGrainedLineage2.setTransformOperation("CREATE"); + fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + Assert.assertEquals(result2.getFineGrainedLineages().get(0), fineGrainedLineage2); + + // Check different downstream types + JsonPatchOperation operation4 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"), + downstreamConfidenceScore); + List patchOperations3 = new ArrayList<>(); + patchOperations3.add(operation4); + JsonPatch jsonPatch3 = new JsonPatch(patchOperations3); + UpstreamLineage result3 = upstreamLineageTemplate.applyPatch(result2, jsonPatch3); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap3 = new DataMap(); + dataMap3.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage3 = new FineGrainedLineage(dataMap3); + UrnArray urns3 = new UrnArray(); + Urn urn3 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"); + urns3.add(urn3); + fineGrainedLineage3.setDownstreams(urns3); + fineGrainedLineage3.setTransformOperation("CREATE"); + fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD); + fineGrainedLineage3.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + // Splits into two for different types + Assert.assertEquals(result3.getFineGrainedLineages().get(1), fineGrainedLineage3); + + // Check different transform types + JsonPatchOperation operation5 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/TRANSFORM/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"), + downstreamConfidenceScore); + List patchOperations4 = new ArrayList<>(); + patchOperations4.add(operation5); + JsonPatch jsonPatch4 = new JsonPatch(patchOperations4); + UpstreamLineage result4 = upstreamLineageTemplate.applyPatch(result3, jsonPatch4); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap4 = new DataMap(); + dataMap4.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage4 = new FineGrainedLineage(dataMap4); + UrnArray urns4 = new UrnArray(); + Urn urn4 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"); + urns4.add(urn4); + fineGrainedLineage4.setDownstreams(urns4); + fineGrainedLineage4.setTransformOperation("TRANSFORM"); + fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD); + fineGrainedLineage4.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + // New entry in array because of new transformation type + Assert.assertEquals(result4.getFineGrainedLineages().get(2), fineGrainedLineage4); + + // Remove + JsonPatchOperation removeOperation = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)")); + JsonPatchOperation removeOperation2 = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)")); + JsonPatchOperation removeOperation3 = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)")); + JsonPatchOperation removeOperation4 = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/TRANSFORM/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)")); + + List removeOperations = new ArrayList<>(); + removeOperations.add(removeOperation); + removeOperations.add(removeOperation2); + removeOperations.add(removeOperation3); + removeOperations.add(removeOperation4); + JsonPatch removePatch = new JsonPatch(removeOperations); + UpstreamLineage finalResult = upstreamLineageTemplate.applyPatch(result4, removePatch); + Assert.assertEquals(upstreamLineageTemplate.getDefault(), finalResult); + } + + @Test + public void testUpAndDown() throws Exception { + UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate(); + UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault(); + List patchOperations = new ArrayList<>(); + NumericNode downstreamConfidenceScore = instance.numberNode(1.0f); + JsonPatchOperation operation = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"), + downstreamConfidenceScore); + patchOperations.add(operation); + NumericNode upstreamConfidenceScore = instance.numberNode(1.0f); + JsonPatchOperation operation2 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"), + upstreamConfidenceScore); + patchOperations.add(operation2); + JsonPatch jsonPatch = new JsonPatch(patchOperations); + + // Initial population test + UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap = new DataMap(); + dataMap.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap); + UrnArray urns = new UrnArray(); + Urn urn1 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"); + urns.add(urn1); + fineGrainedLineage.setTransformOperation("CREATE"); + fineGrainedLineage.setUpstreams(urns); + fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + fineGrainedLineage.setDownstreams(urns); + + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap2 = new DataMap(); + dataMap2.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2); + fineGrainedLineage2.setTransformOperation("CREATE"); + fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + fineGrainedLineage2.setDownstreams(urns); + + Assert.assertEquals(result.getFineGrainedLineages().get(1), fineGrainedLineage2); + } +} diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java index f5a3c9c12ff70e..3d9b533dc8f720 100644 --- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -125,6 +125,11 @@ public class Constants { public static final String VIEW_PROPERTIES_ASPECT_NAME = "viewProperties"; public static final String DATASET_PROFILE_ASPECT_NAME = "datasetProfile"; + // Aspect support + public static final String FINE_GRAINED_LINEAGE_DATASET_TYPE = "DATASET"; + public static final String FINE_GRAINED_LINEAGE_FIELD_SET_TYPE = "FIELD_SET"; + public static final String FINE_GRAINED_LINEAGE_FIELD_TYPE = "FIELD"; + // Chart public static final String CHART_KEY_ASPECT_NAME = "chartKey"; public static final String CHART_INFO_ASPECT_NAME = "chartInfo"; diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index fcfe049fb15cf9..294a80572669b8 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -1,4 +1,4 @@ -from typing import Dict, Generic, List, Optional, TypeVar, Union +from typing import Dict, Generic, List, Optional, Tuple, TypeVar, Union from urllib.parse import quote from datahub.emitter.mcp_patch_builder import MetadataPatchProposal @@ -6,6 +6,9 @@ DatasetPropertiesClass as DatasetProperties, EditableDatasetPropertiesClass as EditableDatasetProperties, EditableSchemaMetadataClass as EditableSchemaMetadata, + FineGrainedLineageClass as FineGrainedLineage, + FineGrainedLineageDownstreamTypeClass as FineGrainedLineageDownstreamType, + FineGrainedLineageUpstreamTypeClass as FineGrainedLineageUpstreamType, GlobalTagsClass as GlobalTags, GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, @@ -144,6 +147,108 @@ def set_upstream_lineages(self, upstreams: List[Upstream]) -> "DatasetPatchBuild ) return self + def add_fine_grained_upstream_lineage( + self, fine_grained_lineage: FineGrainedLineage + ) -> "DatasetPatchBuilder": + ( + transform_op, + upstream_type, + downstream_type, + ) = DatasetPatchBuilder.get_fine_grained_key(fine_grained_lineage) + for upstream_urn in fine_grained_lineage.upstreams or []: + self._add_patch( + UpstreamLineage.ASPECT_NAME, + "add", + path=DatasetPatchBuilder.quote_fine_grained_upstream_path( + transform_op, upstream_type, upstream_urn + ), + value=fine_grained_lineage.confidenceScore, + ) + for downstream_urn in fine_grained_lineage.downstreams or []: + self._add_patch( + UpstreamLineage.ASPECT_NAME, + "add", + path=DatasetPatchBuilder.quote_fine_grained_downstream_path( + transform_op, downstream_type, downstream_urn + ), + value=fine_grained_lineage.confidenceScore, + ) + return self + + @staticmethod + def get_fine_grained_key( + fine_grained_lineage: FineGrainedLineage, + ) -> Tuple[str, str, str]: + transform_op = fine_grained_lineage.transformOperation or "NONE" + upstream_type = ( + fine_grained_lineage.upstreamType + if isinstance(fine_grained_lineage.upstreamType, str) + else FineGrainedLineageUpstreamType.FIELD_SET + ) + downstream_type = ( + fine_grained_lineage.downstreamType + if isinstance(fine_grained_lineage.downstreamType, str) + else FineGrainedLineageDownstreamType.FIELD_SET + ) + return transform_op, upstream_type, downstream_type + + @staticmethod + def quote_fine_grained_downstream_path( + transform_op: str, downstream_type: str, downstream_urn: str + ) -> str: + return ( + f"/fineGrainedLineages/{quote(transform_op, safe='')}/downstreamType/" + f"{quote(downstream_type, safe='')}/{quote(downstream_urn, safe='')}" + ) + + @staticmethod + def quote_fine_grained_upstream_path( + transform_op: str, upstream_type: str, upstream_urn: str + ) -> str: + return ( + f"/fineGrainedLineages/{quote(transform_op, safe='')}/upstreamType/" + f"{quote(upstream_type, safe='')}/{quote(upstream_urn, safe='')}" + ) + + def remove_fine_grained_upstream_lineage( + self, fine_grained_lineage: FineGrainedLineage + ) -> "DatasetPatchBuilder": + ( + transform_op, + upstream_type, + downstream_type, + ) = DatasetPatchBuilder.get_fine_grained_key(fine_grained_lineage) + for upstream_urn in fine_grained_lineage.upstreams or []: + self._add_patch( + UpstreamLineage.ASPECT_NAME, + "remove", + path=DatasetPatchBuilder.quote_fine_grained_upstream_path( + transform_op, upstream_type, upstream_urn + ), + value={}, + ) + for downstream_urn in fine_grained_lineage.downstreams or []: + self._add_patch( + UpstreamLineage.ASPECT_NAME, + "remove", + path=DatasetPatchBuilder.quote_fine_grained_downstream_path( + transform_op, downstream_type, downstream_urn + ), + value={}, + ) + return self + + def set_fine_grained_upstream_lineages( + self, fine_grained_lineages: List[FineGrainedLineage] + ) -> "DatasetPatchBuilder": + self._add_patch( + UpstreamLineage.ASPECT_NAME, + "add", + path="/fineGrainedLineages", + value=fine_grained_lineages, + ) + return self + def add_tag(self, tag: Tag) -> "DatasetPatchBuilder": self._add_patch( GlobalTags.ASPECT_NAME, "add", path=f"/tags/{tag.tag}", value=tag diff --git a/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json b/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json index d5dfe125942fba..ed5a7723ac2bf1 100644 --- a/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json +++ b/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json @@ -42,26 +42,31 @@ } }, { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", - "changeType": "PATCH", - "aspectName": "upstreamLineage", - "aspect": { - "json": [ - { - "op": "add", - "path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29", - "value": { - "auditStamp": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)", - "type": "TRANSFORMED" - } - } - ] - } + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "upstreamLineage", + "aspect": { + "json": [ + { + "op": "add", + "path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29", + "value": { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)", + "type": "TRANSFORMED" + } + }, + { + "op": "add", + "path": "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29", + "value": 1.0 + } + ] + } }, { "entityType": "dataset", diff --git a/metadata-ingestion/tests/unit/patch/test_patch_builder.py b/metadata-ingestion/tests/unit/patch/test_patch_builder.py index 0701b3d6968959..f05c4978f8644e 100644 --- a/metadata-ingestion/tests/unit/patch/test_patch_builder.py +++ b/metadata-ingestion/tests/unit/patch/test_patch_builder.py @@ -7,6 +7,9 @@ from datahub.ingestion.sink.file import write_metadata_file from datahub.metadata.schema_classes import ( DatasetLineageTypeClass, + FineGrainedLineageClass, + FineGrainedLineageDownstreamTypeClass, + FineGrainedLineageUpstreamTypeClass, GenericAspectClass, MetadataChangeProposalClass, TagAssociationClass, @@ -53,6 +56,19 @@ def test_complex_dataset_patch( type=DatasetLineageTypeClass.TRANSFORMED, ) ) + .add_fine_grained_upstream_lineage( + fine_grained_lineage=FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.DATASET, + upstreams=[ + make_dataset_urn( + platform="hive", name="fct_users_created_upstream", env="PROD" + ) + ], + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD_SET, + transformOperation="TRANSFORM", + confidenceScore=1.0, + ) + ) ) patcher.for_field("field1").add_tag(TagAssociationClass(tag=make_tag_urn("tag1"))) diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/patch/dataset/UpstreamLineagePatchBuilder.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/patch/dataset/UpstreamLineagePatchBuilder.java index 6ded8a25b4e22c..9db2ebc522e093 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/patch/dataset/UpstreamLineagePatchBuilder.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/patch/dataset/UpstreamLineagePatchBuilder.java @@ -5,10 +5,14 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.common.urn.Urn; import com.linkedin.dataset.DatasetLineageType; +import com.linkedin.dataset.FineGrainedLineageDownstreamType; +import com.linkedin.dataset.FineGrainedLineageUpstreamType; import datahub.client.patch.AbstractMultiFieldPatchBuilder; import datahub.client.patch.PatchOperationType; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.ToString; import org.apache.commons.lang3.tuple.ImmutableTriple; @@ -16,7 +20,8 @@ public class UpstreamLineagePatchBuilder extends AbstractMultiFieldPatchBuilder { - private static final String PATH_START = "/upstreams/"; + private static final String UPSTREAMS_PATH_START = "/upstreams/"; + private static final String FINE_GRAINED_PATH_START = "/fineGrainedLineages/"; private static final String DATASET_KEY = "dataset"; private static final String AUDIT_STAMP_KEY = "auditStamp"; private static final String TIME_KEY = "time"; @@ -34,13 +39,233 @@ public UpstreamLineagePatchBuilder addUpstream( .set(AUDIT_STAMP_KEY, auditStamp); pathValues.add( - ImmutableTriple.of(PatchOperationType.ADD.getValue(), PATH_START + datasetUrn, value)); + ImmutableTriple.of( + PatchOperationType.ADD.getValue(), UPSTREAMS_PATH_START + datasetUrn, value)); return this; } public UpstreamLineagePatchBuilder removeUpstream(@Nonnull DatasetUrn datasetUrn) { pathValues.add( - ImmutableTriple.of(PatchOperationType.REMOVE.getValue(), PATH_START + datasetUrn, null)); + ImmutableTriple.of( + PatchOperationType.REMOVE.getValue(), UPSTREAMS_PATH_START + datasetUrn, null)); + return this; + } + + /** + * Method for adding an upstream FineGrained Dataset + * + * @param datasetUrn dataset to be set as upstream + * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for + * full confidence + * @param transformationOperation string operation type that describes the transformation + * operation happening in the lineage edge + * @return this builder + */ + public UpstreamLineagePatchBuilder addFineGrainedUpstreamDataset( + @Nonnull DatasetUrn datasetUrn, + @Nullable Float confidenceScore, + @Nonnull String transformationOperation) { + Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); + + pathValues.add( + ImmutableTriple.of( + PatchOperationType.ADD.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + "upstreamType" + + "/" + + "DATASET" + + "/" + + datasetUrn, + instance.numberNode(finalConfidenceScore))); + return this; + } + + /** + * Adds a field as a fine grained upstream + * + * @param schemaFieldUrn a schema field to be marked as upstream, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for + * full confidence + * @param transformationOperation string operation type that describes the transformation + * operation happening in the lineage edge + * @param type the upstream lineage type, either Field or Field Set + * @return this builder + */ + public UpstreamLineagePatchBuilder addFineGrainedUpstreamField( + @Nonnull Urn schemaFieldUrn, + @Nullable Float confidenceScore, + @Nonnull String transformationOperation, + @Nullable FineGrainedLineageUpstreamType type) { + Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); + String finalType; + if (type == null) { + // Default to set of fields if not explicitly a single field + finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString(); + } else { + finalType = type.toString(); + } + + pathValues.add( + ImmutableTriple.of( + PatchOperationType.ADD.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + "upstreamType" + + "/" + + finalType + + "/" + + schemaFieldUrn, + instance.numberNode(finalConfidenceScore))); + + return this; + } + + /** + * Adds a field as a fine grained downstream + * + * @param schemaFieldUrn a schema field to be marked as downstream, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for + * full confidence + * @param transformationOperation string operation type that describes the transformation + * operation happening in the lineage edge + * @param type the downstream lineage type, either Field or Field Set + * @return this builder + */ + public UpstreamLineagePatchBuilder addFineGrainedDownstreamField( + @Nonnull Urn schemaFieldUrn, + @Nullable Float confidenceScore, + @Nonnull String transformationOperation, + @Nullable FineGrainedLineageDownstreamType type) { + Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); + String finalType; + if (type == null) { + // Default to set of fields if not explicitly a single field + finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString(); + } else { + finalType = type.toString(); + } + + pathValues.add( + ImmutableTriple.of( + PatchOperationType.ADD.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + "downstreamType" + + "/" + + finalType + + "/" + + schemaFieldUrn, + instance.numberNode(finalConfidenceScore))); + return this; + } + + private Float getConfidenceScoreOrDefault(@Nullable Float confidenceScore) { + float finalConfidenceScore; + if (confidenceScore != null && confidenceScore > 0 && confidenceScore <= 1.0f) { + finalConfidenceScore = confidenceScore; + } else { + finalConfidenceScore = 1.0f; + } + + return finalConfidenceScore; + } + + /** + * Removes a field as a fine grained upstream + * + * @param schemaFieldUrn a schema field to be marked as upstream, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param transformationOperation string operation type that describes the transformation + * operation happening in the lineage edge + * @param type the upstream lineage type, either Field or Field Set + * @return this builder + */ + public UpstreamLineagePatchBuilder removeFineGrainedUpstreamField( + @Nonnull Urn schemaFieldUrn, + @Nonnull String transformationOperation, + @Nullable FineGrainedLineageUpstreamType type) { + String finalType; + if (type == null) { + // Default to set of fields if not explicitly a single field + finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString(); + } else { + finalType = type.toString(); + } + + pathValues.add( + ImmutableTriple.of( + PatchOperationType.REMOVE.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + "upstreamType" + + "/" + + finalType + + "/" + + schemaFieldUrn, + null)); + + return this; + } + + public UpstreamLineagePatchBuilder removeFineGrainedUpstreamDataset( + @Nonnull DatasetUrn datasetUrn, @Nonnull String transformationOperation) { + + pathValues.add( + ImmutableTriple.of( + PatchOperationType.REMOVE.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + "upstreamType" + + "/" + + "DATASET" + + "/" + + datasetUrn, + null)); + return this; + } + + /** + * Adds a field as a fine grained downstream + * + * @param schemaFieldUrn a schema field to be marked as downstream, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param transformationOperation string operation type that describes the transformation + * operation happening in the lineage edge + * @param type the downstream lineage type, either Field or Field Set + * @return this builder + */ + public UpstreamLineagePatchBuilder removeFineGrainedDownstreamField( + @Nonnull Urn schemaFieldUrn, + @Nonnull String transformationOperation, + @Nullable FineGrainedLineageDownstreamType type) { + String finalType; + if (type == null) { + // Default to set of fields if not explicitly a single field + finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString(); + } else { + finalType = type.toString(); + } + + pathValues.add( + ImmutableTriple.of( + PatchOperationType.REMOVE.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + "downstreamType" + + "/" + + finalType + + "/" + + schemaFieldUrn, + null)); return this; } diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java index 1d387acb0ce12b..563742990f5468 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java @@ -14,6 +14,7 @@ import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.GlossaryTermUrn; import com.linkedin.common.urn.TagUrn; +import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.dataset.DatasetLineageType; import com.linkedin.metadata.graph.LineageDirection; @@ -49,15 +50,21 @@ public class PatchTest { public void testLocalUpstream() { RestEmitter restEmitter = new RestEmitter(RestEmitterConfig.builder().build()); try { + DatasetUrn upstreamUrn = + DatasetUrn.createFromString( + "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"); + Urn schemaFieldUrn = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), foo)"); MetadataChangeProposal upstreamPatch = new UpstreamLineagePatchBuilder() .urn( UrnUtils.getUrn( "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)")) - .addUpstream( - DatasetUrn.createFromString( - "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"), - DatasetLineageType.TRANSFORMED) + .addUpstream(upstreamUrn, DatasetLineageType.TRANSFORMED) + .addFineGrainedUpstreamDataset(upstreamUrn, null, "TRANSFORM") + .addFineGrainedUpstreamField(schemaFieldUrn, null, "TRANSFORM", null) + .addFineGrainedDownstreamField(schemaFieldUrn, null, "TRANSFORM", null) .build(); Future response = restEmitter.emit(upstreamPatch); @@ -73,6 +80,12 @@ public void testLocalUpstream() { public void testLocalUpstreamRemove() { RestEmitter restEmitter = new RestEmitter(RestEmitterConfig.builder().build()); try { + DatasetUrn upstreamUrn = + DatasetUrn.createFromString( + "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"); + Urn schemaFieldUrn = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), foo)"); MetadataChangeProposal upstreamPatch = new UpstreamLineagePatchBuilder() .urn( @@ -81,6 +94,9 @@ public void testLocalUpstreamRemove() { .removeUpstream( DatasetUrn.createFromString( "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)")) + .removeFineGrainedUpstreamDataset(upstreamUrn, "TRANSFORM") + .removeFineGrainedUpstreamField(schemaFieldUrn, "TRANSFORM", null) + .removeFineGrainedDownstreamField(schemaFieldUrn, "TRANSFORM", null) .build(); Future response = restEmitter.emit(upstreamPatch); From 79ccbc57d1c3266025c8e52ce18fbfcff550c387 Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Mon, 11 Dec 2023 14:41:23 -0800 Subject: [PATCH 2/7] fix(CVE-2023-6378): update logback classic (#9438) --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index f5e5403e822e77..b16e3ca169c717 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ buildscript { ext.playVersion = '2.8.18' ext.log4jVersion = '2.19.0' ext.slf4jVersion = '1.7.36' - ext.logbackClassic = '1.2.12' + ext.logbackClassic = '1.2.13' ext.hadoop3Version = '3.3.5' ext.kafkaVersion = '2.3.0' ext.hazelcastVersion = '5.3.6' From ee4e8dd74c569d0dfc98e8eb13034c91b0ad61a8 Mon Sep 17 00:00:00 2001 From: Salman-Apptware <101426513+Salman-Apptware@users.noreply.github.com> Date: Tue, 12 Dec 2023 15:03:30 +0530 Subject: [PATCH 3/7] feat: allow the sidebar size to be draggable (#9401) --- .../src/app/search/SearchResults.tsx | 2 +- .../src/app/search/sidebar/BrowseSidebar.tsx | 51 ++++++++++++------- .../src/app/search/sidebar/EntityNode.tsx | 3 +- .../cypress/cypress/e2e/browse/browseV2.js | 10 ++-- 4 files changed, 41 insertions(+), 25 deletions(-) diff --git a/datahub-web-react/src/app/search/SearchResults.tsx b/datahub-web-react/src/app/search/SearchResults.tsx index 56e83e42350270..d7ad6d517d8fed 100644 --- a/datahub-web-react/src/app/search/SearchResults.tsx +++ b/datahub-web-react/src/app/search/SearchResults.tsx @@ -197,7 +197,7 @@ export const SearchResults = ({ {showBrowseV2 && ( - + )} diff --git a/datahub-web-react/src/app/search/sidebar/BrowseSidebar.tsx b/datahub-web-react/src/app/search/sidebar/BrowseSidebar.tsx index 822e75b65febc3..c16bcdcaf6c727 100644 --- a/datahub-web-react/src/app/search/sidebar/BrowseSidebar.tsx +++ b/datahub-web-react/src/app/search/sidebar/BrowseSidebar.tsx @@ -1,4 +1,4 @@ -import React from 'react'; +import React, { useState } from 'react'; import styled from 'styled-components'; import { Typography } from 'antd'; import EntityNode from './EntityNode'; @@ -7,10 +7,16 @@ import SidebarLoadingError from './SidebarLoadingError'; import { SEARCH_RESULTS_BROWSE_SIDEBAR_ID } from '../../onboarding/config/SearchOnboardingConfig'; import useSidebarEntities from './useSidebarEntities'; import { ANTD_GRAY_V2 } from '../../entity/shared/constants'; +import { ProfileSidebarResizer } from '../../entity/shared/containers/profile/sidebar/ProfileSidebarResizer'; -const Sidebar = styled.div<{ visible: boolean; width: number }>` + +export const MAX_BROWSER_WIDTH = 500; +export const MIN_BROWSWER_WIDTH = 200; + +export const SidebarWrapper = styled.div<{ visible: boolean; width: number }>` height: 100%; width: ${(props) => (props.visible ? `${props.width}px` : '0')}; + min-width: ${(props) => (props.visible ? `${props.width}px` : '0')}; transition: width 250ms ease-in-out; border-right: 1px solid ${(props) => props.theme.styles['border-color-base']}; background-color: ${ANTD_GRAY_V2[1]}; @@ -37,29 +43,38 @@ const SidebarBody = styled.div<{ visible: boolean }>` type Props = { visible: boolean; - width: number; }; -const BrowseSidebar = ({ visible, width }: Props) => { +const BrowseSidebar = ({ visible }: Props) => { const { error, entityAggregations, retry } = useSidebarEntities({ skip: !visible, }); + const [browserWidth, setBrowserWith] = useState(window.innerWidth * 0.2); return ( - - - Navigate - - - {entityAggregations && !entityAggregations.length &&
No results found
} - {entityAggregations?.map((entityAggregation) => ( - - - - ))} - {error && } -
-
+ <> + + + Navigate + + + {entityAggregations && !entityAggregations.length &&
No results found
} + {entityAggregations?.map((entityAggregation) => ( + + + + ))} + {error && } +
+
+ + setBrowserWith(Math.min(Math.max(widthProp, MIN_BROWSWER_WIDTH), MAX_BROWSER_WIDTH)) + } + initialSize={browserWidth} + isSidebarOnLeft + /> + ); }; diff --git a/datahub-web-react/src/app/search/sidebar/EntityNode.tsx b/datahub-web-react/src/app/search/sidebar/EntityNode.tsx index e04e4253dca134..627d19c4fb10c1 100644 --- a/datahub-web-react/src/app/search/sidebar/EntityNode.tsx +++ b/datahub-web-react/src/app/search/sidebar/EntityNode.tsx @@ -38,7 +38,8 @@ const EntityNode = () => { onToggle: (isNowOpen: boolean) => trackToggleNodeEvent(isNowOpen, 'entity'), }); - const onClickHeader = () => { + const onClickHeader = (e) => { + e.preventDefault(); if (count) toggle(); }; diff --git a/smoke-test/tests/cypress/cypress/e2e/browse/browseV2.js b/smoke-test/tests/cypress/cypress/e2e/browse/browseV2.js index a61b9030b13c6f..f45edc5fa04819 100644 --- a/smoke-test/tests/cypress/cypress/e2e/browse/browseV2.js +++ b/smoke-test/tests/cypress/cypress/e2e/browse/browseV2.js @@ -46,31 +46,31 @@ describe("search", () => { cy.get("[data-testid=browse-v2") .invoke("css", "width") - .should("match", /^\d\d\dpx$/); + .should("match", /\d\d\dpx$/); cy.get("[data-testid=browse-v2-toggle").click(); cy.get("[data-testid=browse-v2") .invoke("css", "width") - .should("match", /^\dpx$/); + .should("match", /\dpx$/); cy.reload(); cy.get("[data-testid=browse-v2") .invoke("css", "width") - .should("match", /^\dpx$/); + .should("match", /\dpx$/); cy.get("[data-testid=browse-v2-toggle").click(); cy.get("[data-testid=browse-v2") .invoke("css", "width") - .should("match", /^\d\d\dpx$/); + .should("match", /\d\d\dpx$/); cy.reload(); cy.get("[data-testid=browse-v2") .invoke("css", "width") - .should("match", /^\d\d\dpx$/); + .should("match", /\d\d\dpx$/); }); it("should take you to the old browse experience when clicking entity type on home page with the browse flag off", () => { From abbc4cdc577647d7b97a03117c4317805a3a8ce3 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Tue, 12 Dec 2023 17:26:29 +0530 Subject: [PATCH 4/7] fix(json-schema): do not send invalid URLs (#9417) --- .../ingestion/source/schema/json_schema.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/schema/json_schema.py b/metadata-ingestion/src/datahub/ingestion/source/schema/json_schema.py index f6e944f4fc3cb3..c7e8a15d8dfa48 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/schema/json_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/schema/json_schema.py @@ -9,6 +9,7 @@ from os.path import basename, dirname from pathlib import Path from typing import Any, Iterable, List, Optional, Union +from urllib.parse import urlparse import jsonref from pydantic import AnyHttpUrl, DirectoryPath, FilePath, validator @@ -53,6 +54,16 @@ logger = logging.getLogger(__name__) +def is_url_valid(url: Optional[str]) -> bool: + if url is None: + return False + try: + result = urlparse(url) + return all([result.scheme, result.netloc]) + except Exception: + return False + + class URIReplacePattern(ConfigModel): match: str = Field( description="Pattern to match on uri-s as part of reference resolution. See replace field", @@ -281,12 +292,14 @@ def _load_one_file( entityUrn=dataset_urn, aspect=models.StatusClass(removed=False) ).as_workunit() + external_url = JsonSchemaTranslator._get_id_from_any_schema(schema_dict) + if not is_url_valid(external_url): + external_url = None + yield MetadataChangeProposalWrapper( entityUrn=dataset_urn, aspect=models.DatasetPropertiesClass( - externalUrl=JsonSchemaTranslator._get_id_from_any_schema( - schema_dict - ), + externalUrl=external_url, name=dataset_simple_name, description=JsonSchemaTranslator._get_description_from_any_schema( schema_dict From ffccc6556110ea197402ad1de72117ffd5509a8d Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Tue, 12 Dec 2023 18:31:58 +0100 Subject: [PATCH 5/7] fix(ingest/profiling) Fixing profile eligibility check (#9446) --- .../datahub/ingestion/source/sql/sql_generic_profiler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index 844a458d9f1ab6..a2f91e5fae1a98 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -274,16 +274,16 @@ def is_dataset_eligible_for_profiling( return False if self.config.profiling.profile_table_size_limit is not None and ( - size_in_bytes is None - or size_in_bytes / (2**30) + size_in_bytes is not None + and size_in_bytes / (2**30) > self.config.profiling.profile_table_size_limit ): self.report.profiling_skipped_size_limit[schema_name] += 1 return False if self.config.profiling.profile_table_row_limit is not None and ( - rows_count is None - or rows_count > self.config.profiling.profile_table_row_limit + rows_count is not None + and rows_count > self.config.profiling.profile_table_row_limit ): self.report.profiling_skipped_row_limit[schema_name] += 1 return False From 66f90c7ffd483f397c99dbf494280d3cd9ef10dd Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 12 Dec 2023 12:32:59 -0500 Subject: [PATCH 6/7] fix(ingest): avoid git dependency in dbt (#9447) --- metadata-ingestion/src/datahub/configuration/git.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/configuration/git.py b/metadata-ingestion/src/datahub/configuration/git.py index 9ea9007553839b..a5f88744661a4a 100644 --- a/metadata-ingestion/src/datahub/configuration/git.py +++ b/metadata-ingestion/src/datahub/configuration/git.py @@ -6,7 +6,6 @@ from datahub.configuration.common import ConfigModel from datahub.configuration.validate_field_rename import pydantic_renamed_field -from datahub.ingestion.source.git.git_import import GitClone _GITHUB_PREFIX = "https://github.com/" _GITLAB_PREFIX = "https://gitlab.com/" @@ -151,6 +150,9 @@ def clone( ) -> pathlib.Path: """Clones the repo into a temporary directory and returns the path to the checkout.""" + # We import this here to avoid a hard dependency on gitpython. + from datahub.ingestion.source.git.git_import import GitClone + assert self.repo_ssh_locator git_clone = GitClone(str(tmp_path)) From 02982ed88600f9b11c2387e540299c437ca21ed6 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 12 Dec 2023 12:38:21 -0500 Subject: [PATCH 7/7] feat(ingest): add retries for tableau (#9437) --- .../src/datahub/ingestion/source/tableau.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index da44d09121c6c1..f870e99df27c5f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -21,7 +21,7 @@ import tableauserverclient as TSC from pydantic import root_validator, validator from pydantic.fields import Field -from requests.adapters import ConnectionError +from requests.adapters import ConnectionError, HTTPAdapter from tableauserverclient import ( PersonalAccessTokenAuth, Server, @@ -29,6 +29,7 @@ TableauAuth, ) from tableauserverclient.server.endpoint.exceptions import NonXMLResponseError +from urllib3 import Retry import datahub.emitter.mce_builder as builder import datahub.utilities.sqlglot_lineage as sqlglot_l @@ -174,6 +175,7 @@ class TableauConnectionConfig(ConfigModel): description="Unique relationship between the Tableau Server and site", ) + max_retries: int = Field(3, description="Number of retries for failed requests.") ssl_verify: Union[bool, str] = Field( default=True, description="Whether to verify SSL certificates. If using self-signed certificates, set to false or provide the path to the .pem certificate bundle.", @@ -224,6 +226,17 @@ def make_tableau_client(self) -> Server: # From https://stackoverflow.com/a/50159273/5004662. server._session.trust_env = False + # Setup request retries. + adapter = HTTPAdapter( + max_retries=Retry( + total=self.max_retries, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + ) + ) + server._session.mount("http://", adapter) + server._session.mount("https://", adapter) + server.auth.sign_in(authentication) return server except ServerResponseError as e: