From bf3dbb198bdf106ae062cf324a11ef959551de2c Mon Sep 17 00:00:00 2001 From: Bill Baumgartner Date: Thu, 20 Jun 2024 22:29:09 -0600 Subject: [PATCH] Update targeted-export.py updates yielded successful kgx export assertion count is still hard coded -- that is the next thing to fix. --- DAG/targeted-export.py | 55 +++++++++++++++++++++++++++++++++--------- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/DAG/targeted-export.py b/DAG/targeted-export.py index 4a409e1..b90f726 100644 --- a/DAG/targeted-export.py +++ b/DAG/targeted-export.py @@ -4,10 +4,11 @@ from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta -from airflow import models +from airflow import models, XComArg from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) + from kubernetes.client import models as k8s_models MYSQL_DATABASE_PASSWORD=os.environ.get('MYSQL_DATABASE_PASSWORD') @@ -17,15 +18,15 @@ UNI_BUCKET = os.environ.get('UNI_BUCKET') TMP_BUCKET = os.environ.get('TMP_BUCKET') FAILURE_EMAIL = os.environ.get('FAILURE_EMAIL') -START_DATE=datetime(2024, 3, 29, 0, 0) +START_DATE=datetime(2024, 6, 20, 0, 0) EVIDENCE_LIMIT = 5 # STEP_SIZE = 75000 ### STEP_SIZE doesn't seem to be used -# ASSERTION_LIMIT = 600000 # This is the default in Edgar's original implementation so keeping it for now -# CHUNK_SIZE = '100000' +ASSERTION_LIMIT = 100000 # This is the default in Edgar's original implementation so keeping it for now +CHUNK_SIZE = '25000' -# for testing -ASSERTION_LIMIT = 25000 -CHUNK_SIZE = 5000 +# # for testing +# ASSERTION_LIMIT = 25000 +# CHUNK_SIZE = 5000 @@ -76,17 +77,27 @@ def output_operations(**kwargs): with open(kwargs['output_filename'], 'w') as outfile: x = outfile.write(json.dumps(operations_dict)) +# TODO: we are able to read the assertion count after querying for it, but not sure how to get it from XComArgs as an integer to use it in generate_edge_export_arguments +def read_assertion_count_from_file(ti, **kwargs): + file_path = kwargs['file_path'] + with open(file_path, "r") as count_file: + assertion_count = int(count_file.readline().strip()) + print(f"===================== ASSERTION COUNT {assertion_count}") + ti.xcom_push(key='assertion_count', value=assertion_count) +# TODO: get the assertion count dynamically. It is put into XComArgs above, but I can't seem to figure out how to retrieve it as an integer def get_assertion_count(): - # TODO: implement this as a database query - write output to the tmp bucket - return 50000 + # return 50000 + return 3261384 def generate_edge_export_arguments(assertion_limit, chunk_size, evidence_limit, bucket): arguments_list = [] + total_assertion_count = get_assertion_count() + # total_assertion_count = total_assertion_count# get_assertion_count() incremental_assertion_count = 0 - while incremental_assertion_count < total_assertion_count: + while incremental_assertion_count < int(total_assertion_count): arguments_list.append(['-t', 'edges', '-b', bucket, '--chunk_size', str(chunk_size), @@ -116,6 +127,28 @@ def generate_edge_export_arguments(assertion_limit, chunk_size, evidence_limit, }, image='gcr.io/translator-text-workflow-dev/kgx-export:latest') + export_assertion_count = KubernetesPodOperator( + task_id='count-assertions', + name='count-assertions', + config_file="/home/airflow/composer_kube_config", + namespace='composer-user-workloads', + image_pull_policy='Always', + arguments=['-t', 'count', '-b', TMP_BUCKET], + env_vars={ + 'MYSQL_DATABASE_PASSWORD': MYSQL_DATABASE_PASSWORD, + 'MYSQL_DATABASE_USER': MYSQL_DATABASE_USER, + 'MYSQL_DATABASE_INSTANCE': MYSQL_DATABASE_INSTANCE, + }, + image='gcr.io/translator-text-workflow-dev/kgx-export:latest') + + read_assertion_count = PythonOperator( + task_id='read_assertion_count', + python_callable=read_assertion_count_from_file, + provide_context=True, + op_kwargs={'file_path': '/home/airflow/gcs/data/kgx-build/assertion.count'}, + dag=dag) + + export_edges = KubernetesPodOperator.partial( task_id=f'targeted-edges', name=f'edge-export', @@ -169,4 +202,4 @@ def generate_edge_export_arguments(assertion_limit, chunk_size, evidence_limit, task_id='clean-up', bash_command=f"cd /home/airflow/gcs/data/kgx-build/ && rm *.tsv") - export_nodes >> export_edges >> cat_edge_files >> generate_bte_operations >> compress_edge_file >> generate_metadata >> publish_files >> clean_up + export_nodes >> export_assertion_count >> read_assertion_count >> export_edges >> cat_edge_files >> generate_bte_operations >> compress_edge_file >> generate_metadata >> publish_files >> clean_up