diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_entities.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_entities.py index 3cb48b578324c..52fcbc3715f9f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_entities.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_entities.py @@ -4,7 +4,7 @@ import re import uuid from collections import deque -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any, Deque, Dict, List, Optional @@ -72,6 +72,38 @@ "DATA_MANIPULATION": ["INSERT INTO", "MERGE INTO", "CREATE TABLE"], } +@dataclass +class DremioContainerResponse: + container_type: str + name: str + id: str + path: Optional[List[str]] = None + source_type: Optional[str] = None + root_path: Optional[str] = None + database_name: Optional[str] = None + +@dataclass +class DremioDatasetResponse: + resource_id: str + table_name: str + table_schema: str + full_table_path: str + view_definition: Optional[str] = None + columns: List[Dict[str, Any]] = field(default_factory=list) + owner: Optional[str] = None + owner_type: Optional[str] = None + location_id: Optional[str] = None + format_type: Optional[str] = None + created: Optional[str] = None + +@dataclass +class DremioQueryResponse: + job_id: str + user_name: str + submitted_ts: str + query: str + queried_datasets: str + affected_datasets: Optional[str] = None class DremioDatasetType(Enum): VIEW = "View" diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py index 586c377f1d7bc..58b95c8913cfa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py @@ -7,7 +7,6 @@ from datetime import datetime from typing import Any, Dict, Iterable, List, Optional -import datahub.sql_parsing.sqlglot_utils from datahub.emitter.mce_builder import ( make_data_platform_urn, make_dataset_urn_with_platform_instance, @@ -127,6 +126,7 @@ class DremioSource(StatefulIngestionSourceBase): def __init__(self, config: DremioSourceConfig, ctx: Any): super().__init__(config, ctx) + self.default_db = "dremio" self.config = config self.report = DremioSourceReport() self.source_map: Dict[str, DremioSourceMapping] = defaultdict() @@ -373,7 +373,7 @@ def process_dataset( self.sql_parsing_aggregator.add_view_definition( view_urn=dataset_urn, view_definition=dataset_info.sql_definition, - default_db="dremio", + default_db=self.default_db, ) elif dataset_info.dataset_type == DremioDatasetType.TABLE: @@ -508,7 +508,8 @@ def process_query(self, query: DremioQuery) -> None: query_text=query.query, upstreams=upstream_urns, downstream=downstream_urn, - ) + ), + merge_lineage=True, ) # Add observed query @@ -517,7 +518,7 @@ def process_query(self, query: DremioQuery) -> None: query=query.query, timestamp=query.submitted_ts, user=CorpUserUrn(username=query.username), - default_db="dremio", + default_db=self.default_db, ) ) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py index 71245353101f6..b13a4b5524853 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py @@ -35,6 +35,9 @@ def _get_dialect_str(platform: str) -> str: # let the fuzzy resolution logic handle it. # MariaDB is a fork of MySQL, so we reuse the same dialect. return "mysql, normalization_strategy = lowercase" + # Dremio is based upon drill. Not 100% compatibility + elif platform == "dremio": + return "drill" else: return platform