From 72b45e2bde43673f61f44b4168c2d524248eec73 Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Mon, 25 Mar 2024 11:49:34 -0300 Subject: [PATCH 1/9] add: recreate table_test action --- .github/workflows/scripts/table_test.py | 259 ++++++++++++++++++++++++ .github/workflows/test_dbt_model.yaml | 33 +++ 2 files changed, 292 insertions(+) create mode 100644 .github/workflows/scripts/table_test.py create mode 100644 .github/workflows/test_dbt_model.yaml diff --git a/.github/workflows/scripts/table_test.py b/.github/workflows/scripts/table_test.py new file mode 100644 index 00000000..c5262c7f --- /dev/null +++ b/.github/workflows/scripts/table_test.py @@ -0,0 +1,259 @@ +from argparse import ArgumentParser +from pathlib import Path +import sys +from time import sleep +import traceback +import shutil +import re + +import basedosdados as bd +from basedosdados import Dataset, Storage + +from backend import Backend +from utils import expand_alls, get_datasets_tables_from_modified_files + + +def get_flow_run_state(flow_run_id: str, backend: Backend, auth_token: str): + query = """ + query ($flow_run_id: uuid!) { + flow_run_by_pk (id: $flow_run_id) { + state + } + } + """ + response = backend._execute_query( + query, + variables={"flow_run_id": flow_run_id}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + return response["flow_run_by_pk"]["state"] + +def get_flow_status_logs(flow_run_id: str, backend: Backend, auth_token: str): + query = """query ($flow_run_id: uuid!){ + log(where:{ + flow_run_id:{_eq:$flow_run_id}, + message:{_like:"%Done.%"}}){ + message + } + }""" + response = backend._execute_query( + query, + variables={"flow_run_id": flow_run_id}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + print(response) + message = response['log']['message'] + result = {} + result['pass'] = int(re.findall("PASS=\d+", message)[0].split('=')[1]) + result['skip'] = int(re.findall("SKIP=\d+", message)[0].split('=')[1]) + result['warn'] = int(re.findall("WARN=\d+", message)[0].split('=')[1]) + + return result + + +def get_materialization_flow_id(backend: Backend, auth_token: str): + query = """ + query { + flow (where: { + name: { + _like: "BD template: Executa DBT model" + }, + archived: { + _eq: false + }, + project: { + name: {_eq: "main"} + } + }) { + id + } + } + """ + response = backend._execute_query( + query, headers={"Authorization": f"Bearer {auth_token}"} + ) + return response["flow"][0]["id"] + + +if __name__ == "__main__": + # Start argument parser + arg_parser = ArgumentParser() + + # Add GraphQL URL argument + arg_parser.add_argument( + "--graphql-url", + type=str, + required=True, + help="URL of the GraphQL endpoint.", + ) + + # Add list of modified files argument + arg_parser.add_argument( + "--modified-files", + type=str, + required=True, + help="List of modified files.", + ) + + + # Add Prefect backend URL argument + arg_parser.add_argument( + "--prefect-backend-url", + type=str, + required=False, + default="https://prefect.basedosdados.org/api", + help="Prefect backend URL.", + ) + + # Add prefect base URL argument + arg_parser.add_argument( + "--prefect-base-url", + type=str, + required=False, + default="https://prefect.basedosdados.org", + help="Prefect base URL.", + ) + + # Add Prefect API token argument + arg_parser.add_argument( + "--prefect-backend-token", + type=str, + required=True, + help="Prefect backend token.", + ) + + # Add materialization mode argument + arg_parser.add_argument( + "--materialization-mode", + type=str, + required=False, + default="dev", + help="Materialization mode.", + ) + + # Add materialization label argument + arg_parser.add_argument( + "--materialization-label", + type=str, + required=False, + default="basedosdados-dev", + help="Materialization label.", + ) + + # Add dbt command label argument + arg_parser.add_argument( + "--dbt-command", + type=str, + required=False, + default = "test", + help="Materialization label.", + ) + + # Get arguments + args = arg_parser.parse_args() + + # Get datasets and tables from modified files + modified_files = args.modified_files.split(",") + datasets_tables = get_datasets_tables_from_modified_files( + modified_files, show_details=True + ) + # Split deleted datasets and tables + deleted_datasets_tables = [] + existing_datasets_tables = [] + for dataset_id, table_id, exists, alias in datasets_tables: + if exists: + existing_datasets_tables.append((dataset_id, table_id, alias)) + else: + deleted_datasets_tables.append((dataset_id, table_id, alias)) + # Expand `__all__` tables + backend = Backend(args.graphql_url) + expanded_existing_datasets_tables = [] + for dataset_id, table_id, alias in existing_datasets_tables: + expanded_table_ids = expand_alls(dataset_id, table_id, backend) + for expanded_dataset_id, expanded_table_id in expanded_table_ids: + expanded_existing_datasets_tables.append( + (expanded_dataset_id, expanded_table_id, alias) + ) + existing_datasets_tables = expanded_existing_datasets_tables + + # Launch materialization flows + backend = Backend(args.prefect_backend_url) + flow_id = get_materialization_flow_id(backend, args.prefect_backend_token) + launched_flow_run_ids = [] + for dataset_id, table_id, alias in existing_datasets_tables: + print( + f"Launching materialization flow for {dataset_id}.{table_id} (alias={alias})..." + ) + parameters = { + "dataset_id": dataset_id, + "dbt_alias": alias, + "mode": args.materialization_mode, + "table_id": table_id, + "dbt_command": args.dbt_command + } + + mutation = """ + mutation ($flow_id: UUID, $parameters: JSON, $label: String!) { + create_flow_run (input: { + flow_id: $flow_id, + parameters: $parameters, + labels: [$label], + }) { + id + } + } + """ + variables = { + "flow_id": flow_id, + "parameters": parameters, + "label": args.materialization_label, + } + + response = backend._execute_query( + mutation, + variables, + headers={"Authorization": f"Bearer {args.prefect_backend_token}"}, + ) + + flow_run_id = response["create_flow_run"]["id"] + launched_flow_run_ids.append(flow_run_id) + flow_run_url = f"{args.prefect_base_url}/flow-run/{flow_run_id}" + print(f" - Materialization flow run launched: {flow_run_url}") + + # Keep monitoring the launched flow runs until they are finished + for launched_flow_run_id in launched_flow_run_ids: + print(f"Monitoring flow run {launched_flow_run_id}...") + flow_run_state = get_flow_run_state( + flow_run_id=launched_flow_run_id, + backend=backend, + auth_token=args.prefect_backend_token, + ) + while flow_run_state not in ["Success", "Failed", "Cancelled"]: + sleep(5) + flow_run_state = get_flow_run_state( + flow_run_id=launched_flow_run_id, + backend=backend, + auth_token=args.prefect_backend_token, + ) + if flow_run_state != "Success": + raise Exception( + f'Flow run {launched_flow_run_id} finished with state "{flow_run_state}". ' + f"Check the logs at {args.prefect_base_url}/flow-run/{launched_flow_run_id}" + ) + else: + test_results = get_flow_status_logs( + flow_run_id=launched_flow_run_id, + backend=backend, + auth_token=args.prefect_backend_token, + ) + + if test_results['warn'] > 0: + raise Exception( + f"Test got {test_results['warn']} warns\n" + f"Check the logs at {args.prefect_base_url}/flow-run/{launched_flow_run_id}" + ) + + else: + print("Congrats! Everything seems fine!") + print(f"{test_results['pass']} tests passed") + print(f"{test_results['skip']} tests skiped") diff --git a/.github/workflows/test_dbt_model.yaml b/.github/workflows/test_dbt_model.yaml new file mode 100644 index 00000000..a73356a5 --- /dev/null +++ b/.github/workflows/test_dbt_model.yaml @@ -0,0 +1,33 @@ +--- +name: Test DBT model +on: + pull_request: + types: [labeled, opened] + branches: [main] + paths: [models/**, .github/workflows/test_dbt_model.yaml] +jobs: + test_dbt_model: + if: contains(github.event.pull_request.labels.*.name, 'test-dev-model') + name: Test DBT dev model + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + ref: ${{ github.head_ref }} + - name: Get all changed files using a comma separator + id: changed-files + uses: tj-actions/changed-files@v35 + with: + separator: ',' + - name: Setup Python 3.9 + uses: actions/setup-python@v2 + with: + python-version: 3.9 + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-actions.txt + - name: Run script to test DBT model + run: |- + python .github/workflows/scripts/table_test.py --modified-files ${{ steps.changed-files.outputs.all_modified_files }} --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --prefect-backend-token ${{ secrets.PREFECT_BACKEND_TOKEN }} From 7cc93bf802a96f69f8a38af7cc8c3d1dfc9792c3 Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Mon, 25 Mar 2024 11:54:57 -0300 Subject: [PATCH 2/9] fix: change set up --- .github/workflows/test_dbt_model.yaml | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/.github/workflows/test_dbt_model.yaml b/.github/workflows/test_dbt_model.yaml index a73356a5..2dccf8bc 100644 --- a/.github/workflows/test_dbt_model.yaml +++ b/.github/workflows/test_dbt_model.yaml @@ -20,14 +20,15 @@ jobs: uses: tj-actions/changed-files@v35 with: separator: ',' - - name: Setup Python 3.9 - uses: actions/setup-python@v2 + - name: Set up poetry + run: pipx install poetry + - name: Set up python + uses: actions/setup-python@v4 with: - python-version: 3.9 - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -r requirements-actions.txt + cache: poetry + python-version: '3.9' + - name: Install requirements + run: poetry install --only=dev - name: Run script to test DBT model run: |- python .github/workflows/scripts/table_test.py --modified-files ${{ steps.changed-files.outputs.all_modified_files }} --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --prefect-backend-token ${{ secrets.PREFECT_BACKEND_TOKEN }} From 60797fe8c43e1cc6c60fda8dd52d31262d5a4c4b Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Mon, 25 Mar 2024 11:59:36 -0300 Subject: [PATCH 3/9] fix: poetry install --- .github/workflows/test_dbt_model.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_dbt_model.yaml b/.github/workflows/test_dbt_model.yaml index 2dccf8bc..d83f925a 100644 --- a/.github/workflows/test_dbt_model.yaml +++ b/.github/workflows/test_dbt_model.yaml @@ -28,7 +28,7 @@ jobs: cache: poetry python-version: '3.9' - name: Install requirements - run: poetry install --only=dev + run: poetry install --with=dev - name: Run script to test DBT model run: |- python .github/workflows/scripts/table_test.py --modified-files ${{ steps.changed-files.outputs.all_modified_files }} --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --prefect-backend-token ${{ secrets.PREFECT_BACKEND_TOKEN }} From 80a72cd0c859ba9e299d398f23b231d48781696c Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Tue, 26 Mar 2024 08:55:28 -0300 Subject: [PATCH 4/9] fix: poetry install --- .github/workflows/test_dbt_model.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_dbt_model.yaml b/.github/workflows/test_dbt_model.yaml index d83f925a..2dccf8bc 100644 --- a/.github/workflows/test_dbt_model.yaml +++ b/.github/workflows/test_dbt_model.yaml @@ -28,7 +28,7 @@ jobs: cache: poetry python-version: '3.9' - name: Install requirements - run: poetry install --with=dev + run: poetry install --only=dev - name: Run script to test DBT model run: |- python .github/workflows/scripts/table_test.py --modified-files ${{ steps.changed-files.outputs.all_modified_files }} --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --prefect-backend-token ${{ secrets.PREFECT_BACKEND_TOKEN }} From 886fd230341495bf39741b2d00fb67b96fbddb09 Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Tue, 26 Mar 2024 09:04:39 -0300 Subject: [PATCH 5/9] fix: run python --- .github/workflows/test_dbt_model.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_dbt_model.yaml b/.github/workflows/test_dbt_model.yaml index 2dccf8bc..5111e0d1 100644 --- a/.github/workflows/test_dbt_model.yaml +++ b/.github/workflows/test_dbt_model.yaml @@ -31,4 +31,4 @@ jobs: run: poetry install --only=dev - name: Run script to test DBT model run: |- - python .github/workflows/scripts/table_test.py --modified-files ${{ steps.changed-files.outputs.all_modified_files }} --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --prefect-backend-token ${{ secrets.PREFECT_BACKEND_TOKEN }} + poetry run python .github/workflows/scripts/table_test.py --modified-files ${{ steps.changed-files.outputs.all_modified_files }} --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --prefect-backend-token ${{ secrets.PREFECT_BACKEND_TOKEN }} From 20a03a43c353459031cdc8956ad373e1540be8c6 Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Tue, 26 Mar 2024 09:10:25 -0300 Subject: [PATCH 6/9] add: modified file to test action --- .../br_mme_consumo_energia_eletrica__uf.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/br_mme_consumo_energia_eletrica/br_mme_consumo_energia_eletrica__uf.sql b/models/br_mme_consumo_energia_eletrica/br_mme_consumo_energia_eletrica__uf.sql index 620f7a43..607cd4f0 100644 --- a/models/br_mme_consumo_energia_eletrica/br_mme_consumo_energia_eletrica__uf.sql +++ b/models/br_mme_consumo_energia_eletrica/br_mme_consumo_energia_eletrica__uf.sql @@ -15,5 +15,5 @@ select then null else safe_cast(numero_consumidores as int64) end as numero_consumidores, - safe_cast(consumo as int64) as consumo + safe_cast(consumo as float64) as consumo from `basedosdados-staging.br_mme_consumo_energia_eletrica_staging.uf` as t From 04a6f5c6164e0eaefefd133b533da2113cdfc464 Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Tue, 26 Mar 2024 09:18:14 -0300 Subject: [PATCH 7/9] fix: stop getting logs --- .github/workflows/scripts/table_test.py | 25 +------------------------ 1 file changed, 1 insertion(+), 24 deletions(-) diff --git a/.github/workflows/scripts/table_test.py b/.github/workflows/scripts/table_test.py index c5262c7f..de8fbc42 100644 --- a/.github/workflows/scripts/table_test.py +++ b/.github/workflows/scripts/table_test.py @@ -1,14 +1,6 @@ from argparse import ArgumentParser -from pathlib import Path -import sys from time import sleep -import traceback -import shutil import re - -import basedosdados as bd -from basedosdados import Dataset, Storage - from backend import Backend from utils import expand_alls, get_datasets_tables_from_modified_files @@ -241,19 +233,4 @@ def get_materialization_flow_id(backend: Backend, auth_token: str): f"Check the logs at {args.prefect_base_url}/flow-run/{launched_flow_run_id}" ) else: - test_results = get_flow_status_logs( - flow_run_id=launched_flow_run_id, - backend=backend, - auth_token=args.prefect_backend_token, - ) - - if test_results['warn'] > 0: - raise Exception( - f"Test got {test_results['warn']} warns\n" - f"Check the logs at {args.prefect_base_url}/flow-run/{launched_flow_run_id}" - ) - - else: - print("Congrats! Everything seems fine!") - print(f"{test_results['pass']} tests passed") - print(f"{test_results['skip']} tests skiped") + print("Congrats! Everything seems fine!") From cc020f8a5e6b06a1b4e4b058864227ca27888110 Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Tue, 26 Mar 2024 09:39:09 -0300 Subject: [PATCH 8/9] add: test fail --- models/br_bcb_taxa_cambio/taxa_cambio.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models/br_bcb_taxa_cambio/taxa_cambio.sql b/models/br_bcb_taxa_cambio/taxa_cambio.sql index 45303091..9a1c0c1f 100644 --- a/models/br_bcb_taxa_cambio/taxa_cambio.sql +++ b/models/br_bcb_taxa_cambio/taxa_cambio.sql @@ -4,7 +4,7 @@ ) }} select - safe_cast(ano as string) ano, + safe_cast(ano as string) as ano, safe_cast(data_cotacao as date) data_cotacao, safe_cast(hora_cotacao as time) hora_cotacao, safe_cast(moeda as string) moeda, From 7e9ef654fc310afee274955d9203277117846979 Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Tue, 26 Mar 2024 09:46:24 -0300 Subject: [PATCH 9/9] remove test files --- models/br_bcb_taxa_cambio/taxa_cambio.sql | 2 +- .../br_mme_consumo_energia_eletrica__uf.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/models/br_bcb_taxa_cambio/taxa_cambio.sql b/models/br_bcb_taxa_cambio/taxa_cambio.sql index 9a1c0c1f..45303091 100644 --- a/models/br_bcb_taxa_cambio/taxa_cambio.sql +++ b/models/br_bcb_taxa_cambio/taxa_cambio.sql @@ -4,7 +4,7 @@ ) }} select - safe_cast(ano as string) as ano, + safe_cast(ano as string) ano, safe_cast(data_cotacao as date) data_cotacao, safe_cast(hora_cotacao as time) hora_cotacao, safe_cast(moeda as string) moeda, diff --git a/models/br_mme_consumo_energia_eletrica/br_mme_consumo_energia_eletrica__uf.sql b/models/br_mme_consumo_energia_eletrica/br_mme_consumo_energia_eletrica__uf.sql index 607cd4f0..620f7a43 100644 --- a/models/br_mme_consumo_energia_eletrica/br_mme_consumo_energia_eletrica__uf.sql +++ b/models/br_mme_consumo_energia_eletrica/br_mme_consumo_energia_eletrica__uf.sql @@ -15,5 +15,5 @@ select then null else safe_cast(numero_consumidores as int64) end as numero_consumidores, - safe_cast(consumo as float64) as consumo + safe_cast(consumo as int64) as consumo from `basedosdados-staging.br_mme_consumo_energia_eletrica_staging.uf` as t