Skip to content

Commit

Permalink
use dbt-adapters.git@feature/externalCatalogConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Dec 18, 2024
1 parent 4a3e5e2 commit f0d7606
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 12 deletions.
14 changes: 7 additions & 7 deletions core/dbt/cli/requires.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,23 +370,23 @@ def setup_manifest(ctx: Context, write: bool = True, write_perf_info: bool = Fal

runtime_config = ctx.obj["runtime_config"]

catalogs = ctx.obj["catalogs"] if "catalogs" in ctx.obj else None
catalog_integrations = (
catalogs.get_active_adapter_write_catalog_integrations() if catalogs else []
)

# if a manifest has already been set on the context, don't overwrite it
if ctx.obj.get("manifest") is None:
ctx.obj["manifest"] = parse_manifest(
runtime_config, write_perf_info, write, ctx.obj["flags"].write_json
)

adapter = get_adapter(runtime_config)
catalogs = ctx.obj["catalogs"].catalogs if "catalogs" in ctx.obj else []
for catalog in catalogs:
adapter.set_catalog_integration(catalog.name, catalog.active_write_integration)
adapter.add_catalog_integrations(catalog_integrations)
else:
register_adapter(runtime_config, get_mp_context())
adapter = get_adapter(runtime_config)
adapter.set_macro_context_generator(generate_runtime_macro_context) # type: ignore[arg-type]
adapter.set_macro_resolver(ctx.obj["manifest"])
query_header_context = generate_query_header_context(adapter.config, ctx.obj["manifest"]) # type: ignore[attr-defined]
adapter.connections.set_query_header(query_header_context)
catalogs = ctx.obj["catalogs"].catalogs if "catalogs" in ctx.obj else []
for catalog in catalogs:
adapter.set_catalog_integration(catalog.name, catalog.active_write_integration)
adapter.add_catalog_integrations(catalog_integrations)
48 changes: 47 additions & 1 deletion core/dbt/config/catalogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,35 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

from dbt.adapters.contracts.catalog_integration import CatalogIntegration
from dbt.adapters.contracts.catalog import CatalogIntegrationType
from dbt.adapters.relation_configs.formats import TableFormat
from dbt.clients.yaml_helper import load_yaml_text
from dbt.config.renderer import SecretRenderer
from dbt_common.clients.system import load_file_contents
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.exceptions import CompilationError, DbtValidationError


@dataclass
class CatalogIntegration(dbtClassMixin):
name: str
external_volume: str
table_format: TableFormat
catalog_type: CatalogIntegrationType


# satisfies dbt.adapters.protocol.CatalogIntegrationConfig
@dataclass
class AdapterCatalogIntegration:
catalog_name: str
integration_name: str
table_format: str
catalog_type: str
external_volume: Optional[str]
namespace: Optional[str]
adapter_configs: Optional[Dict]


@dataclass
class Catalog(dbtClassMixin):
name: str
Expand Down Expand Up @@ -75,6 +96,31 @@ def load(cls, catalog_dir: str, profile: str, cli_vars: Dict[str, Any]) -> "Cata

return cls(catalogs=catalogs)

def get_active_adapter_write_catalog_integrations(self):
adapter_catalog_integrations: List[AdapterCatalogIntegration] = []

for catalog in self.catalogs:
active_write_integration = list(
filter(
lambda c: c.name == catalog.active_write_integration,
catalog.write_integrations,
)
)[0]

adapter_catalog_integrations.append(
AdapterCatalogIntegration(
catalog_name=catalog.name,
integration_name=catalog.active_write_integration,
table_format=active_write_integration.table_format,
catalog_type=active_write_integration.catalog_type,
external_volume=active_write_integration.external_volume,
namespace=None, # namespaces on write_integrations are not yet supported
adapter_configs={}, # configs on write_integrations not yet supported
)
)

return adapter_catalog_integrations

@classmethod
def _read_catalogs(cls, catalog_dir: str) -> Dict[str, Any]:
path = os.path.join(catalog_dir, "catalogs.yml")
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git+https://github.com/dbt-labs/dbt-adapters.git@catalog-integrations
git+https://github.com/dbt-labs/dbt-adapters.git@feature/externalCatalogConfig
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git@main
git+https://github.com/dbt-labs/dbt-postgres.git@main
Expand Down
6 changes: 3 additions & 3 deletions tests/functional/catalogs/test_catalogs_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ def catalogs(self):
def test_catalog_parsing_adapter_initialialization(self, catalogs, project):
write_config_file(catalogs, project.project_root, "catalogs.yml")

mock_set_catalog_integration = mock.Mock()
mock_add_catalog_integration = mock.Mock()
with mock.patch.object(
type(project.adapter), "set_catalog_integration", mock_set_catalog_integration
type(project.adapter), "add_catalog_integrations", mock_add_catalog_integration
):
run_dbt(["run"])
mock_set_catalog_integration.assert_called_once()
mock_add_catalog_integration.assert_called_once()

0 comments on commit f0d7606

Please sign in to comment.