diff --git a/CHANGELOG.md b/CHANGELOG.md index fb82fdead..7730d3950 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed ### Removed +- Removed `VidClub` related code and tests. ## [0.4.26] - 2024-04-11 diff --git a/tests/integration/flows/test_vidclub_to_adls.py b/tests/integration/flows/test_vidclub_to_adls.py deleted file mode 100644 index c3a7dcaf4..000000000 --- a/tests/integration/flows/test_vidclub_to_adls.py +++ /dev/null @@ -1,88 +0,0 @@ -import os -from unittest import mock - -import pandas as pd -import pytest - -from viadot.exceptions import ValidationError -from viadot.flows import VidClubToADLS - -DATA = {"col1": ["aaa", "bbb", "ccc"], "col2": [11, 22, 33]} -ADLS_FILE_NAME = "test_vid_club.parquet" -ADLS_FILE_NAME2 = "test_vid_club_validate_df.parquet" -ADLS_DIR_PATH = "raw/test/" - - -@mock.patch( - "viadot.tasks.VidClubToDF.run", - return_value=pd.DataFrame(data=DATA), -) -@pytest.mark.run -def test_vidclub_to_adls_run_flow(mocked_class): - flow = VidClubToADLS( - "test_vidclub_to_adls_flow_run", - source=["test"], - from_date="2023-06-05", - overwrite_adls=True, - adls_dir_path=ADLS_DIR_PATH, - adls_file_name=ADLS_FILE_NAME, - ) - result = flow.run() - assert result.is_successful() - os.remove("test_vidclub_to_adls_flow_run.parquet") - os.remove("test_vidclub_to_adls_flow_run.json") - - -def test_vidclub_validate_df_task_success(caplog): - flow = VidClubToADLS( - "test_vidclub_validate_df_task_success", - source="product", - cols_to_drop=[ - "submissionProductID", - "submissionProductDate", - "brand", - "productCode", - ], - from_date="2023-10-25", - to_date="2023-10-25", - adls_dir_path="raw/tests", - adls_file_name="test.parquet", - overwrite_adls=True, - validate_df_dict={ - "column_size": {"submissionID": 5}, - "column_list_to_match": [ - "submissionID", - "regionID", - "productQuantity", - "unit", - ], - }, - ) - - result = flow.run() - assert result.is_successful() - - -def test_vidclub_validate_df_task_fail(caplog): - flow = VidClubToADLS( - "test_vidclub_validate_df_task_fail", - source="product", - cols_to_drop=[ - "submissionProductID", - "submissionProductDate", - "brand", - "productCode", - ], - from_date="2023-10-25", - to_date="2023-10-25", - adls_dir_path="raw/tests", - adls_file_name="test.parquet", - overwrite_adls=True, - validate_df_dict={ - "column_size": {"submissionID": 5}, - "column_unique_values": ["id"], - }, - ) - - result = flow.run() - assert result.is_failed() diff --git a/tests/integration/tasks/test_vid_club.py b/tests/integration/tasks/test_vid_club.py deleted file mode 100644 index 6ba849c13..000000000 --- a/tests/integration/tasks/test_vid_club.py +++ /dev/null @@ -1,120 +0,0 @@ -from unittest import mock - -import pandas as pd -import pytest - -from viadot.task_utils import credentials_loader -from viadot.tasks import VidClubToDF - -CREDENTIALS = credentials_loader.run(credentials_secret="VIDCLUB") - - -class MockVidClubResponse: - response_data = pd.DataFrame() - - -@pytest.fixture(scope="session") -def var_dictionary(): - variables = { - "source": "jobs", - "from_date": "2022-03-23", - "to_date": "2022-03-24", - "items_per_page": 1, - "days_interval": 1, - } - yield variables - - -@mock.patch( - "viadot.tasks.VidClubToDF.run", return_value=MockVidClubResponse.response_data -) -def test_vid_club_to_df(var_dictionary): - """ - Checks if run method returns DataFrame. - - Args: - var_dictionary: Dictionary with example arguments for run method. - """ - vc_to_df = VidClubToDF(credentials=CREDENTIALS) - - df = vc_to_df.run( - source=var_dictionary["source"], - to_date=var_dictionary["to_date"], - from_date=var_dictionary["from_date"], - items_per_page=var_dictionary["items_per_page"], - days_interval=var_dictionary["days_interval"], - ) - - assert isinstance(df, pd.DataFrame) - - -@pytest.mark.drop_cols -def test_drop_columns(var_dictionary): - """ - Tests cols_to_drop argument in function. - - Args: - var_dictionary: Dictionary with example arguments for run method. - """ - cols_to_drop = ["__v", "status"] - vc_to_df = VidClubToDF(credentials=CREDENTIALS) - - output_with_dropped = vc_to_df.run( - source=var_dictionary["source"], - to_date=var_dictionary["to_date"], - from_date=var_dictionary["from_date"], - items_per_page=var_dictionary["items_per_page"], - days_interval=var_dictionary["days_interval"], - cols_to_drop=cols_to_drop, - ) - - assert all(col not in output_with_dropped.columns for col in cols_to_drop) - - -@pytest.mark.drop_cols -def test_drop_columns_KeyError(var_dictionary, caplog): - """ - Tests if in case of KeyError (when passed columns in cols_to_drop are not included in DataFrame), there is returned error logger.. - - Args: - var_dictionary: Dictionary with example arguments for run method. - """ - cols_to_drop = ["Test", "submissionDate"] - vc_to_df = VidClubToDF(credentials=CREDENTIALS) - - vc_to_df.run( - source=var_dictionary["source"], - to_date=var_dictionary["to_date"], - from_date=var_dictionary["from_date"], - items_per_page=var_dictionary["items_per_page"], - days_interval=var_dictionary["days_interval"], - cols_to_drop=cols_to_drop, - ) - assert len(caplog.records) == 1 - assert caplog.records[0].levelname == "ERROR" - assert ( - f"Column(s): {cols_to_drop} don't exist in the DataFrame" - in caplog.records[0].message - ) - - -@pytest.mark.drop_cols -def test_drop_columns_TypeError(var_dictionary): - """ - Tests raising TypeError if passed columns in cols_to_drop is not a List. - - Args: - var_dictionary: Dictionary with example arguments for run method. - """ - with pytest.raises(TypeError, match="Provide columns to drop in a List."): - cols_to_drop = "Test" - vc_to_df = VidClubToDF(credentials=CREDENTIALS) - - output_with_dropped = vc_to_df.run( - source=var_dictionary["source"], - to_date=var_dictionary["to_date"], - from_date=var_dictionary["from_date"], - items_per_page=var_dictionary["items_per_page"], - days_interval=var_dictionary["days_interval"], - cols_to_drop=cols_to_drop, - ) diff --git a/tests/integration/test_vid_club.py b/tests/integration/test_vid_club.py deleted file mode 100644 index 50c3015cf..000000000 --- a/tests/integration/test_vid_club.py +++ /dev/null @@ -1,243 +0,0 @@ -from datetime import datetime, timedelta -from unittest import mock - -import pandas as pd -import pytest - -from viadot.exceptions import ValidationError -from viadot.sources import VidClub -from viadot.task_utils import credentials_loader - -CREDENTIALS = credentials_loader.run(credentials_secret="VIDCLUB") -vc = VidClub(credentials=CREDENTIALS) - - -@pytest.fixture -def var_dictionary(): - variables = {} - - return variables - - -class MockClass: - status_code = 200 - - def json(): - df = pd.DataFrame() - return df - - -@pytest.mark.init -def test_default_credential_param(): - """ - Checks if credentials are loaded from Azure Key Vault or PrefectSecret or from local config ursing credentials_loader and if it's dictionary type. - """ - assert vc.credentials is not None and isinstance(vc.credentials, dict) - - -@pytest.mark.build_query -def test_build_query_wrong_source(): - """ - Checks if passing different source than Literal["jobs", "product", "company", "survey"] is catched and returns error. - """ - with pytest.raises( - ValidationError, match=r"Pick one these sources: jobs, product, company, survey" - ): - query = vc.build_query( - source="test", - from_date="2023-03-24", - to_date="2023-03-24", - api_url="test", - items_per_page=1, - ) - - -@pytest.mark.build_query -def test_url_string(): - """ - Checks if fucntion generates URL with needed parameters. - """ - source = "jobs" - from_date = "2023-03-24" - to_date = "2023-03-24" - api_url = "https://api/test/" - items_per_page = 1 - - expected_elements = [ - f"from={from_date}", - f"to={to_date}", - f"limit={items_per_page}", - api_url, - ] - - query = vc.build_query( - source=source, - from_date=from_date, - to_date=to_date, - api_url=api_url, - items_per_page=items_per_page, - ) - - assert all(ex in query for ex in expected_elements) - - -@pytest.mark.intervals -def test_intervals_split(): - """ - Checks if prrovided date range with days_interval creates list with expected split. - """ - from_date = "2022-01-01" - to_date = "2022-01-19" - days_interval = 5 - expected_starts = ["2022-01-01", "2022-01-06", "2022-01-11", "2022-01-16"] - expected_ends = ["2022-01-06", "2022-01-11", "2022-01-16", "2022-01-19"] - starts, ends = vc.intervals( - from_date=from_date, to_date=to_date, days_interval=days_interval - ) - - assert starts == expected_starts - assert ends == expected_ends - - -@pytest.mark.connection_check -def test_check_connection(): - """ - Checks if check_connection method returns tuple with dictionary and string. - """ - output = vc.check_connection( - source="jobs", - from_date="2023-03-24", - to_date="2023-03-24", - items_per_page=1, - ) - - response, first_url = vc.check_connection( - source="jobs", - from_date="2023-03-24", - to_date="2023-03-24", - items_per_page=1, - ) - - assert isinstance(output, tuple) - assert isinstance(response, dict) - assert isinstance(first_url, str) - - -@pytest.mark.proper -def test_get_response_wrong_source(): - """ - Checks if ValidationError is returned when passing wrong source name. - """ - with pytest.raises( - ValidationError, match=r"The source has to be: jobs, product, company or survey" - ): - query = vc.get_response(source="test") - - -@mock.patch( - "viadot.sources.vid_club.VidClub.get_response", return_value=MockClass.json() -) -@pytest.mark.parametrize("source", ["jobs", "company", "product", "survey"]) -@pytest.mark.proper -def test_get_response_sources(mock_api_response, source): - """ - Checks if get_response method returnes DataFrame for each of the 4 possible sources. - Test assert that the mock was called exactly once. - - Args: - mock_api_response: Mocked return_value for get_response method. - source: The endpoint source to be accessed. - """ - query = vc.get_response(source=source, to_date="2023-03-24", from_date="2023-03-24") - - assert isinstance(query, pd.DataFrame) - mock_api_response.assert_called_once() - - -@pytest.mark.proper -def test_get_response_wrong_date(): - """ - Checks if ValidationError is returned when passing from_date earlier than 2022-03-22. - """ - with pytest.raises( - ValidationError, match=r"from_date cannot be earlier than 2022-03-22" - ): - vc.get_response(source="jobs", from_date="2021-05-09") - - -@pytest.mark.proper -def test_get_response_wrong_date_range(): - """ - Checks if ValidationError is returned when passing to_date earlier than from_date. - """ - with pytest.raises( - ValidationError, match=r"to_date cannot be earlier than from_date" - ): - vc.get_response(source="jobs", to_date="2022-05-04", from_date="2022-05-05") - - -@pytest.mark.total_load -def test_total_load_for_the_same_dates(): - """ - total_load method includes logic for situation when from_date == to_date. In this scenario interval split is skipped and used just get_response method. - This test checks if this logic is executed without error and returned object if DataFrame. - """ - from_date = "2022-04-01" - to_date = "2022-04-01" - days_interval = 10 - source = "jobs" - df = vc.total_load( - from_date=from_date, to_date=to_date, days_interval=days_interval, source=source - ) - - assert isinstance(df, pd.DataFrame) - - -@pytest.mark.total_load -def test_total_load_for_intervals(): - """ - Checks if interval function is properly looped in the total_load method. At first we check if returned object is DataFrame, - then we check if returned DataFrame for smaller date range contains less rows than DataFrame returned for bigger date range. - """ - from_date = "2022-04-01" - to_date = "2022-04-12" - days_interval = 2 - source = "jobs" - - date_object = datetime.strptime(from_date, "%Y-%m-%d") + timedelta( - days=days_interval - ) - one_interval = date_object.strftime("%Y-%m-%d") - - df = vc.total_load( - from_date=from_date, to_date=to_date, days_interval=days_interval, source=source - ) - df_one_interval = vc.total_load( - from_date=from_date, - to_date=one_interval, - days_interval=days_interval, - source=source, - ) - - assert isinstance(df, pd.DataFrame) - assert len(df) > len(df_one_interval) - - -@pytest.mark.total_load -def test_drop_duplicates(): - """ - Checks logic for dropping duplicated rows, that is included in total_load method. - Test checks if returned DataFrame has duplicates. - """ - from_date = "2022-04-01" - to_date = "2022-04-12" - days_interval = 2 - source = "jobs" - - df = vc.total_load( - from_date=from_date, to_date=to_date, days_interval=days_interval, source=source - ) - dups_mask = df.duplicated() - df_check = df[dups_mask] - - assert len(df_check) == 0 diff --git a/viadot/flows/__init__.py b/viadot/flows/__init__.py index 2f30c04d8..157df4d94 100644 --- a/viadot/flows/__init__.py +++ b/viadot/flows/__init__.py @@ -46,4 +46,3 @@ from .sql_server_to_parquet import SQLServerToParquet from .sql_server_transform import SQLServerTransform from .transform_and_catalog import TransformAndCatalogToLuma -from .vid_club_to_adls import VidClubToADLS diff --git a/viadot/flows/vid_club_to_adls.py b/viadot/flows/vid_club_to_adls.py deleted file mode 100644 index de7267479..000000000 --- a/viadot/flows/vid_club_to_adls.py +++ /dev/null @@ -1,212 +0,0 @@ -import os -from pathlib import Path -from typing import Any, Dict, List, Literal - -import pandas as pd -import pendulum -from prefect import Flow -from prefect.backend import set_key_value -from prefect.utilities import logging - -from viadot.task_utils import ( - add_ingestion_metadata_task, - df_get_data_types_task, - df_map_mixed_dtypes_for_parquet, - df_to_csv, - df_to_parquet, - dtypes_to_json_task, - update_dtypes_dict, - validate_df, -) -from viadot.tasks import AzureDataLakeUpload, VidClubToDF - -logger = logging.get_logger(__name__) - - -class VidClubToADLS(Flow): - def __init__( - self, - name: str, - source: Literal["jobs", "product", "company", "survey"] = None, - from_date: str = "2022-03-22", - to_date: str = None, - items_per_page: int = 100, - region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, - days_interval: int = 30, - cols_to_drop: List[str] = None, - vid_club_credentials: Dict[str, Any] = None, - vidclub_credentials_secret: str = "VIDCLUB", - vidclub_vault_name: str = None, - output_file_extension: str = ".parquet", - adls_dir_path: str = None, - local_file_path: str = None, - adls_file_name: str = None, - vault_name: str = None, - adls_sp_credentials_secret: str = None, - overwrite_adls: bool = False, - if_exists: str = "replace", - validate_df_dict: dict = None, - timeout: int = 3600, - *args: List[Any], - **kwargs: Dict[str, Any] - ): - """ - Flow for downloading data from the Vid Club via API to a CSV or Parquet file. - Then upload it to Azure Data Lake. - - Args: - name (str): The name of the flow. - source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. - from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. - to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. - items_per_page (int, optional): Number of entries per page. Defaults to 100. - region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to None (parameter is not used in url). [December 2023 status: value 'all' does not work for company and jobs] - days_interval (int, optional): Days specified in date range per API call (test showed that 30-40 is optimal for performance). Defaults to 30. - cols_to_drop (List[str], optional): List of columns to drop. Defaults to None. - vid_club_credentials (Dict[str, Any], optional): Stores the credentials information. Defaults to None. - vidclub_credentials_secret (str, optional): The name of the secret in Azure Key Vault or Prefect or local_config file. Defaults to "VIDCLUB". - vidclub_vault_name (str, optional): For Vid Club credentials stored in Azure Key Vault. The name of the vault from which to obtain the secret. Defaults to None. - output_file_extension (str, optional): Output file extension - to allow selection of .csv for data - which is not easy to handle with parquet. Defaults to ".parquet". - adls_dir_path (str, optional): Azure Data Lake destination folder/catalog path. Defaults to None. - local_file_path (str, optional): Local destination path. Defaults to None. - adls_file_name (str, optional): Name of file in ADLS. Defaults to None. - vault_name (str, optional): For ADLS credentials stored in Azure Key Vault. The name of the vault from which to obtain the secret. Defaults to None. - adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with - ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. - Defaults to None. - overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. - if_exists (str, optional): What to do if the file exists. Defaults to "replace". - validate_df_dict (dict, optional): A dictionary with optional list of tests to verify the output - dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None. - timeout (int, optional): The time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. - """ - # VidClubToDF - self.source = source - self.from_date = from_date - self.to_date = to_date - self.items_per_page = items_per_page - self.region = region - self.days_interval = days_interval - self.cols_to_drop = cols_to_drop - self.vid_club_credentials = vid_club_credentials - self.vidclub_credentials_secret = vidclub_credentials_secret - self.vidclub_vault_name = vidclub_vault_name - - # validate_df - self.validate_df_dict = validate_df_dict - - # AzureDataLakeUpload - self.adls_file_name = adls_file_name - self.adls_dir_path = adls_dir_path - self.local_file_path = local_file_path - self.overwrite = overwrite_adls - self.vault_name = vault_name - self.adls_sp_credentials_secret = adls_sp_credentials_secret - self.if_exists = if_exists - self.output_file_extension = output_file_extension - self.timeout = timeout - self.now = str(pendulum.now("utc")) - - self.local_file_path = ( - local_file_path or self.slugify(name) + self.output_file_extension - ) - self.local_json_path = self.slugify(name) + ".json" - self.adls_dir_path = adls_dir_path - - if adls_file_name is not None: - self.adls_file_path = os.path.join(adls_dir_path, adls_file_name) - self.adls_schema_file_dir_file = os.path.join( - adls_dir_path, "schema", Path(adls_file_name).stem + ".json" - ) - else: - self.adls_file_path = os.path.join( - adls_dir_path, self.now + self.output_file_extension - ) - self.adls_schema_file_dir_file = os.path.join( - adls_dir_path, "schema", self.now + ".json" - ) - - super().__init__(*args, name=name, **kwargs) - - self.gen_flow() - - @staticmethod - def slugify(name): - return name.replace(" ", "_").lower() - - def gen_flow(self) -> Flow: - vid_club_df_task = VidClubToDF( - timeout=self.timeout, - source=self.source, - credentials=self.vid_club_credentials, - credentials_secret=self.vidclub_credentials_secret, - vault_name=self.vidclub_vault_name, - ) - - vid_club_df = vid_club_df_task.bind( - from_date=self.from_date, - to_date=self.to_date, - items_per_page=self.items_per_page, - region=self.region, - days_interval=self.days_interval, - cols_to_drop=self.cols_to_drop, - flow=self, - ) - - if self.validate_df_dict: - validation_task = validate_df.bind( - vid_club_df, tests=self.validate_df_dict, flow=self - ) - validation_task.set_upstream(vid_club_df, flow=self) - - df_with_metadata = add_ingestion_metadata_task.bind(vid_club_df, flow=self) - - dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) - df_mapped = df_map_mixed_dtypes_for_parquet.bind( - df_with_metadata, dtypes_dict, flow=self - ) - - if self.output_file_extension == ".parquet": - df_to_file = df_to_parquet.bind( - df=df_mapped, - path=self.local_file_path, - if_exists=self.if_exists, - flow=self, - ) - else: - df_to_file = df_to_csv.bind( - df=df_with_metadata, - path=self.local_file_path, - if_exists=self.if_exists, - flow=self, - ) - - file_to_adls_task = AzureDataLakeUpload(timeout=self.timeout) - file_to_adls_task.bind( - from_path=self.local_file_path, - to_path=self.adls_file_path, - overwrite=self.overwrite, - sp_credentials_secret=self.adls_sp_credentials_secret, - flow=self, - ) - - dtypes_updated = update_dtypes_dict(dtypes_dict, flow=self) - dtypes_to_json_task.bind( - dtypes_dict=dtypes_updated, local_json_path=self.local_json_path, flow=self - ) - json_to_adls_task = AzureDataLakeUpload(timeout=self.timeout) - json_to_adls_task.bind( - from_path=self.local_json_path, - to_path=self.adls_schema_file_dir_file, - overwrite=self.overwrite, - sp_credentials_secret=self.adls_sp_credentials_secret, - flow=self, - ) - - if self.validate_df_dict: - df_with_metadata.set_upstream(validation_task, flow=self) - - file_to_adls_task.set_upstream(df_to_file, flow=self) - json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) - set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/sources/__init__.py b/viadot/sources/__init__.py index 7f0bf6f51..4e852da58 100644 --- a/viadot/sources/__init__.py +++ b/viadot/sources/__init__.py @@ -34,4 +34,4 @@ # APIS from .uk_carbon_intensity import UKCarbonIntensity -from .vid_club import VidClub + diff --git a/viadot/sources/vid_club.py b/viadot/sources/vid_club.py deleted file mode 100644 index 6cd3c00a5..000000000 --- a/viadot/sources/vid_club.py +++ /dev/null @@ -1,319 +0,0 @@ -from datetime import datetime, timedelta -from typing import Any, Dict, List, Literal, Tuple - -import pandas as pd -from prefect.utilities import logging - -from ..exceptions import ValidationError -from ..utils import handle_api_response -from .base import Source - -logger = logging.get_logger() - - -class VidClub(Source): - """ - A class implementing the Vid Club API. - - Documentation for this API is located at: https://evps01.envoo.net/vipapi/ - There are 4 endpoints where to get the data. - """ - - def __init__(self, credentials: Dict[str, Any], *args, **kwargs): - """ - Create an instance of VidClub. - - Args: - credentials (Dict[str, Any]): Credentials to Vid Club APIs containing token. - - Raises: - CredentialError: If credentials are not provided as a parameter. - """ - self.headers = { - "Authorization": "Bearer " + credentials["token"], - "Content-Type": "application/json", - } - - super().__init__(*args, credentials=credentials, **kwargs) - - def build_query( - self, - from_date: str, - to_date: str, - api_url: str, - items_per_page: int, - source: Literal["jobs", "product", "company", "survey"] = None, - region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, - ) -> str: - """ - Builds the query from the inputs. - - Args: - from_date (str): Start date for the query. - to_date (str): End date for the query, if empty, will be executed as datetime.today().strftime("%Y-%m-%d"). - api_url (str): Generic part of the URL to Vid Club API. - items_per_page (int): number of entries per page. - source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. - region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to None (parameter is not used in url). [December 2023 status: value 'all' does not work for company and jobs] - - Returns: - str: Final query with all filters added. - - Raises: - ValidationError: If any source different than the ones in the list are used. - """ - if source in ["jobs", "product", "company"]: - region_url_string = f"®ion={region}" if region else "" - url = f"{api_url}{source}?from={from_date}&to={to_date}{region_url_string}&limit={items_per_page}" - elif source == "survey": - url = f"{api_url}{source}?language=en&type=question" - else: - raise ValidationError( - "Pick one these sources: jobs, product, company, survey" - ) - return url - - def intervals( - self, from_date: str, to_date: str, days_interval: int - ) -> Tuple[List[str], List[str]]: - """ - Breaks dates range into smaller by provided days interval. - - Args: - from_date (str): Start date for the query in "%Y-%m-%d" format. - to_date (str): End date for the query, if empty, will be executed as datetime.today().strftime("%Y-%m-%d"). - days_interval (int): Days specified in date range per api call (test showed that 30-40 is optimal for performance). - - Returns: - List[str], List[str]: Starts and Ends lists that contains information about date ranges for specific period and time interval. - - Raises: - ValidationError: If the final date of the query is before the start date. - """ - - if to_date == None: - to_date = datetime.today().strftime("%Y-%m-%d") - - end_date = datetime.strptime(to_date, "%Y-%m-%d").date() - start_date = datetime.strptime(from_date, "%Y-%m-%d").date() - - from_date_obj = datetime.strptime(from_date, "%Y-%m-%d") - - to_date_obj = datetime.strptime(to_date, "%Y-%m-%d") - delta = to_date_obj - from_date_obj - - if delta.days < 0: - raise ValidationError("to_date cannot be earlier than from_date.") - - interval = timedelta(days=days_interval) - starts = [] - ends = [] - - period_start = start_date - while period_start < end_date: - period_end = min(period_start + interval, end_date) - starts.append(period_start.strftime("%Y-%m-%d")) - ends.append(period_end.strftime("%Y-%m-%d")) - period_start = period_end - if len(starts) == 0 and len(ends) == 0: - starts.append(from_date) - ends.append(to_date) - return starts, ends - - def check_connection( - self, - source: Literal["jobs", "product", "company", "survey"] = None, - from_date: str = "2022-03-22", - to_date: str = None, - items_per_page: int = 100, - region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, - url: str = None, - ) -> Tuple[Dict[str, Any], str]: - """ - Initiate first connection to API to retrieve piece of data with information about type of pagination in API URL. - This option is added because type of pagination for endpoints is being changed in the future from page number to 'next' id. - - Args: - source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. - from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. - to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. - items_per_page (int, optional): Number of entries per page. 100 entries by default. - region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to None (parameter is not used in url). [December 2023 status: value 'all' does not work for company and jobs] - url (str, optional): Generic part of the URL to Vid Club API. Defaults to None. - - Returns: - Tuple[Dict[str, Any], str]: Dictionary with first response from API with JSON containing data and used URL string. - - Raises: - ValidationError: If from_date is earlier than 2022-03-22. - ValidationError: If to_date is earlier than from_date. - """ - - if from_date < "2022-03-22": - raise ValidationError("from_date cannot be earlier than 2022-03-22.") - - if to_date < from_date: - raise ValidationError("to_date cannot be earlier than from_date.") - - if url is None: - url = self.credentials["url"] - - first_url = self.build_query( - source=source, - from_date=from_date, - to_date=to_date, - api_url=url, - items_per_page=items_per_page, - region=region, - ) - headers = self.headers - response = handle_api_response( - url=first_url, headers=headers, method="GET", verify=False - ) - response = response.json() - return (response, first_url) - - def get_response( - self, - source: Literal["jobs", "product", "company", "survey"] = None, - from_date: str = "2022-03-22", - to_date: str = None, - items_per_page: int = 100, - region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, - ) -> pd.DataFrame: - """ - Basing on the pagination type retrieved using check_connection function, gets the response from the API queried and transforms it into DataFrame. - - Args: - source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. - from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. - to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. - items_per_page (int, optional): Number of entries per page. 100 entries by default. - region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to None (parameter is not used in url). [December 2023 status: value 'all' does not work for company and jobs] - - Returns: - pd.DataFrame: Table of the data carried in the response. - - Raises: - ValidationError: If any source different than the ones in the list are used. - """ - headers = self.headers - if source not in ["jobs", "product", "company", "survey"]: - raise ValidationError( - "The source has to be: jobs, product, company or survey" - ) - if to_date == None: - to_date = datetime.today().strftime("%Y-%m-%d") - - response, first_url = self.check_connection( - source=source, - from_date=from_date, - to_date=to_date, - items_per_page=items_per_page, - region=region, - ) - - if isinstance(response, dict): - keys_list = list(response.keys()) - elif isinstance(response, list): - keys_list = list(response[0].keys()) - else: - keys_list = [] - - if "next" in keys_list: - ind = True - else: - ind = False - - if "data" in keys_list: - df = pd.json_normalize(response["data"]) - df = pd.DataFrame(df) - length = df.shape[0] - page = 1 - - while length == items_per_page: - if ind == True: - next = response["next"] - url = f"{first_url}&next={next}" - else: - page += 1 - url = f"{first_url}&page={page}" - r = handle_api_response( - url=url, headers=headers, method="GET", verify=False - ) - response = r.json() - df_page = pd.json_normalize(response["data"]) - df_page = pd.DataFrame(df_page) - if source == "product": - df_page = df_page.transpose() - length = df_page.shape[0] - df = pd.concat((df, df_page), axis=0) - else: - df = pd.DataFrame(response) - - return df - - def total_load( - self, - source: Literal["jobs", "product", "company", "survey"] = None, - from_date: str = "2022-03-22", - to_date: str = None, - items_per_page: int = 100, - region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, - days_interval: int = 30, - ) -> pd.DataFrame: - """ - Looping get_response and iterating by date ranges defined in intervals. Stores outputs as DataFrames in a list. - At the end, daframes are concatenated in one and dropped duplicates that would appear when quering. - - Args: - source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. - from_date (str, optional): Start date for the query, by default is the oldest date in the data 2022-03-22. - to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. - items_per_page (int, optional): Number of entries per page. 100 entries by default. - region (Literal["bg", "hu", "hr", "pl", "ro", "si", "all"], optional): Region filter for the query. Defaults to None (parameter is not used in url). [December 2023 status: value 'all' does not work for company and jobs] - days_interval (int, optional): Days specified in date range per api call (test showed that 30-40 is optimal for performance). Defaults to 30. - - Returns: - pd.DataFrame: Dataframe of the concatanated data carried in the responses. - """ - - starts, ends = self.intervals( - from_date=from_date, to_date=to_date, days_interval=days_interval - ) - - dfs_list = [] - if len(starts) > 0 and len(ends) > 0: - for start, end in zip(starts, ends): - logger.info(f"ingesting data for dates [{start}]-[{end}]...") - df = self.get_response( - source=source, - from_date=start, - to_date=end, - items_per_page=items_per_page, - region=region, - ) - dfs_list.append(df) - if len(dfs_list) > 1: - df = pd.concat(dfs_list, axis=0, ignore_index=True) - else: - df = pd.DataFrame(dfs_list[0]) - else: - df = self.get_response( - source=source, - from_date=from_date, - to_date=to_date, - items_per_page=items_per_page, - region=region, - ) - list_columns = df.columns[ - df.applymap(lambda x: isinstance(x, list)).any() - ].tolist() - for i in list_columns: - df[i] = df[i].apply(lambda x: tuple(x) if isinstance(x, list) else x) - df.drop_duplicates(inplace=True) - - if df.empty: - logger.error("No data for this date range") - - return df diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index 7dc3d61cd..d7023cb7e 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -58,4 +58,3 @@ from .sftp import SftpList, SftpToDF from .sql_server import SQLServerCreateTable, SQLServerQuery, SQLServerToDF from .tm1 import TM1ToDF -from .vid_club import VidClubToDF diff --git a/viadot/tasks/vid_club.py b/viadot/tasks/vid_club.py deleted file mode 100644 index aff0e09ea..000000000 --- a/viadot/tasks/vid_club.py +++ /dev/null @@ -1,138 +0,0 @@ -import copy -import json -import os -from datetime import timedelta -from typing import Any, Dict, List, Literal - -import pandas as pd -from prefect import Task -from prefect.tasks.secrets import PrefectSecret -from prefect.utilities import logging -from prefect.utilities.tasks import defaults_from_attrs - -from viadot.task_utils import * - -from ..sources import VidClub - -logger = logging.get_logger() - - -class VidClubToDF(Task): - def __init__( - self, - source: Literal["jobs", "product", "company", "survey"] = None, - credentials: Dict[str, Any] = None, - credentials_secret: str = "VIDCLUB", - vault_name: str = None, - from_date: str = "2022-03-22", - to_date: str = None, - timeout: int = 3600, - report_name: str = "vid_club_to_df", - *args: List[Any], - **kwargs: Dict[str, Any], - ): - """ - Task to downloading data from Vid Club APIs to Pandas DataFrame. - - Args: - source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. - credentials (Dict[str, Any], optional): Stores the credentials information. Defaults to None. - credentials_secret (str, optional): The name of the secret in Azure Key Vault or Prefect or local_config file. Defaults to "VIDCLUB". - vault_name (str, optional): For credentials stored in Azure Key Vault. The name of the vault from which to obtain the secret. Defaults to None. - from_date (str): Start date for the query, by default is the oldest date in the data, '2022-03-22'. - to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. - timeout (int, optional): The amount of time (in seconds) to wait while running this task before - a timeout occurs. Defaults to 3600. - report_name (str, optional): Stores the report name. Defaults to "vid_club_to_df". - - Returns: Pandas DataFrame - """ - self.source = source - self.from_date = from_date - self.to_date = to_date - self.report_name = report_name - self.credentials_secret = credentials_secret - self.vault_name = vault_name - - if credentials is None: - self.credentials = credentials_loader.run( - credentials_secret=credentials_secret, vault_name=vault_name - ) - else: - self.credentials = credentials - - super().__init__( - name=self.report_name, - timeout=timeout, - *args, - **kwargs, - ) - - def __call__(self, *args, **kwargs): - """Download Vid Club data to Pandas DataFrame""" - return super().__call__(*args, **kwargs) - - @defaults_from_attrs( - "source", - "credentials", - "from_date", - "to_date", - ) - def run( - self, - source: Literal["jobs", "product", "company", "survey"] = None, - credentials: Dict[str, Any] = None, - from_date: str = "2022-03-22", - to_date: str = None, - items_per_page: int = 100, - region: Literal["bg", "hu", "hr", "pl", "ro", "si", "all"] = None, - days_interval: int = 30, - cols_to_drop: List[str] = None, - ) -> pd.DataFrame: - """ - Task run method. - - Args: - source (Literal["jobs", "product", "company", "survey"], optional): The endpoint source to be accessed. Defaults to None. - credentials (Dict[str, Any], optional): Stores the credentials information. Defaults to None. - from_date (str, optional): Start date for the query, by default is the oldest date in the data, '2022-03-22'. - to_date (str, optional): End date for the query. By default None, which will be executed as datetime.today().strftime("%Y-%m-%d") in code. - items_per_page (int, optional): Number of entries per page. 100 entries by default. - region (str, optional): Region filter for the query. Valid inputs: ["bg", "hu", "hr", "pl", "ro", "si", "all"]. Defaults to None. - days_interval (int, optional): Days specified in date range per api call (test showed that 30-40 is optimal for performance). Defaults to 30. - cols_to_drop (List[str], optional): List of columns to drop. Defaults to None. - - Raises: - KeyError: When DataFrame doesn't contain columns provided in the list of columns to drop. - TypeError: When cols_to_drop is not a list type. - - Returns: - pd.DataFrame: The query result as a pandas DataFrame. - """ - - vc_obj = VidClub(credentials=credentials) - - vc_dataframe = vc_obj.total_load( - source=source, - from_date=from_date, - to_date=to_date, - items_per_page=items_per_page, - region=region, - days_interval=days_interval, - ) - if cols_to_drop is not None: - if isinstance(cols_to_drop, list): - try: - logger.info(f"Dropping following columns: {cols_to_drop}...") - vc_dataframe.drop( - columns=cols_to_drop, inplace=True, errors="raise" - ) - except KeyError as ke: - logger.error( - f"Column(s): {cols_to_drop} don't exist in the DataFrame. No columns were dropped. Returning full DataFrame..." - ) - logger.info(f"Existing columns: {vc_dataframe.columns}") - else: - raise TypeError("Provide columns to drop in a List.") - - return vc_dataframe