Skip to content

Commit

Permalink
mimic get_catalog behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
mikealfare committed Dec 19, 2023
1 parent c3e608d commit 5221389
Showing 1 changed file with 17 additions and 20 deletions.
37 changes: 17 additions & 20 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def _get_columns_for_catalog(self, relation: BaseRelation) -> Iterable[Dict[str,
def get_catalog(
self, manifest: Manifest, selected_nodes: Optional[Set] = None
) -> Tuple[agate.Table, List[Exception]]:
schema_map = self._get_catalog_schemas(manifest)
schema_map = self._get_catalog_schemas(manifest) # info_schema: [relation_schema]
if len(schema_map) > 1:
raise dbt.exceptions.CompilationError(
f"Expected only one database in get_catalog, found " f"{list(schema_map)}"
Expand All @@ -385,30 +385,27 @@ def get_catalog_by_relations(
self, manifest: Manifest, relations: Set[BaseRelation]
) -> Tuple[agate.Table, List[Exception]]:
info_schemas = {r.information_schema() for r in relations}
if len(info_schemas) > 1:
raise dbt.exceptions.CompilationError(
f"Expected only one database in get_catalog, found " f"{list(info_schemas)}"
)

if len(info_schemas) == 0:
return catch_as_completed([])

elif len(info_schemas) == 1:
info_schema = info_schemas.pop()
with executor(self.config) as tpe:
return catch_as_completed(
[
with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info_schema in info_schemas:
for relation in relations:
futures.append(
tpe.submit_connected(
self,
"list_schemas",
str(relation),
self._get_one_catalog_by_relations,
info_schema,
relations,
manifest,
information_schema=info_schema,
relations=[relation],
manifest=manifest,
)
]
)

else:
raise dbt.exceptions.CompilationError(
f"Expected only one database in get_catalog_by_relations, found {list(info_schemas)}"
)
)
catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions

def _get_one_catalog(
self,
Expand Down

0 comments on commit 5221389

Please sign in to comment.