Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] br_cvm_fi #892

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions pipelines/datasets/br_cvm_fi/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import os
import re
import zipfile
from datetime import datetime
from datetime import datetime, timedelta
from typing import Tuple

import pandas as pd
Expand All @@ -30,9 +30,12 @@
sheet_to_df,
)
from pipelines.utils.utils import log, to_partitions
from pipelines.constants import constants


@task # noqa
@task(
max_retries=2,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
) # noqa
def download_unzip_csv(
url: str, files, chunk_size: int = 128, mkdir: bool = True, id="teste"
) -> str:
Expand Down Expand Up @@ -114,7 +117,10 @@ def download_unzip_csv(
return f"/tmp/data/br_cvm_fi/{id}/input/"


@task
@task(
max_retries=2,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def extract_links_and_dates(url) -> Tuple[pd.DataFrame, str]:
"""
Extracts all file names and their respective last update dates in a pandas dataframe.
Expand Down Expand Up @@ -169,7 +175,10 @@ def extract_links_and_dates(url) -> Tuple[pd.DataFrame, str]:
return df, data_maxima


@task
@task(
max_retries=2,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def generate_links_to_download(df: pd.DataFrame, max_date: datetime) -> list[str]:
"""
Checks for outdated tables.
Expand All @@ -182,7 +191,10 @@ def generate_links_to_download(df: pd.DataFrame, max_date: datetime) -> list[str
return lists


@task
@task(
max_retries=2,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def check_for_updates(df: pd.DataFrame):
"""
Checks for outdated tables.
Expand All @@ -193,7 +205,10 @@ def check_for_updates(df: pd.DataFrame):



@task
@task(
max_retries=2,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def check_for_updates_ext(df):
"""
Checks for outdated tables in documentos_extratos_informacoes table.
Expand Down
Loading