Skip to content

Commit

Permalink
Merge branch 'main' into staging/fix_cnes_test
Browse files Browse the repository at this point in the history
  • Loading branch information
folhesgabriel authored May 16, 2024
2 parents 1f9fe03 + e83da3b commit 5c57b2b
Show file tree
Hide file tree
Showing 11 changed files with 1,818 additions and 975 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/elementary.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ name: Deploy Elementary Report
on:
push:
branches: [main, master]
pull_request:
branches: [main, master]
schedule:
- cron: 00 22 * * 1-5
workflow_dispatch:
Expand All @@ -18,9 +20,8 @@ jobs:
warehouse-type: bigquery
adapter-version: 1.5.9
profiles-yml: ${{ secrets.ELEMENTARY_PROFILES_YML }}
edr-command: edr report --file-path "report.html" && edr send-report --google-service-account-path
"/tmp/gcs_keyfile.json" --gcs-bucket-name "basedosdados" --update-bucket-website
"true"
edr-command: edr report --file-path "report.html" --days-back 90 && edr send-report --google-service-account-path
"/tmp/gcs_keyfile.json" --gcs-bucket-name "basedosdados" --update-bucket-website "true" --days-back 90
bigquery-keyfile: ${{ secrets.BIGQUERY_KEYFILE }}
gcs-keyfile: ${{ secrets.GCS_KEYFILE }}
- name: Upload report
Expand Down
166 changes: 166 additions & 0 deletions .github/workflows/scripts/triggers_flow_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
from argparse import ArgumentParser
from time import sleep
import re
from backend import Backend
from utils import expand_alls, get_datasets_tables_from_modified_files
from table_test import get_flow_run_state, get_materialization_flow_id, get_flow_status_logs

if __name__ == "__main__":
# Start argument parser
arg_parser = ArgumentParser()

# Add GraphQL URL argument
arg_parser.add_argument(
"--graphql-url",
type=str,
required=True,
help="URL of the GraphQL endpoint.",
)

# Add list of modified files argument
arg_parser.add_argument(
"--dataset-id",
type=str,
required=True,
help="Table id",
)
arg_parser.add_argument(
"--table-id",
type=str,
required=False,
default="",
help="Table id",
)

arg_parser.add_argument(
"--alias",
type=str,
required=False,
default="False",
help="DBT alias",
)

# Add Prefect backend URL argument
arg_parser.add_argument(
"--prefect-backend-url",
type=str,
required=False,
default="https://prefect.basedosdados.org/api",
help="Prefect backend URL.",
)

# Add prefect base URL argument
arg_parser.add_argument(
"--prefect-base-url",
type=str,
required=False,
default="https://prefect.basedosdados.org",
help="Prefect base URL.",
)

# Add Prefect API token argument
arg_parser.add_argument(
"--prefect-backend-token",
type=str,
required=True,
help="Prefect backend token.",
)

# Add materialization mode argument
arg_parser.add_argument(
"--materialization-mode",
type=str,
required=False,
default="prod",
help="Materialization mode.",
)

# Add materialization label argument
arg_parser.add_argument(
"--materialization-label",
type=str,
required=False,
default="basedosdados",
help="Materialization label.",
)

# Add dbt command label argument
arg_parser.add_argument(
"--dbt-command",
type=str,
required=False,
default = "run",
help="Materialization label.",
)

# Get arguments
args = arg_parser.parse_args()
# Expand `__all__` tables
backend = Backend(args.graphql_url)

# Launch materialization flows
backend = Backend(args.prefect_backend_url)
flow_id = get_materialization_flow_id(backend, args.prefect_backend_token)
launched_flow_run_ids = []
print(
f"Launching materialization flow for {args.dataset_id}.{args.table_id}"
)
parameters = {
"dataset_id": args.dataset_id,
"dbt_alias": args.alias,
"mode": args.materialization_mode,
"table_id": args.table_id,
"dbt_command": args.dbt_command,
"disable_elementary": False,
}

