diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 631296a1f..45ab5d960 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -358,7 +358,7 @@ def _get_columns_for_catalog(self, relation: BaseRelation) -> Iterable[Dict[str, def get_catalog( self, manifest: Manifest, selected_nodes: Optional[Set] = None ) -> Tuple[agate.Table, List[Exception]]: - schema_map = self._get_catalog_schemas(manifest) + schema_map = self._get_catalog_schemas(manifest) # info_schema: [relation_schema] if len(schema_map) > 1: raise dbt.exceptions.CompilationError( f"Expected only one database in get_catalog, found " f"{list(schema_map)}" @@ -385,30 +385,27 @@ def get_catalog_by_relations( self, manifest: Manifest, relations: Set[BaseRelation] ) -> Tuple[agate.Table, List[Exception]]: info_schemas = {r.information_schema() for r in relations} + if len(info_schemas) > 1: + raise dbt.exceptions.CompilationError( + f"Expected only one database in get_catalog, found " f"{list(info_schemas)}" + ) - if len(info_schemas) == 0: - return catch_as_completed([]) - - elif len(info_schemas) == 1: - info_schema = info_schemas.pop() - with executor(self.config) as tpe: - return catch_as_completed( - [ + with executor(self.config) as tpe: + futures: List[Future[agate.Table]] = [] + for info_schema in info_schemas: + for relation in relations: + futures.append( tpe.submit_connected( self, - "list_schemas", + str(relation), self._get_one_catalog_by_relations, - info_schema, - relations, - manifest, + information_schema=info_schema, + relations=[relation], + manifest=manifest, ) - ] - ) - - else: - raise dbt.exceptions.CompilationError( - f"Expected only one database in get_catalog_by_relations, found {list(info_schemas)}" - ) + ) + catalogs, exceptions = catch_as_completed(futures) + return catalogs, exceptions def _get_one_catalog( self,