Skip to content

Commit

Permalink
fix: no objects to concat
Browse files Browse the repository at this point in the history
  • Loading branch information
aspeddro committed Sep 26, 2023
1 parent 4276314 commit f3242f9
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 43 deletions.
50 changes: 19 additions & 31 deletions pipelines/datasets/br_cgu_servidores_executivo_federal/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"materialization_mode", default="dev", required=False
)
materialize_after_dump = Parameter(
"materialize after dump", default=True, required=False
"materialize after dump", default=False, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

Expand All @@ -76,32 +76,32 @@
outputs_path_by_table = make_partitions(
data_clean_by_table, upstream_tasks=[data_clean_by_table]
)
log_task(f"Partitions done. {outputs_path_by_table=}")
log_task("Partitions done")

create_table_and_upload_to_gcs(
data_path=outputs_path_by_table["aposentados_cadastro"],
dataset_id=dataset_id,
table_id="aposentados_cadastro",
dump_mode="append",
wait=outputs_path_by_table,
)

create_table_and_upload_to_gcs(
data_path=outputs_path_by_table["pensionistas_cadastro"],
dataset_id=dataset_id,
table_id="pensionistas_cadastro",
dump_mode="append",
wait=outputs_path_by_table,
)
# create_table_and_upload_to_gcs(
# data_path=outputs_path_by_table["aposentados_cadastro"],
# dataset_id=dataset_id,
# table_id="aposentados_cadastro",
# dump_mode="append",
# wait=outputs_path_by_table,
# )

# create_table_and_upload_to_gcs(
# data_path=outputs_path_by_table["servidores_cadastro"],
# data_path=outputs_path_by_table["pensionistas_cadastro"],
# dataset_id=dataset_id,
# table_id="servidores_cadastro",
# table_id="pensionistas_cadastro",
# dump_mode="append",
# wait=outputs_path_by_table,
# )

create_table_and_upload_to_gcs(
data_path=outputs_path_by_table["servidores_cadastro"],
dataset_id=dataset_id,
table_id="servidores_cadastro",
dump_mode="append",
wait=outputs_path_by_table,
)

# create_table_and_upload_to_gcs(
# data_path=outputs_path_by_table["reserva_reforma_militares_cadastro"],
# dataset_id=dataset_id,
Expand Down Expand Up @@ -188,15 +188,3 @@
image=constants.DOCKER_IMAGE.value
)
datasets_br_cgu_servidores_executivo_federal_flow.schedule = every_month


# run_local(
# datasets_br_cgu_servidores_executivo_federal_flow,
# parameters={
# "dataset_id": "br_cgu_servidores_executivo_federal",
# "dbt_alias": False,
# "materialization_mode": "dev",
# "table_id": list(cgu_constants.TABLES.value.keys()),
# "update_metadata": False,
# },
# )
34 changes: 26 additions & 8 deletions pipelines/datasets/br_cgu_servidores_executivo_federal/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from prefect import task

from pipelines.utils.utils import to_partitions
from pipelines.utils.utils import to_partitions, log

import pandas as pd
import datetime
Expand Down Expand Up @@ -41,13 +41,21 @@ def download_files(date_start: datetime.date, date_end: datetime.date):
if key in urls_for_sheets_before_2020:
urls_for_sheets_after_2019[key].extend(urls_for_sheets_before_2020[key])

valid_sheets = {
sheet: payload
for (sheet, payload) in urls_for_sheets_after_2019.items()
if len(payload) > 0
}

log(f"{valid_sheets=}")

if not os.path.exists(constants.INPUT.value):
os.mkdir(constants.INPUT.value)

for sheet_name in urls_for_sheets_after_2019:
download_zip_files_for_sheet(sheet_name, urls_for_sheets_after_2019[sheet_name])
for sheet_name in valid_sheets:
download_zip_files_for_sheet(sheet_name, valid_sheets[sheet_name])

return urls_for_sheets_after_2019
return valid_sheets


@task
Expand All @@ -60,12 +68,22 @@ def get_sheets_by_sources(table_name: str, sources: list[str]):
]
return {"table_name": table_name, "sources": sources, "dates": dates[0]}

sheets_by_source = [
get_sheets_by_sources(table_name, sources)
for table_name, sources in constants.TABLES.value.items()
tables = set(
[
table_name
for table_name, sources in constants.TABLES.value.items()
for source in sources
if source in list(sheets_info.keys())
]
)

table_and_source = [
get_sheets_by_sources(table, constants.TABLES.value[table]) for table in tables
]

return [process_table(table_info) for table_info in sheets_by_source]
log(f"{table_and_source=}")

return [process_table(table_info) for table_info in table_and_source]


@task
Expand Down
20 changes: 16 additions & 4 deletions pipelines/datasets/br_cgu_servidores_executivo_federal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,15 @@ def get_source(table_name: str, source: str) -> str:
def read_and_clean_csv(
table_name: str, source: str, date: datetime.date
) -> pd.DataFrame:
path = f"{constants.INPUT.value}/{source}/{date.year}-{date.month}/{get_csv_file_by_table_name_and_date(table_name, date)}"
csv_path = get_csv_file_by_table_name_and_date(table_name, date)

path = f"{constants.INPUT.value}/{source}/{date.year}-{date.month}/{csv_path}"

log(f"Reading {table_name=}, {source=}, {date=} {path=}")

if not os.path.exists(path):
log(f"File {path=} dont exists")
return pd.DataFrame()

df = pd.read_csv(
path,
Expand Down Expand Up @@ -175,10 +183,14 @@ def process_table(table_info: dict) -> tuple[str, pd.DataFrame]:
sources: list[str] = table_info["sources"]
dates: list[datetime.date] = table_info["dates"]

def read_csv_by_source(source):
def read_csv_by_source(source: str):
dfs = [read_and_clean_csv(table_name, source, date) for date in dates]

return pd.concat(dfs)

log(f"{table_name=}, {sources=}")
log(f"Processing {table_name=}, {sources=}")

return (table_name, pd.concat([read_csv_by_source(source) for source in sources]))
return (
table_name,
pd.concat([read_csv_by_source(source) for source in sources]),
)

0 comments on commit f3242f9

Please sign in to comment.