From 6fc4fb69521229758739bd6cea77833d81351909 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Tue, 10 Sep 2024 22:53:42 -0700 Subject: [PATCH] Address review comments --- .../propagation/docs/propagation_action.py | 31 +++++-------------- .../utils_requests_wrapper.py | 1 + .../doc_propagation/test_propagation.py | 7 +---- smoke-test/tests/conftest.py | 2 ++ smoke-test/tests/consistency_utils.py | 1 + smoke-test/tests/utils.py | 1 + 6 files changed, 13 insertions(+), 30 deletions(-) diff --git a/datahub-actions/src/datahub_actions/plugin/action/propagation/docs/propagation_action.py b/datahub-actions/src/datahub_actions/plugin/action/propagation/docs/propagation_action.py index 2774b900..109597a2 100644 --- a/datahub-actions/src/datahub_actions/plugin/action/propagation/docs/propagation_action.py +++ b/datahub-actions/src/datahub_actions/plugin/action/propagation/docs/propagation_action.py @@ -15,7 +15,7 @@ import json import logging import time -from typing import Any, Iterable, List, Optional +from typing import Any, Iterable, Optional from datahub.configuration.common import ConfigModel from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -31,6 +31,7 @@ MetadataAttributionClass, MetadataChangeLogClass, ) +from datahub.metadata.urns import DatasetUrn from datahub.utilities.urns.urn import Urn from pydantic import BaseModel, Field, validator @@ -314,9 +315,11 @@ def modify_docs_on_columns( # No need to propagate to self return None - if not dataset_urn.startswith("urn:li:dataset"): + try: + DatasetUrn.from_string(dataset_urn) + except Exception as e: logger.error( - f"Invalid dataset urn {dataset_urn}. Must start with urn:li:dataset" + f"Invalid dataset urn {dataset_urn}. {e}. Skipping documentation propagation." ) return None @@ -441,22 +444,6 @@ def _is_settings_change(self, event: Optional[EventEnvelope]) -> bool: return True return False - def get_upstreams(self, graph: AcrylDataHubGraph, entity_urn: str) -> List[str]: - """ - Fetch the upstreams for an dataset or schema field. - Note that this DOES NOT support DataJob upstreams, or any intermediate nodes. - """ - import urllib.parse - - url_frag = f"/relationships?direction=OUTGOING&types=List(DownstreamOf)&urn={urllib.parse.quote(entity_urn)}" - url = f"{graph.graph._gms_server}{url_frag}" - response = graph.graph._get_generic(url) - if response["count"] > 0: - relnships = response["relationships"] - entities = [x["entity"] for x in relnships] - return entities - return [] - def _only_one_upstream_field( self, graph: AcrylDataHubGraph, @@ -469,11 +456,7 @@ def _only_one_upstream_field( TODO: We should cache upstreams because we make this fetch upstreams call FOR EVERY downstream that must be propagated to. """ - upstreams = ( - graph.get_upstreams(entity_urn=downstream_field) - if hasattr(graph, "get_upstreams") - else self.get_upstreams(graph, downstream_field) - ) + upstreams = graph.get_upstreams(entity_urn=downstream_field) # Use a set here in case there are duplicated upstream edges upstream_fields = list( {x for x in upstreams if x.startswith("urn:li:schemaField")} diff --git a/smoke-test/requests_wrapper/utils_requests_wrapper.py b/smoke-test/requests_wrapper/utils_requests_wrapper.py index d6bf0d4a..36b997d1 100644 --- a/smoke-test/requests_wrapper/utils_requests_wrapper.py +++ b/smoke-test/requests_wrapper/utils_requests_wrapper.py @@ -1,3 +1,4 @@ +# Copied largely without modification from datahub-project/datahub/smoke-test/requests_wrapper import requests from tests.consistency_utils import wait_for_writes_to_sync diff --git a/smoke-test/tests/actions/doc_propagation/test_propagation.py b/smoke-test/tests/actions/doc_propagation/test_propagation.py index eebeae5f..e9b70139 100644 --- a/smoke-test/tests/actions/doc_propagation/test_propagation.py +++ b/smoke-test/tests/actions/doc_propagation/test_propagation.py @@ -7,6 +7,7 @@ import datahub.metadata.schema_classes as models import pytest +from datahub.api.entities.dataset.dataset import Dataset from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext, RecordEnvelope @@ -81,8 +82,6 @@ def get_urns_from_mcp(mcp: MetadataChangeProposalWrapper) -> List[str]: urns.append(field_urn) return urns - from datahub.api.entities.dataset.dataset import Dataset - mcps = [] all_urns = [] for dataset in Dataset.from_yaml(file=f"{test_resources_dir}/datasets.yaml"): @@ -157,8 +156,6 @@ def add_col_col_lineage(graph): def add_field_description(f1, description, graph): - import datahub.metadata.schema_classes as models - urn = Urn.from_string(f1) dataset_urn = urn.entity_ids[0] schema_metadata = graph.get_aspect(dataset_urn, models.SchemaMetadataClass) @@ -171,8 +168,6 @@ def add_field_description(f1, description, graph): def check_propagated_description(downstream_field, description, graph): - import datahub.metadata.schema_classes as models - documentation = graph.get_aspect(downstream_field, models.DocumentationClass) assert any(doc.documentation == description for doc in documentation.documentations) diff --git a/smoke-test/tests/conftest.py b/smoke-test/tests/conftest.py index 6794b2b9..f8058976 100644 --- a/smoke-test/tests/conftest.py +++ b/smoke-test/tests/conftest.py @@ -1,3 +1,5 @@ +# Copied largely without modification from datahub-project/datahub/smoke-test/tests + import os import pytest diff --git a/smoke-test/tests/consistency_utils.py b/smoke-test/tests/consistency_utils.py index 4335e2a8..d742a60d 100644 --- a/smoke-test/tests/consistency_utils.py +++ b/smoke-test/tests/consistency_utils.py @@ -1,3 +1,4 @@ +# Copied largely without modification from datahub-project/datahub/smoke-test/tests import logging import os import subprocess diff --git a/smoke-test/tests/utils.py b/smoke-test/tests/utils.py index 7564f1a0..bde03099 100644 --- a/smoke-test/tests/utils.py +++ b/smoke-test/tests/utils.py @@ -1,3 +1,4 @@ +# Copied largely without modification from datahub-project/datahub/smoke-test/tests import functools import json import logging