Skip to content

Commit

Permalink
Merge branch 'master' into feat/dremio-connector-source
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny authored Oct 21, 2024
2 parents b9d7b8a + 554288b commit 230fbd7
Show file tree
Hide file tree
Showing 60 changed files with 1,592 additions and 503 deletions.
11 changes: 10 additions & 1 deletion datahub-web-react/src/app/ingest/source/builder/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import csvLogo from '../../../../images/csv-logo.png';
import qlikLogo from '../../../../images/qliklogo.png';
import sigmaLogo from '../../../../images/sigmalogo.png';
import sacLogo from '../../../../images/saclogo.svg';
import datahubLogo from '../../../../images/datahublogo.png';

export const ATHENA = 'athena';
export const ATHENA_URN = `urn:li:dataPlatform:${ATHENA}`;
Expand Down Expand Up @@ -128,6 +129,11 @@ export const SIGMA = 'sigma';
export const SIGMA_URN = `urn:li:dataPlatform:${SIGMA}`;
export const SAC = 'sac';
export const SAC_URN = `urn:li:dataPlatform:${SAC}`;
export const DATAHUB = 'datahub';
export const DATAHUB_GC = 'datahub-gc';
export const DATAHUB_LINEAGE_FILE = 'datahub-lineage-file';
export const DATAHUB_BUSINESS_GLOSSARY = 'datahub-business-glossary';
export const DATAHUB_URN = `urn:li:dataPlatform:${DATAHUB}`;

