Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Oct 12, 2023
2 parents e7efcc2 + 8813ae2 commit b4f835e
Show file tree
Hide file tree
Showing 65 changed files with 845 additions and 956 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,8 @@ private void configureCorpUserResolvers(final RuntimeWiring.Builder builder) {
*/
private void configureCorpGroupResolvers(final RuntimeWiring.Builder builder) {
builder.type("CorpGroup", typeWiring -> typeWiring
.dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient)));
.dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient))
.dataFetcher("exists", new EntityExistsResolver(entityService)));
builder.type("CorpGroupInfo", typeWiring -> typeWiring
.dataFetcher("admins",
new LoadableTypeBatchResolver<>(corpUserType,
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -3788,6 +3788,11 @@ type CorpGroup implements Entity {
Additional read only info about the group
"""
info: CorpGroupInfo @deprecated

"""
Whether or not this entity exists on DataHub
"""
exists: Boolean
}

"""
Expand Down
4 changes: 4 additions & 0 deletions datahub-web-react/src/app/entity/group/GroupProfile.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { RoutedTabs } from '../../shared/RoutedTabs';
import GroupInfoSidebar from './GroupInfoSideBar';
import { GroupAssets } from './GroupAssets';
import { ErrorSection } from '../../shared/error/ErrorSection';
import NonExistentEntityPage from '../shared/entity/NonExistentEntityPage';

const messageStyle = { marginTop: '10%' };

Expand Down Expand Up @@ -110,6 +111,9 @@ export default function GroupProfile() {
urn,
};

if (data?.corpGroup?.exists === false) {
return <NonExistentEntityPage />;
}
return (
<>
{error && <ErrorSection />}
Expand Down
1 change: 1 addition & 0 deletions datahub-web-react/src/graphql/group.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ query getGroup($urn: String!, $membersCount: Int!) {
urn
type
name
exists
origin {
type
externalType
Expand Down
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
### Breaking Changes

- #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now.
- #8942 - Removed `urn:li:corpuser:datahub` owner for the `Measure`, `Dimension` and `Temporal` tags emitted
by Looker and LookML source connectors.
- #8853 - The Airflow plugin no longer supports Airflow 2.0.x or Python 3.7. See the docs for more details.
- #8853 - Introduced the Airflow plugin v2. If you're using Airflow 2.3+, the v2 plugin will be enabled by default, and so you'll need to switch your requirements to include `pip install 'acryl-datahub-airflow-plugin[plugin-v2]'`. To continue using the v1 plugin, set the `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN` environment variable to `true`.
- #8943 The Unity Catalog ingestion source has a new option `include_metastore`, which will cause all urns to be changed when disabled.
Expand Down
7 changes: 7 additions & 0 deletions metadata-ingestion/src/datahub/configuration/source_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ class DatasetSourceConfigMixin(PlatformInstanceConfigMixin, EnvConfigMixin):
"""


class LowerCaseDatasetUrnConfigMixin(ConfigModel):
convert_urns_to_lowercase: bool = Field(
default=False,
description="Whether to convert dataset urns to lowercase.",
)


class DatasetLineageProviderConfigBase(EnvConfigMixin):
"""
Any non-Dataset source that produces lineage to Datasets should inherit this class.
Expand Down
24 changes: 24 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_lowercase_urns,
auto_materialize_referenced_tags,
auto_status_aspect,
auto_workunit_reporter,
Expand Down Expand Up @@ -192,7 +193,30 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run
)

auto_lowercase_dataset_urns: Optional[MetadataWorkUnitProcessor] = None
if (
self.ctx.pipeline_config
and self.ctx.pipeline_config.source
and self.ctx.pipeline_config.source.config
and (
(
hasattr(
self.ctx.pipeline_config.source.config,
"convert_urns_to_lowercase",
)
and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase
)
or (
hasattr(self.ctx.pipeline_config.source.config, "get")
and self.ctx.pipeline_config.source.config.get(
"convert_urns_to_lowercase"
)
)
)
):
auto_lowercase_dataset_urns = auto_lowercase_urns
return [
auto_lowercase_dataset_urns,
auto_status_aspect,
auto_materialize_referenced_tags,
browse_path_processor,
Expand Down
20 changes: 18 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.tag_urn import TagUrn
from datahub.utilities.urns.urn import guess_entity_type
from datahub.utilities.urns.urn_iter import list_urns
from datahub.utilities.urns.urn_iter import list_urns, lowercase_dataset_urns

if TYPE_CHECKING:
from datahub.ingestion.api.source import SourceReport
Expand Down Expand Up @@ -70,7 +70,6 @@ def auto_status_aspect(
for wu in stream:
urn = wu.get_urn()
all_urns.add(urn)

if not wu.is_primary_source:
# If this is a non-primary source, we pretend like we've seen the status
# aspect so that we don't try to emit a removal for it.
Expand Down Expand Up @@ -173,6 +172,23 @@ def auto_materialize_referenced_tags(
).as_workunit()


def auto_lowercase_urns(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""Lowercase all dataset urns"""

for wu in stream:
try:
old_urn = wu.get_urn()
lowercase_dataset_urns(wu.metadata)
wu.id = wu.id.replace(old_urn, wu.get_urn())

yield wu
except Exception as e:
logger.warning(f"Failed to lowercase urns for {wu}: {e}", exc_info=True)
yield wu


def auto_browse_path_v2(
stream: Iterable[MetadataWorkUnit],
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
make_dataplatform_instance_urn,
make_dataset_urn,
make_tag_urn,
set_dataset_urn_to_lower,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey
Expand Down Expand Up @@ -218,8 +217,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
if self.config.enable_legacy_sharded_table_support:
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""

set_dataset_urn_to_lower(self.config.convert_urns_to_lowercase)

self.bigquery_data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf, self.config.get_bigquery_client()
)
Expand Down Expand Up @@ -1057,6 +1054,7 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
):
field.description = col.comment
schema_fields[idx] = field
break
else:
tags = []
if col.is_partition_column:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@

logger: logging.Logger = logging.getLogger(__name__)

_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX = "((.+)[_$])?(\\d{8})$"
# Regexp for sharded tables.
# A sharded table is a table that has a suffix of the form _yyyymmdd or yyyymmdd, where yyyymmdd is a date.
# The regexp checks for valid dates in the suffix (e.g. 20200101, 20200229, 20201231) and if the date is not valid
# then it is not a sharded table.
_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX = (
"((.+\\D)[_$]?)?(\\d\\d\\d\\d(?:0[1-9]|1[0-2])(?:0[1-9]|[12][0-9]|3[01]))$"
)


@dataclass(frozen=True, order=True)
Expand All @@ -40,7 +46,7 @@ class BigqueryTableIdentifier:
_BQ_SHARDED_TABLE_SUFFIX: str = "_yyyymmdd"

@staticmethod
def get_table_and_shard(table_name: str) -> Tuple[str, Optional[str]]:
def get_table_and_shard(table_name: str) -> Tuple[Optional[str], Optional[str]]:
"""
Args:
table_name:
Expand All @@ -53,16 +59,25 @@ def get_table_and_shard(table_name: str) -> Tuple[str, Optional[str]]:
In case of non-sharded tables, returns (<table-id>, None)
In case of sharded tables, returns (<table-prefix>, shard)
"""
new_table_name = table_name
match = re.match(
BigqueryTableIdentifier._BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX,
table_name,
re.IGNORECASE,
)
if match:
table_name = match.group(2)
shard = match.group(3)
return table_name, shard
return table_name, None
shard: str = match[3]
if shard:
if table_name.endswith(shard):
new_table_name = table_name[: -len(shard)]

new_table_name = (
new_table_name.rstrip("_") if new_table_name else new_table_name
)
if new_table_name.endswith("."):
new_table_name = table_name
return (new_table_name, shard) if new_table_name else (None, shard)
return new_table_name, None

@classmethod
def from_string_name(cls, table: str) -> "BigqueryTableIdentifier":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage",
)

convert_urns_to_lowercase: bool = Field(
default=False,
description="Convert urns to lowercase.",
)

enable_legacy_sharded_table_support: bool = Field(
default=True,
description="Use the legacy sharded table urn suffix added.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class BigqueryQuery:
p.max_partition_id,
p.active_billable_bytes,
p.long_term_billable_bytes,
REGEXP_EXTRACT(t.table_name, r".*_(\\d+)$") as table_suffix,
REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base
REGEXP_EXTRACT(t.table_name, r"(?:(?:.+\\D)[_$]?)(\\d\\d\\d\\d(?:0[1-9]|1[012])(?:0[1-9]|[12][0-9]|3[01]))$") as table_suffix,
REGEXP_REPLACE(t.table_name, r"(?:[_$]?)(\\d\\d\\d\\d(?:0[1-9]|1[012])(?:0[1-9]|[12][0-9]|3[01]))$", "") as table_base
FROM
`{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t
Expand Down Expand Up @@ -92,8 +92,8 @@ class BigqueryQuery:
tos.OPTION_VALUE as comment,
t.is_insertable_into,
t.ddl,
REGEXP_EXTRACT(t.table_name, r".*_(\\d+)$") as table_suffix,
REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base
REGEXP_EXTRACT(t.table_name, r"(?:(?:.+\\D)[_$]?)(\\d\\d\\d\\d(?:0[1-9]|1[012])(?:0[1-9]|[12][0-9]|3[01]))$") as table_suffix,
REGEXP_REPLACE(t.table_name, r"(?:[_$]?)(\\d\\d\\d\\d(?:0[1-9]|1[012])(?:0[1-9]|[12][0-9]|3[01]))$", "") as table_base
FROM
`{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@
logger: logging.Logger = logging.getLogger(__name__)

SUPPORTED_FILE_TYPES: List[str] = ["csv", "tsv", "json", "parquet", "avro"]
SUPPORTED_COMPRESSIONS: List[str] = ["gz", "bz2"]

# These come from the smart_open library.
SUPPORTED_COMPRESSIONS: List[str] = [
"gz",
"bz2",
# We have a monkeypatch on smart_open that aliases .gzip to .gz.
"gzip",
]


class PathSpec(ConfigModel):
Expand Down
11 changes: 9 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import (
make_data_platform_urn,
Expand Down Expand Up @@ -76,7 +79,11 @@ class KafkaTopicConfigKeys(str, Enum):
UNCLEAN_LEADER_ELECTION_CONFIG = "unclean.leader.election.enable"


class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
class KafkaSourceConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()

topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@
EnumTypeClass,
FineGrainedLineageClass,
GlobalTagsClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
SchemaMetadataClass,
StatusClass,
SubTypesClass,
Expand Down Expand Up @@ -453,17 +450,9 @@ def _get_schema(
@staticmethod
def _get_tag_mce_for_urn(tag_urn: str) -> MetadataChangeEvent:
assert tag_urn in LookerUtil.tag_definitions
ownership = OwnershipClass(
owners=[
OwnerClass(
owner="urn:li:corpuser:datahub",
type=OwnershipTypeClass.DATAOWNER,
)
]
)
return MetadataChangeEvent(
proposedSnapshot=TagSnapshotClass(
urn=tag_urn, aspects=[ownership, LookerUtil.tag_definitions[tag_urn]]
urn=tag_urn, aspects=[LookerUtil.tag_definitions[tag_urn]]
)
)

Expand Down
8 changes: 7 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Tuple

import smart_open.compression as so_compression
from more_itertools import peekable
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
Expand Down Expand Up @@ -120,6 +121,9 @@
}
PAGE_SIZE = 1000

# Hack to support the .gzip extension with smart_open.
so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"])


def get_column_type(
report: SourceReport, dataset_name: str, column_type: str
Expand Down Expand Up @@ -407,7 +411,9 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List:
table_data.full_path, "rb", transport_params={"client": s3_client}
)
else:
file = open(table_data.full_path, "rb")
# We still use smart_open here to take advantage of the compression
# capabilities of smart_open.
file = smart_open(table_data.full_path, "rb")

fields = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from pydantic import Field

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand All @@ -21,7 +24,11 @@
logger: logging.Logger = logging.getLogger(__name__)


class SQLCommonConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
class SQLCommonConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
options: dict = pydantic.Field(
default_factory=dict,
description="Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.",
Expand Down
Loading

0 comments on commit b4f835e

Please sign in to comment.