diff --git a/gooddata-dbt/gooddata_dbt/dbt/cloud.py b/gooddata-dbt/gooddata_dbt/dbt/cloud.py index 2b1e033fb..a097252cc 100644 --- a/gooddata-dbt/gooddata_dbt/dbt/cloud.py +++ b/gooddata-dbt/gooddata_dbt/dbt/cloud.py @@ -4,7 +4,7 @@ import os import time from pathlib import Path -from typing import Dict, Tuple, Union +from typing import Dict, List, Tuple, Union import attrs import requests @@ -119,7 +119,7 @@ def __attrs_post_init__(self) -> None: if not can_connect: raise ValueError("Cannot connect to dbt Cloud. Please, check credentials.") - def _post_rest(self, url: str, data: dict) -> dict: + def _post_rest(self, url: str, data: Dict) -> Dict: response = requests.post(url, headers=self.credentials.bearer_token_header, data=data) if response.status_code != 200: raise ValueError( @@ -273,7 +273,7 @@ def make_profiles(environment: Environment, path_to_profiles: Path, profile_file def string_camel_to_snake(element: str) -> str: return "".join(["_" + c.lower() if c.isupper() else c for c in element]).lstrip("_") - def dict_camel_to_snake(self, data: Union[dict, list]) -> Union[dict, list]: + def dict_camel_to_snake(self, data: Union[Dict, List]) -> Union[Dict, List]: if isinstance(data, list): result = [] for record in data: @@ -290,15 +290,15 @@ def dict_camel_to_snake(self, data: Union[dict, list]) -> Union[dict, list]: result[self.string_camel_to_snake(key)] = value # type: ignore return result - def get_last_execution(self, environment_id: str, model_count: int) -> list[DbtExecution]: + def get_last_execution(self, environment_id: str, model_count: int) -> List[DbtExecution]: variables = {"environmentId": environment_id, "first": model_count} result = self._post_graphql(self.graphql_applied_models, variables) model_edges = self.dict_camel_to_snake(safeget(result, ["data", "environment", "applied", "models", "edges"])) return [DbtExecution.from_dict(m["node"]) for m in model_edges] def get_average_times( - self, logger: logging.Logger, models: list[DbtExecution], environment_id: str, history_count: int - ) -> dict[str, float]: + self, logger: logging.Logger, models: List[DbtExecution], environment_id: str, history_count: int + ) -> Dict[str, float]: models_history_avg_execution_times = {} for model in models: variables = {"environmentId": environment_id, "modelId": model.unique_id, "first": history_count} diff --git a/gooddata-dbt/gooddata_dbt/dbt_plugin.py b/gooddata-dbt/gooddata_dbt/dbt_plugin.py index 715ed8333..b9bf34b7b 100644 --- a/gooddata-dbt/gooddata_dbt/dbt_plugin.py +++ b/gooddata-dbt/gooddata_dbt/dbt_plugin.py @@ -182,7 +182,7 @@ def deploy_models( def dbt_cloud_stats( args: Namespace, logger: logging.Logger, - all_model_ids: list[str], + all_model_ids: List[str], environment_id: str, ) -> None: logger.info("Get stats for last execution...") @@ -231,7 +231,7 @@ def dbt_cloud_stats( report_message_to_merge_request(degradation_md) -def dbt_cloud_run(args: Namespace, logger: logging.Logger, all_model_ids: list[str]) -> None: +def dbt_cloud_run(args: Namespace, logger: logging.Logger, all_model_ids: List[str]) -> None: dbt_conn = DbtConnection(credentials=DbtCredentials(account_id=args.account_id, token=args.token)) logger.info("#" * 80) logger.info(f"Starting job in dbt cloud with job_id={args.job_id}")