Skip to content

Commit

Permalink
Fix typing for Python 3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
hkad98 committed Sep 22, 2023
1 parent cc057b1 commit 5442e28
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
12 changes: 6 additions & 6 deletions gooddata-dbt/gooddata_dbt/dbt/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import time
from pathlib import Path
from typing import Dict, Tuple, Union
from typing import Dict, List, Tuple, Union

import attrs
import requests
Expand Down Expand Up @@ -119,7 +119,7 @@ def __attrs_post_init__(self) -> None:
if not can_connect:
raise ValueError("Cannot connect to dbt Cloud. Please, check credentials.")

def _post_rest(self, url: str, data: dict) -> dict:
def _post_rest(self, url: str, data: Dict) -> Dict:
response = requests.post(url, headers=self.credentials.bearer_token_header, data=data)
if response.status_code != 200:
raise ValueError(
Expand Down Expand Up @@ -273,7 +273,7 @@ def make_profiles(environment: Environment, path_to_profiles: Path, profile_file
def string_camel_to_snake(element: str) -> str:
return "".join(["_" + c.lower() if c.isupper() else c for c in element]).lstrip("_")

def dict_camel_to_snake(self, data: Union[dict, list]) -> Union[dict, list]:
def dict_camel_to_snake(self, data: Union[Dict, List]) -> Union[Dict, List]:
if isinstance(data, list):
result = []
for record in data:
Expand All @@ -290,15 +290,15 @@ def dict_camel_to_snake(self, data: Union[dict, list]) -> Union[dict, list]:
result[self.string_camel_to_snake(key)] = value # type: ignore
return result

def get_last_execution(self, environment_id: str, model_count: int) -> list[DbtExecution]:
def get_last_execution(self, environment_id: str, model_count: int) -> List[DbtExecution]:
variables = {"environmentId": environment_id, "first": model_count}
result = self._post_graphql(self.graphql_applied_models, variables)
model_edges = self.dict_camel_to_snake(safeget(result, ["data", "environment", "applied", "models", "edges"]))
return [DbtExecution.from_dict(m["node"]) for m in model_edges]

def get_average_times(
self, logger: logging.Logger, models: list[DbtExecution], environment_id: str, history_count: int
) -> dict[str, float]:
self, logger: logging.Logger, models: List[DbtExecution], environment_id: str, history_count: int
) -> Dict[str, float]:
models_history_avg_execution_times = {}
for model in models:
variables = {"environmentId": environment_id, "modelId": model.unique_id, "first": history_count}
Expand Down
4 changes: 2 additions & 2 deletions gooddata-dbt/gooddata_dbt/dbt_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def deploy_models(
def dbt_cloud_stats(
args: Namespace,
logger: logging.Logger,
all_model_ids: list[str],
all_model_ids: List[str],
environment_id: str,
) -> None:
logger.info("Get stats for last execution...")
Expand Down Expand Up @@ -231,7 +231,7 @@ def dbt_cloud_stats(
report_message_to_merge_request(degradation_md)


def dbt_cloud_run(args: Namespace, logger: logging.Logger, all_model_ids: list[str]) -> None:
def dbt_cloud_run(args: Namespace, logger: logging.Logger, all_model_ids: List[str]) -> None:
dbt_conn = DbtConnection(credentials=DbtCredentials(account_id=args.account_id, token=args.token))
logger.info("#" * 80)
logger.info(f"Starting job in dbt cloud with job_id={args.job_id}")
Expand Down

0 comments on commit 5442e28

Please sign in to comment.