Skip to content

Commit

Permalink
Move catalog artifact schema to dbt_common
Browse files Browse the repository at this point in the history
  • Loading branch information
aranke committed May 29, 2024
1 parent df4b4c0 commit ccfc4e5
Show file tree
Hide file tree
Showing 10 changed files with 587 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,4 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.idea/
Empty file.
1 change: 1 addition & 0 deletions dbt_common/artifacts/exceptions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from dbt_common.artifacts.exceptions.schemas import IncompatibleSchemaError
31 changes: 31 additions & 0 deletions dbt_common/artifacts/exceptions/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Optional

from dbt_common.exceptions import DbtRuntimeError


class IncompatibleSchemaError(DbtRuntimeError):
def __init__(self, expected: str, found: Optional[str] = None) -> None:
self.expected = expected
self.found = found
self.filename = "input file"

super().__init__(msg=self.get_message())

def add_filename(self, filename: str):
self.filename = filename
self.msg = self.get_message()

def get_message(self) -> str:
found_str = "nothing"
if self.found is not None:
found_str = f'"{self.found}"'

msg = (
f'Expected a schema version of "{self.expected}" in '
f"{self.filename}, but found {found_str}. Are you running with a "
f"different version of dbt?"
)
return msg

CODE = 10014
MESSAGE = "Incompatible Schema"
Empty file.
175 changes: 175 additions & 0 deletions dbt_common/artifacts/schemas/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import dataclasses
import functools
from datetime import datetime
from typing import Any, ClassVar, Dict, Optional, Type, TypeVar

from mashumaro.jsonschema import build_json_schema
from mashumaro.jsonschema.dialects import DRAFT_2020_12

from dbt_common.artifacts.exceptions import IncompatibleSchemaError
from dbt_common.version import __version__
from dbt_common.clients.system import read_json, write_json
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.functions import get_metadata_vars
from dbt_common.exceptions import DbtInternalError, DbtRuntimeError
from dbt_common.invocation import get_invocation_id

BASE_SCHEMAS_URL = "https://schemas.getdbt.com/"
SCHEMA_PATH = "dbt/{name}/v{version}.json"


@dataclasses.dataclass
class SchemaVersion:
name: str
version: int

@property
def path(self) -> str:
return SCHEMA_PATH.format(name=self.name, version=self.version)

def __str__(self) -> str:
return BASE_SCHEMAS_URL + self.path


class Writable:
def write(self, path: str):
write_json(path, self.to_dict(omit_none=False, context={"artifact": True})) # type: ignore


class Readable:
@classmethod
def read(cls, path: str):
try:
data = read_json(path)
except (EnvironmentError, ValueError) as exc:
raise DbtRuntimeError(
f'Could not read {cls.__name__} at "{path}" as JSON: {exc}'
) from exc

return cls.from_dict(data) # type: ignore


# This is used in the ManifestMetadata, RunResultsMetadata, RunOperationResultMetadata,
# FreshnessMetadata, and CatalogMetadata classes
@dataclasses.dataclass
class BaseArtifactMetadata(dbtClassMixin):
dbt_schema_version: str
dbt_version: str = __version__
generated_at: datetime = dataclasses.field(default_factory=datetime.utcnow)
invocation_id: Optional[str] = dataclasses.field(default_factory=get_invocation_id)
env: Dict[str, str] = dataclasses.field(default_factory=get_metadata_vars)

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if dct["generated_at"] and dct["generated_at"].endswith("+00:00"):
dct["generated_at"] = dct["generated_at"].replace("+00:00", "") + "Z"
return dct


# This is used as a class decorator to set the schema_version in the
# 'dbt_schema_version' class attribute. (It's copied into the metadata objects.)
# Name attributes of SchemaVersion in classes with the 'schema_version' decorator:
# manifest
# run-results
# run-operation-result
# sources
# catalog
# remote-compile-result
# remote-execution-result
# remote-run-result
def schema_version(name: str, version: int):
def inner(cls: Type[VersionedSchema]):
cls.dbt_schema_version = SchemaVersion(
name=name,
version=version,
)
return cls

return inner


# This is used in the ArtifactMixin and RemoteCompileResultMixin classes
@dataclasses.dataclass
class VersionedSchema(dbtClassMixin):
dbt_schema_version: ClassVar[SchemaVersion]

@classmethod
@functools.lru_cache
def json_schema(cls) -> Dict[str, Any]:
json_schema_obj = build_json_schema(cls, dialect=DRAFT_2020_12, with_dialect_uri=True)
json_schema = json_schema_obj.to_dict()
json_schema["$id"] = str(cls.dbt_schema_version)
return json_schema

@classmethod
def is_compatible_version(cls, schema_version):
compatible_versions = [str(cls.dbt_schema_version)]
if hasattr(cls, "compatible_previous_versions"):
for name, version in cls.compatible_previous_versions():
compatible_versions.append(str(SchemaVersion(name, version)))
return str(schema_version) in compatible_versions

