Skip to content

Commit

Permalink
Merge branch 'dev' into sap_rfc_local_config_to_key_voult_change
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-wojcik authored Apr 2, 2024
2 parents f0d218a + c935cf6 commit 922ae5a
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 91 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added option for sap_rfc connector to get credentials from Azure KeyVault or directly passing dictionary inside flow.

### Fixed

### Fixed
- Fixed the `if_exists` parameter definition in the `CreateTableFromBlob` task.
- Changed `requirements.txt` to level up version of `dbt-sqlserver` in order to fix bug with `MAXRECURSION` error in dbt_run


### Changed

### Removed
- Removed `dbt-core==1.3.2` from `requirements.txt`

## [0.4.25] - 2024-01-30
### Added
Expand Down
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ aiolimiter==1.0.0
protobuf>=3.19.0, <3.20
avro-python3==1.10.2
pygit2>=1.10.1, <1.11.0
dbt-core==1.3.2
dbt-sqlserver==1.3.1
dbt-sqlserver @ git+https://github.com/djagoda881/[email protected]_option_clause
lumaCLI==0.0.19
Office365-REST-Python-Client==2.4.4
TM1py==1.11.3
Expand Down
45 changes: 0 additions & 45 deletions tests/integration/flows/test_adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,51 +9,6 @@
from viadot.flows.adls_to_azure_sql import check_dtypes_sort, df_to_csv_task


def test_get_promoted_adls_path_csv_file():
adls_path_file = "raw/supermetrics/adls_ga_load_times_fr_test/2021-07-14T13%3A09%3A02.997357%2B00%3A00.csv"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_file)
promoted_path = flow.get_promoted_path(env="conformed")
assert (
promoted_path
== "conformed/supermetrics/adls_ga_load_times_fr_test/2021-07-14T13%3A09%3A02.997357%2B00%3A00.csv"
)


def test_get_promoted_adls_path_parquet_file():
adls_path_file = "raw/supermetrics/adls_ga_load_times_fr_test/2021-07-14T13%3A09%3A02.997357%2B00%3A00.parquet"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_file)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_get_promoted_adls_path_file_starts_with_slash():
adls_path_dir_starts_with_slash = "/raw/supermetrics/adls_ga_load_times_fr_test/"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_starts_with_slash)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_get_promoted_adls_path_dir_slash():
adls_path_dir_slash = "raw/supermetrics/adls_ga_load_times_fr_test/"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_slash)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_get_promoted_adls_path_dir():
adls_path_dir = "raw/supermetrics/adls_ga_load_times_fr_test"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_get_promoted_adls_path_dir_starts_with_slash():
adls_path_dir_starts_with_slash = "/raw/supermetrics/adls_ga_load_times_fr_test/"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_starts_with_slash)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_df_to_csv_task():
d = {"col1": ["rat", "\tdog"], "col2": ["cat", 4]}
df = pd.DataFrame(data=d)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_viadot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


def test_version():
assert __version__ == "0.4.25"
assert __version__ == "0.4.26"
2 changes: 1 addition & 1 deletion viadot/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.25"
__version__ = "0.4.26"
40 changes: 2 additions & 38 deletions viadot/flows/adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,6 @@ def __init__(
self.overwrite_adls = overwrite_adls
self.if_empty = if_empty
self.adls_sp_credentials_secret = adls_sp_credentials_secret
self.adls_path_conformed = self.get_promoted_path(env="conformed")
self.adls_path_operations = self.get_promoted_path(env="operations")

# AzureSQLCreateTable
self.table = table
Expand Down Expand Up @@ -257,20 +255,6 @@ def _map_if_exists(if_exists: str) -> str:
def slugify(name):
return name.replace(" ", "_").lower()

def get_promoted_path(self, env: str) -> str:
adls_path_clean = self.adls_path.strip("/")
extension = adls_path_clean.split(".")[-1].strip()
if extension == "parquet":
file_name = adls_path_clean.split("/")[-2] + ".csv"
common_path = "/".join(adls_path_clean.split("/")[1:-2])
else:
file_name = adls_path_clean.split("/")[-1]
common_path = "/".join(adls_path_clean.split("/")[1:-1])

promoted_path = os.path.join(env, common_path, file_name)

return promoted_path

def gen_flow(self) -> Flow:
lake_to_df_task = AzureDataLakeToDF(timeout=self.timeout)
df = lake_to_df_task.bind(
Expand Down Expand Up @@ -327,22 +311,6 @@ def gen_flow(self) -> Flow:
flow=self,
)

promote_to_conformed_task = AzureDataLakeCopy(timeout=self.timeout)
promote_to_conformed_task.bind(
from_path=self.adls_path,
to_path=self.adls_path_conformed,
sp_credentials_secret=self.adls_sp_credentials_secret,
vault_name=self.vault_name,
flow=self,
)
promote_to_operations_task = AzureDataLakeCopy(timeout=self.timeout)
promote_to_operations_task.bind(
from_path=self.adls_path_conformed,
to_path=self.adls_path_operations,
sp_credentials_secret=self.adls_sp_credentials_secret,
vault_name=self.vault_name,
flow=self,
)
create_table_task = AzureSQLCreateTable(timeout=self.timeout)
create_table_task.bind(
schema=self.schema,
Expand All @@ -368,13 +336,9 @@ def gen_flow(self) -> Flow:
# data validation function (optional)
if self.validate_df_dict:
validate_df.bind(df=df, tests=self.validate_df_dict, flow=self)
validate_df.set_upstream(lake_to_df_task, flow=self)
df_reorder.set_upstream(validate_df, flow=self)

df_reorder.set_upstream(lake_to_df_task, flow=self)
df_to_csv.set_upstream(dtypes, flow=self)
df_to_csv.set_upstream(df_reorder, flow=self)
promote_to_conformed_task.set_upstream(df_to_csv, flow=self)
create_table_task.set_upstream(df_to_csv, flow=self)
promote_to_operations_task.set_upstream(
promote_to_conformed_task, flow=self
)
bulk_insert_task.set_upstream(create_table_task, flow=self)
6 changes: 3 additions & 3 deletions viadot/tasks/azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def run(
table: str,
dtypes: Dict[str, Any],
sep: str = None,
if_exists: Literal = ["fail", "replace", "append", "delete"],
if_exists: Literal["fail", "replace", "append", "delete"] = "fail",
):
"""
Create a table from an Azure Blob object.
Expand All @@ -75,8 +75,8 @@ def run(
schema (str): Destination schema.
table (str): Destination table.
dtypes (Dict[str, Any]): Data types to force.
sep (str): The separator to use to read the CSV file.
if_exists (Literal, optional): What to do if the table already exists.
sep (str, optional): The separator to use to read the CSV file. Defaults to None.
if_exists (Literal["fail", "replace", "append", "delete"], optional): What to do if the table already exists. Defaults to "fail".
"""

fqn = f"{schema}.{table}" if schema else table
Expand Down

0 comments on commit 922ae5a

Please sign in to comment.