From 8f5348af799fc63f94fa9be7682d325d5954356d Mon Sep 17 00:00:00 2001 From: dominikjedlinski Date: Tue, 24 Oct 2023 16:56:31 +0200 Subject: [PATCH 1/8] added df validation to aselite --- .../integration/flows/test_aselite_to_adls.py | 62 ++++++++++++++++++- viadot/flows/aselite_to_adls.py | 16 ++++- 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/tests/integration/flows/test_aselite_to_adls.py b/tests/integration/flows/test_aselite_to_adls.py index 48146c293..32a31aca7 100644 --- a/tests/integration/flows/test_aselite_to_adls.py +++ b/tests/integration/flows/test_aselite_to_adls.py @@ -8,7 +8,7 @@ from prefect.tasks.secrets import PrefectSecret from viadot.flows.aselite_to_adls import ASELiteToADLS -from viadot.task_utils import df_converts_bytes_to_int, df_to_csv +from viadot.task_utils import df_converts_bytes_to_int, df_to_csv, validate_df from viadot.tasks import AzureDataLakeUpload from viadot.tasks.aselite import ASELiteToDF @@ -62,3 +62,63 @@ def test_aselite_to_adls(): assert MAIN_DF.shape == (10, 17) os.remove(TMP_FILE_NAME) + + +def test_aselite_to_adls_validate_df(): + credentials_secret = PrefectSecret("aselite").run() + vault_name = PrefectSecret("AZURE_DEFAULT_KEYVAULT").run() + + query_designer = """SELECT TOP 10 [ID] + ,[SpracheText] + ,[SpracheKat] + ,[SpracheMM] + ,[KatSprache] + ,[KatBasisSprache] + ,[CodePage] + ,[Font] + ,[Neu] + ,[Upd] + ,[UpdL] + ,[LosKZ] + ,[AstNr] + ,[KomKz] + ,[RKZ] + ,[ParentLanguageNo] + ,[UPD_FIELD] + FROM [UCRMDEV].[dbo].[CRM_00]""" + + validate_df_dict = { + "column_size": {"ParentLanguageNo": 1}, + "column_unique_values": ["ID"], + "column_list_to_match": [ + "SpracheText", + "SpracheMM", + "KatSprache", + "KatBasisSprache", + "CodePage", + ], + "dataset_row_count": {"min": 10, "max": 10}, + "column_match_regex": {"SpracheText", r"TE_.*"}, + } + + flow = ASELiteToADLS( + "Test flow", + query=query_designer, + sqldb_credentials_secret=credentials_secret, + vault_name=vault_name, + file_path=TMP_FILE_NAME, + validate_df_dict=validate_df_dict, + to_path="raw/supermetrics/mp/result_df_flow_at_des_m.csv", + run_config=None, + ) + + result = flow.run() + assert result.is_successful() + + MAIN_DF = pd.read_csv(TMP_FILE_NAME, delimiter="\t") + + assert isinstance(MAIN_DF, pd.DataFrame) == True + + assert MAIN_DF.shape == (10, 17) + + os.remove(TMP_FILE_NAME) diff --git a/viadot/flows/aselite_to_adls.py b/viadot/flows/aselite_to_adls.py index 86e9b215b..8fcdaddb3 100644 --- a/viadot/flows/aselite_to_adls.py +++ b/viadot/flows/aselite_to_adls.py @@ -2,7 +2,12 @@ from prefect import Flow -from viadot.task_utils import df_clean_column, df_converts_bytes_to_int, df_to_csv +from viadot.task_utils import ( + df_clean_column, + df_converts_bytes_to_int, + df_to_csv, + validate_df, +) from viadot.tasks import AzureDataLakeUpload from viadot.tasks.aselite import ASELiteToDF @@ -19,6 +24,7 @@ def __init__( to_path: str = None, if_exists: Literal["replace", "append", "delete"] = "replace", overwrite: bool = True, + validate_df_dict: dict = None, convert_bytes: bool = False, sp_credentials_secret: str = None, remove_special_characters: bool = None, @@ -41,6 +47,7 @@ def __init__( to_path (str): The path to an ADLS file. Defaults to None. if_exists (Literal, optional): What to do if the table exists. Defaults to "replace". overwrite (str, optional): Whether to overwrite the destination file. Defaults to True. + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. remove_special_characters (str, optional): Call a function that remove special characters like escape symbols. Defaults to None. @@ -53,6 +60,7 @@ def __init__( self.sqldb_credentials_secret = sqldb_credentials_secret self.vault_name = vault_name self.overwrite = overwrite + self.validate_df_dict = validate_df_dict self.file_path = file_path self.sep = sep @@ -83,6 +91,11 @@ def gen_flow(self) -> Flow: if self.remove_special_characters == True: df = df_clean_column(df, columns_to_clean=self.columns_to_clean, flow=self) + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + create_csv = df_to_csv.bind( df, path=self.file_path, @@ -100,5 +113,6 @@ def gen_flow(self) -> Flow: flow=self, ) + # validation_task.set_upstream(df, flow=self) create_csv.set_upstream(df, flow=self) adls_upload.set_upstream(create_csv, flow=self) From 99cbcecf0157f7347f9495c18e8e5d97c65909a2 Mon Sep 17 00:00:00 2001 From: dominikjedlinski Date: Wed, 25 Oct 2023 13:53:05 +0200 Subject: [PATCH 2/8] added validate_df --- .../flows/test_salesforce_to_adls.py | 27 +++++++++++++++++++ viadot/flows/salesforce_to_adls.py | 12 +++++++++ 2 files changed, 39 insertions(+) diff --git a/tests/integration/flows/test_salesforce_to_adls.py b/tests/integration/flows/test_salesforce_to_adls.py index 60ec41aaa..ec68a1227 100644 --- a/tests/integration/flows/test_salesforce_to_adls.py +++ b/tests/integration/flows/test_salesforce_to_adls.py @@ -4,6 +4,7 @@ from viadot.flows import SalesforceToADLS from viadot.tasks import AzureDataLakeRemove +from viadot.exceptions import ValidationError ADLS_FILE_NAME = "test_salesforce.parquet" ADLS_DIR_PATH = "raw/tests/" @@ -32,3 +33,29 @@ def test_salesforce_to_adls(): vault_name="azuwevelcrkeyv001s", ) rm.run(sp_credentials_secret=credentials_secret) + + +def test_salesforce_to_adls_validate_success(): + credentials_secret = PrefectSecret( + "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET" + ).run() + + flow = SalesforceToADLS( + "test_salesforce_to_adls_run_flow", + query="SELECT IsDeleted, FiscalYear FROM Opportunity LIMIT 50", + adls_sp_credentials_secret=credentials_secret, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + validate_df_dict={"column_list_to_match": ["IsDeleted", "FiscalYear"]}, + ) + + result = flow.run() + assert result.is_successful() + + os.remove("test_salesforce_to_adls_run_flow.parquet") + os.remove("test_salesforce_to_adls_run_flow.json") + rm = AzureDataLakeRemove( + path=ADLS_DIR_PATH + ADLS_FILE_NAME, + vault_name="azuwevelcrkeyv001s", + ) + rm.run(sp_credentials_secret=credentials_secret) diff --git a/viadot/flows/salesforce_to_adls.py b/viadot/flows/salesforce_to_adls.py index fe84be381..1ace9aa5a 100644 --- a/viadot/flows/salesforce_to_adls.py +++ b/viadot/flows/salesforce_to_adls.py @@ -16,6 +16,7 @@ df_to_parquet, dtypes_to_json_task, update_dtypes_dict, + validate_df, ) from viadot.tasks import AzureDataLakeUpload, SalesforceToDF @@ -41,6 +42,7 @@ def __init__( adls_file_name: str = None, adls_sp_credentials_secret: str = None, if_exists: str = "replace", + validate_df_dict: Dict[str, Any] = None, timeout: int = 3600, *args: List[Any], **kwargs: Dict[str, Any], @@ -70,6 +72,8 @@ def __init__( ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. Defaults to None. if_exists (str, optional): What to do if the file exists. Defaults to "replace". + validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -82,6 +86,7 @@ def __init__( self.env = env self.vault_name = vault_name self.credentials_secret = credentials_secret + self.validate_df_dict = validate_df_dict # AzureDataLakeUpload self.adls_sp_credentials_secret = adls_sp_credentials_secret @@ -135,6 +140,13 @@ def gen_flow(self) -> Flow: df_clean = df_clean_column.bind(df=df, flow=self) df_with_metadata = add_ingestion_metadata_task.bind(df_clean, flow=self) dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) + + if self.validate_df_dict: + validation_task = validate_df.bind( + df, tests=self.validate_df_dict, flow=self + ) + validation_task.set_upstream(df, flow=self) + df_to_be_loaded = df_map_mixed_dtypes_for_parquet( df_with_metadata, dtypes_dict, flow=self ) From c9cc2005a1202395cea04500fcdfc24231b5165c Mon Sep 17 00:00:00 2001 From: dominikjedlinski Date: Wed, 25 Oct 2023 14:39:23 +0200 Subject: [PATCH 3/8] updated validation_task for aselite --- tests/integration/flows/test_aselite_to_adls.py | 11 ++--------- viadot/flows/aselite_to_adls.py | 7 ++++--- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/tests/integration/flows/test_aselite_to_adls.py b/tests/integration/flows/test_aselite_to_adls.py index 32a31aca7..60022f47e 100644 --- a/tests/integration/flows/test_aselite_to_adls.py +++ b/tests/integration/flows/test_aselite_to_adls.py @@ -64,7 +64,7 @@ def test_aselite_to_adls(): os.remove(TMP_FILE_NAME) -def test_aselite_to_adls_validate_df(): +def test_aselite_to_adls_validate_success(): credentials_secret = PrefectSecret("aselite").run() vault_name = PrefectSecret("AZURE_DEFAULT_KEYVAULT").run() @@ -90,14 +90,7 @@ def test_aselite_to_adls_validate_df(): validate_df_dict = { "column_size": {"ParentLanguageNo": 1}, "column_unique_values": ["ID"], - "column_list_to_match": [ - "SpracheText", - "SpracheMM", - "KatSprache", - "KatBasisSprache", - "CodePage", - ], - "dataset_row_count": {"min": 10, "max": 10}, + "dataset_row_count": {"min": 0, "max": 10}, "column_match_regex": {"SpracheText", r"TE_.*"}, } diff --git a/viadot/flows/aselite_to_adls.py b/viadot/flows/aselite_to_adls.py index 8fcdaddb3..bd77cf40f 100644 --- a/viadot/flows/aselite_to_adls.py +++ b/viadot/flows/aselite_to_adls.py @@ -24,7 +24,7 @@ def __init__( to_path: str = None, if_exists: Literal["replace", "append", "delete"] = "replace", overwrite: bool = True, - validate_df_dict: dict = None, + validate_df_dict: Dict[str, Any] = None, convert_bytes: bool = False, sp_credentials_secret: str = None, remove_special_characters: bool = None, @@ -47,7 +47,8 @@ def __init__( to_path (str): The path to an ADLS file. Defaults to None. if_exists (Literal, optional): What to do if the table exists. Defaults to "replace". overwrite (str, optional): Whether to overwrite the destination file. Defaults to True. - validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. + validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output + dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. remove_special_characters (str, optional): Call a function that remove special characters like escape symbols. Defaults to None. @@ -95,6 +96,7 @@ def gen_flow(self) -> Flow: validation_task = validate_df.bind( df, tests=self.validate_df_dict, flow=self ) + validation_task.set_upstream(df, flow=self) create_csv = df_to_csv.bind( df, @@ -113,6 +115,5 @@ def gen_flow(self) -> Flow: flow=self, ) - # validation_task.set_upstream(df, flow=self) create_csv.set_upstream(df, flow=self) adls_upload.set_upstream(create_csv, flow=self) From f41f523a001ca8b5427e310ce5860c3fd2f580f5 Mon Sep 17 00:00:00 2001 From: Angelika Tarnawa Date: Wed, 25 Oct 2023 16:16:58 +0200 Subject: [PATCH 4/8] =?UTF-8?q?=E2=9C=A8=20Added=20validate=5Fdf=5Fdict=20?= =?UTF-8?q?parameter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/flows/genesys_to_adls.py | 7 ++++++- viadot/tasks/genesys.py | 11 +++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/viadot/flows/genesys_to_adls.py b/viadot/flows/genesys_to_adls.py index 4f8c54f3e..830c02c71 100644 --- a/viadot/flows/genesys_to_adls.py +++ b/viadot/flows/genesys_to_adls.py @@ -5,7 +5,7 @@ import pandas as pd from prefect import Flow, task -from viadot.task_utils import add_ingestion_metadata_task, adls_bulk_upload +from viadot.task_utils import add_ingestion_metadata_task, adls_bulk_upload, validate_df from viadot.tasks.genesys import GenesysToCSV @@ -95,6 +95,7 @@ def __init__( overwrite_adls: bool = True, adls_sp_credentials_secret: str = None, credentials_genesys: Dict[str, Any] = None, + validate_df_dict: Dict[str, Any] = None, timeout: int = 3600, *args: List[any], **kwargs: Dict[str, Any], @@ -143,6 +144,8 @@ def __init__( adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. credentials(dict, optional): Credentials for the genesys api. Defaults to None. + validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers + the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -165,6 +168,7 @@ def __init__( self.start_date = start_date self.end_date = end_date self.sep = sep + self.validate_df_dict = validate_df_dict self.timeout = timeout # AzureDataLake @@ -183,6 +187,7 @@ def gen_flow(self) -> Flow: timeout=self.timeout, local_file_path=self.local_file_path, sep=self.sep, + validate_df_dict=self.validate_df_dict, ) file_names = to_csv.bind( diff --git a/viadot/tasks/genesys.py b/viadot/tasks/genesys.py index f39bff6d2..b7386d815 100644 --- a/viadot/tasks/genesys.py +++ b/viadot/tasks/genesys.py @@ -10,6 +10,7 @@ from prefect.engine import signals from prefect.utilities import logging from prefect.utilities.tasks import defaults_from_attrs +from viadot.task_utils import validate_df from viadot.exceptions import APIError from viadot.sources import Genesys @@ -33,6 +34,7 @@ def __init__( conversationId_list: List[str] = None, key_list: List[str] = None, credentials_genesys: Dict[str, Any] = None, + validate_df_dict: Dict[str, Any] = None, timeout: int = 3600, *args: List[Any], **kwargs: Dict[str, Any], @@ -54,6 +56,8 @@ def __init__( sep (str, optional): Separator in csv file. Defaults to "\t". conversationId_list (List[str], optional): List of conversationId passed as attribute of GET method. Defaults to None. key_list (List[str], optional): List of keys needed to specify the columns in the GET request method. Defaults to None. + validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers + the `validate_df` task from task_utils. Defaults to None. timeout(int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. """ @@ -72,6 +76,7 @@ def __init__( self.sep = sep self.conversationId_list = conversationId_list self.key_list = key_list + self.validate_df_dict = validate_df_dict super().__init__( name=self.report_name, @@ -293,6 +298,7 @@ def merge_conversations_dfs(self, data_to_merge: list) -> DataFrame: "credentials_genesys", "conversationId_list", "key_list", + "validate_df_dict", ) def run( self, @@ -309,6 +315,7 @@ def run( conversationId_list: List[str] = None, key_list: List[str] = None, credentials_genesys: Dict[str, Any] = None, + validate_df_dict: Dict[str, Any] = None, ) -> List[str]: """ Task for downloading data from the Genesys API to DF. @@ -327,6 +334,8 @@ def run( report_columns (List[str], optional): List of exisiting column in report. Defaults to None. conversationId_list (List[str], optional): List of conversationId passed as attribute of GET method. Defaults to None. key_list (List[str], optional): List of keys needed to specify the columns in the GET request method. Defaults to None. + validate_df_dict (Dict[str,Any], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers + the `validate_df` task from task_utils. Defaults to None. Returns: List[str]: List of file names. @@ -488,6 +497,8 @@ def run( end = end_date.replace("-", "") file_name = f"WEBMESSAGE_{start}-{end}.csv" + if validate_df_dict: + validate_df(df=df, tests=validate_df_dict) df.to_csv( os.path.join(file_name), index=False, From 4f482a7bbdc2d6c21c0da2eebeb60ac8289fd16a Mon Sep 17 00:00:00 2001 From: Angelika Tarnawa Date: Wed, 25 Oct 2023 16:31:05 +0200 Subject: [PATCH 5/8] =?UTF-8?q?=F0=9F=8E=A8=20changed=20import?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/tasks/genesys.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/viadot/tasks/genesys.py b/viadot/tasks/genesys.py index b7386d815..6588810be 100644 --- a/viadot/tasks/genesys.py +++ b/viadot/tasks/genesys.py @@ -10,7 +10,7 @@ from prefect.engine import signals from prefect.utilities import logging from prefect.utilities.tasks import defaults_from_attrs -from viadot.task_utils import validate_df +from viadot.task_utils import * from viadot.exceptions import APIError from viadot.sources import Genesys From a8de722acd31887024f27aa4d658895a9507b1ab Mon Sep 17 00:00:00 2001 From: Angelika Tarnawa Date: Thu, 26 Oct 2023 08:42:32 +0200 Subject: [PATCH 6/8] =?UTF-8?q?=F0=9F=8E=A8=20Updated=20validate=5Fdf?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/tasks/genesys.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/viadot/tasks/genesys.py b/viadot/tasks/genesys.py index 6588810be..7eec313b6 100644 --- a/viadot/tasks/genesys.py +++ b/viadot/tasks/genesys.py @@ -459,7 +459,8 @@ def run( date = start_date.replace("-", "") file_name = f"conversations_detail_{date}".upper() + ".csv" - + if validate_df_dict: + validate_df(df=final_df, tests=validate_df_dict) final_df.to_csv( os.path.join(self.local_file_path, file_name), index=False, From 5e49816e20487c2da7424aa097108e147f02f211 Mon Sep 17 00:00:00 2001 From: Angelika Tarnawa Date: Thu, 26 Oct 2023 08:44:25 +0200 Subject: [PATCH 7/8] =?UTF-8?q?=F0=9F=8E=A8=20Added=20run=20to=20task?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/tasks/genesys.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/viadot/tasks/genesys.py b/viadot/tasks/genesys.py index 7eec313b6..de47ddebf 100644 --- a/viadot/tasks/genesys.py +++ b/viadot/tasks/genesys.py @@ -460,7 +460,7 @@ def run( date = start_date.replace("-", "") file_name = f"conversations_detail_{date}".upper() + ".csv" if validate_df_dict: - validate_df(df=final_df, tests=validate_df_dict) + validate_df.run(df=final_df, tests=validate_df_dict) final_df.to_csv( os.path.join(self.local_file_path, file_name), index=False, @@ -499,7 +499,7 @@ def run( file_name = f"WEBMESSAGE_{start}-{end}.csv" if validate_df_dict: - validate_df(df=df, tests=validate_df_dict) + validate_df.run(df=df, tests=validate_df_dict) df.to_csv( os.path.join(file_name), index=False, From ee2071dcf5ad68742f8a451b76ba8f1e745e5835 Mon Sep 17 00:00:00 2001 From: dominikjedlinski Date: Thu, 26 Oct 2023 12:28:26 +0200 Subject: [PATCH 8/8] reverted test_aselite_to_adls --- .../integration/flows/test_aselite_to_adls.py | 55 +------------------ 1 file changed, 1 insertion(+), 54 deletions(-) diff --git a/tests/integration/flows/test_aselite_to_adls.py b/tests/integration/flows/test_aselite_to_adls.py index 60022f47e..48146c293 100644 --- a/tests/integration/flows/test_aselite_to_adls.py +++ b/tests/integration/flows/test_aselite_to_adls.py @@ -8,7 +8,7 @@ from prefect.tasks.secrets import PrefectSecret from viadot.flows.aselite_to_adls import ASELiteToADLS -from viadot.task_utils import df_converts_bytes_to_int, df_to_csv, validate_df +from viadot.task_utils import df_converts_bytes_to_int, df_to_csv from viadot.tasks import AzureDataLakeUpload from viadot.tasks.aselite import ASELiteToDF @@ -62,56 +62,3 @@ def test_aselite_to_adls(): assert MAIN_DF.shape == (10, 17) os.remove(TMP_FILE_NAME) - - -def test_aselite_to_adls_validate_success(): - credentials_secret = PrefectSecret("aselite").run() - vault_name = PrefectSecret("AZURE_DEFAULT_KEYVAULT").run() - - query_designer = """SELECT TOP 10 [ID] - ,[SpracheText] - ,[SpracheKat] - ,[SpracheMM] - ,[KatSprache] - ,[KatBasisSprache] - ,[CodePage] - ,[Font] - ,[Neu] - ,[Upd] - ,[UpdL] - ,[LosKZ] - ,[AstNr] - ,[KomKz] - ,[RKZ] - ,[ParentLanguageNo] - ,[UPD_FIELD] - FROM [UCRMDEV].[dbo].[CRM_00]""" - - validate_df_dict = { - "column_size": {"ParentLanguageNo": 1}, - "column_unique_values": ["ID"], - "dataset_row_count": {"min": 0, "max": 10}, - "column_match_regex": {"SpracheText", r"TE_.*"}, - } - - flow = ASELiteToADLS( - "Test flow", - query=query_designer, - sqldb_credentials_secret=credentials_secret, - vault_name=vault_name, - file_path=TMP_FILE_NAME, - validate_df_dict=validate_df_dict, - to_path="raw/supermetrics/mp/result_df_flow_at_des_m.csv", - run_config=None, - ) - - result = flow.run() - assert result.is_successful() - - MAIN_DF = pd.read_csv(TMP_FILE_NAME, delimiter="\t") - - assert isinstance(MAIN_DF, pd.DataFrame) == True - - assert MAIN_DF.shape == (10, 17) - - os.remove(TMP_FILE_NAME)