@classmethod
def read_and_check_versions(cls, path: str):
try:
data = read_json(path)
except (EnvironmentError, ValueError) as exc:
raise DbtRuntimeError(
f'Could not read {cls.__name__} at "{path}" as JSON: {exc}'
) from exc

# Check metadata version. There is a class variable 'dbt_schema_version', but
# that doesn't show up in artifacts, where it only exists in the 'metadata'
# dictionary.
if hasattr(cls, "dbt_schema_version"):
if "metadata" in data and "dbt_schema_version" in data["metadata"]:
previous_schema_version = data["metadata"]["dbt_schema_version"]
# cls.dbt_schema_version is a SchemaVersion object
if not cls.is_compatible_version(previous_schema_version):
raise IncompatibleSchemaError(
expected=str(cls.dbt_schema_version),
found=previous_schema_version,
)

return cls.upgrade_schema_version(data)

@classmethod
def upgrade_schema_version(cls, data):
"""This will modify the data (dictionary) passed in to match the current
artifact schema code, if necessary. This is the default method, which
just returns the instantiated object via from_dict."""
return cls.from_dict(data)


T = TypeVar("T", bound="ArtifactMixin")


# metadata should really be a Generic[T_M] where T_M is a TypeVar bound to
# BaseArtifactMetadata. Unfortunately this isn't possible due to a mypy issue:
# https://github.com/python/mypy/issues/7520
# This is used in the WritableManifest, RunResultsArtifact, RunOperationResultsArtifact,
# and CatalogArtifact
@dataclasses.dataclass(init=False)
class ArtifactMixin(VersionedSchema, Writable, Readable):
metadata: BaseArtifactMetadata

@classmethod
def validate(cls, data):
super().validate(data)
if cls.dbt_schema_version is None:
raise DbtInternalError("Cannot call from_dict with no schema version!")


def get_artifact_schema_version(dct: dict) -> int:
schema_version = dct.get("metadata", {}).get("dbt_schema_version", None)
if not schema_version:
raise ValueError("Artifact is missing schema version")

# schema_version is in this format: https://schemas.getdbt.com/dbt/manifest/v10.json
# What the code below is doing:
# 1. Split on "/" – v10.json
# 2. Split on "." – v10
# 3. Skip first character – 10
# 4. Convert to int
# TODO: If this gets more complicated, turn into a regex
return int(schema_version.split("/")[-1].split(".")[0][1:])
2 changes: 2 additions & 0 deletions dbt_common/artifacts/schemas/catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# alias to latest
from .v1.catalog import * # noqa
Empty file.
108 changes: 108 additions & 0 deletions dbt_common/artifacts/schemas/catalog/v1/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, NamedTuple, Optional, Union

from dbt_common.artifacts.schemas.base import BaseArtifactMetadata, schema_version, ArtifactMixin
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.utils.formatting import lowercase

Primitive = Union[bool, str, float, None]
PrimitiveDict = Dict[str, Primitive]

CatalogKey = NamedTuple(
"CatalogKey", [("database", Optional[str]), ("schema", str), ("name", str)]
)


@dataclass
class StatsItem(dbtClassMixin):
id: str
label: str
value: Primitive
include: bool
description: Optional[str] = None


StatsDict = Dict[str, StatsItem]


@dataclass
class ColumnMetadata(dbtClassMixin):
type: str
index: int
name: str
comment: Optional[str] = None


ColumnMap = Dict[str, ColumnMetadata]


@dataclass
class TableMetadata(dbtClassMixin):
type: str
schema: str
name: str
database: Optional[str] = None
comment: Optional[str] = None
owner: Optional[str] = None


@dataclass
class CatalogTable(dbtClassMixin):
metadata: TableMetadata
columns: ColumnMap
stats: StatsDict
# the same table with two unique IDs will just be listed two times
unique_id: Optional[str] = None

def key(self) -> CatalogKey:
return CatalogKey(
lowercase(self.metadata.database),
self.metadata.schema.lower(),
self.metadata.name.lower(),
)


@dataclass
class CatalogMetadata(BaseArtifactMetadata):
dbt_schema_version: str = field(
default_factory=lambda: str(CatalogArtifact.dbt_schema_version)
)


@dataclass
class CatalogResults(dbtClassMixin):
nodes: Dict[str, CatalogTable]
sources: Dict[str, CatalogTable]
errors: Optional[List[str]] = None
_compile_results: Optional[Any] = None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_compile_results" in dct:
del dct["_compile_results"]
return dct


@dataclass
@schema_version("catalog", 1)
class CatalogArtifact(CatalogResults, ArtifactMixin):
metadata: CatalogMetadata

@classmethod
def from_results(
cls,
generated_at: datetime,
nodes: Dict[str, CatalogTable],
sources: Dict[str, CatalogTable],
compile_results: Optional[Any],
errors: Optional[List[str]],
) -> "CatalogArtifact":
meta = CatalogMetadata(generated_at=generated_at)
return cls(
metadata=meta,
nodes=nodes,
sources=sources,
errors=errors,
_compile_results=compile_results,
)
Loading

0 comments on commit ccfc4e5

Please sign in to comment.