Skip to content

Commit

Permalink
add: recreate table_test action
Browse files Browse the repository at this point in the history
  • Loading branch information
laura-l-amaral committed Mar 25, 2024
1 parent 42bd48c commit 72b45e2
Show file tree
Hide file tree
Showing 2 changed files with 292 additions and 0 deletions.
259 changes: 259 additions & 0 deletions .github/workflows/scripts/table_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
from argparse import ArgumentParser
from pathlib import Path
import sys
from time import sleep
import traceback
import shutil
import re

import basedosdados as bd
from basedosdados import Dataset, Storage

from backend import Backend
from utils import expand_alls, get_datasets_tables_from_modified_files


def get_flow_run_state(flow_run_id: str, backend: Backend, auth_token: str):
query = """
query ($flow_run_id: uuid!) {
flow_run_by_pk (id: $flow_run_id) {
state
}
}
"""
response = backend._execute_query(
query,
variables={"flow_run_id": flow_run_id},
headers={"Authorization": f"Bearer {auth_token}"},
)
return response["flow_run_by_pk"]["state"]

def get_flow_status_logs(flow_run_id: str, backend: Backend, auth_token: str):
query = """query ($flow_run_id: uuid!){
log(where:{
flow_run_id:{_eq:$flow_run_id},
message:{_like:"%Done.%"}}){
message
}
}"""
response = backend._execute_query(
query,
variables={"flow_run_id": flow_run_id},
headers={"Authorization": f"Bearer {auth_token}"},
)
print(response)
message = response['log']['message']
result = {}
result['pass'] = int(re.findall("PASS=\d+", message)[0].split('=')[1])
result['skip'] = int(re.findall("SKIP=\d+", message)[0].split('=')[1])
result['warn'] = int(re.findall("WARN=\d+", message)[0].split('=')[1])

return result


def get_materialization_flow_id(backend: Backend, auth_token: str):
query = """
query {
flow (where: {
name: {
_like: "BD template: Executa DBT model"
},
archived: {
_eq: false
},
project: {
name: {_eq: "main"}
}
}) {
id
}
}
"""
response = backend._execute_query(
query, headers={"Authorization": f"Bearer {auth_token}"}
)
return response["flow"][0]["id"]


if __name__ == "__main__":
# Start argument parser
arg_parser = ArgumentParser()

# Add GraphQL URL argument
arg_parser.add_argument(
"--graphql-url",
type=str,
required=True,
help="URL of the GraphQL endpoint.",
)

# Add list of modified files argument
arg_parser.add_argument(
"--modified-files",
type=str,
required=True,
help="List of modified files.",
)


# Add Prefect backend URL argument
arg_parser.add_argument(
"--prefect-backend-url",
type=str,
required=False,
default="https://prefect.basedosdados.org/api",
help="Prefect backend URL.",
)

# Add prefect base URL argument
arg_parser.add_argument(
"--prefect-base-url",
type=str,
required=False,
default="https://prefect.basedosdados.org",
help="Prefect base URL.",
)

# Add Prefect API token argument
arg_parser.add_argument(
"--prefect-backend-token",
type=str,
required=True,
help="Prefect backend token.",
)

# Add materialization mode argument
arg_parser.add_argument(
"--materialization-mode",
type=str,
required=False,
default="dev",
help="Materialization mode.",
)

# Add materialization label argument
arg_parser.add_argument(
"--materialization-label",
type=str,
required=False,
default="basedosdados-dev",
help="Materialization label.",
)

# Add dbt command label argument
arg_parser.add_argument(
"--dbt-command",
type=str,
required=False,
default = "test",
help="Materialization label.",
)

# Get arguments
args = arg_parser.parse_args()

