From ccfc4e52ff41324ad61f596ab597f7a39b858209 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Wed, 29 May 2024 14:21:19 +0100 Subject: [PATCH] Move catalog artifact schema to dbt_common --- .gitignore | 2 +- dbt_common/artifacts/__init__.py | 0 dbt_common/artifacts/exceptions/__init__.py | 1 + dbt_common/artifacts/exceptions/schemas.py | 31 ++ dbt_common/artifacts/schemas/__init__.py | 0 dbt_common/artifacts/schemas/base.py | 175 ++++++++++++ .../artifacts/schemas/catalog/__init__.py | 2 + .../artifacts/schemas/catalog/v1/__init__.py | 0 .../artifacts/schemas/catalog/v1/catalog.py | 108 +++++++ dbt_common/version.py | 269 ++++++++++++++++++ 10 files changed, 587 insertions(+), 1 deletion(-) create mode 100644 dbt_common/artifacts/__init__.py create mode 100644 dbt_common/artifacts/exceptions/__init__.py create mode 100644 dbt_common/artifacts/exceptions/schemas.py create mode 100644 dbt_common/artifacts/schemas/__init__.py create mode 100644 dbt_common/artifacts/schemas/base.py create mode 100644 dbt_common/artifacts/schemas/catalog/__init__.py create mode 100644 dbt_common/artifacts/schemas/catalog/v1/__init__.py create mode 100644 dbt_common/artifacts/schemas/catalog/v1/catalog.py create mode 100644 dbt_common/version.py diff --git a/.gitignore b/.gitignore index 28030861..5ad1a6c2 100644 --- a/.gitignore +++ b/.gitignore @@ -157,4 +157,4 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +.idea/ diff --git a/dbt_common/artifacts/__init__.py b/dbt_common/artifacts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dbt_common/artifacts/exceptions/__init__.py b/dbt_common/artifacts/exceptions/__init__.py new file mode 100644 index 00000000..5acc6385 --- /dev/null +++ b/dbt_common/artifacts/exceptions/__init__.py @@ -0,0 +1 @@ +from dbt_common.artifacts.exceptions.schemas import IncompatibleSchemaError diff --git a/dbt_common/artifacts/exceptions/schemas.py b/dbt_common/artifacts/exceptions/schemas.py new file mode 100644 index 00000000..c9f1b0e1 --- /dev/null +++ b/dbt_common/artifacts/exceptions/schemas.py @@ -0,0 +1,31 @@ +from typing import Optional + +from dbt_common.exceptions import DbtRuntimeError + + +class IncompatibleSchemaError(DbtRuntimeError): + def __init__(self, expected: str, found: Optional[str] = None) -> None: + self.expected = expected + self.found = found + self.filename = "input file" + + super().__init__(msg=self.get_message()) + + def add_filename(self, filename: str): + self.filename = filename + self.msg = self.get_message() + + def get_message(self) -> str: + found_str = "nothing" + if self.found is not None: + found_str = f'"{self.found}"' + + msg = ( + f'Expected a schema version of "{self.expected}" in ' + f"{self.filename}, but found {found_str}. Are you running with a " + f"different version of dbt?" + ) + return msg + + CODE = 10014 + MESSAGE = "Incompatible Schema" diff --git a/dbt_common/artifacts/schemas/__init__.py b/dbt_common/artifacts/schemas/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dbt_common/artifacts/schemas/base.py b/dbt_common/artifacts/schemas/base.py new file mode 100644 index 00000000..5803acb6 --- /dev/null +++ b/dbt_common/artifacts/schemas/base.py @@ -0,0 +1,175 @@ +import dataclasses +import functools +from datetime import datetime +from typing import Any, ClassVar, Dict, Optional, Type, TypeVar + +from mashumaro.jsonschema import build_json_schema +from mashumaro.jsonschema.dialects import DRAFT_2020_12 + +from dbt_common.artifacts.exceptions import IncompatibleSchemaError +from dbt_common.version import __version__ +from dbt_common.clients.system import read_json, write_json +from dbt_common.dataclass_schema import dbtClassMixin +from dbt_common.events.functions import get_metadata_vars +from dbt_common.exceptions import DbtInternalError, DbtRuntimeError +from dbt_common.invocation import get_invocation_id + +BASE_SCHEMAS_URL = "https://schemas.getdbt.com/" +SCHEMA_PATH = "dbt/{name}/v{version}.json" + + +@dataclasses.dataclass +class SchemaVersion: + name: str + version: int + + @property + def path(self) -> str: + return SCHEMA_PATH.format(name=self.name, version=self.version) + + def __str__(self) -> str: + return BASE_SCHEMAS_URL + self.path + + +class Writable: + def write(self, path: str): + write_json(path, self.to_dict(omit_none=False, context={"artifact": True})) # type: ignore + + +class Readable: + @classmethod + def read(cls, path: str): + try: + data = read_json(path) + except (EnvironmentError, ValueError) as exc: + raise DbtRuntimeError( + f'Could not read {cls.__name__} at "{path}" as JSON: {exc}' + ) from exc + + return cls.from_dict(data) # type: ignore + + +# This is used in the ManifestMetadata, RunResultsMetadata, RunOperationResultMetadata, +# FreshnessMetadata, and CatalogMetadata classes +@dataclasses.dataclass +class BaseArtifactMetadata(dbtClassMixin): + dbt_schema_version: str + dbt_version: str = __version__ + generated_at: datetime = dataclasses.field(default_factory=datetime.utcnow) + invocation_id: Optional[str] = dataclasses.field(default_factory=get_invocation_id) + env: Dict[str, str] = dataclasses.field(default_factory=get_metadata_vars) + + def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): + dct = super().__post_serialize__(dct, context) + if dct["generated_at"] and dct["generated_at"].endswith("+00:00"): + dct["generated_at"] = dct["generated_at"].replace("+00:00", "") + "Z" + return dct + + +# This is used as a class decorator to set the schema_version in the +# 'dbt_schema_version' class attribute. (It's copied into the metadata objects.) +# Name attributes of SchemaVersion in classes with the 'schema_version' decorator: +# manifest +# run-results +# run-operation-result +# sources +# catalog +# remote-compile-result +# remote-execution-result +# remote-run-result +def schema_version(name: str, version: int): + def inner(cls: Type[VersionedSchema]): + cls.dbt_schema_version = SchemaVersion( + name=name, + version=version, + ) + return cls + + return inner + + +# This is used in the ArtifactMixin and RemoteCompileResultMixin classes +@dataclasses.dataclass +class VersionedSchema(dbtClassMixin): + dbt_schema_version: ClassVar[SchemaVersion] + + @classmethod + @functools.lru_cache + def json_schema(cls) -> Dict[str, Any]: + json_schema_obj = build_json_schema(cls, dialect=DRAFT_2020_12, with_dialect_uri=True) + json_schema = json_schema_obj.to_dict() + json_schema["$id"] = str(cls.dbt_schema_version) + return json_schema + + @classmethod + def is_compatible_version(cls, schema_version): + compatible_versions = [str(cls.dbt_schema_version)] + if hasattr(cls, "compatible_previous_versions"): + for name, version in cls.compatible_previous_versions(): + compatible_versions.append(str(SchemaVersion(name, version))) + return str(schema_version) in compatible_versions + + @classmethod + def read_and_check_versions(cls, path: str): + try: + data = read_json(path) + except (EnvironmentError, ValueError) as exc: + raise DbtRuntimeError( + f'Could not read {cls.__name__} at "{path}" as JSON: {exc}' + ) from exc + + # Check metadata version. There is a class variable 'dbt_schema_version', but + # that doesn't show up in artifacts, where it only exists in the 'metadata' + # dictionary. + if hasattr(cls, "dbt_schema_version"): + if "metadata" in data and "dbt_schema_version" in data["metadata"]: + previous_schema_version = data["metadata"]["dbt_schema_version"] + # cls.dbt_schema_version is a SchemaVersion object + if not cls.is_compatible_version(previous_schema_version): + raise IncompatibleSchemaError( + expected=str(cls.dbt_schema_version), + found=previous_schema_version, + ) + + return cls.upgrade_schema_version(data) + + @classmethod + def upgrade_schema_version(cls, data): + """This will modify the data (dictionary) passed in to match the current + artifact schema code, if necessary. This is the default method, which + just returns the instantiated object via from_dict.""" + return cls.from_dict(data) + + +T = TypeVar("T", bound="ArtifactMixin") + + +# metadata should really be a Generic[T_M] where T_M is a TypeVar bound to +# BaseArtifactMetadata. Unfortunately this isn't possible due to a mypy issue: +# https://github.com/python/mypy/issues/7520 +# This is used in the WritableManifest, RunResultsArtifact, RunOperationResultsArtifact, +# and CatalogArtifact +@dataclasses.dataclass(init=False) +class ArtifactMixin(VersionedSchema, Writable, Readable): + metadata: BaseArtifactMetadata + + @classmethod + def validate(cls, data): + super().validate(data) + if cls.dbt_schema_version is None: + raise DbtInternalError("Cannot call from_dict with no schema version!") + + +def get_artifact_schema_version(dct: dict) -> int: + schema_version = dct.get("metadata", {}).get("dbt_schema_version", None) + if not schema_version: + raise ValueError("Artifact is missing schema version") + + # schema_version is in this format: https://schemas.getdbt.com/dbt/manifest/v10.json + # What the code below is doing: + # 1. Split on "/" – v10.json + # 2. Split on "." – v10 + # 3. Skip first character – 10 + # 4. Convert to int + # TODO: If this gets more complicated, turn into a regex + return int(schema_version.split("/")[-1].split(".")[0][1:]) diff --git a/dbt_common/artifacts/schemas/catalog/__init__.py b/dbt_common/artifacts/schemas/catalog/__init__.py new file mode 100644 index 00000000..6ba92972 --- /dev/null +++ b/dbt_common/artifacts/schemas/catalog/__init__.py @@ -0,0 +1,2 @@ +# alias to latest +from .v1.catalog import * # noqa diff --git a/dbt_common/artifacts/schemas/catalog/v1/__init__.py b/dbt_common/artifacts/schemas/catalog/v1/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dbt_common/artifacts/schemas/catalog/v1/catalog.py b/dbt_common/artifacts/schemas/catalog/v1/catalog.py new file mode 100644 index 00000000..1362b1ac --- /dev/null +++ b/dbt_common/artifacts/schemas/catalog/v1/catalog.py @@ -0,0 +1,108 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Dict, List, NamedTuple, Optional, Union + +from dbt_common.artifacts.schemas.base import BaseArtifactMetadata, schema_version, ArtifactMixin +from dbt_common.dataclass_schema import dbtClassMixin +from dbt_common.utils.formatting import lowercase + +Primitive = Union[bool, str, float, None] +PrimitiveDict = Dict[str, Primitive] + +CatalogKey = NamedTuple( + "CatalogKey", [("database", Optional[str]), ("schema", str), ("name", str)] +) + + +@dataclass +class StatsItem(dbtClassMixin): + id: str + label: str + value: Primitive + include: bool + description: Optional[str] = None + + +StatsDict = Dict[str, StatsItem] + + +@dataclass +class ColumnMetadata(dbtClassMixin): + type: str + index: int + name: str + comment: Optional[str] = None + + +ColumnMap = Dict[str, ColumnMetadata] + + +@dataclass +class TableMetadata(dbtClassMixin): + type: str + schema: str + name: str + database: Optional[str] = None + comment: Optional[str] = None + owner: Optional[str] = None + + +@dataclass +class CatalogTable(dbtClassMixin): + metadata: TableMetadata + columns: ColumnMap + stats: StatsDict + # the same table with two unique IDs will just be listed two times + unique_id: Optional[str] = None + + def key(self) -> CatalogKey: + return CatalogKey( + lowercase(self.metadata.database), + self.metadata.schema.lower(), + self.metadata.name.lower(), + ) + + +@dataclass +class CatalogMetadata(BaseArtifactMetadata): + dbt_schema_version: str = field( + default_factory=lambda: str(CatalogArtifact.dbt_schema_version) + ) + + +@dataclass +class CatalogResults(dbtClassMixin): + nodes: Dict[str, CatalogTable] + sources: Dict[str, CatalogTable] + errors: Optional[List[str]] = None + _compile_results: Optional[Any] = None + + def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): + dct = super().__post_serialize__(dct, context) + if "_compile_results" in dct: + del dct["_compile_results"] + return dct + + +@dataclass +@schema_version("catalog", 1) +class CatalogArtifact(CatalogResults, ArtifactMixin): + metadata: CatalogMetadata + + @classmethod + def from_results( + cls, + generated_at: datetime, + nodes: Dict[str, CatalogTable], + sources: Dict[str, CatalogTable], + compile_results: Optional[Any], + errors: Optional[List[str]], + ) -> "CatalogArtifact": + meta = CatalogMetadata(generated_at=generated_at) + return cls( + metadata=meta, + nodes=nodes, + sources=sources, + errors=errors, + _compile_results=compile_results, + ) diff --git a/dbt_common/version.py b/dbt_common/version.py new file mode 100644 index 00000000..c1c5484f --- /dev/null +++ b/dbt_common/version.py @@ -0,0 +1,269 @@ +import glob +import importlib +import importlib.util +import json +import os +from datetime import date +from pathlib import Path +from typing import Iterator, List, Optional, Tuple + +import requests + +import dbt_common.semver as semver +from dbt_common.ui import green, red, yellow + +PYPI_VERSION_URL = "https://pypi.org/pypi/dbt-core/json" + + +def get_version_information() -> str: + installed = get_installed_version() + latest = get_latest_version() + + core_msg_lines, core_info_msg = _get_core_msg_lines(installed, latest) + core_msg = _format_core_msg(core_msg_lines) + plugin_version_msg = _get_plugins_msg(installed) + + msg_lines = [core_msg] + + if core_info_msg != "": + msg_lines.append(core_info_msg) + + msg_lines.append(plugin_version_msg) + msg_lines.append("") + + return "\n\n".join(msg_lines) + + +def get_installed_version() -> semver.VersionSpecifier: + return semver.VersionSpecifier.from_version_string(__version__) + + +def get_latest_version( + version_url: str = PYPI_VERSION_URL, +) -> Optional[semver.VersionSpecifier]: + try: + resp = requests.get(version_url, timeout=1) + data = resp.json() + version_string = data["info"]["version"] + except (json.JSONDecodeError, KeyError, requests.RequestException): + return None + + return semver.VersionSpecifier.from_version_string(version_string) + + +def _get_core_msg_lines(installed, latest) -> Tuple[List[List[str]], str]: + installed_s = installed.to_version_string(skip_matcher=True) + installed_line = ["installed", installed_s, ""] + update_info = "" + + if latest is None: + update_info = ( + " The latest version of dbt-core could not be determined!\n" + " Make sure that the following URL is accessible:\n" + f" {PYPI_VERSION_URL}" + ) + return [installed_line], update_info + + latest_s = latest.to_version_string(skip_matcher=True) + latest_line = ["latest", latest_s, green("Up to date!")] + + if installed > latest: + latest_line[2] = yellow("Ahead of latest version!") + elif installed < latest: + latest_line[2] = yellow("Update available!") + update_info = ( + " Your version of dbt-core is out of date!\n" + " You can find instructions for upgrading here:\n" + " https://docs.getdbt.com/docs/installation" + ) + + return [ + installed_line, + latest_line, + ], update_info + + +def _format_core_msg(lines: List[List[str]]) -> str: + msg = "Core:\n" + msg_lines = [] + + for name, version, update_msg in _pad_lines(lines, seperator=":"): + line_msg = f" - {name} {version}" + if update_msg != "": + line_msg += f" - {update_msg}" + msg_lines.append(line_msg) + + return msg + "\n".join(msg_lines) + + +def _get_plugins_msg(installed: semver.VersionSpecifier) -> str: + msg_lines = ["Plugins:"] + + plugins = [] + display_update_msg = False + for name, version_s in _get_dbt_plugins_info(): + compatability_msg, needs_update = _get_plugin_msg_info(name, version_s, installed) + if needs_update: + display_update_msg = True + plugins.append([name, version_s, compatability_msg]) + + for plugin in _pad_lines(plugins, seperator=":"): + msg_lines.append(_format_single_plugin(plugin, "")) + + if display_update_msg: + update_msg = ( + " At least one plugin is out of date or incompatible with dbt-core.\n" + " You can find instructions for upgrading here:\n" + " https://docs.getdbt.com/docs/installation" + ) + msg_lines += ["", update_msg] + + return "\n".join(msg_lines) + + +def _get_plugin_msg_info( + name: str, version_s: str, core: semver.VersionSpecifier +) -> Tuple[str, bool]: + plugin = semver.VersionSpecifier.from_version_string(version_s) + latest_plugin = get_latest_version(version_url=get_package_pypi_url(name)) + + needs_update = False + + if plugin.major != core.major or plugin.minor != core.minor: + compatibility_msg = red("Not compatible!") + needs_update = True + return (compatibility_msg, needs_update) + + if not latest_plugin: + compatibility_msg = yellow("Could not determine latest version") + return (compatibility_msg, needs_update) + + if plugin < latest_plugin: + compatibility_msg = yellow("Update available!") + needs_update = True + elif plugin > latest_plugin: + compatibility_msg = yellow("Ahead of latest version!") + else: + compatibility_msg = green("Up to date!") + + return (compatibility_msg, needs_update) + + +def _format_single_plugin(plugin: List[str], update_msg: str) -> str: + name, version_s, compatability_msg = plugin + msg = f" - {name} {version_s} - {compatability_msg}" + if update_msg != "": + msg += f"\n{update_msg}\n" + return msg + + +def _pad_lines(lines: List[List[str]], seperator: str = "") -> List[List[str]]: + if len(lines) == 0: + return [] + + # count the max line length for each column in the line + counter = [0] * len(lines[0]) + for line in lines: + for i, item in enumerate(line): + counter[i] = max(counter[i], len(item)) + + result: List[List[str]] = [] + for i, line in enumerate(lines): + # add another list to hold padded strings + if len(result) == i: + result.append([""] * len(line)) + + # iterate over columns in the line + for j, item in enumerate(line): + # the last column does not need padding + if j == len(line) - 1: + result[i][j] = item + continue + + # if the following column has no length + # the string does not need padding + if counter[j + 1] == 0: + result[i][j] = item + continue + + # only add the seperator to the first column + offset = 0 + if j == 0 and seperator != "": + item += seperator + offset = len(seperator) + + result[i][j] = item.ljust(counter[j] + offset) + + return result + + +def get_package_pypi_url(package_name: str) -> str: + return f"https://pypi.org/pypi/dbt-{package_name}/json" + + +def _get_dbt_plugins_info() -> Iterator[Tuple[str, str]]: + for plugin_name in _get_adapter_plugin_names(): + if plugin_name == "core": + continue + try: + mod = importlib.import_module(f"dbt.adapters.{plugin_name}.__version__") + except ImportError: + # not an adapter + continue + yield plugin_name, mod.version # type: ignore + + +def _get_adapter_plugin_names() -> Iterator[str]: + spec = importlib.util.find_spec("dbt.adapters") + # If None, then nothing provides an importable 'dbt.adapters', so we will + # not be reporting plugin versions today + if spec is None or spec.submodule_search_locations is None: + return + + for adapters_path in spec.submodule_search_locations: + version_glob = os.path.join(adapters_path, "*", "__version__.py") + for version_path in glob.glob(version_glob): + # the path is like .../dbt/adapters/{plugin_name}/__version__.py + # except it could be \\ on windows! + plugin_root, _ = os.path.split(version_path) + _, plugin_name = os.path.split(plugin_root) + yield plugin_name + + +def _get_version_from_venv_cache_info() -> str: + # Assuming the standard structure of our released virtual environment + # docker image (venv cache) look for the file which contains the release + # number and use that release number to formulate the reported version. + try: + today = date.today() + major = today.year + minor = today.month + + # We find the file by relative position, since dbt is sometimes run in + # production in a chroot environment which makes it difficult to use an + # consistent absolute path. + venv_parent_dir = Path(__file__).parent.parent.parent.parent.parent.parent + venv_cache_info_file = venv_parent_dir.joinpath("venv-cache-info.json") + + with open(venv_cache_info_file) as f: + info_json = json.load(f) + + patch = int(info_json["release_tag"]) + + if patch < 1: + raise ValueError(f"Invalid release number: {patch}") + + return f"{major}.{minor}.{patch}" + except Exception: + # Although the above try block should succeed in production, we can't + # easily log failures because logging is not initialized when this + # function iws called, so we fall back on the same behavior we want for + # local development, which is to report version "2024.0.0dev" + return "2024.0.0dev" + + +# Mantle is notionally versionless, but we assign an internal version for +# debugging and tracking. At release-time it will be set to YYYY.MM.R +# where R represents the incrementing GitHub release number. +__version__: str = _get_version_from_venv_cache_info() +installed: semver.VersionSpecifier = get_installed_version()