Skip to content

Commit

Permalink
Update targeted-export.py
Browse files Browse the repository at this point in the history
updates yielded successful kgx export
assertion count is still hard coded -- that is the next thing to fix.
  • Loading branch information
bill-baumgartner committed Jun 21, 2024
1 parent c0ba720 commit bf3dbb1
Showing 1 changed file with 44 additions and 11 deletions.
55 changes: 44 additions & 11 deletions DAG/targeted-export.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow import models
from airflow import models, XComArg
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)

from kubernetes.client import models as k8s_models

MYSQL_DATABASE_PASSWORD=os.environ.get('MYSQL_DATABASE_PASSWORD')
Expand All @@ -17,15 +18,15 @@
UNI_BUCKET = os.environ.get('UNI_BUCKET')
TMP_BUCKET = os.environ.get('TMP_BUCKET')
FAILURE_EMAIL = os.environ.get('FAILURE_EMAIL')
START_DATE=datetime(2024, 3, 29, 0, 0)
START_DATE=datetime(2024, 6, 20, 0, 0)
EVIDENCE_LIMIT = 5
# STEP_SIZE = 75000 ### STEP_SIZE doesn't seem to be used
# ASSERTION_LIMIT = 600000 # This is the default in Edgar's original implementation so keeping it for now
# CHUNK_SIZE = '100000'
ASSERTION_LIMIT = 100000 # This is the default in Edgar's original implementation so keeping it for now
CHUNK_SIZE = '25000'

# for testing
ASSERTION_LIMIT = 25000
CHUNK_SIZE = 5000
# # for testing
# ASSERTION_LIMIT = 25000
# CHUNK_SIZE = 5000



Expand Down Expand Up @@ -76,17 +77,27 @@ def output_operations(**kwargs):
with open(kwargs['output_filename'], 'w') as outfile:
x = outfile.write(json.dumps(operations_dict))

# TODO: we are able to read the assertion count after querying for it, but not sure how to get it from XComArgs as an integer to use it in generate_edge_export_arguments
def read_assertion_count_from_file(ti, **kwargs):
file_path = kwargs['file_path']
with open(file_path, "r") as count_file:
assertion_count = int(count_file.readline().strip())
print(f"===================== ASSERTION COUNT {assertion_count}")
ti.xcom_push(key='assertion_count', value=assertion_count)

# TODO: get the assertion count dynamically. It is put into XComArgs above, but I can't seem to figure out how to retrieve it as an integer
def get_assertion_count():
# TODO: implement this as a database query - write output to the tmp bucket
return 50000
# return 50000
return 3261384

def generate_edge_export_arguments(assertion_limit, chunk_size, evidence_limit, bucket):
arguments_list = []

total_assertion_count = get_assertion_count()
# total_assertion_count = total_assertion_count# get_assertion_count()
incremental_assertion_count = 0

while incremental_assertion_count < total_assertion_count:
while incremental_assertion_count < int(total_assertion_count):
arguments_list.append(['-t', 'edges',
'-b', bucket,
'--chunk_size', str(chunk_size),
Expand Down Expand Up @@ -116,6 +127,28 @@ def generate_edge_export_arguments(assertion_limit, chunk_size, evidence_limit,
},
image='gcr.io/translator-text-workflow-dev/kgx-export:latest')

export_assertion_count = KubernetesPodOperator(
task_id='count-assertions',
name='count-assertions',
config_file="/home/airflow/composer_kube_config",
namespace='composer-user-workloads',
image_pull_policy='Always',
arguments=['-t', 'count', '-b', TMP_BUCKET],
env_vars={
'MYSQL_DATABASE_PASSWORD': MYSQL_DATABASE_PASSWORD,
'MYSQL_DATABASE_USER': MYSQL_DATABASE_USER,
'MYSQL_DATABASE_INSTANCE': MYSQL_DATABASE_INSTANCE,
},
image='gcr.io/translator-text-workflow-dev/kgx-export:latest')

read_assertion_count = PythonOperator(
task_id='read_assertion_count',
python_callable=read_assertion_count_from_file,
provide_context=True,
op_kwargs={'file_path': '/home/airflow/gcs/data/kgx-build/assertion.count'},
dag=dag)


export_edges = KubernetesPodOperator.partial(
task_id=f'targeted-edges',
name=f'edge-export',
Expand Down Expand Up @@ -169,4 +202,4 @@ def generate_edge_export_arguments(assertion_limit, chunk_size, evidence_limit,
task_id='clean-up',
bash_command=f"cd /home/airflow/gcs/data/kgx-build/ && rm *.tsv")

export_nodes >> export_edges >> cat_edge_files >> generate_bte_operations >> compress_edge_file >> generate_metadata >> publish_files >> clean_up
export_nodes >> export_assertion_count >> read_assertion_count >> export_edges >> cat_edge_files >> generate_bte_operations >> compress_edge_file >> generate_metadata >> publish_files >> clean_up

0 comments on commit bf3dbb1

Please sign in to comment.