export const PLATFORM_URN_TO_LOGO = {
[ATHENA_URN]: athenaLogo,
Expand Down Expand Up @@ -169,6 +175,7 @@ export const PLATFORM_URN_TO_LOGO = {
[QLIK_SENSE_URN]: qlikLogo,
[SIGMA_URN]: sigmaLogo,
[SAC_URN]: sacLogo,
[DATAHUB_URN]: datahubLogo,
};

export const SOURCE_TO_PLATFORM_URN = {
Expand All @@ -182,5 +189,7 @@ export const SOURCE_TO_PLATFORM_URN = {
[SNOWFLAKE_USAGE]: SNOWFLAKE_URN,
[STARBURST_TRINO_USAGE]: TRINO_URN,
[DBT_CLOUD]: DBT_URN,
[VERTICA]: VERTICA_URN,
[DATAHUB_GC]: DATAHUB_URN,
[DATAHUB_LINEAGE_FILE]: DATAHUB_URN,
[DATAHUB_BUSINESS_GLOSSARY]: DATAHUB_URN,
};
1 change: 0 additions & 1 deletion docs/businessattributes.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ Taking the example of "United States- Social Security Number", if an application
What you need to create/update and associate business attributes to dataset schema field

* **Manage Business Attributes** platform privilege to create/update/delete business attributes.
* **Edit Dataset Column Business Attribute** metadata privilege to associate business attributes to dataset schema field.

## Using Business Attributes
As of now Business Attributes can only be created through UI
Expand Down
6 changes: 5 additions & 1 deletion docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default
```

| Name | Default value | Description |
|----------------------------|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| -------------------------- | -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| enabled | true | If the plugin should be enabled. |
| conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. |
| cluster | prod | name of the airflow cluster |
Expand Down Expand Up @@ -191,6 +191,10 @@ These operators are supported by OpenLineage, but we haven't tested them yet:
There's also a few operators (e.g. BashOperator, PythonOperator) that have custom extractors, but those extractors don't generate lineage.
-->

Known limitations:

- We do not fully support operators that run multiple SQL statements at once. In these cases, we'll only capture lineage from the first SQL statement.

## Manual Lineage Annotation

### Using `inlets` and `outlets`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
TableSchemaMetadataValue,
)
from dagster._core.execution.stats import RunStepKeyStatsSnapshot, StepEventStatus
from dagster._core.snap import JobSnapshot

try:
from dagster._core.snap import JobSnapshot # type: ignore[attr-defined]
except ImportError:
# Import changed since Dagster 1.8.12 to this -> https://github.com/dagster-io/dagster/commit/29a37d1f0260cfd112849633d1096ffc916d6c95
from dagster._core.snap import JobSnap as JobSnapshot

from dagster._core.snap.node import OpDefSnap
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatsSnapshot
from datahub.api.entities.datajob import DataFlow, DataJob
Expand Down
28 changes: 15 additions & 13 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ transformers:
```
## Simple Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|--------------------|----------|--------------|-------------|---------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove ownership from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
|--------------------|----------|--------------|-------------|------------------------------------------------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove ownership from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `on_conflict` | | enum | `DO_UPDATE` | Whether to make changes if domains already exist. If set to DO_NOTHING, `semantics` setting is irrelevant. |

For transformer behaviour on `replace_existing` and `semantics`, please refer section [Relationship Between replace_existing And semantics](#relationship-between-replace_existing-and-semantics).

Expand Down Expand Up @@ -191,13 +192,14 @@ transformers:

## Pattern Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|--------------------|----------|----------------------|-------------|-----------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. |
| Field | Required | Type | Default | Description |
|--------------------|----------|----------------------|-------------|------------------------------------------------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. |
| `on_conflict` | | enum | `DO_UPDATE` | Whether to make changes if domains already exist. If set to DO_NOTHING, `semantics` setting is irrelevant. |

let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners.

Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:main?expand=1
"acryl-sqlglot[rs]==25.20.2.dev6",
"acryl-sqlglot[rs]==25.25.2.dev9",
}

classification_lib = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Dict, Iterable, List, Optional, Union
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Type, Union, cast

from avrogen.dict_wrapper import DictWrapper
from pydantic import BaseModel
Expand All @@ -14,7 +14,14 @@
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import DatahubKey
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.urns import PlatformResourceUrn
from datahub.metadata.urns import DataPlatformUrn, PlatformResourceUrn, Urn
from datahub.utilities.openapi_utils import OpenAPIGraphClient
from datahub.utilities.search_utils import (
ElasticDocumentQuery,
ElasticsearchQueryBuilder,
LogicalOperator,
SearchField,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,54 +76,75 @@ def to_resource_info(self) -> models.PlatformResourceInfoClass:
)


class OpenAPIGraphClient:
class DataPlatformInstanceUrn:
"""
A simple implementation of a URN class for DataPlatformInstance.
Since this is not present in the URN registry, we need to implement it here.
"""

ENTITY_KEY_ASPECT_MAP = {
aspect_type.ASPECT_INFO.get("keyForEntity"): name
for name, aspect_type in models.ASPECT_NAME_MAP.items()
if aspect_type.ASPECT_INFO.get("keyForEntity")
}
@staticmethod
def create_from_id(platform_instance_urn: str) -> Urn:
if platform_instance_urn.startswith("urn:li:platformInstance:"):
string_urn = platform_instance_urn
else:
string_urn = f"urn:li:platformInstance:{platform_instance_urn}"
return Urn.from_string(string_urn)

def __init__(self, graph: DataHubGraph):
self.graph = graph
self.openapi_base = graph._gms_server.rstrip("/") + "/openapi/v3"

def scroll_urns_by_filter(
self,
entity_type: str,
extra_or_filters: List[Dict[str, str]],
) -> Iterable[str]:
"""
Scroll through all urns that match the given filters
"""
class UrnSearchField(SearchField):
"""
A search field that supports URN values.
TODO: Move this to search_utils after we make this more generic.
"""

key_aspect = self.ENTITY_KEY_ASPECT_MAP.get(entity_type)
assert key_aspect, f"No key aspect found for entity type {entity_type}"
def __init__(self, field_name: str, urn_value_extractor: Callable[[str], Urn]):
self.urn_value_extractor = urn_value_extractor
super().__init__(field_name)

count = 1000
query = " OR ".join(
[f"{filter['field']}:{filter['value']}" for filter in extra_or_filters]
def get_search_value(self, value: str) -> str:
return str(self.urn_value_extractor(value))


class PlatformResourceSearchField(SearchField):
def __init__(self, field_name: str):
super().__init__(field_name)

@classmethod
def from_search_field(
cls, search_field: SearchField
) -> "PlatformResourceSearchField":
# pretends to be a class method, but just returns the input
return search_field # type: ignore


class PlatformResourceSearchFields:
PRIMARY_KEY = PlatformResourceSearchField("primaryKey")
RESOURCE_TYPE = PlatformResourceSearchField("resourceType")
SECONDARY_KEYS = PlatformResourceSearchField("secondaryKeys")
PLATFORM = PlatformResourceSearchField.from_search_field(
UrnSearchField(
field_name="platform.keyword",
urn_value_extractor=DataPlatformUrn.create_from_id,
)
scroll_id = None
while True:
response = self.graph._get_generic(
self.openapi_base + f"/entity/{entity_type.lower()}",
params={
"systemMetadata": "false",
"includeSoftDelete": "false",
"skipCache": "false",
"aspects": [key_aspect],
"scrollId": scroll_id,
"count": count,
"query": query,
},
)
entities = response.get("entities", [])
scroll_id = response.get("scrollId")
for entity in entities:
yield entity["urn"]
if not scroll_id:
break
)
PLATFORM_INSTANCE = PlatformResourceSearchField.from_search_field(
UrnSearchField(
field_name="platformInstance.keyword",
urn_value_extractor=DataPlatformInstanceUrn.create_from_id,
)
)


class ElasticPlatformResourceQuery(ElasticDocumentQuery[PlatformResourceSearchField]):
def __init__(self):
super().__init__()

@classmethod
def create_from(
cls: Type["ElasticPlatformResourceQuery"],
*args: Tuple[Union[str, PlatformResourceSearchField], str],
) -> "ElasticPlatformResourceQuery":
return cast(ElasticPlatformResourceQuery, super().create_from(*args))


class PlatformResource(BaseModel):
Expand All @@ -130,6 +158,12 @@ def remove(
cls,
key: PlatformResourceKey,
) -> "PlatformResource":
"""
Creates a PlatformResource object with the removed status set to True.
Removed PlatformResource objects are used to soft-delete resources from
the graph.
To hard-delete a resource, use the delete method.
"""
return cls(
id=key.id,
removed=True,
Expand Down Expand Up @@ -223,32 +257,60 @@ def from_datahub(

@staticmethod
def search_by_key(
graph_client: DataHubGraph, key: str, primary: bool = True
graph_client: DataHubGraph,
key: str,
primary: bool = True,
is_exact: bool = True,
) -> Iterable["PlatformResource"]:
extra_or_filters = []
extra_or_filters.append(
{
"field": "primaryKey",
"condition": "EQUAL",
"value": key,
}
"""
Searches for PlatformResource entities by primary or secondary key.
:param graph_client: DataHubGraph client
:param key: The key to search for
:param primary: Whether to search for primary only or expand the search
to secondary keys (default: True)
:param is_exact: Whether to search for an exact match (default: True)
:return: An iterable of PlatformResource objects
"""

elastic_platform_resource_group = (
ElasticPlatformResourceQuery.create_from()
.group(LogicalOperator.OR)
.add_field_match(
PlatformResourceSearchFields.PRIMARY_KEY, key, is_exact=is_exact
)
)
if not primary: # we expand the search to secondary keys
extra_or_filters.append(
{
"field": "secondaryKeys",
"condition": "EQUAL",
"value": key,
}
elastic_platform_resource_group.add_field_match(
PlatformResourceSearchFields.SECONDARY_KEYS, key, is_exact=is_exact
)
query = elastic_platform_resource_group.end()
openapi_client = OpenAPIGraphClient(graph_client)
for urn in openapi_client.scroll_urns_by_filter(
entity_type="platformResource",
extra_or_filters=extra_or_filters,
query=query,
):
platform_resource = PlatformResource.from_datahub(graph_client, urn)
if platform_resource:
yield platform_resource

def delete(self, graph_client: DataHubGraph, hard: bool = True) -> None:
graph_client.delete_entity(str(PlatformResourceUrn(self.id)), hard=hard)

@staticmethod
def search_by_filters(
graph_client: DataHubGraph,
query: Union[
ElasticPlatformResourceQuery,
ElasticDocumentQuery,
ElasticsearchQueryBuilder,
],
) -> Iterable["PlatformResource"]:
openapi_client = OpenAPIGraphClient(graph_client)
for urn in openapi_client.scroll_urns_by_filter(
entity_type="platformResource",
query=query,
):
platform_resource = PlatformResource.from_datahub(graph_client, urn)
if platform_resource:
yield platform_resource
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def get_tags(self, entity_urn: str) -> Optional[GlobalTagsClass]:
def get_glossary_terms(self, entity_urn: str) -> Optional[GlossaryTermsClass]:
return self.get_aspect(entity_urn=entity_urn, aspect_type=GlossaryTermsClass)

@functools.lru_cache(maxsize=1)
def get_domain(self, entity_urn: str) -> Optional[DomainsClass]:
return self.get_aspect(entity_urn=entity_urn, aspect_type=DomainsClass)

Expand Down
Loading

0 comments on commit 230fbd7

Please sign in to comment.