mutation = """
mutation ($flow_id: UUID, $parameters: JSON, $label: String!) {
create_flow_run (input: {
flow_id: $flow_id,
parameters: $parameters,
labels: [$label],
}) {
id
}
}
"""
variables = {
"flow_id": flow_id,
"parameters": parameters,
"label": args.materialization_label,
}

response = backend._execute_query(
mutation,
variables,
headers={"Authorization": f"Bearer {args.prefect_backend_token}"},
)

flow_run_id = response["create_flow_run"]["id"]
launched_flow_run_ids.append(flow_run_id)
flow_run_url = f"{args.prefect_base_url}/flow-run/{flow_run_id}"
print(f" - Materialization flow run launched: {flow_run_url}")

# Keep monitoring the launched flow runs until they are finished
for launched_flow_run_id in launched_flow_run_ids:
print(f"Monitoring flow run {launched_flow_run_id}...")
flow_run_state = get_flow_run_state(
flow_run_id=launched_flow_run_id,
backend=backend,
auth_token=args.prefect_backend_token,
)
while flow_run_state not in ["Success", "Failed", "Cancelled"]:
sleep(5)
flow_run_state = get_flow_run_state(
flow_run_id=launched_flow_run_id,
backend=backend,
auth_token=args.prefect_backend_token,
)
if flow_run_state != "Success":
raise Exception(
f'Flow run {launched_flow_run_id} finished with state "{flow_run_state}". '
f"Check the logs at {args.prefect_base_url}/flow-run/{launched_flow_run_id}"
)
else:
print("Congrats! Everything seems fine!")
28 changes: 28 additions & 0 deletions .github/workflows/triggers-elementary-model.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
name: Triggers Elementary Models
on:
schedule:
- cron: 00 23 * * 1
workflow_dispatch:

jobs:
run_elementary_models:
name: Run elementary model
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{ github.head_ref }}
- name: Set up poetry
run: pipx install poetry
- name: Set up python
uses: actions/setup-python@v4
with:
cache: poetry
python-version: '3.9'
- name: Install requirements
run: poetry install --only=dev
- name: Run script to test DBT model
run: |-
poetry run python .github/workflows/scripts/triggers_flow_execution.py --dataset-id elementary --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --prefect-backend-token ${{ secrets.PREFECT_BACKEND_TOKEN }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{{
config(
alias="cbo_1994",
schema="br_bd_diretorios_brasil",
materialized="table",
)
}}

select
safe_cast(cbo_1994 as string) cbo_1994,
safe_cast(initcap(descricao) as string) descricao
from `basedosdados-staging.br_bd_diretorios_brasil_staging.cbo_1994` as t
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

select
safe_cast(cbo_2002 as string) cbo_2002,
safe_cast(descricao as string) descricao,
safe_cast(initcap(descricao) as string) descricao,
safe_cast(familia as string) familia,
safe_cast(descricao_familia as string) descricao_familia,
safe_cast(subgrupo as string) subgrupo,
Expand All @@ -19,5 +19,6 @@ select
initcap(descricao_subgrupo_principal) as string
) descricao_subgrupo_principal,
safe_cast(grande_grupo as string) grande_grupo,
safe_cast(initcap(descricao_grande_grupo) as string) descricao_grande_grupo
safe_cast(initcap(descricao_grande_grupo) as string) descricao_grande_grupo,
safe_cast(indicador_cbo_2002_ativa as int64) indicador_cbo_2002_ativa
from `basedosdados-staging.br_bd_diretorios_brasil_staging.cbo_2002` as t
3 changes: 3 additions & 0 deletions models/br_bd_diretorios_brasil/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ models:
description: Grande Grupo
- name: descricao_grande_grupo
description: Descrição do Grande Grupo
- name: indicador_cbo_2002_ativa
description: Indica se o código de 6 dígitos da CBO de 2002 permanece ativo
ou não
- name: br_bd_diretorios_brasil__empresa
description: A tabela apresenta informações do Cadastro Nacional da Pessoa Jurídica
(CNPJ), que é um banco de dados administrado pela Secretaria Especial da Receita
Expand Down
Loading

0 comments on commit 5c57b2b

Please sign in to comment.