Skip to content

Commit

Permalink
Merge pull request #499 from basedosdados/staging/br_anp
Browse files Browse the repository at this point in the history
[fix] br_anp_precos_combustiveis
  • Loading branch information
tricktx authored Oct 3, 2023
2 parents 17db897 + 6b71725 commit 14670d6
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
10 changes: 6 additions & 4 deletions pipelines/datasets/br_anp_precos_combustiveis/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
with case(dados_desatualizados, True):
df = download_and_transform(upstream_tasks=[rename_flow_run])
output_path = make_partitions(df=df, upstream_tasks=[df])
get_date_max_pro = data_max_bd_pro(df=df)

# pylint: disable=C0103
wait_upload_table = create_table_and_upload_to_gcs(
Expand All @@ -91,6 +90,7 @@
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
upstream_tasks=[wait_upload_table],
)

wait_for_materialization = wait_for_flow_run(
Expand All @@ -105,22 +105,24 @@
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

get_date_max_pro = data_max_bd_pro(
df=df, upstream_tasks=[wait_upload_table]
)
with case(update_metadata, True):
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
bq_table_last_year_month=True,
bq_table_last_year_month=False,
api_mode="prod",
date_format="yy-mm-dd",
is_bd_pro=True,
is_free=True,
time_delta=6,
time_unit="weeks",
_last_date=get_date_max_pro,
upstream_tasks=[wait_upload_table],
upstream_tasks=[get_date_max_pro],
)
anp_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
anp_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
Expand Down
13 changes: 10 additions & 3 deletions pipelines/datasets/br_anp_precos_combustiveis/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,16 @@ def check_for_updates(dataset_id, table_id):
# Obtém a data mais recente do site
download_files(anatel_constants.URLS_DATA.value, anatel_constants.PATH_INPUT.value)
df = pd.read_csv(anatel_constants.URL_GLP.value, sep=";", encoding="utf-8")
data_obj = pd.to_datetime(df["Data da Coleta"]).max()
data_obj = data_obj.date()

data_obj = (
df["Data da Coleta"].str[6:10]
+ "-"
+ df["Data da Coleta"].str[3:5]
+ "-"
+ df["Data da Coleta"].str[0:2]
)
data_obj = data_obj.apply(lambda x: pd.to_datetime(x).strftime("%Y-%m-%d"))
data_obj = pd.to_datetime(data_obj, format="%Y-%m-%d").dt.date
data_obj = data_obj.max()
# Obtém a última data no site BD
data_bq_obj = extract_last_date(
dataset_id, table_id, "yy-mm-dd", "basedosdados", data="data_coleta"
Expand Down

0 comments on commit 14670d6

Please sign in to comment.