diff --git a/CHANGELOG.md b/CHANGELOG.md index c022d5dc6..9256b1ccf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,10 +8,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Added option for sap_rfc connector to get credentials from Azure KeyVault or directly passing dictionary inside flow. -### Fixed + +### Fixed +- Fixed the `if_exists` parameter definition in the `CreateTableFromBlob` task. +- Changed `requirements.txt` to level up version of `dbt-sqlserver` in order to fix bug with `MAXRECURSION` error in dbt_run + ### Changed +### Removed +- Removed `dbt-core==1.3.2` from `requirements.txt` ## [0.4.25] - 2024-01-30 ### Added diff --git a/requirements.txt b/requirements.txt index 72bbb20b2..990e4a89c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -39,8 +39,7 @@ aiolimiter==1.0.0 protobuf>=3.19.0, <3.20 avro-python3==1.10.2 pygit2>=1.10.1, <1.11.0 -dbt-core==1.3.2 -dbt-sqlserver==1.3.1 +dbt-sqlserver @ git+https://github.com/djagoda881/dbt-sqlserver.git@v1.3.latest_option_clause lumaCLI==0.0.19 Office365-REST-Python-Client==2.4.4 TM1py==1.11.3 diff --git a/tests/integration/flows/test_adls_to_azure_sql.py b/tests/integration/flows/test_adls_to_azure_sql.py index e3ae45623..34cef2f9e 100644 --- a/tests/integration/flows/test_adls_to_azure_sql.py +++ b/tests/integration/flows/test_adls_to_azure_sql.py @@ -9,51 +9,6 @@ from viadot.flows.adls_to_azure_sql import check_dtypes_sort, df_to_csv_task -def test_get_promoted_adls_path_csv_file(): - adls_path_file = "raw/supermetrics/adls_ga_load_times_fr_test/2021-07-14T13%3A09%3A02.997357%2B00%3A00.csv" - flow = ADLSToAzureSQL(name="test", adls_path=adls_path_file) - promoted_path = flow.get_promoted_path(env="conformed") - assert ( - promoted_path - == "conformed/supermetrics/adls_ga_load_times_fr_test/2021-07-14T13%3A09%3A02.997357%2B00%3A00.csv" - ) - - -def test_get_promoted_adls_path_parquet_file(): - adls_path_file = "raw/supermetrics/adls_ga_load_times_fr_test/2021-07-14T13%3A09%3A02.997357%2B00%3A00.parquet" - flow = ADLSToAzureSQL(name="test", adls_path=adls_path_file) - promoted_path = flow.get_promoted_path(env="conformed") - assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv" - - -def test_get_promoted_adls_path_file_starts_with_slash(): - adls_path_dir_starts_with_slash = "/raw/supermetrics/adls_ga_load_times_fr_test/" - flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_starts_with_slash) - promoted_path = flow.get_promoted_path(env="conformed") - assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv" - - -def test_get_promoted_adls_path_dir_slash(): - adls_path_dir_slash = "raw/supermetrics/adls_ga_load_times_fr_test/" - flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_slash) - promoted_path = flow.get_promoted_path(env="conformed") - assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv" - - -def test_get_promoted_adls_path_dir(): - adls_path_dir = "raw/supermetrics/adls_ga_load_times_fr_test" - flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir) - promoted_path = flow.get_promoted_path(env="conformed") - assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv" - - -def test_get_promoted_adls_path_dir_starts_with_slash(): - adls_path_dir_starts_with_slash = "/raw/supermetrics/adls_ga_load_times_fr_test/" - flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_starts_with_slash) - promoted_path = flow.get_promoted_path(env="conformed") - assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv" - - def test_df_to_csv_task(): d = {"col1": ["rat", "\tdog"], "col2": ["cat", 4]} df = pd.DataFrame(data=d) diff --git a/tests/test_viadot.py b/tests/test_viadot.py index 71c3d6187..29cd4e622 100644 --- a/tests/test_viadot.py +++ b/tests/test_viadot.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == "0.4.25" + assert __version__ == "0.4.26" diff --git a/viadot/__init__.py b/viadot/__init__.py index 1cc3baa70..9c8003d45 100644 --- a/viadot/__init__.py +++ b/viadot/__init__.py @@ -1 +1 @@ -__version__ = "0.4.25" +__version__ = "0.4.26" diff --git a/viadot/flows/adls_to_azure_sql.py b/viadot/flows/adls_to_azure_sql.py index c12cc7e1d..dd467ea86 100644 --- a/viadot/flows/adls_to_azure_sql.py +++ b/viadot/flows/adls_to_azure_sql.py @@ -221,8 +221,6 @@ def __init__( self.overwrite_adls = overwrite_adls self.if_empty = if_empty self.adls_sp_credentials_secret = adls_sp_credentials_secret - self.adls_path_conformed = self.get_promoted_path(env="conformed") - self.adls_path_operations = self.get_promoted_path(env="operations") # AzureSQLCreateTable self.table = table @@ -257,20 +255,6 @@ def _map_if_exists(if_exists: str) -> str: def slugify(name): return name.replace(" ", "_").lower() - def get_promoted_path(self, env: str) -> str: - adls_path_clean = self.adls_path.strip("/") - extension = adls_path_clean.split(".")[-1].strip() - if extension == "parquet": - file_name = adls_path_clean.split("/")[-2] + ".csv" - common_path = "/".join(adls_path_clean.split("/")[1:-2]) - else: - file_name = adls_path_clean.split("/")[-1] - common_path = "/".join(adls_path_clean.split("/")[1:-1]) - - promoted_path = os.path.join(env, common_path, file_name) - - return promoted_path - def gen_flow(self) -> Flow: lake_to_df_task = AzureDataLakeToDF(timeout=self.timeout) df = lake_to_df_task.bind( @@ -327,22 +311,6 @@ def gen_flow(self) -> Flow: flow=self, ) - promote_to_conformed_task = AzureDataLakeCopy(timeout=self.timeout) - promote_to_conformed_task.bind( - from_path=self.adls_path, - to_path=self.adls_path_conformed, - sp_credentials_secret=self.adls_sp_credentials_secret, - vault_name=self.vault_name, - flow=self, - ) - promote_to_operations_task = AzureDataLakeCopy(timeout=self.timeout) - promote_to_operations_task.bind( - from_path=self.adls_path_conformed, - to_path=self.adls_path_operations, - sp_credentials_secret=self.adls_sp_credentials_secret, - vault_name=self.vault_name, - flow=self, - ) create_table_task = AzureSQLCreateTable(timeout=self.timeout) create_table_task.bind( schema=self.schema, @@ -368,13 +336,9 @@ def gen_flow(self) -> Flow: # data validation function (optional) if self.validate_df_dict: validate_df.bind(df=df, tests=self.validate_df_dict, flow=self) - validate_df.set_upstream(lake_to_df_task, flow=self) + df_reorder.set_upstream(validate_df, flow=self) - df_reorder.set_upstream(lake_to_df_task, flow=self) + df_to_csv.set_upstream(dtypes, flow=self) df_to_csv.set_upstream(df_reorder, flow=self) - promote_to_conformed_task.set_upstream(df_to_csv, flow=self) create_table_task.set_upstream(df_to_csv, flow=self) - promote_to_operations_task.set_upstream( - promote_to_conformed_task, flow=self - ) bulk_insert_task.set_upstream(create_table_task, flow=self) diff --git a/viadot/tasks/azure_sql.py b/viadot/tasks/azure_sql.py index b6481975a..c4bc00484 100644 --- a/viadot/tasks/azure_sql.py +++ b/viadot/tasks/azure_sql.py @@ -64,7 +64,7 @@ def run( table: str, dtypes: Dict[str, Any], sep: str = None, - if_exists: Literal = ["fail", "replace", "append", "delete"], + if_exists: Literal["fail", "replace", "append", "delete"] = "fail", ): """ Create a table from an Azure Blob object. @@ -75,8 +75,8 @@ def run( schema (str): Destination schema. table (str): Destination table. dtypes (Dict[str, Any]): Data types to force. - sep (str): The separator to use to read the CSV file. - if_exists (Literal, optional): What to do if the table already exists. + sep (str, optional): The separator to use to read the CSV file. Defaults to None. + if_exists (Literal["fail", "replace", "append", "delete"], optional): What to do if the table already exists. Defaults to "fail". """ fqn = f"{schema}.{table}" if schema else table