diff --git a/.github/workflows/scripts/table_test.py b/.github/workflows/scripts/table_test.py new file mode 100644 index 00000000..c5262c7f --- /dev/null +++ b/.github/workflows/scripts/table_test.py @@ -0,0 +1,259 @@ +from argparse import ArgumentParser +from pathlib import Path +import sys +from time import sleep +import traceback +import shutil +import re + +import basedosdados as bd +from basedosdados import Dataset, Storage + +from backend import Backend +from utils import expand_alls, get_datasets_tables_from_modified_files + + +def get_flow_run_state(flow_run_id: str, backend: Backend, auth_token: str): + query = """ + query ($flow_run_id: uuid!) { + flow_run_by_pk (id: $flow_run_id) { + state + } + } + """ + response = backend._execute_query( + query, + variables={"flow_run_id": flow_run_id}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + return response["flow_run_by_pk"]["state"] + +def get_flow_status_logs(flow_run_id: str, backend: Backend, auth_token: str): + query = """query ($flow_run_id: uuid!){ + log(where:{ + flow_run_id:{_eq:$flow_run_id}, + message:{_like:"%Done.%"}}){ + message + } + }""" + response = backend._execute_query( + query, + variables={"flow_run_id": flow_run_id}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + print(response) + message = response['log']['message'] + result = {} + result['pass'] = int(re.findall("PASS=\d+", message)[0].split('=')[1]) + result['skip'] = int(re.findall("SKIP=\d+", message)[0].split('=')[1]) + result['warn'] = int(re.findall("WARN=\d+", message)[0].split('=')[1]) + + return result + + +def get_materialization_flow_id(backend: Backend, auth_token: str): + query = """ + query { + flow (where: { + name: { + _like: "BD template: Executa DBT model" + }, + archived: { + _eq: false + }, + project: { + name: {_eq: "main"} + } + }) { + id + } + } + """ + response = backend._execute_query( + query, headers={"Authorization": f"Bearer {auth_token}"} + ) + return response["flow"][0]["id"] + + +if __name__ == "__main__": + # Start argument parser + arg_parser = ArgumentParser() + + # Add GraphQL URL argument + arg_parser.add_argument( + "--graphql-url", + type=str, + required=True, + help="URL of the GraphQL endpoint.", + ) + + # Add list of modified files argument + arg_parser.add_argument( + "--modified-files", + type=str, + required=True, + help="List of modified files.", + ) + + + # Add Prefect backend URL argument + arg_parser.add_argument( + "--prefect-backend-url", + type=str, + required=False, + default="https://prefect.basedosdados.org/api", + help="Prefect backend URL.", + ) + + # Add prefect base URL argument + arg_parser.add_argument( + "--prefect-base-url", + type=str, + required=False, + default="https://prefect.basedosdados.org", + help="Prefect base URL.", + ) + + # Add Prefect API token argument + arg_parser.add_argument( + "--prefect-backend-token", + type=str, + required=True, + help="Prefect backend token.", + ) + + # Add materialization mode argument + arg_parser.add_argument( + "--materialization-mode", + type=str, + required=False, + default="dev", + help="Materialization mode.", + ) + + # Add materialization label argument + arg_parser.add_argument( + "--materialization-label", + type=str, + required=False, + default="basedosdados-dev", + help="Materialization label.", + ) + + # Add dbt command label argument + arg_parser.add_argument( + "--dbt-command", + type=str, + required=False, + default = "test", + help="Materialization label.", + ) + + # Get arguments + args = arg_parser.parse_args() + + # Get datasets and tables from modified files + modified_files = args.modified_files.split(",") + datasets_tables = get_datasets_tables_from_modified_files( + modified_files, show_details=True + ) + # Split deleted datasets and tables + deleted_datasets_tables = [] + existing_datasets_tables = [] + for dataset_id, table_id, exists, alias in datasets_tables: + if exists: + existing_datasets_tables.append((dataset_id, table_id, alias)) + else: + deleted_datasets_tables.append((dataset_id, table_id, alias)) + # Expand `__all__` tables + backend = Backend(args.graphql_url) + expanded_existing_datasets_tables = [] + for dataset_id, table_id, alias in existing_datasets_tables: + expanded_table_ids = expand_alls(dataset_id, table_id, backend) + for expanded_dataset_id, expanded_table_id in expanded_table_ids: + expanded_existing_datasets_tables.append( + (expanded_dataset_id, expanded_table_id, alias) + ) + existing_datasets_tables = expanded_existing_datasets_tables + + # Launch materialization flows + backend = Backend(args.prefect_backend_url) + flow_id = get_materialization_flow_id(backend, args.prefect_backend_token) + launched_flow_run_ids = [] + for dataset_id, table_id, alias in existing_datasets_tables: + print( + f"Launching materialization flow for {dataset_id}.{table_id} (alias={alias})..." + ) + parameters = { + "dataset_id": dataset_id, + "dbt_alias": alias, + "mode": args.materialization_mode, + "table_id": table_id, + "dbt_command": args.dbt_command + } + + mutation = """ + mutation ($flow_id: UUID, $parameters: JSON, $label: String!) { + create_flow_run (input: { + flow_id: $flow_id, + parameters: $parameters, + labels: [$label], + }) { + id + } + } + """ + variables = { + "flow_id": flow_id, + "parameters": parameters, + "label": args.materialization_label, + } + + response = backend._execute_query( + mutation, + variables, + headers={"Authorization": f"Bearer {args.prefect_backend_token}"}, + ) + + flow_run_id = response["create_flow_run"]["id"] + launched_flow_run_ids.append(flow_run_id) + flow_run_url = f"{args.prefect_base_url}/flow-run/{flow_run_id}" + print(f" - Materialization flow run launched: {flow_run_url}") + + # Keep monitoring the launched flow runs until they are finished + for launched_flow_run_id in launched_flow_run_ids: + print(f"Monitoring flow run {launched_flow_run_id}...") + flow_run_state = get_flow_run_state( + flow_run_id=launched_flow_run_id, + backend=backend, + auth_token=args.prefect_backend_token, + ) + while flow_run_state not in ["Success", "Failed", "Cancelled"]: + sleep(5) + flow_run_state = get_flow_run_state( + flow_run_id=launched_flow_run_id, + backend=backend, + auth_token=args.prefect_backend_token, + ) + if flow_run_state != "Success": + raise Exception( + f'Flow run {launched_flow_run_id} finished with state "{flow_run_state}". ' + f"Check the logs at {args.prefect_base_url}/flow-run/{launched_flow_run_id}" + ) + else: + test_results = get_flow_status_logs( + flow_run_id=launched_flow_run_id, + backend=backend, + auth_token=args.prefect_backend_token, + ) + + if test_results['warn'] > 0: + raise Exception( + f"Test got {test_results['warn']} warns\n" + f"Check the logs at {args.prefect_base_url}/flow-run/{launched_flow_run_id}" + ) + + else: + print("Congrats! Everything seems fine!") + print(f"{test_results['pass']} tests passed") + print(f"{test_results['skip']} tests skiped") diff --git a/.github/workflows/test_dbt_model.yaml b/.github/workflows/test_dbt_model.yaml new file mode 100644 index 00000000..a73356a5 --- /dev/null +++ b/.github/workflows/test_dbt_model.yaml @@ -0,0 +1,33 @@ +--- +name: Test DBT model +on: + pull_request: + types: [labeled, opened] + branches: [main] + paths: [models/**, .github/workflows/test_dbt_model.yaml] +jobs: + test_dbt_model: + if: contains(github.event.pull_request.labels.*.name, 'test-dev-model') + name: Test DBT dev model + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + ref: ${{ github.head_ref }} + - name: Get all changed files using a comma separator + id: changed-files + uses: tj-actions/changed-files@v35 + with: + separator: ',' + - name: Setup Python 3.9 + uses: actions/setup-python@v2 + with: + python-version: 3.9 + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-actions.txt + - name: Run script to test DBT model + run: |- + python .github/workflows/scripts/table_test.py --modified-files ${{ steps.changed-files.outputs.all_modified_files }} --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --prefect-backend-token ${{ secrets.PREFECT_BACKEND_TOKEN }}