diff --git a/core/dbt/cli/requires.py b/core/dbt/cli/requires.py index 892c065f4a1..d7a93ffe8d9 100644 --- a/core/dbt/cli/requires.py +++ b/core/dbt/cli/requires.py @@ -370,16 +370,18 @@ def setup_manifest(ctx: Context, write: bool = True, write_perf_info: bool = Fal runtime_config = ctx.obj["runtime_config"] + catalogs = ctx.obj["catalogs"] if "catalogs" in ctx.obj else None + catalog_integrations = ( + catalogs.get_active_adapter_write_catalog_integrations() if catalogs else [] + ) + # if a manifest has already been set on the context, don't overwrite it if ctx.obj.get("manifest") is None: ctx.obj["manifest"] = parse_manifest( runtime_config, write_perf_info, write, ctx.obj["flags"].write_json ) - adapter = get_adapter(runtime_config) - catalogs = ctx.obj["catalogs"].catalogs if "catalogs" in ctx.obj else [] - for catalog in catalogs: - adapter.set_catalog_integration(catalog.name, catalog.active_write_integration) + adapter.add_catalog_integrations(catalog_integrations) else: register_adapter(runtime_config, get_mp_context()) adapter = get_adapter(runtime_config) @@ -387,6 +389,4 @@ def setup_manifest(ctx: Context, write: bool = True, write_perf_info: bool = Fal adapter.set_macro_resolver(ctx.obj["manifest"]) query_header_context = generate_query_header_context(adapter.config, ctx.obj["manifest"]) # type: ignore[attr-defined] adapter.connections.set_query_header(query_header_context) - catalogs = ctx.obj["catalogs"].catalogs if "catalogs" in ctx.obj else [] - for catalog in catalogs: - adapter.set_catalog_integration(catalog.name, catalog.active_write_integration) + adapter.add_catalog_integrations(catalog_integrations) diff --git a/core/dbt/config/catalogs.py b/core/dbt/config/catalogs.py index e48896ba29f..ee0480be7e7 100644 --- a/core/dbt/config/catalogs.py +++ b/core/dbt/config/catalogs.py @@ -2,7 +2,8 @@ from dataclasses import dataclass, field from typing import Any, Dict, List, Optional -from dbt.adapters.contracts.catalog_integration import CatalogIntegration +from dbt.adapters.contracts.catalog import CatalogIntegrationType +from dbt.adapters.relation_configs.formats import TableFormat from dbt.clients.yaml_helper import load_yaml_text from dbt.config.renderer import SecretRenderer from dbt_common.clients.system import load_file_contents @@ -10,6 +11,26 @@ from dbt_common.exceptions import CompilationError, DbtValidationError +@dataclass +class CatalogIntegration(dbtClassMixin): + name: str + external_volume: str + table_format: TableFormat + catalog_type: CatalogIntegrationType + + +# satisfies dbt.adapters.protocol.CatalogIntegrationConfig +@dataclass +class AdapterCatalogIntegration: + catalog_name: str + integration_name: str + table_format: str + catalog_type: str + external_volume: Optional[str] + namespace: Optional[str] + adapter_configs: Optional[Dict] + + @dataclass class Catalog(dbtClassMixin): name: str @@ -75,6 +96,31 @@ def load(cls, catalog_dir: str, profile: str, cli_vars: Dict[str, Any]) -> "Cata return cls(catalogs=catalogs) + def get_active_adapter_write_catalog_integrations(self): + adapter_catalog_integrations: List[AdapterCatalogIntegration] = [] + + for catalog in self.catalogs: + active_write_integration = list( + filter( + lambda c: c.name == catalog.active_write_integration, + catalog.write_integrations, + ) + )[0] + + adapter_catalog_integrations.append( + AdapterCatalogIntegration( + catalog_name=catalog.name, + integration_name=catalog.active_write_integration, + table_format=active_write_integration.table_format, + catalog_type=active_write_integration.catalog_type, + external_volume=active_write_integration.external_volume, + namespace=None, # namespaces on write_integrations are not yet supported + adapter_configs={}, # configs on write_integrations not yet supported + ) + ) + + return adapter_catalog_integrations + @classmethod def _read_catalogs(cls, catalog_dir: str) -> Dict[str, Any]: path = os.path.join(catalog_dir, "catalogs.yml") diff --git a/dev-requirements.txt b/dev-requirements.txt index faed6888767..91aba73ef33 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,4 +1,4 @@ -git+https://github.com/dbt-labs/dbt-adapters.git@catalog-integrations +git+https://github.com/dbt-labs/dbt-adapters.git@feature/externalCatalogConfig git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git@main git+https://github.com/dbt-labs/dbt-postgres.git@main diff --git a/tests/functional/catalogs/test_catalogs_parsing.py b/tests/functional/catalogs/test_catalogs_parsing.py index 3bb71d1ddc7..bbd53dd2c6b 100644 --- a/tests/functional/catalogs/test_catalogs_parsing.py +++ b/tests/functional/catalogs/test_catalogs_parsing.py @@ -28,9 +28,9 @@ def catalogs(self): def test_catalog_parsing_adapter_initialialization(self, catalogs, project): write_config_file(catalogs, project.project_root, "catalogs.yml") - mock_set_catalog_integration = mock.Mock() + mock_add_catalog_integration = mock.Mock() with mock.patch.object( - type(project.adapter), "set_catalog_integration", mock_set_catalog_integration + type(project.adapter), "add_catalog_integrations", mock_add_catalog_integration ): run_dbt(["run"]) - mock_set_catalog_integration.assert_called_once() + mock_add_catalog_integration.assert_called_once()