diff --git a/build.gradle b/build.gradle
index f5e5403e822e77..b16e3ca169c717 100644
--- a/build.gradle
+++ b/build.gradle
@@ -16,7 +16,7 @@ buildscript {
ext.playVersion = '2.8.18'
ext.log4jVersion = '2.19.0'
ext.slf4jVersion = '1.7.36'
- ext.logbackClassic = '1.2.12'
+ ext.logbackClassic = '1.2.13'
ext.hadoop3Version = '3.3.5'
ext.kafkaVersion = '2.3.0'
ext.hazelcastVersion = '5.3.6'
diff --git a/datahub-web-react/src/app/search/SearchResults.tsx b/datahub-web-react/src/app/search/SearchResults.tsx
index 56e83e42350270..d7ad6d517d8fed 100644
--- a/datahub-web-react/src/app/search/SearchResults.tsx
+++ b/datahub-web-react/src/app/search/SearchResults.tsx
@@ -197,7 +197,7 @@ export const SearchResults = ({
{showBrowseV2 && (
-
+
)}
diff --git a/datahub-web-react/src/app/search/sidebar/BrowseSidebar.tsx b/datahub-web-react/src/app/search/sidebar/BrowseSidebar.tsx
index 822e75b65febc3..c16bcdcaf6c727 100644
--- a/datahub-web-react/src/app/search/sidebar/BrowseSidebar.tsx
+++ b/datahub-web-react/src/app/search/sidebar/BrowseSidebar.tsx
@@ -1,4 +1,4 @@
-import React from 'react';
+import React, { useState } from 'react';
import styled from 'styled-components';
import { Typography } from 'antd';
import EntityNode from './EntityNode';
@@ -7,10 +7,16 @@ import SidebarLoadingError from './SidebarLoadingError';
import { SEARCH_RESULTS_BROWSE_SIDEBAR_ID } from '../../onboarding/config/SearchOnboardingConfig';
import useSidebarEntities from './useSidebarEntities';
import { ANTD_GRAY_V2 } from '../../entity/shared/constants';
+import { ProfileSidebarResizer } from '../../entity/shared/containers/profile/sidebar/ProfileSidebarResizer';
-const Sidebar = styled.div<{ visible: boolean; width: number }>`
+
+export const MAX_BROWSER_WIDTH = 500;
+export const MIN_BROWSWER_WIDTH = 200;
+
+export const SidebarWrapper = styled.div<{ visible: boolean; width: number }>`
height: 100%;
width: ${(props) => (props.visible ? `${props.width}px` : '0')};
+ min-width: ${(props) => (props.visible ? `${props.width}px` : '0')};
transition: width 250ms ease-in-out;
border-right: 1px solid ${(props) => props.theme.styles['border-color-base']};
background-color: ${ANTD_GRAY_V2[1]};
@@ -37,29 +43,38 @@ const SidebarBody = styled.div<{ visible: boolean }>`
type Props = {
visible: boolean;
- width: number;
};
-const BrowseSidebar = ({ visible, width }: Props) => {
+const BrowseSidebar = ({ visible }: Props) => {
const { error, entityAggregations, retry } = useSidebarEntities({
skip: !visible,
});
+ const [browserWidth, setBrowserWith] = useState(window.innerWidth * 0.2);
return (
-
+ <>
+
+
+ setBrowserWith(Math.min(Math.max(widthProp, MIN_BROWSWER_WIDTH), MAX_BROWSER_WIDTH))
+ }
+ initialSize={browserWidth}
+ isSidebarOnLeft
+ />
+ >
);
};
diff --git a/datahub-web-react/src/app/search/sidebar/EntityNode.tsx b/datahub-web-react/src/app/search/sidebar/EntityNode.tsx
index e04e4253dca134..627d19c4fb10c1 100644
--- a/datahub-web-react/src/app/search/sidebar/EntityNode.tsx
+++ b/datahub-web-react/src/app/search/sidebar/EntityNode.tsx
@@ -38,7 +38,8 @@ const EntityNode = () => {
onToggle: (isNowOpen: boolean) => trackToggleNodeEvent(isNowOpen, 'entity'),
});
- const onClickHeader = () => {
+ const onClickHeader = (e) => {
+ e.preventDefault();
if (count) toggle();
};
diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/registry/template/dataset/UpstreamLineageTemplate.java b/entity-registry/src/main/java/com/linkedin/metadata/models/registry/template/dataset/UpstreamLineageTemplate.java
index 35816895669beb..81a4065dedb1a2 100644
--- a/entity-registry/src/main/java/com/linkedin/metadata/models/registry/template/dataset/UpstreamLineageTemplate.java
+++ b/entity-registry/src/main/java/com/linkedin/metadata/models/registry/template/dataset/UpstreamLineageTemplate.java
@@ -1,20 +1,41 @@
package com.linkedin.metadata.models.registry.template.dataset;
+import static com.fasterxml.jackson.databind.node.JsonNodeFactory.*;
+import static com.linkedin.metadata.Constants.*;
+
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Streams;
+import com.linkedin.common.urn.Urn;
+import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataset.FineGrainedLineageArray;
import com.linkedin.dataset.UpstreamArray;
import com.linkedin.dataset.UpstreamLineage;
-import com.linkedin.metadata.models.registry.template.ArrayMergingTemplate;
+import com.linkedin.metadata.models.registry.template.CompoundKeyTemplate;
import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
-public class UpstreamLineageTemplate implements ArrayMergingTemplate {
+public class UpstreamLineageTemplate extends CompoundKeyTemplate {
+ // Fields
private static final String UPSTREAMS_FIELD_NAME = "upstreams";
private static final String DATASET_FIELD_NAME = "dataset";
+ private static final String FINE_GRAINED_LINEAGES_FIELD_NAME = "fineGrainedLineages";
+ private static final String FINE_GRAINED_UPSTREAM_TYPE = "upstreamType";
+ private static final String FINE_GRAINED_UPSTREAMS = "upstreams";
+ private static final String FINE_GRAINED_DOWNSTREAM_TYPE = "downstreamType";
+ private static final String FINE_GRAINED_DOWNSTREAMS = "downstreams";
+ private static final String FINE_GRAINED_TRANSFORMATION_OPERATION = "transformOperation";
+ private static final String FINE_GRAINED_CONFIDENCE_SCORE = "confidenceScore";
- // TODO: Fine Grained Lineages not patchable at this time, they don't have a well established key
+ // Template support
+ private static final String NONE_TRANSFORMATION_TYPE = "NONE";
+ private static final Float DEFAULT_CONFIDENCE_SCORE = 1.0f;
@Override
public UpstreamLineage getSubtype(RecordTemplate recordTemplate) throws ClassCastException {
@@ -42,14 +63,250 @@ public UpstreamLineage getDefault() {
@Nonnull
@Override
public JsonNode transformFields(JsonNode baseNode) {
- return arrayFieldToMap(
- baseNode, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME));
+ JsonNode transformedNode =
+ arrayFieldToMap(
+ baseNode, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME));
+ ((ObjectNode) transformedNode)
+ .set(
+ FINE_GRAINED_LINEAGES_FIELD_NAME,
+ combineAndTransformFineGrainedLineages(
+ transformedNode.get(FINE_GRAINED_LINEAGES_FIELD_NAME)));
+
+ return transformedNode;
}
@Nonnull
@Override
public JsonNode rebaseFields(JsonNode patched) {
- return transformedMapToArray(
- patched, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME));
+ JsonNode rebasedNode =
+ transformedMapToArray(
+ patched, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME));
+ ((ObjectNode) rebasedNode)
+ .set(
+ FINE_GRAINED_LINEAGES_FIELD_NAME,
+ reconstructFineGrainedLineages(rebasedNode.get(FINE_GRAINED_LINEAGES_FIELD_NAME)));
+ return rebasedNode;
+ }
+
+ /**
+ * Combines fine grained lineage array into a map using upstream and downstream types as keys,
+ * defaulting when not present. Due to this construction, patches will look like: path:
+ * /fineGrainedLineages/TRANSFORMATION_OPERATION/(upstreamType || downstreamType)/TYPE/FIELD_URN,
+ * op: ADD/REMOVE, value: float (confidenceScore) Due to the way FineGrainedLineage was designed
+ * it doesn't necessarily have a consistent key we can reference, so this specialized method
+ * mimics the arrayFieldToMap of the super class with the specialization that it does not put the
+ * full value of the aspect at the end of the key, just the particular array. This prevents
+ * unintended overwrites through improper MCP construction that is technically allowed by the
+ * schema when combining under fields that form the natural key.
+ *
+ * @param fineGrainedLineages the fine grained lineage array node
+ * @return the modified {@link JsonNode} with array fields transformed to maps
+ */
+ private JsonNode combineAndTransformFineGrainedLineages(@Nullable JsonNode fineGrainedLineages) {
+ ObjectNode mapNode = instance.objectNode();
+ if (!(fineGrainedLineages instanceof ArrayNode) || fineGrainedLineages.isEmpty()) {
+ return mapNode;
+ }
+ JsonNode lineageCopy = fineGrainedLineages.deepCopy();
+
+ lineageCopy
+ .elements()
+ .forEachRemaining(
+ node -> {
+ JsonNode nodeClone = node.deepCopy();
+ String transformationOperation =
+ nodeClone.has(FINE_GRAINED_TRANSFORMATION_OPERATION)
+ ? nodeClone.get(FINE_GRAINED_TRANSFORMATION_OPERATION).asText()
+ : NONE_TRANSFORMATION_TYPE;
+
+ if (!mapNode.has(transformationOperation)) {
+ mapNode.set(transformationOperation, instance.objectNode());
+ }
+ ObjectNode transformationOperationNode =
+ (ObjectNode) mapNode.get(transformationOperation);
+
+ Float confidenceScore =
+ nodeClone.has(FINE_GRAINED_CONFIDENCE_SCORE)
+ ? nodeClone.get(FINE_GRAINED_CONFIDENCE_SCORE).floatValue()
+ : DEFAULT_CONFIDENCE_SCORE;
+
+ String upstreamType =
+ nodeClone.has(FINE_GRAINED_UPSTREAM_TYPE)
+ ? nodeClone.get(FINE_GRAINED_UPSTREAM_TYPE).asText()
+ : null;
+ String downstreamType =
+ nodeClone.has(FINE_GRAINED_DOWNSTREAM_TYPE)
+ ? nodeClone.get(FINE_GRAINED_DOWNSTREAM_TYPE).asText()
+ : null;
+ ArrayNode upstreams =
+ nodeClone.has(FINE_GRAINED_UPSTREAMS)
+ ? (ArrayNode) nodeClone.get(FINE_GRAINED_UPSTREAMS)
+ : null;
+ ArrayNode downstreams =
+ nodeClone.has(FINE_GRAINED_DOWNSTREAMS)
+ ? (ArrayNode) nodeClone.get(FINE_GRAINED_DOWNSTREAMS)
+ : null;
+
+ // Handle upstreams
+ if (upstreamType == null) {
+ // Determine default type
+ Urn upstreamUrn =
+ upstreams != null ? UrnUtils.getUrn(upstreams.get(0).asText()) : null;
+ if (upstreamUrn != null
+ && SCHEMA_FIELD_ENTITY_NAME.equals(upstreamUrn.getEntityType())) {
+ upstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE;
+ } else {
+ upstreamType = FINE_GRAINED_LINEAGE_DATASET_TYPE;
+ }
+ }
+ if (!transformationOperationNode.has(FINE_GRAINED_UPSTREAM_TYPE)) {
+ transformationOperationNode.set(FINE_GRAINED_UPSTREAM_TYPE, instance.objectNode());
+ }
+ ObjectNode upstreamTypeNode =
+ (ObjectNode) transformationOperationNode.get(FINE_GRAINED_UPSTREAM_TYPE);
+ if (!upstreamTypeNode.has(upstreamType)) {
+ upstreamTypeNode.set(upstreamType, instance.objectNode());
+ }
+ if (upstreams != null) {
+ addUrnsToSubType(upstreamTypeNode, upstreams, upstreamType, confidenceScore);
+ }
+
+ // Handle downstreams
+ if (downstreamType == null) {
+ // Determine default type
+ if (downstreams != null && downstreams.size() > 1) {
+ downstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE;
+ } else {
+ downstreamType = FINE_GRAINED_LINEAGE_FIELD_TYPE;
+ }
+ }
+ if (!transformationOperationNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)) {
+ transformationOperationNode.set(
+ FINE_GRAINED_DOWNSTREAM_TYPE, instance.objectNode());
+ }
+ ObjectNode downstreamTypeNode =
+ (ObjectNode) transformationOperationNode.get(FINE_GRAINED_DOWNSTREAM_TYPE);
+ if (!downstreamTypeNode.has(downstreamType)) {
+ downstreamTypeNode.set(downstreamType, instance.objectNode());
+ }
+ if (downstreams != null) {
+ addUrnsToSubType(downstreamTypeNode, downstreams, downstreamType, confidenceScore);
+ }
+ });
+ return mapNode;
+ }
+
+ private void addUrnsToSubType(
+ JsonNode superType, ArrayNode urnsList, String subType, Float confidenceScore) {
+ ObjectNode upstreamSubTypeNode = (ObjectNode) superType.get(subType);
+ // Will overwrite repeat urns with different confidence scores with the most recently seen
+ upstreamSubTypeNode.setAll(
+ Streams.stream(urnsList.elements())
+ .map(JsonNode::asText)
+ .distinct()
+ .collect(Collectors.toMap(urn -> urn, urn -> instance.numberNode(confidenceScore))));
+ }
+
+ /**
+ * Takes the transformed fine grained lineages map from pre-processing and reconstructs an array
+ * of FineGrainedLineages Avoids producing side effects by copying nodes, use resulting node and
+ * not the original
+ *
+ * @param transformedFineGrainedLineages the transformed fine grained lineage map
+ * @return the modified {@link JsonNode} formatted consistent with the original schema
+ */
+ private ArrayNode reconstructFineGrainedLineages(JsonNode transformedFineGrainedLineages) {
+ if (transformedFineGrainedLineages instanceof ArrayNode) {
+ // We already have an ArrayNode, no need to transform. This happens during `replace`
+ // operations
+ return (ArrayNode) transformedFineGrainedLineages;
+ }
+ ObjectNode mapNode = (ObjectNode) transformedFineGrainedLineages;
+ ArrayNode arrayNode = instance.arrayNode();
+
+ mapNode
+ .fieldNames()
+ .forEachRemaining(
+ transformationOperation -> {
+ final ObjectNode transformationOperationNode =
+ (ObjectNode) mapNode.get(transformationOperation);
+ final ObjectNode upstreamType =
+ transformationOperationNode.has(FINE_GRAINED_UPSTREAM_TYPE)
+ ? (ObjectNode) transformationOperationNode.get(FINE_GRAINED_UPSTREAM_TYPE)
+ : instance.objectNode();
+ final ObjectNode downstreamType =
+ transformationOperationNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)
+ ? (ObjectNode) transformationOperationNode.get(FINE_GRAINED_DOWNSTREAM_TYPE)
+ : instance.objectNode();
+
+ // Handle upstreams
+ if (!upstreamType.isEmpty()) {
+ populateTypeNode(
+ upstreamType,
+ transformationOperation,
+ FINE_GRAINED_UPSTREAM_TYPE,
+ FINE_GRAINED_UPSTREAMS,
+ FINE_GRAINED_DOWNSTREAM_TYPE,
+ arrayNode);
+ }
+
+ // Handle downstreams
+ if (!downstreamType.isEmpty()) {
+ populateTypeNode(
+ downstreamType,
+ transformationOperation,
+ FINE_GRAINED_DOWNSTREAM_TYPE,
+ FINE_GRAINED_DOWNSTREAMS,
+ FINE_GRAINED_UPSTREAM_TYPE,
+ arrayNode);
+ }
+ });
+
+ return arrayNode;
+ }
+
+ private void populateTypeNode(
+ JsonNode typeNode,
+ String transformationOperation,
+ String typeName,
+ String arrayTypeName,
+ String defaultTypeName,
+ ArrayNode arrayNode) {
+ typeNode
+ .fieldNames()
+ .forEachRemaining(
+ subTypeName -> {
+ ObjectNode subType = (ObjectNode) typeNode.get(subTypeName);
+ if (!subType.isEmpty()) {
+ ObjectNode fineGrainedLineage = instance.objectNode();
+ AtomicReference minimumConfidenceScore = new AtomicReference<>(1.0f);
+
+ fineGrainedLineage.put(typeName, subTypeName);
+ fineGrainedLineage.put(
+ FINE_GRAINED_TRANSFORMATION_OPERATION, transformationOperation);
+ // Array to actually be filled out
+ fineGrainedLineage.set(arrayTypeName, instance.arrayNode());
+ // Added to pass model validation, because we have no way of appropriately pairing
+ // upstreams and downstreams
+ // within fine grained lineages consistently due to being able to have multiple
+ // downstream types paired with a single
+ // transform operation, we just set a default type because it's a required property
+ fineGrainedLineage.put(defaultTypeName, FINE_GRAINED_LINEAGE_FIELD_SET_TYPE);
+ subType
+ .fieldNames()
+ .forEachRemaining(
+ subTypeKey -> {
+ ((ArrayNode) fineGrainedLineage.get(arrayTypeName)).add(subTypeKey);
+ Float scoreValue = subType.get(subTypeKey).floatValue();
+ if (scoreValue <= minimumConfidenceScore.get()) {
+ minimumConfidenceScore.set(scoreValue);
+ fineGrainedLineage.set(
+ FINE_GRAINED_CONFIDENCE_SCORE,
+ instance.numberNode(minimumConfidenceScore.get()));
+ }
+ });
+ arrayNode.add(fineGrainedLineage);
+ }
+ });
}
}
diff --git a/entity-registry/src/test/java/com/linkedin/metadata/models/registry/UpstreamLineageTemplateTest.java b/entity-registry/src/test/java/com/linkedin/metadata/models/registry/UpstreamLineageTemplateTest.java
new file mode 100644
index 00000000000000..07982a87be56cb
--- /dev/null
+++ b/entity-registry/src/test/java/com/linkedin/metadata/models/registry/UpstreamLineageTemplateTest.java
@@ -0,0 +1,359 @@
+package com.linkedin.metadata.models.registry;
+
+import static com.fasterxml.jackson.databind.node.JsonNodeFactory.*;
+
+import com.fasterxml.jackson.databind.node.NumericNode;
+import com.github.fge.jackson.jsonpointer.JsonPointer;
+import com.github.fge.jsonpatch.AddOperation;
+import com.github.fge.jsonpatch.JsonPatch;
+import com.github.fge.jsonpatch.JsonPatchOperation;
+import com.github.fge.jsonpatch.RemoveOperation;
+import com.linkedin.common.UrnArray;
+import com.linkedin.common.urn.Urn;
+import com.linkedin.common.urn.UrnUtils;
+import com.linkedin.data.DataMap;
+import com.linkedin.dataset.FineGrainedLineage;
+import com.linkedin.dataset.FineGrainedLineageDownstreamType;
+import com.linkedin.dataset.FineGrainedLineageUpstreamType;
+import com.linkedin.dataset.UpstreamLineage;
+import com.linkedin.metadata.models.registry.template.dataset.UpstreamLineageTemplate;
+import java.util.ArrayList;
+import java.util.List;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class UpstreamLineageTemplateTest {
+ @Test
+ public void testPatchUpstream() throws Exception {
+ UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate();
+ UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault();
+ List patchOperations = new ArrayList<>();
+ NumericNode upstreamConfidenceScore = instance.numberNode(1.0f);
+ JsonPatchOperation operation =
+ new AddOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"),
+ upstreamConfidenceScore);
+ patchOperations.add(operation);
+ JsonPatch jsonPatch = new JsonPatch(patchOperations);
+
+ // Initial population test
+ UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch);
+ // Hack because Jackson parses values to doubles instead of floats
+ DataMap dataMap = new DataMap();
+ dataMap.put("confidenceScore", 1.0);
+ FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap);
+ UrnArray urns = new UrnArray();
+ Urn urn1 =
+ UrnUtils.getUrn(
+ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)");
+ urns.add(urn1);
+ fineGrainedLineage.setUpstreams(urns);
+ fineGrainedLineage.setTransformOperation("CREATE");
+ fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
+ fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
+ Assert.assertEquals(result.getFineGrainedLineages().get(0), fineGrainedLineage);
+
+ // Test non-overwrite upstreams and correct confidence score
+ JsonPatchOperation operation2 =
+ new AddOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"),
+ upstreamConfidenceScore);
+ NumericNode upstreamConfidenceScore2 = instance.numberNode(0.1f);
+ JsonPatchOperation operation3 =
+ new AddOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"),
+ upstreamConfidenceScore2);
+ List patchOperations2 = new ArrayList<>();
+ patchOperations2.add(operation2);
+ patchOperations2.add(operation3);
+ JsonPatch jsonPatch2 = new JsonPatch(patchOperations2);
+ UpstreamLineage result2 = upstreamLineageTemplate.applyPatch(result, jsonPatch2);
+ // Hack because Jackson parses values to doubles instead of floats
+ DataMap dataMap2 = new DataMap();
+ dataMap2.put("confidenceScore", 0.1);
+ FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2);
+ UrnArray urns2 = new UrnArray();
+ Urn urn2 =
+ UrnUtils.getUrn(
+ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)");
+ urns2.add(urn1);
+ urns2.add(urn2);
+ fineGrainedLineage2.setUpstreams(urns2);
+ fineGrainedLineage2.setTransformOperation("CREATE");
+ fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
+ fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
+ Assert.assertEquals(result2.getFineGrainedLineages().get(0), fineGrainedLineage2);
+
+ // Check different upstream types
+ JsonPatchOperation operation4 =
+ new AddOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"),
+ upstreamConfidenceScore);
+ List patchOperations3 = new ArrayList<>();
+ patchOperations3.add(operation4);
+ JsonPatch jsonPatch3 = new JsonPatch(patchOperations3);
+ UpstreamLineage result3 = upstreamLineageTemplate.applyPatch(result2, jsonPatch3);
+ // Hack because Jackson parses values to doubles instead of floats
+ DataMap dataMap3 = new DataMap();
+ dataMap3.put("confidenceScore", 1.0);
+ FineGrainedLineage fineGrainedLineage3 = new FineGrainedLineage(dataMap3);
+ UrnArray urns3 = new UrnArray();
+ Urn urn3 =
+ UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)");
+ urns3.add(urn3);
+ fineGrainedLineage3.setUpstreams(urns3);
+ fineGrainedLineage3.setTransformOperation("CREATE");
+ fineGrainedLineage3.setUpstreamType(FineGrainedLineageUpstreamType.DATASET);
+ fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
+ // Splits into two for different types
+ Assert.assertEquals(result3.getFineGrainedLineages().get(1), fineGrainedLineage3);
+
+ // Check different transform types
+ JsonPatchOperation operation5 =
+ new AddOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"),
+ upstreamConfidenceScore);
+ List patchOperations4 = new ArrayList<>();
+ patchOperations4.add(operation5);
+ JsonPatch jsonPatch4 = new JsonPatch(patchOperations4);
+ UpstreamLineage result4 = upstreamLineageTemplate.applyPatch(result3, jsonPatch4);
+ // Hack because Jackson parses values to doubles instead of floats
+ DataMap dataMap4 = new DataMap();
+ dataMap4.put("confidenceScore", 1.0);
+ FineGrainedLineage fineGrainedLineage4 = new FineGrainedLineage(dataMap4);
+ UrnArray urns4 = new UrnArray();
+ Urn urn4 =
+ UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)");
+ urns4.add(urn4);
+ fineGrainedLineage4.setUpstreams(urns4);
+ fineGrainedLineage4.setTransformOperation("TRANSFORM");
+ fineGrainedLineage4.setUpstreamType(FineGrainedLineageUpstreamType.DATASET);
+ fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
+ // New entry in array because of new transformation type
+ Assert.assertEquals(result4.getFineGrainedLineages().get(2), fineGrainedLineage4);
+
+ // Remove
+ JsonPatchOperation removeOperation =
+ new RemoveOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"));
+ JsonPatchOperation removeOperation2 =
+ new RemoveOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"));
+ JsonPatchOperation removeOperation3 =
+ new RemoveOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"));
+ JsonPatchOperation removeOperation4 =
+ new RemoveOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"));
+
+ List removeOperations = new ArrayList<>();
+ removeOperations.add(removeOperation);
+ removeOperations.add(removeOperation2);
+ removeOperations.add(removeOperation3);
+ removeOperations.add(removeOperation4);
+ JsonPatch removePatch = new JsonPatch(removeOperations);
+ UpstreamLineage finalResult = upstreamLineageTemplate.applyPatch(result4, removePatch);
+ Assert.assertEquals(upstreamLineageTemplate.getDefault(), finalResult);
+ }
+
+ @Test
+ public void testPatchDownstream() throws Exception {
+ UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate();
+ UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault();
+ List patchOperations = new ArrayList<>();
+ NumericNode downstreamConfidenceScore = instance.numberNode(1.0f);
+ JsonPatchOperation operation =
+ new AddOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"),
+ downstreamConfidenceScore);
+ patchOperations.add(operation);
+ JsonPatch jsonPatch = new JsonPatch(patchOperations);
+
+ // Initial population test
+ UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch);
+ // Hack because Jackson parses values to doubles instead of floats
+ DataMap dataMap = new DataMap();
+ dataMap.put("confidenceScore", 1.0);
+ FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap);
+ UrnArray urns = new UrnArray();
+ Urn urn1 =
+ UrnUtils.getUrn(
+ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)");
+ urns.add(urn1);
+ fineGrainedLineage.setDownstreams(urns);
+ fineGrainedLineage.setTransformOperation("CREATE");
+ fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
+ fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
+ Assert.assertEquals(result.getFineGrainedLineages().get(0), fineGrainedLineage);
+
+ // Test non-overwrite downstreams and correct confidence score
+ JsonPatchOperation operation2 =
+ new AddOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"),
+ downstreamConfidenceScore);
+ NumericNode downstreamConfidenceScore2 = instance.numberNode(0.1f);
+ JsonPatchOperation operation3 =
+ new AddOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"),
+ downstreamConfidenceScore2);
+ List patchOperations2 = new ArrayList<>();
+ patchOperations2.add(operation2);
+ patchOperations2.add(operation3);
+ JsonPatch jsonPatch2 = new JsonPatch(patchOperations2);
+ UpstreamLineage result2 = upstreamLineageTemplate.applyPatch(result, jsonPatch2);
+ // Hack because Jackson parses values to doubles instead of floats
+ DataMap dataMap2 = new DataMap();
+ dataMap2.put("confidenceScore", 0.1);
+ FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2);
+ UrnArray urns2 = new UrnArray();
+ Urn urn2 =
+ UrnUtils.getUrn(
+ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)");
+ urns2.add(urn1);
+ urns2.add(urn2);
+ fineGrainedLineage2.setDownstreams(urns2);
+ fineGrainedLineage2.setTransformOperation("CREATE");
+ fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
+ fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
+ Assert.assertEquals(result2.getFineGrainedLineages().get(0), fineGrainedLineage2);
+
+ // Check different downstream types
+ JsonPatchOperation operation4 =
+ new AddOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"),
+ downstreamConfidenceScore);
+ List patchOperations3 = new ArrayList<>();
+ patchOperations3.add(operation4);
+ JsonPatch jsonPatch3 = new JsonPatch(patchOperations3);
+ UpstreamLineage result3 = upstreamLineageTemplate.applyPatch(result2, jsonPatch3);
+ // Hack because Jackson parses values to doubles instead of floats
+ DataMap dataMap3 = new DataMap();
+ dataMap3.put("confidenceScore", 1.0);
+ FineGrainedLineage fineGrainedLineage3 = new FineGrainedLineage(dataMap3);
+ UrnArray urns3 = new UrnArray();
+ Urn urn3 =
+ UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)");
+ urns3.add(urn3);
+ fineGrainedLineage3.setDownstreams(urns3);
+ fineGrainedLineage3.setTransformOperation("CREATE");
+ fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD);
+ fineGrainedLineage3.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
+ // Splits into two for different types
+ Assert.assertEquals(result3.getFineGrainedLineages().get(1), fineGrainedLineage3);
+
+ // Check different transform types
+ JsonPatchOperation operation5 =
+ new AddOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/TRANSFORM/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"),
+ downstreamConfidenceScore);
+ List patchOperations4 = new ArrayList<>();
+ patchOperations4.add(operation5);
+ JsonPatch jsonPatch4 = new JsonPatch(patchOperations4);
+ UpstreamLineage result4 = upstreamLineageTemplate.applyPatch(result3, jsonPatch4);
+ // Hack because Jackson parses values to doubles instead of floats
+ DataMap dataMap4 = new DataMap();
+ dataMap4.put("confidenceScore", 1.0);
+ FineGrainedLineage fineGrainedLineage4 = new FineGrainedLineage(dataMap4);
+ UrnArray urns4 = new UrnArray();
+ Urn urn4 =
+ UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)");
+ urns4.add(urn4);
+ fineGrainedLineage4.setDownstreams(urns4);
+ fineGrainedLineage4.setTransformOperation("TRANSFORM");
+ fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD);
+ fineGrainedLineage4.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
+ // New entry in array because of new transformation type
+ Assert.assertEquals(result4.getFineGrainedLineages().get(2), fineGrainedLineage4);
+
+ // Remove
+ JsonPatchOperation removeOperation =
+ new RemoveOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"));
+ JsonPatchOperation removeOperation2 =
+ new RemoveOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"));
+ JsonPatchOperation removeOperation3 =
+ new RemoveOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"));
+ JsonPatchOperation removeOperation4 =
+ new RemoveOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/TRANSFORM/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"));
+
+ List removeOperations = new ArrayList<>();
+ removeOperations.add(removeOperation);
+ removeOperations.add(removeOperation2);
+ removeOperations.add(removeOperation3);
+ removeOperations.add(removeOperation4);
+ JsonPatch removePatch = new JsonPatch(removeOperations);
+ UpstreamLineage finalResult = upstreamLineageTemplate.applyPatch(result4, removePatch);
+ Assert.assertEquals(upstreamLineageTemplate.getDefault(), finalResult);
+ }
+
+ @Test
+ public void testUpAndDown() throws Exception {
+ UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate();
+ UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault();
+ List patchOperations = new ArrayList<>();
+ NumericNode downstreamConfidenceScore = instance.numberNode(1.0f);
+ JsonPatchOperation operation =
+ new AddOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"),
+ downstreamConfidenceScore);
+ patchOperations.add(operation);
+ NumericNode upstreamConfidenceScore = instance.numberNode(1.0f);
+ JsonPatchOperation operation2 =
+ new AddOperation(
+ new JsonPointer(
+ "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"),
+ upstreamConfidenceScore);
+ patchOperations.add(operation2);
+ JsonPatch jsonPatch = new JsonPatch(patchOperations);
+
+ // Initial population test
+ UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch);
+ // Hack because Jackson parses values to doubles instead of floats
+ DataMap dataMap = new DataMap();
+ dataMap.put("confidenceScore", 1.0);
+ FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap);
+ UrnArray urns = new UrnArray();
+ Urn urn1 =
+ UrnUtils.getUrn(
+ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)");
+ urns.add(urn1);
+ fineGrainedLineage.setTransformOperation("CREATE");
+ fineGrainedLineage.setUpstreams(urns);
+ fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
+ fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
+ fineGrainedLineage.setDownstreams(urns);
+
+ // Hack because Jackson parses values to doubles instead of floats
+ DataMap dataMap2 = new DataMap();
+ dataMap2.put("confidenceScore", 1.0);
+ FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2);
+ fineGrainedLineage2.setTransformOperation("CREATE");
+ fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
+ fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
+ fineGrainedLineage2.setDownstreams(urns);
+
+ Assert.assertEquals(result.getFineGrainedLineages().get(1), fineGrainedLineage2);
+ }
+}
diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java
index f5a3c9c12ff70e..3d9b533dc8f720 100644
--- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java
+++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java
@@ -125,6 +125,11 @@ public class Constants {
public static final String VIEW_PROPERTIES_ASPECT_NAME = "viewProperties";
public static final String DATASET_PROFILE_ASPECT_NAME = "datasetProfile";
+ // Aspect support
+ public static final String FINE_GRAINED_LINEAGE_DATASET_TYPE = "DATASET";
+ public static final String FINE_GRAINED_LINEAGE_FIELD_SET_TYPE = "FIELD_SET";
+ public static final String FINE_GRAINED_LINEAGE_FIELD_TYPE = "FIELD";
+
// Chart
public static final String CHART_KEY_ASPECT_NAME = "chartKey";
public static final String CHART_INFO_ASPECT_NAME = "chartInfo";
diff --git a/metadata-ingestion/src/datahub/configuration/git.py b/metadata-ingestion/src/datahub/configuration/git.py
index 9ea9007553839b..a5f88744661a4a 100644
--- a/metadata-ingestion/src/datahub/configuration/git.py
+++ b/metadata-ingestion/src/datahub/configuration/git.py
@@ -6,7 +6,6 @@
from datahub.configuration.common import ConfigModel
from datahub.configuration.validate_field_rename import pydantic_renamed_field
-from datahub.ingestion.source.git.git_import import GitClone
_GITHUB_PREFIX = "https://github.com/"
_GITLAB_PREFIX = "https://gitlab.com/"
@@ -151,6 +150,9 @@ def clone(
) -> pathlib.Path:
"""Clones the repo into a temporary directory and returns the path to the checkout."""
+ # We import this here to avoid a hard dependency on gitpython.
+ from datahub.ingestion.source.git.git_import import GitClone
+
assert self.repo_ssh_locator
git_clone = GitClone(str(tmp_path))
diff --git a/metadata-ingestion/src/datahub/ingestion/source/schema/json_schema.py b/metadata-ingestion/src/datahub/ingestion/source/schema/json_schema.py
index f6e944f4fc3cb3..c7e8a15d8dfa48 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/schema/json_schema.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/schema/json_schema.py
@@ -9,6 +9,7 @@
from os.path import basename, dirname
from pathlib import Path
from typing import Any, Iterable, List, Optional, Union
+from urllib.parse import urlparse
import jsonref
from pydantic import AnyHttpUrl, DirectoryPath, FilePath, validator
@@ -53,6 +54,16 @@
logger = logging.getLogger(__name__)
+def is_url_valid(url: Optional[str]) -> bool:
+ if url is None:
+ return False
+ try:
+ result = urlparse(url)
+ return all([result.scheme, result.netloc])
+ except Exception:
+ return False
+
+
class URIReplacePattern(ConfigModel):
match: str = Field(
description="Pattern to match on uri-s as part of reference resolution. See replace field",
@@ -281,12 +292,14 @@ def _load_one_file(
entityUrn=dataset_urn, aspect=models.StatusClass(removed=False)
).as_workunit()
+ external_url = JsonSchemaTranslator._get_id_from_any_schema(schema_dict)
+ if not is_url_valid(external_url):
+ external_url = None
+
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=models.DatasetPropertiesClass(
- externalUrl=JsonSchemaTranslator._get_id_from_any_schema(
- schema_dict
- ),
+ externalUrl=external_url,
name=dataset_simple_name,
description=JsonSchemaTranslator._get_description_from_any_schema(
schema_dict
diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py
index 844a458d9f1ab6..a2f91e5fae1a98 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py
@@ -274,16 +274,16 @@ def is_dataset_eligible_for_profiling(
return False
if self.config.profiling.profile_table_size_limit is not None and (
- size_in_bytes is None
- or size_in_bytes / (2**30)
+ size_in_bytes is not None
+ and size_in_bytes / (2**30)
> self.config.profiling.profile_table_size_limit
):
self.report.profiling_skipped_size_limit[schema_name] += 1
return False
if self.config.profiling.profile_table_row_limit is not None and (
- rows_count is None
- or rows_count > self.config.profiling.profile_table_row_limit
+ rows_count is not None
+ and rows_count > self.config.profiling.profile_table_row_limit
):
self.report.profiling_skipped_row_limit[schema_name] += 1
return False
diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py
index da44d09121c6c1..f870e99df27c5f 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py
@@ -21,7 +21,7 @@
import tableauserverclient as TSC
from pydantic import root_validator, validator
from pydantic.fields import Field
-from requests.adapters import ConnectionError
+from requests.adapters import ConnectionError, HTTPAdapter
from tableauserverclient import (
PersonalAccessTokenAuth,
Server,
@@ -29,6 +29,7 @@
TableauAuth,
)
from tableauserverclient.server.endpoint.exceptions import NonXMLResponseError
+from urllib3 import Retry
import datahub.emitter.mce_builder as builder
import datahub.utilities.sqlglot_lineage as sqlglot_l
@@ -174,6 +175,7 @@ class TableauConnectionConfig(ConfigModel):
description="Unique relationship between the Tableau Server and site",
)
+ max_retries: int = Field(3, description="Number of retries for failed requests.")
ssl_verify: Union[bool, str] = Field(
default=True,
description="Whether to verify SSL certificates. If using self-signed certificates, set to false or provide the path to the .pem certificate bundle.",
@@ -224,6 +226,17 @@ def make_tableau_client(self) -> Server:
# From https://stackoverflow.com/a/50159273/5004662.
server._session.trust_env = False
+ # Setup request retries.
+ adapter = HTTPAdapter(
+ max_retries=Retry(
+ total=self.max_retries,
+ backoff_factor=1,
+ status_forcelist=[429, 500, 502, 503, 504],
+ )
+ )
+ server._session.mount("http://", adapter)
+ server._session.mount("https://", adapter)
+
server.auth.sign_in(authentication)
return server
except ServerResponseError as e:
diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py
index fcfe049fb15cf9..294a80572669b8 100644
--- a/metadata-ingestion/src/datahub/specific/dataset.py
+++ b/metadata-ingestion/src/datahub/specific/dataset.py
@@ -1,4 +1,4 @@
-from typing import Dict, Generic, List, Optional, TypeVar, Union
+from typing import Dict, Generic, List, Optional, Tuple, TypeVar, Union
from urllib.parse import quote
from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
@@ -6,6 +6,9 @@
DatasetPropertiesClass as DatasetProperties,
EditableDatasetPropertiesClass as EditableDatasetProperties,
EditableSchemaMetadataClass as EditableSchemaMetadata,
+ FineGrainedLineageClass as FineGrainedLineage,
+ FineGrainedLineageDownstreamTypeClass as FineGrainedLineageDownstreamType,
+ FineGrainedLineageUpstreamTypeClass as FineGrainedLineageUpstreamType,
GlobalTagsClass as GlobalTags,
GlossaryTermAssociationClass as Term,
GlossaryTermsClass as GlossaryTerms,
@@ -144,6 +147,108 @@ def set_upstream_lineages(self, upstreams: List[Upstream]) -> "DatasetPatchBuild
)
return self
+ def add_fine_grained_upstream_lineage(
+ self, fine_grained_lineage: FineGrainedLineage
+ ) -> "DatasetPatchBuilder":
+ (
+ transform_op,
+ upstream_type,
+ downstream_type,
+ ) = DatasetPatchBuilder.get_fine_grained_key(fine_grained_lineage)
+ for upstream_urn in fine_grained_lineage.upstreams or []:
+ self._add_patch(
+ UpstreamLineage.ASPECT_NAME,
+ "add",
+ path=DatasetPatchBuilder.quote_fine_grained_upstream_path(
+ transform_op, upstream_type, upstream_urn
+ ),
+ value=fine_grained_lineage.confidenceScore,
+ )
+ for downstream_urn in fine_grained_lineage.downstreams or []:
+ self._add_patch(
+ UpstreamLineage.ASPECT_NAME,
+ "add",
+ path=DatasetPatchBuilder.quote_fine_grained_downstream_path(
+ transform_op, downstream_type, downstream_urn
+ ),
+ value=fine_grained_lineage.confidenceScore,
+ )
+ return self
+
+ @staticmethod
+ def get_fine_grained_key(
+ fine_grained_lineage: FineGrainedLineage,
+ ) -> Tuple[str, str, str]:
+ transform_op = fine_grained_lineage.transformOperation or "NONE"
+ upstream_type = (
+ fine_grained_lineage.upstreamType
+ if isinstance(fine_grained_lineage.upstreamType, str)
+ else FineGrainedLineageUpstreamType.FIELD_SET
+ )
+ downstream_type = (
+ fine_grained_lineage.downstreamType
+ if isinstance(fine_grained_lineage.downstreamType, str)
+ else FineGrainedLineageDownstreamType.FIELD_SET
+ )
+ return transform_op, upstream_type, downstream_type
+
+ @staticmethod
+ def quote_fine_grained_downstream_path(
+ transform_op: str, downstream_type: str, downstream_urn: str
+ ) -> str:
+ return (
+ f"/fineGrainedLineages/{quote(transform_op, safe='')}/downstreamType/"
+ f"{quote(downstream_type, safe='')}/{quote(downstream_urn, safe='')}"
+ )
+
+ @staticmethod
+ def quote_fine_grained_upstream_path(
+ transform_op: str, upstream_type: str, upstream_urn: str
+ ) -> str:
+ return (
+ f"/fineGrainedLineages/{quote(transform_op, safe='')}/upstreamType/"
+ f"{quote(upstream_type, safe='')}/{quote(upstream_urn, safe='')}"
+ )
+
+ def remove_fine_grained_upstream_lineage(
+ self, fine_grained_lineage: FineGrainedLineage
+ ) -> "DatasetPatchBuilder":
+ (
+ transform_op,
+ upstream_type,
+ downstream_type,
+ ) = DatasetPatchBuilder.get_fine_grained_key(fine_grained_lineage)
+ for upstream_urn in fine_grained_lineage.upstreams or []:
+ self._add_patch(
+ UpstreamLineage.ASPECT_NAME,
+ "remove",
+ path=DatasetPatchBuilder.quote_fine_grained_upstream_path(
+ transform_op, upstream_type, upstream_urn
+ ),
+ value={},
+ )
+ for downstream_urn in fine_grained_lineage.downstreams or []:
+ self._add_patch(
+ UpstreamLineage.ASPECT_NAME,
+ "remove",
+ path=DatasetPatchBuilder.quote_fine_grained_downstream_path(
+ transform_op, downstream_type, downstream_urn
+ ),
+ value={},
+ )
+ return self
+
+ def set_fine_grained_upstream_lineages(
+ self, fine_grained_lineages: List[FineGrainedLineage]
+ ) -> "DatasetPatchBuilder":
+ self._add_patch(
+ UpstreamLineage.ASPECT_NAME,
+ "add",
+ path="/fineGrainedLineages",
+ value=fine_grained_lineages,
+ )
+ return self
+
def add_tag(self, tag: Tag) -> "DatasetPatchBuilder":
self._add_patch(
GlobalTags.ASPECT_NAME, "add", path=f"/tags/{tag.tag}", value=tag
diff --git a/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json b/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json
index d5dfe125942fba..ed5a7723ac2bf1 100644
--- a/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json
+++ b/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json
@@ -42,26 +42,31 @@
}
},
{
- "entityType": "dataset",
- "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
- "changeType": "PATCH",
- "aspectName": "upstreamLineage",
- "aspect": {
- "json": [
- {
- "op": "add",
- "path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29",
- "value": {
- "auditStamp": {
- "time": 0,
- "actor": "urn:li:corpuser:unknown"
- },
- "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)",
- "type": "TRANSFORMED"
- }
- }
- ]
- }
+ "entityType": "dataset",
+ "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
+ "changeType": "PATCH",
+ "aspectName": "upstreamLineage",
+ "aspect": {
+ "json": [
+ {
+ "op": "add",
+ "path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29",
+ "value": {
+ "auditStamp": {
+ "time": 0,
+ "actor": "urn:li:corpuser:unknown"
+ },
+ "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)",
+ "type": "TRANSFORMED"
+ }
+ },
+ {
+ "op": "add",
+ "path": "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29",
+ "value": 1.0
+ }
+ ]
+ }
},
{
"entityType": "dataset",
diff --git a/metadata-ingestion/tests/unit/patch/test_patch_builder.py b/metadata-ingestion/tests/unit/patch/test_patch_builder.py
index 0701b3d6968959..f05c4978f8644e 100644
--- a/metadata-ingestion/tests/unit/patch/test_patch_builder.py
+++ b/metadata-ingestion/tests/unit/patch/test_patch_builder.py
@@ -7,6 +7,9 @@
from datahub.ingestion.sink.file import write_metadata_file
from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
+ FineGrainedLineageClass,
+ FineGrainedLineageDownstreamTypeClass,
+ FineGrainedLineageUpstreamTypeClass,
GenericAspectClass,
MetadataChangeProposalClass,
TagAssociationClass,
@@ -53,6 +56,19 @@ def test_complex_dataset_patch(
type=DatasetLineageTypeClass.TRANSFORMED,
)
)
+ .add_fine_grained_upstream_lineage(
+ fine_grained_lineage=FineGrainedLineageClass(
+ upstreamType=FineGrainedLineageUpstreamTypeClass.DATASET,
+ upstreams=[
+ make_dataset_urn(
+ platform="hive", name="fct_users_created_upstream", env="PROD"
+ )
+ ],
+ downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD_SET,
+ transformOperation="TRANSFORM",
+ confidenceScore=1.0,
+ )
+ )
)
patcher.for_field("field1").add_tag(TagAssociationClass(tag=make_tag_urn("tag1")))
diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/patch/dataset/UpstreamLineagePatchBuilder.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/patch/dataset/UpstreamLineagePatchBuilder.java
index 6ded8a25b4e22c..9db2ebc522e093 100644
--- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/patch/dataset/UpstreamLineagePatchBuilder.java
+++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/patch/dataset/UpstreamLineagePatchBuilder.java
@@ -5,10 +5,14 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.common.urn.DatasetUrn;
+import com.linkedin.common.urn.Urn;
import com.linkedin.dataset.DatasetLineageType;
+import com.linkedin.dataset.FineGrainedLineageDownstreamType;
+import com.linkedin.dataset.FineGrainedLineageUpstreamType;
import datahub.client.patch.AbstractMultiFieldPatchBuilder;
import datahub.client.patch.PatchOperationType;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import lombok.ToString;
import org.apache.commons.lang3.tuple.ImmutableTriple;
@@ -16,7 +20,8 @@
public class UpstreamLineagePatchBuilder
extends AbstractMultiFieldPatchBuilder {
- private static final String PATH_START = "/upstreams/";
+ private static final String UPSTREAMS_PATH_START = "/upstreams/";
+ private static final String FINE_GRAINED_PATH_START = "/fineGrainedLineages/";
private static final String DATASET_KEY = "dataset";
private static final String AUDIT_STAMP_KEY = "auditStamp";
private static final String TIME_KEY = "time";
@@ -34,13 +39,233 @@ public UpstreamLineagePatchBuilder addUpstream(
.set(AUDIT_STAMP_KEY, auditStamp);
pathValues.add(
- ImmutableTriple.of(PatchOperationType.ADD.getValue(), PATH_START + datasetUrn, value));
+ ImmutableTriple.of(
+ PatchOperationType.ADD.getValue(), UPSTREAMS_PATH_START + datasetUrn, value));
return this;
}
public UpstreamLineagePatchBuilder removeUpstream(@Nonnull DatasetUrn datasetUrn) {
pathValues.add(
- ImmutableTriple.of(PatchOperationType.REMOVE.getValue(), PATH_START + datasetUrn, null));
+ ImmutableTriple.of(
+ PatchOperationType.REMOVE.getValue(), UPSTREAMS_PATH_START + datasetUrn, null));
+ return this;
+ }
+
+ /**
+ * Method for adding an upstream FineGrained Dataset
+ *
+ * @param datasetUrn dataset to be set as upstream
+ * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
+ * full confidence
+ * @param transformationOperation string operation type that describes the transformation
+ * operation happening in the lineage edge
+ * @return this builder
+ */
+ public UpstreamLineagePatchBuilder addFineGrainedUpstreamDataset(
+ @Nonnull DatasetUrn datasetUrn,
+ @Nullable Float confidenceScore,
+ @Nonnull String transformationOperation) {
+ Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
+
+ pathValues.add(
+ ImmutableTriple.of(
+ PatchOperationType.ADD.getValue(),
+ FINE_GRAINED_PATH_START
+ + transformationOperation
+ + "/"
+ + "upstreamType"
+ + "/"
+ + "DATASET"
+ + "/"
+ + datasetUrn,
+ instance.numberNode(finalConfidenceScore)));
+ return this;
+ }
+
+ /**
+ * Adds a field as a fine grained upstream
+ *
+ * @param schemaFieldUrn a schema field to be marked as upstream, format:
+ * urn:li:schemaField(DATASET_URN, COLUMN NAME)
+ * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
+ * full confidence
+ * @param transformationOperation string operation type that describes the transformation
+ * operation happening in the lineage edge
+ * @param type the upstream lineage type, either Field or Field Set
+ * @return this builder
+ */
+ public UpstreamLineagePatchBuilder addFineGrainedUpstreamField(
+ @Nonnull Urn schemaFieldUrn,
+ @Nullable Float confidenceScore,
+ @Nonnull String transformationOperation,
+ @Nullable FineGrainedLineageUpstreamType type) {
+ Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
+ String finalType;
+ if (type == null) {
+ // Default to set of fields if not explicitly a single field
+ finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString();
+ } else {
+ finalType = type.toString();
+ }
+
+ pathValues.add(
+ ImmutableTriple.of(
+ PatchOperationType.ADD.getValue(),
+ FINE_GRAINED_PATH_START
+ + transformationOperation
+ + "/"
+ + "upstreamType"
+ + "/"
+ + finalType
+ + "/"
+ + schemaFieldUrn,
+ instance.numberNode(finalConfidenceScore)));
+
+ return this;
+ }
+
+ /**
+ * Adds a field as a fine grained downstream
+ *
+ * @param schemaFieldUrn a schema field to be marked as downstream, format:
+ * urn:li:schemaField(DATASET_URN, COLUMN NAME)
+ * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
+ * full confidence
+ * @param transformationOperation string operation type that describes the transformation
+ * operation happening in the lineage edge
+ * @param type the downstream lineage type, either Field or Field Set
+ * @return this builder
+ */
+ public UpstreamLineagePatchBuilder addFineGrainedDownstreamField(
+ @Nonnull Urn schemaFieldUrn,
+ @Nullable Float confidenceScore,
+ @Nonnull String transformationOperation,
+ @Nullable FineGrainedLineageDownstreamType type) {
+ Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
+ String finalType;
+ if (type == null) {
+ // Default to set of fields if not explicitly a single field
+ finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString();
+ } else {
+ finalType = type.toString();
+ }
+
+ pathValues.add(
+ ImmutableTriple.of(
+ PatchOperationType.ADD.getValue(),
+ FINE_GRAINED_PATH_START
+ + transformationOperation
+ + "/"
+ + "downstreamType"
+ + "/"
+ + finalType
+ + "/"
+ + schemaFieldUrn,
+ instance.numberNode(finalConfidenceScore)));
+ return this;
+ }
+
+ private Float getConfidenceScoreOrDefault(@Nullable Float confidenceScore) {
+ float finalConfidenceScore;
+ if (confidenceScore != null && confidenceScore > 0 && confidenceScore <= 1.0f) {
+ finalConfidenceScore = confidenceScore;
+ } else {
+ finalConfidenceScore = 1.0f;
+ }
+
+ return finalConfidenceScore;
+ }
+
+ /**
+ * Removes a field as a fine grained upstream
+ *
+ * @param schemaFieldUrn a schema field to be marked as upstream, format:
+ * urn:li:schemaField(DATASET_URN, COLUMN NAME)
+ * @param transformationOperation string operation type that describes the transformation
+ * operation happening in the lineage edge
+ * @param type the upstream lineage type, either Field or Field Set
+ * @return this builder
+ */
+ public UpstreamLineagePatchBuilder removeFineGrainedUpstreamField(
+ @Nonnull Urn schemaFieldUrn,
+ @Nonnull String transformationOperation,
+ @Nullable FineGrainedLineageUpstreamType type) {
+ String finalType;
+ if (type == null) {
+ // Default to set of fields if not explicitly a single field
+ finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString();
+ } else {
+ finalType = type.toString();
+ }
+
+ pathValues.add(
+ ImmutableTriple.of(
+ PatchOperationType.REMOVE.getValue(),
+ FINE_GRAINED_PATH_START
+ + transformationOperation
+ + "/"
+ + "upstreamType"
+ + "/"
+ + finalType
+ + "/"
+ + schemaFieldUrn,
+ null));
+
+ return this;
+ }
+
+ public UpstreamLineagePatchBuilder removeFineGrainedUpstreamDataset(
+ @Nonnull DatasetUrn datasetUrn, @Nonnull String transformationOperation) {
+
+ pathValues.add(
+ ImmutableTriple.of(
+ PatchOperationType.REMOVE.getValue(),
+ FINE_GRAINED_PATH_START
+ + transformationOperation
+ + "/"
+ + "upstreamType"
+ + "/"
+ + "DATASET"
+ + "/"
+ + datasetUrn,
+ null));
+ return this;
+ }
+
+ /**
+ * Adds a field as a fine grained downstream
+ *
+ * @param schemaFieldUrn a schema field to be marked as downstream, format:
+ * urn:li:schemaField(DATASET_URN, COLUMN NAME)
+ * @param transformationOperation string operation type that describes the transformation
+ * operation happening in the lineage edge
+ * @param type the downstream lineage type, either Field or Field Set
+ * @return this builder
+ */
+ public UpstreamLineagePatchBuilder removeFineGrainedDownstreamField(
+ @Nonnull Urn schemaFieldUrn,
+ @Nonnull String transformationOperation,
+ @Nullable FineGrainedLineageDownstreamType type) {
+ String finalType;
+ if (type == null) {
+ // Default to set of fields if not explicitly a single field
+ finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString();
+ } else {
+ finalType = type.toString();
+ }
+
+ pathValues.add(
+ ImmutableTriple.of(
+ PatchOperationType.REMOVE.getValue(),
+ FINE_GRAINED_PATH_START
+ + transformationOperation
+ + "/"
+ + "downstreamType"
+ + "/"
+ + finalType
+ + "/"
+ + schemaFieldUrn,
+ null));
return this;
}
diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java
index 1d387acb0ce12b..563742990f5468 100644
--- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java
+++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java
@@ -14,6 +14,7 @@
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.GlossaryTermUrn;
import com.linkedin.common.urn.TagUrn;
+import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.dataset.DatasetLineageType;
import com.linkedin.metadata.graph.LineageDirection;
@@ -49,15 +50,21 @@ public class PatchTest {
public void testLocalUpstream() {
RestEmitter restEmitter = new RestEmitter(RestEmitterConfig.builder().build());
try {
+ DatasetUrn upstreamUrn =
+ DatasetUrn.createFromString(
+ "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)");
+ Urn schemaFieldUrn =
+ UrnUtils.getUrn(
+ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), foo)");
MetadataChangeProposal upstreamPatch =
new UpstreamLineagePatchBuilder()
.urn(
UrnUtils.getUrn(
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)"))
- .addUpstream(
- DatasetUrn.createFromString(
- "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"),
- DatasetLineageType.TRANSFORMED)
+ .addUpstream(upstreamUrn, DatasetLineageType.TRANSFORMED)
+ .addFineGrainedUpstreamDataset(upstreamUrn, null, "TRANSFORM")
+ .addFineGrainedUpstreamField(schemaFieldUrn, null, "TRANSFORM", null)
+ .addFineGrainedDownstreamField(schemaFieldUrn, null, "TRANSFORM", null)
.build();
Future response = restEmitter.emit(upstreamPatch);
@@ -73,6 +80,12 @@ public void testLocalUpstream() {
public void testLocalUpstreamRemove() {
RestEmitter restEmitter = new RestEmitter(RestEmitterConfig.builder().build());
try {
+ DatasetUrn upstreamUrn =
+ DatasetUrn.createFromString(
+ "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)");
+ Urn schemaFieldUrn =
+ UrnUtils.getUrn(
+ "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), foo)");
MetadataChangeProposal upstreamPatch =
new UpstreamLineagePatchBuilder()
.urn(
@@ -81,6 +94,9 @@ public void testLocalUpstreamRemove() {
.removeUpstream(
DatasetUrn.createFromString(
"urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"))
+ .removeFineGrainedUpstreamDataset(upstreamUrn, "TRANSFORM")
+ .removeFineGrainedUpstreamField(schemaFieldUrn, "TRANSFORM", null)
+ .removeFineGrainedDownstreamField(schemaFieldUrn, "TRANSFORM", null)
.build();
Future response = restEmitter.emit(upstreamPatch);
diff --git a/smoke-test/tests/cypress/cypress/e2e/browse/browseV2.js b/smoke-test/tests/cypress/cypress/e2e/browse/browseV2.js
index a61b9030b13c6f..f45edc5fa04819 100644
--- a/smoke-test/tests/cypress/cypress/e2e/browse/browseV2.js
+++ b/smoke-test/tests/cypress/cypress/e2e/browse/browseV2.js
@@ -46,31 +46,31 @@ describe("search", () => {
cy.get("[data-testid=browse-v2")
.invoke("css", "width")
- .should("match", /^\d\d\dpx$/);
+ .should("match", /\d\d\dpx$/);
cy.get("[data-testid=browse-v2-toggle").click();
cy.get("[data-testid=browse-v2")
.invoke("css", "width")
- .should("match", /^\dpx$/);
+ .should("match", /\dpx$/);
cy.reload();
cy.get("[data-testid=browse-v2")
.invoke("css", "width")
- .should("match", /^\dpx$/);
+ .should("match", /\dpx$/);
cy.get("[data-testid=browse-v2-toggle").click();
cy.get("[data-testid=browse-v2")
.invoke("css", "width")
- .should("match", /^\d\d\dpx$/);
+ .should("match", /\d\d\dpx$/);
cy.reload();
cy.get("[data-testid=browse-v2")
.invoke("css", "width")
- .should("match", /^\d\d\dpx$/);
+ .should("match", /\d\d\dpx$/);
});
it("should take you to the old browse experience when clicking entity type on home page with the browse flag off", () => {