Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[add] Action test dbt model #518

Merged
merged 12 commits into from
Mar 26, 2024
236 changes: 236 additions & 0 deletions .github/workflows/scripts/table_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
from argparse import ArgumentParser
from time import sleep
import re
from backend import Backend
from utils import expand_alls, get_datasets_tables_from_modified_files


def get_flow_run_state(flow_run_id: str, backend: Backend, auth_token: str):
query = """
query ($flow_run_id: uuid!) {
flow_run_by_pk (id: $flow_run_id) {
state
}
}
"""
response = backend._execute_query(
query,
variables={"flow_run_id": flow_run_id},
headers={"Authorization": f"Bearer {auth_token}"},
)
return response["flow_run_by_pk"]["state"]

def get_flow_status_logs(flow_run_id: str, backend: Backend, auth_token: str):
query = """query ($flow_run_id: uuid!){
log(where:{
flow_run_id:{_eq:$flow_run_id},
message:{_like:"%Done.%"}}){
message
}
}"""
response = backend._execute_query(
query,
variables={"flow_run_id": flow_run_id},
headers={"Authorization": f"Bearer {auth_token}"},
)
print(response)
message = response['log']['message']
result = {}
result['pass'] = int(re.findall("PASS=\d+", message)[0].split('=')[1])
result['skip'] = int(re.findall("SKIP=\d+", message)[0].split('=')[1])
result['warn'] = int(re.findall("WARN=\d+", message)[0].split('=')[1])

return result


def get_materialization_flow_id(backend: Backend, auth_token: str):
query = """
query {
flow (where: {
name: {
_like: "BD template: Executa DBT model"
},
archived: {
_eq: false
},
project: {
name: {_eq: "main"}
}
}) {
id
}
}
"""
response = backend._execute_query(
query, headers={"Authorization": f"Bearer {auth_token}"}
)
return response["flow"][0]["id"]


if __name__ == "__main__":
# Start argument parser
arg_parser = ArgumentParser()

# Add GraphQL URL argument
arg_parser.add_argument(
"--graphql-url",
type=str,
required=True,
help="URL of the GraphQL endpoint.",
)

# Add list of modified files argument
arg_parser.add_argument(
"--modified-files",
type=str,
required=True,
help="List of modified files.",
)


# Add Prefect backend URL argument
arg_parser.add_argument(
"--prefect-backend-url",
type=str,
required=False,
default="https://prefect.basedosdados.org/api",
help="Prefect backend URL.",
)

# Add prefect base URL argument
arg_parser.add_argument(
"--prefect-base-url",
type=str,
required=False,
default="https://prefect.basedosdados.org",
help="Prefect base URL.",
)

# Add Prefect API token argument
arg_parser.add_argument(
"--prefect-backend-token",
type=str,
required=True,
help="Prefect backend token.",
)

# Add materialization mode argument
arg_parser.add_argument(
"--materialization-mode",
type=str,
required=False,
default="dev",
help="Materialization mode.",
)

# Add materialization label argument
arg_parser.add_argument(
"--materialization-label",
type=str,
required=False,
default="basedosdados-dev",
help="Materialization label.",
)

# Add dbt command label argument
arg_parser.add_argument(
"--dbt-command",
type=str,
required=False,
default = "test",
help="Materialization label.",
)

# Get arguments
args = arg_parser.parse_args()

# Get datasets and tables from modified files
modified_files = args.modified_files.split(",")
datasets_tables = get_datasets_tables_from_modified_files(
modified_files, show_details=True
)
# Split deleted datasets and tables
deleted_datasets_tables = []
existing_datasets_tables = []
for dataset_id, table_id, exists, alias in datasets_tables:
if exists:
existing_datasets_tables.append((dataset_id, table_id, alias))
else:
deleted_datasets_tables.append((dataset_id, table_id, alias))
# Expand `__all__` tables
backend = Backend(args.graphql_url)
expanded_existing_datasets_tables = []
for dataset_id, table_id, alias in existing_datasets_tables:
expanded_table_ids = expand_alls(dataset_id, table_id, backend)
for expanded_dataset_id, expanded_table_id in expanded_table_ids:
expanded_existing_datasets_tables.append(
(expanded_dataset_id, expanded_table_id, alias)
)
existing_datasets_tables = expanded_existing_datasets_tables

# Launch materialization flows
backend = Backend(args.prefect_backend_url)
flow_id = get_materialization_flow_id(backend, args.prefect_backend_token)
launched_flow_run_ids = []
for dataset_id, table_id, alias in existing_datasets_tables:
print(
f"Launching materialization flow for {dataset_id}.{table_id} (alias={alias})..."
)
parameters = {
"dataset_id": dataset_id,
"dbt_alias": alias,
"mode": args.materialization_mode,
"table_id": table_id,
"dbt_command": args.dbt_command
}

mutation = """
mutation ($flow_id: UUID, $parameters: JSON, $label: String!) {
create_flow_run (input: {
flow_id: $flow_id,
parameters: $parameters,
labels: [$label],
}) {
id
}
}
"""
variables = {
"flow_id": flow_id,
"parameters": parameters,
"label": args.materialization_label,
}

response = backend._execute_query(
mutation,
variables,
headers={"Authorization": f"Bearer {args.prefect_backend_token}"},
)

flow_run_id = response["create_flow_run"]["id"]
launched_flow_run_ids.append(flow_run_id)
flow_run_url = f"{args.prefect_base_url}/flow-run/{flow_run_id}"
print(f" - Materialization flow run launched: {flow_run_url}")

# Keep monitoring the launched flow runs until they are finished
for launched_flow_run_id in launched_flow_run_ids:
print(f"Monitoring flow run {launched_flow_run_id}...")
flow_run_state = get_flow_run_state(
flow_run_id=launched_flow_run_id,
backend=backend,
auth_token=args.prefect_backend_token,
)
while flow_run_state not in ["Success", "Failed", "Cancelled"]:
sleep(5)
flow_run_state = get_flow_run_state(
flow_run_id=launched_flow_run_id,
backend=backend,
auth_token=args.prefect_backend_token,
)
if flow_run_state != "Success":
raise Exception(
f'Flow run {launched_flow_run_id} finished with state "{flow_run_state}". '
f"Check the logs at {args.prefect_base_url}/flow-run/{launched_flow_run_id}"
)
else:
print("Congrats! Everything seems fine!")
34 changes: 34 additions & 0 deletions .github/workflows/test_dbt_model.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
name: Test DBT model
on:
pull_request:
types: [labeled, opened]
branches: [main]
paths: [models/**, .github/workflows/test_dbt_model.yaml]
jobs:
test_dbt_model:
if: contains(github.event.pull_request.labels.*.name, 'test-dev-model')
name: Test DBT dev model
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{ github.head_ref }}
- name: Get all changed files using a comma separator
id: changed-files
uses: tj-actions/changed-files@v35
with:
separator: ','
- name: Set up poetry
run: pipx install poetry
- name: Set up python
uses: actions/setup-python@v4
with:
cache: poetry
python-version: '3.9'
- name: Install requirements
run: poetry install --only=dev
- name: Run script to test DBT model
run: |-
poetry run python .github/workflows/scripts/table_test.py --modified-files ${{ steps.changed-files.outputs.all_modified_files }} --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --prefect-backend-token ${{ secrets.PREFECT_BACKEND_TOKEN }}
Loading