# Get datasets and tables from modified files
modified_files = args.modified_files.split(",")
datasets_tables = get_datasets_tables_from_modified_files(
modified_files, show_details=True
)
# Split deleted datasets and tables
deleted_datasets_tables = []
existing_datasets_tables = []
for dataset_id, table_id, exists, alias in datasets_tables:
if exists:
existing_datasets_tables.append((dataset_id, table_id, alias))
else:
deleted_datasets_tables.append((dataset_id, table_id, alias))
# Expand `__all__` tables
backend = Backend(args.graphql_url)
expanded_existing_datasets_tables = []
for dataset_id, table_id, alias in existing_datasets_tables:
expanded_table_ids = expand_alls(dataset_id, table_id, backend)
for expanded_dataset_id, expanded_table_id in expanded_table_ids:
expanded_existing_datasets_tables.append(
(expanded_dataset_id, expanded_table_id, alias)
)
existing_datasets_tables = expanded_existing_datasets_tables

# Launch materialization flows
backend = Backend(args.prefect_backend_url)
flow_id = get_materialization_flow_id(backend, args.prefect_backend_token)
launched_flow_run_ids = []
for dataset_id, table_id, alias in existing_datasets_tables:
print(
f"Launching materialization flow for {dataset_id}.{table_id} (alias={alias})..."
)
parameters = {
"dataset_id": dataset_id,
"dbt_alias": alias,
"mode": args.materialization_mode,
"table_id": table_id,
"dbt_command": args.dbt_command
}

mutation = """
mutation ($flow_id: UUID, $parameters: JSON, $label: String!) {
create_flow_run (input: {
flow_id: $flow_id,
parameters: $parameters,
labels: [$label],
}) {
id
}
}
"""
variables = {
"flow_id": flow_id,
"parameters": parameters,
"label": args.materialization_label,
}

response = backend._execute_query(
mutation,
variables,
headers={"Authorization": f"Bearer {args.prefect_backend_token}"},
)

flow_run_id = response["create_flow_run"]["id"]
launched_flow_run_ids.append(flow_run_id)
flow_run_url = f"{args.prefect_base_url}/flow-run/{flow_run_id}"
print(f" - Materialization flow run launched: {flow_run_url}")

# Keep monitoring the launched flow runs until they are finished
for launched_flow_run_id in launched_flow_run_ids:
print(f"Monitoring flow run {launched_flow_run_id}...")
flow_run_state = get_flow_run_state(
flow_run_id=launched_flow_run_id,
backend=backend,
auth_token=args.prefect_backend_token,
)
while flow_run_state not in ["Success", "Failed", "Cancelled"]:
sleep(5)
flow_run_state = get_flow_run_state(
flow_run_id=launched_flow_run_id,
backend=backend,
auth_token=args.prefect_backend_token,
)
if flow_run_state != "Success":
raise Exception(
f'Flow run {launched_flow_run_id} finished with state "{flow_run_state}". '
f"Check the logs at {args.prefect_base_url}/flow-run/{launched_flow_run_id}"
)
else:
test_results = get_flow_status_logs(
flow_run_id=launched_flow_run_id,
backend=backend,
auth_token=args.prefect_backend_token,
)

if test_results['warn'] > 0:
raise Exception(
f"Test got {test_results['warn']} warns\n"
f"Check the logs at {args.prefect_base_url}/flow-run/{launched_flow_run_id}"
)

else:
print("Congrats! Everything seems fine!")
print(f"{test_results['pass']} tests passed")
print(f"{test_results['skip']} tests skiped")
33 changes: 33 additions & 0 deletions .github/workflows/test_dbt_model.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
name: Test DBT model
on:
pull_request:
types: [labeled, opened]
branches: [main]
paths: [models/**, .github/workflows/test_dbt_model.yaml]
jobs:
test_dbt_model:
if: contains(github.event.pull_request.labels.*.name, 'test-dev-model')
name: Test DBT dev model
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{ github.head_ref }}
- name: Get all changed files using a comma separator
id: changed-files
uses: tj-actions/changed-files@v35
with:
separator: ','
- name: Setup Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements-actions.txt
- name: Run script to test DBT model
run: |-
python .github/workflows/scripts/table_test.py --modified-files ${{ steps.changed-files.outputs.all_modified_files }} --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --prefect-backend-token ${{ secrets.PREFECT_BACKEND_TOKEN }}

0 comments on commit 72b45e2

Please sign in to comment.