Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Sep 11, 2024
1 parent 5e17db2 commit 6fc4fb6
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import json
import logging
import time
from typing import Any, Iterable, List, Optional
from typing import Any, Iterable, Optional

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand All @@ -31,6 +31,7 @@
MetadataAttributionClass,
MetadataChangeLogClass,
)
from datahub.metadata.urns import DatasetUrn
from datahub.utilities.urns.urn import Urn
from pydantic import BaseModel, Field, validator

Expand Down Expand Up @@ -314,9 +315,11 @@ def modify_docs_on_columns(
# No need to propagate to self
return None

if not dataset_urn.startswith("urn:li:dataset"):
try:
DatasetUrn.from_string(dataset_urn)
except Exception as e:
logger.error(
f"Invalid dataset urn {dataset_urn}. Must start with urn:li:dataset"
f"Invalid dataset urn {dataset_urn}. {e}. Skipping documentation propagation."
)
return None

Expand Down Expand Up @@ -441,22 +444,6 @@ def _is_settings_change(self, event: Optional[EventEnvelope]) -> bool:
return True
return False

def get_upstreams(self, graph: AcrylDataHubGraph, entity_urn: str) -> List[str]:
"""
Fetch the upstreams for an dataset or schema field.
Note that this DOES NOT support DataJob upstreams, or any intermediate nodes.
"""
import urllib.parse

url_frag = f"/relationships?direction=OUTGOING&types=List(DownstreamOf)&urn={urllib.parse.quote(entity_urn)}"
url = f"{graph.graph._gms_server}{url_frag}"
response = graph.graph._get_generic(url)
if response["count"] > 0:
relnships = response["relationships"]
entities = [x["entity"] for x in relnships]
return entities
return []

def _only_one_upstream_field(
self,
graph: AcrylDataHubGraph,
Expand All @@ -469,11 +456,7 @@ def _only_one_upstream_field(
TODO: We should cache upstreams because we make this fetch upstreams call FOR EVERY downstream that must be propagated to.
"""
upstreams = (
graph.get_upstreams(entity_urn=downstream_field)
if hasattr(graph, "get_upstreams")
else self.get_upstreams(graph, downstream_field)
)
upstreams = graph.get_upstreams(entity_urn=downstream_field)
# Use a set here in case there are duplicated upstream edges
upstream_fields = list(
{x for x in upstreams if x.startswith("urn:li:schemaField")}
Expand Down
1 change: 1 addition & 0 deletions smoke-test/requests_wrapper/utils_requests_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Copied largely without modification from datahub-project/datahub/smoke-test/requests_wrapper
import requests
from tests.consistency_utils import wait_for_writes_to_sync

Expand Down
7 changes: 1 addition & 6 deletions smoke-test/tests/actions/doc_propagation/test_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import datahub.metadata.schema_classes as models
import pytest
from datahub.api.entities.dataset.dataset import Dataset
from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
Expand Down Expand Up @@ -81,8 +82,6 @@ def get_urns_from_mcp(mcp: MetadataChangeProposalWrapper) -> List[str]:
urns.append(field_urn)
return urns

from datahub.api.entities.dataset.dataset import Dataset

mcps = []
all_urns = []
for dataset in Dataset.from_yaml(file=f"{test_resources_dir}/datasets.yaml"):
Expand Down Expand Up @@ -157,8 +156,6 @@ def add_col_col_lineage(graph):


def add_field_description(f1, description, graph):
import datahub.metadata.schema_classes as models

urn = Urn.from_string(f1)
dataset_urn = urn.entity_ids[0]
schema_metadata = graph.get_aspect(dataset_urn, models.SchemaMetadataClass)
Expand All @@ -171,8 +168,6 @@ def add_field_description(f1, description, graph):


def check_propagated_description(downstream_field, description, graph):
import datahub.metadata.schema_classes as models

documentation = graph.get_aspect(downstream_field, models.DocumentationClass)
assert any(doc.documentation == description for doc in documentation.documentations)

Expand Down
2 changes: 2 additions & 0 deletions smoke-test/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copied largely without modification from datahub-project/datahub/smoke-test/tests

import os

import pytest
Expand Down
1 change: 1 addition & 0 deletions smoke-test/tests/consistency_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Copied largely without modification from datahub-project/datahub/smoke-test/tests
import logging
import os
import subprocess
Expand Down
1 change: 1 addition & 0 deletions smoke-test/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Copied largely without modification from datahub-project/datahub/smoke-test/tests
import functools
import json
import logging
Expand Down

0 comments on commit 6fc4fb6

Please sign in to comment.